Commit 8ba7a8a5 authored by Yunhua Koglin's avatar Yunhua Koglin Committed by Rucha Deshpande
Browse files

implement Journal and some Journal Transaction, and Storage interfaces

commit bd12e513 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Sun Jan 17 2021 23:18:41 GMT-0600 (Central Standard Time) 

    Revert "update packages with aws-sdk"

This reverts commit fa7ba482aba74830daee7935a46daaf2cbdc3d33.


commit 64740dd9 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Sun Jan 17 2021 22:37:08 GMT-0600 (Central Standard Time) 

    add storage and dynamodb


commit 739900b4 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Fri Jan 15 2021 16:16:21 GMT-0600 (Central Standard Time) 

    Merge branch 'dev' of codecommit://os-seismic-store-service into kogliny


commit 5f969e3a 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Fri Jan 15 2021 15:47:49 GMT-0600 (Central Standard Time) 

    Merge branch 'dev' of codecommit://os-seismic-store-service into kogliny


commit 0ac28b3b 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Wed Jan 13 2021 11:35:28 GMT-0600 (Central Standard Time) 

    Merge branch 'dev' of codecommit://os-seismic-store-service into kogliny


commit fa7ba482 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Tue Jan 12 2021 09:26:08 GMT-0600 (Central Standard Time) 

    update packages with aws-sdk
parent 17bfc576
......@@ -13,12 +13,13 @@
// limitations under the License.
import { TenantModel } from '../../../services/tenant';
import { AbstractJournal, IJournalQueryModel, IJournalTransaction, JournalFactory } from '../../journal';
import { AbstractJournal, AbstractJournalTransaction, IJournalQueryModel, IJournalTransaction, JournalFactory } from '../../journal';
import { AWSConfig } from './config';
import AWS from 'aws-sdk/global';
import DynamoDB from 'aws-sdk/clients/dynamodb';
import DynamoDB, { ScanInput } from 'aws-sdk/clients/dynamodb';
import aws from 'aws-sdk';
import { PromiseResult } from 'aws-sdk/lib/request';
const converter = aws.DynamoDB.Converter;
@JournalFactory.register('aws')
......@@ -78,14 +79,50 @@ export class AWSDynamoDbDAO extends AbstractJournal {
}
public createQuery(namespace: string, kind: string): IJournalQueryModel {
//not implemented
return undefined;
return new AWSDynamoDbQuery(namespace, kind);
}
public async runQuery(query: IJournalQueryModel): Promise<[any[], { endCursor?: string }]> {
//not implemented
return undefined;
const dbQuery = (query as AWSDynamoDbQuery);
const statement = dbQuery.getQueryStatement(AWSConfig.DATASETS_KIND);
// const statement = {
// TableName : 'osdu-kogliny-SeismicStore.datasets',
// FilterExpression: "#path=:path",
// KeyConditionExpression: '#name=:name',
// ExpressionAttributeNames: {
// "#name": "name",
// "#path": "path"
// },
// ExpressionAttributeValues: {
// ":name": 'yk-2dataset',
// ":path": '/a/b/c/'
// }
// };
console.log('query ' + JSON.stringify(statement));
const db = new DynamoDB.DocumentClient();
var scanResults = [];
var items: PromiseResult<DynamoDB.DocumentClient.ScanOutput, AWS.AWSError>;
do {
items = await db.scan(statement).promise();
const results = items.Items.map(result => {
if (!result.data) {
return result;
} else {
if (result.data[this.KEY.toString()]) {
result.data[this.KEY] = result.data[this.KEY.toString()];
delete result.data[this.KEY.toString()];
return result.data;
} else {
return result.data;
}
}
});
scanResults = scanResults.concat(results);
statement.ExclusiveStartKey = items.LastEvaluatedKey;
} while (typeof items.LastEvaluatedKey !== "undefined");
return Promise.resolve( [scanResults, {endCursor: items.LastEvaluatedKey}]);
}
public createKey(specs: any): object {
const kind0 = specs.path[0];
const partitionKey = specs.path[1];
......@@ -95,13 +132,189 @@ export class AWSDynamoDbDAO extends AbstractJournal {
}
public getTransaction(): IJournalTransaction {
//not implemented
return undefined;
return new AWSDynamoDbTransactionDAO(this);
}
public getQueryFilterSymbolContains(): string {
return 'contains';
}
}
declare type OperationType = 'save' | 'delete';
export class AWSDynamoDbTransactionOperation {
public constructor(type: OperationType, entityOrKey: any) {
this.type = type;
this.entityOrKey = entityOrKey;
}
public type: OperationType;
public entityOrKey: any;
}
export class AWSDynamoDbTransactionDAO extends AbstractJournalTransaction {
public KEY = null;
public constructor(owner: AWSDynamoDbDAO) {
super();
this.owner = owner;
this.KEY = this.owner.KEY;
}
public async save(entity: any): Promise<void> {
console.log('aws Transaction Save ' + JSON.stringify(entity));
this.queuedOperations.push(new AWSDynamoDbTransactionOperation('save', entity));
await Promise.resolve();
}
public async get(key: any): Promise<[any | any[]]> {
console.log('aws Transaction get ' + JSON.stringify(key));
return await this.owner.get(key);
}
public async delete(key: any): Promise<void> {
console.log('aws Transaction delete ' + JSON.stringify(key));
this.queuedOperations.push(new AWSDynamoDbTransactionOperation('delete', key));
await Promise.resolve();
}
public createQuery(namespace: string, kind: string): IJournalQueryModel {
console.log('aws Transaction createQuery ' + namespace + kind);
return this.owner.createQuery(namespace, kind);
}
public async runQuery(query: IJournalQueryModel): Promise<[any[], { endCursor?: string }]> {
console.log('aws Transaction runQuery ' + JSON.stringify(query));
return await this.owner.runQuery(query);
}
public async run(): Promise<void> {
console.log('aws Transaction run ');
if (this.queuedOperations.length) {
await Promise.reject('Transaction is already in use.');
}
else {
this.queuedOperations = [];
return Promise.resolve();
}
}
public async rollback(): Promise<void> {
console.log('aws Transaction rollback ');
this.queuedOperations = [];
return Promise.resolve();
}
public async commit(): Promise<void> {
console.log('aws Transaction commit ');
for (const operation of this.queuedOperations) {
if (operation.type === 'save') {
await this.owner.save(operation.entityOrKey);
}
if (operation.type === 'delete') {
await this.owner.delete(operation.entityOrKey);
}
}
this.queuedOperations = [];
return Promise.resolve();
}
public getQueryFilterSymbolContains(): string {
//not implemented
return undefined;
return 'contains';
}
private owner: AWSDynamoDbDAO;
public queuedOperations: AWSDynamoDbTransactionOperation[] = [];
}
declare type Operator = '=' | '<' | '>' | '<=' | '>=' | 'HAS_ANCESTOR' | 'CONTAINS';
export class AWSDynamoDbQuery implements IJournalQueryModel {
public constructor(namespace: string, kind: string) {
this.namespace = namespace;
this.kind = kind;
this.queryStatement = { TableName: kind, FilterExpression: '', ExpressionAttributeNames: {}, ExpressionAttributeValues: {} };
}
public namespace: string;
public kind: string;
public queryStatement: ScanInput;
filter(property: string, value: {}): IJournalQueryModel;
filter(property: string, operator: Operator, value: {}): IJournalQueryModel;
filter(property: string, operator?: Operator, value?: {}): IJournalQueryModel {
if (operator === 'CONTAINS') {
this.queryStatement.FilterExpression += 'contains(#' + property + ',:' + property + ')';
this.queryStatement.ExpressionAttributeNames['#' + property] = property;
this.queryStatement.ExpressionAttributeValues[':' + property] = value;
return this;
}
if (value === undefined) {
value = operator;
operator = '=';
}
if (operator === undefined) {
operator = '=';
}
if (value === undefined) {
value = '';
}
if (operator === 'HAS_ANCESTOR') {
throw new Error('HAS_ANCESTOR operator is not supported in query filters.');
}
if (!!(this.queryStatement.FilterExpression))
this.queryStatement.FilterExpression += ' AND ';
this.queryStatement.FilterExpression += '#' + property + operator + ':' + property;
this.queryStatement.ExpressionAttributeNames['#' + property] = property;
this.queryStatement.ExpressionAttributeValues[':' + property] = value;
return this;
}
start(start: string | Buffer): IJournalQueryModel {
if (start instanceof Buffer) {
throw new Error('Type \'Buffer\' is not supported for DynamoDB Continuation while paging.');
}
// this.queryStatement.ExclusiveStartKey = start as DynamoDB.String;
console.log('NOT SUPPOR aws start createQuery ' + start);
return this;
}
limit(n: number): IJournalQueryModel {
this.queryStatement.Limit = n;
return this;
}
groupBy(fieldNames: string | string[]): IJournalQueryModel {
console.log('NOT SUPPORT aws groupBy createQuery ' + fieldNames);
return this;
}
select(fieldNames: string | string[]): IJournalQueryModel {
if (this.queryStatement.ProjectionExpression.length >= 1)
this.queryStatement.ProjectionExpression += ',';
if (typeof fieldNames === 'string') {
this.queryStatement.ProjectionExpression += fieldNames;
} else {
this.queryStatement.ProjectionExpression += fieldNames.join(',');
}
return this;
}
public getQueryStatement(tableName: string): ScanInput {
//delete empty objects in query parameters
if (Object.entries(this.queryStatement.ExpressionAttributeNames).length === 0) {
delete this.queryStatement.ExpressionAttributeNames;
delete this.queryStatement.ExpressionAttributeValues;
}
if (this.queryStatement.FilterExpression.length === 0)
delete this.queryStatement.FilterExpression;
this.queryStatement.TableName = AWSConfig.SERVICE_ENV + '-SeismicStore.' + tableName;
return this.queryStatement;
}
}
\ No newline at end of file
......@@ -15,6 +15,6 @@
export { AWSConfig } from './config';
export { AWSStorage } from './storage';
export { AWSCredentials } from './credentials';
export { AWSDynamoDbDAO} from './dynamodb';
export { AWSDynamoDbDAO, AWSDynamoDbTransactionDAO, AWSDynamoDbQuery } from './dynamodb';
export { AwsTrace } from './trace';
export { AWSDataEcosystemServices } from './dataecosystem';
\ No newline at end of file
......@@ -12,49 +12,179 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import {AbstractStorage, StorageFactory} from '../../storage';
import { AWSConfig } from './config';
import { AbstractStorage, StorageFactory } from '../../storage';
import { TenantModel } from '../../../services/tenant';
import AWS from 'aws-sdk/global';
import S3 from "aws-sdk/clients/s3";
@StorageFactory.register('aws')
export class AWSStorage extends AbstractStorage {
async bucketExists(bucketName: string): Promise<boolean> {
return undefined;
private projectID: string;
private BUCKET_PREFIX = 'ss-' + AWSConfig.SERVICE_ENV;
private s3: S3;
public constructor(tenant: TenantModel) {
super();
this.projectID = tenant.gcpid;
AWS.config.update({ region: AWSConfig.AWS_REGION });
// Create S3 service object
this.s3 = new S3({ apiVersion: '2006-03-01' });
}
// tslint:disable-next-line:max-line-length
async copy(bucketIn: string, prefixIn: string, bucketOut: string, prefixOut: string, ownerEmail: string): Promise<void> {
return undefined;
// generate a random bucket name
public randomBucketName(): string {
let suffix = Math.random().toString(36).substring(2, 16);
suffix = suffix + Math.random().toString(36).substring(2, 16);
suffix = suffix.substr(0, 16);
return this.BUCKET_PREFIX + '-' + suffix;
}
// tslint:disable-next-line:max-line-length
async createBucket(bucketName: string, location: string, storageClass: string, adminACL: string, editorACL: string, viewerACL: string): Promise<void> {
return undefined;
// Create a new bucket
public async createBucket(
bucketName: string,
location: string, storageClass: string,
adminACL: string, editorACL: string, viewerACL: string): Promise<void> {
const create_bucket_params = {
Bucket: bucketName,
};
try {
await this.s3.createBucket(create_bucket_params).promise();
} catch (err) {
console.log(err.code + ": " + err.message);
}
// var params = {
// Bucket: bucketName,
// GrantFullControl: adminACL,
// GrantWrite: editorACL,
// GrantRead: viewerACL
// };
// await this.s3.putBucketAcl(params).promise();
}
async deleteBucket(bucketName: string): Promise<void>;
deleteBucket(bucketName: string): void;
deleteBucket(bucketName: string): Promise<void> | void {
return undefined;
// Delete a bucket
public async deleteBucket(bucketName: string, force = false): Promise<void> {
if (force) {
await this.deleteFiles(bucketName);
}
const delete_bucket_params = { Bucket: bucketName };
try {
await this.s3.deleteBucket(delete_bucket_params).promise();
} catch (err) {
console.log(err.code + ": " + err.message);
}
}
// Delete all files in a bucket
public async deleteFiles(bucketName: string): Promise<void> {
console.log("start to delete all files in " + bucketName);
const params = { Bucket: bucketName };
const listedObjects = await this.s3.listObjectsV2(params).promise();
if (listedObjects.Contents.length === 0)
return;
async deleteFiles(bucketName: string): Promise<void> {
return undefined;
const deleteParams = {
Bucket: bucketName,
Delete: { Objects: [] }
};
listedObjects.Contents.forEach(({ Key }) => {
deleteParams.Delete.Objects.push({ Key });
});
await this.s3.deleteObjects(deleteParams).promise();
if (listedObjects.IsTruncated) //continue delete files as there are more...
await this.deleteFiles(bucketName);
}
async deleteObject(bucketName: string, objectName: string): Promise<void> {
return undefined;
// save an object/file to a bucket, object name contains path
public async saveObject(bucketName: string, objectName: string, data: string): Promise<void> {
const params = { Bucket: bucketName, Key: objectName, Body: data };
try {
this.s3.putObject(params).promise();
} catch (err) {
console.log(err.code + ": " + err.message);
}
}
async deleteObjects(bucketName: string, prefix: string): Promise<void> {
return undefined;
// delete an object from a bucket
public async deleteObject(bucketName: string, objectName: string): Promise<void> {
const params = { Bucket: bucketName, Key: objectName };
try {
this.s3.deleteObject(params).promise();
} catch (err) {
console.log(err.code + ": " + err.message);
}
}
// delete multiple objects, prefix should end with /
public async deleteObjects(bucketName: string, prefix: string, async: boolean = false): Promise<void> {
console.log("start deleteObjects in " + prefix + " in bucket " + bucketName);
const params = { Bucket: bucketName, Prefix: prefix };
const listedObjects = await this.s3.listObjectsV2(params).promise();
randomBucketName(): string {
return '';
if (listedObjects.Contents.length === 0) return;
const deleteParams = {
Bucket: bucketName,
Delete: { Objects: [] }
};
listedObjects.Contents.forEach(({ Key }) => {
deleteParams.Delete.Objects.push({ Key });
});
await this.s3.deleteObjects(deleteParams).promise();
if (listedObjects.IsTruncated) //continue delete files as there are more...
await this.deleteObjects(bucketName, prefix);
}
// copy multiple objects (skip the dummy file)
public async copy(bucketIn: string, prefixIn: string, bucketOut: string, prefixOut: string, ownerEmail: string) {
if (prefixIn) {
prefixIn += '/';
while (prefixIn.indexOf('//') !== -1) {
prefixIn = prefixIn.replace('//', '/');
}
}
if (prefixOut) {
prefixOut += '/';
while (prefixOut.indexOf('//') !== -1) {
prefixOut = prefixOut.replace('//', '/');
}
}
const copyCalls = [];
const params = { Bucket: bucketIn, Prefix: prefixIn };
const files = await this.s3.listObjects(params).promise();
async saveObject(bucketName: string, objectName: string, data: string): Promise<void> {
return undefined;
for (const file of files[0]) {
const params = {
Bucket: bucketOut,
CopySource: bucketIn + '/' + file.Key,
Key: file.Key.replace(prefixIn, prefixOut)
};
copyCalls.push(this.s3.copyObject(params));
}
await Promise.all(copyCalls);
}
// check if a bucket exist
public async bucketExists(bucketName: string): Promise<boolean> {
const bucket_params = { Bucket: bucketName };
try {
await this.s3.headBucket(bucket_params).promise();
return true;
} catch (err) {
if (err.statusCode === 404) { //404 if the bucket does not exist
return false;
}
}
return true;
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment