Commit 2f7455bc authored by Yunhua Koglin's avatar Yunhua Koglin Committed by Rucha Deshpande
Browse files

update Journal and Journal Transaction implementation

commit a0794cf7 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Wed Jan 20 2021 16:59:17 GMT-0600 (Central Standard Time) 

    add entry for app table, fix id for dataset


commit 7013750b 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Wed Jan 20 2021 14:56:20 GMT-0600 (Central Standard Time) 

    fix Journal, JournalTransaction and JournalQuery in dynamodb


commit a916a370 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Wed Jan 20 2021 14:54:11 GMT-0600 (Central Standard Time) 

    Merge branch 'dev' of codecommit://os-seismic-store-service into kogliny


commit bd12e513 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Sun Jan 17 2021 23:18:41 GMT-0600 (Central Standard Time) 

    Revert "update packages with aws-sdk"

This reverts commit fa7ba482aba74830daee7935a46daaf2cbdc3d33.


commit 64740dd9 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Sun Jan 17 2021 22:37:08 GMT-0600 (Central Standard Time) 

    add storage and dynamodb


commit 739900b4 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Fri Jan 15 2021 16:16:21 GMT-0600 (Central Standard Time) 

    Merge branch 'dev' of codecommit://os-seismic-store-service into kogliny


commit 5f969e3a 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Fri Jan 15 2021 15:47:49 GMT-0600 (Central Standard Time) 

    Merge branch 'dev' of codecommit://os-seismic-store-service into kogliny


commit 0ac28b3b 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Wed Jan 13 2021 11:35:28 GMT-0600 (Central Standard Time) 

    Merge branch 'dev' of codecommit://os-seismic-store-service in


commit fa7ba482 
Author: Yunhua Koglin <kogliny@amazon.com> 
Date: Tue Jan 12 2021 09:26:08 GMT-0600 (Central Standard Time) 

    update packages with aws-sdk
parent 8ba7a8a5
......@@ -20,16 +20,20 @@ import DynamoDB, { ScanInput } from 'aws-sdk/clients/dynamodb';
import aws from 'aws-sdk';
import { PromiseResult } from 'aws-sdk/lib/request';
import { Utils } from '../../../shared/utils';
const converter = aws.DynamoDB.Converter;
@JournalFactory.register('aws')
export class AWSDynamoDbDAO extends AbstractJournal {
public KEY = Symbol('id');
private dataPartition: string; //tenant name
private tenant: TenantModel;
public constructor(tenant: TenantModel) {
super();
this.tenant = tenant;
this.dataPartition = tenant.esd.indexOf('.') !== -1 ? tenant.esd.split('.')[0] : tenant.esd;
AWS.config.update({ region: AWSConfig.AWS_REGION });
}
public async save(datasetEntity: any): Promise<void> {
......@@ -38,10 +42,27 @@ export class AWSDynamoDbDAO extends AbstractJournal {
}
for (const entity of datasetEntity) {
const item = entity.data;
//id attribute is used by aws as partitionKey.
//For tenant, id = name, for subproject, id=tenant:name; for dataset, id=tenant:subproject:name:path; for app, id=tenant:email
var strs = entity.key.partitionKey.split(':');
if(entity.key.table_kind == AWSConfig.DATASETS_KIND && strs.length == 2){
//fill in data name and path to the key
entity.key.partitionKey = entity.key.partitionKey+':'+item.name+':'+item.path;
}
if(entity.key.table_kind == AWSConfig.APPS_KIND){
item['tenant'] = strs[0]; //add tenant entry for App table
}
item['id'] = entity.key.partitionKey;
if (entity.ctag) {
item['ctag'] = entity.ctag;
}
//save extra info as this property will be consumed by the service to identify a data record
item[this.KEY.toString()] = entity.key;
const itemMarshall = converter.marshall(item);
console.log('from table ' + entity.key.kind + ' save ' + JSON.stringify(itemMarshall));
console.log('from table ' + entity.key.table_name + ' save ' + JSON.stringify(itemMarshall));
const para = {
TableName: entity.key.kind,
TableName: entity.key.table_name,
Item: itemMarshall
};
const db = new DynamoDB({});
......@@ -50,11 +71,12 @@ export class AWSDynamoDbDAO extends AbstractJournal {
}
public async get(key: any): Promise<[any | any[]]> {
const item = { 'name': key.partitionKey };
const item = { 'id': key.partitionKey };
const itemMarshall = converter.marshall(item);
console.log('from table ' + key.kind + ' get ' + JSON.stringify(itemMarshall));
console.log('from table ' + key.table_name + ' get ' + JSON.stringify(itemMarshall));
const params = {
TableName: key.kind,
TableName: key.table_name,
Key: itemMarshall
};
const db = new DynamoDB({});
......@@ -62,16 +84,19 @@ export class AWSDynamoDbDAO extends AbstractJournal {
const ret = converter.unmarshall(data.Item);
if (Object.keys(ret).length === 0)
return [undefined];
else
else {
//remove aws specific attribute id
delete ret['id'];
return [ret];
}
}
public async delete(key: any): Promise<void> {
const item = { 'name': key.partitionKey };
const item = { 'id': key.partitionKey };
const itemMarshall = converter.marshall(item);
console.log('from table ' + key.kind + ' delete ' + JSON.stringify(itemMarshall));
console.log('from table ' + key.table_name + ' delete ' + JSON.stringify(itemMarshall));
const params = {
TableName: key.kind,
TableName: key.table_name,
Key: itemMarshall
};
const db = new DynamoDB({});
......@@ -84,20 +109,8 @@ export class AWSDynamoDbDAO extends AbstractJournal {
public async runQuery(query: IJournalQueryModel): Promise<[any[], { endCursor?: string }]> {
const dbQuery = (query as AWSDynamoDbQuery);
const statement = dbQuery.getQueryStatement(AWSConfig.DATASETS_KIND);
// const statement = {
// TableName : 'osdu-kogliny-SeismicStore.datasets',
// FilterExpression: "#path=:path",
// KeyConditionExpression: '#name=:name',
// ExpressionAttributeNames: {
// "#name": "name",
// "#path": "path"
// },
// ExpressionAttributeValues: {
// ":name": 'yk-2dataset',
// ":path": '/a/b/c/'
// }
// };
const statement = dbQuery.getQueryStatement(dbQuery.kind);
console.log('query ' + JSON.stringify(statement));
const db = new DynamoDB.DocumentClient();
var scanResults = [];
......@@ -105,30 +118,37 @@ export class AWSDynamoDbDAO extends AbstractJournal {
do {
items = await db.scan(statement).promise();
const results = items.Items.map(result => {
if (!result.data) {
return result;
} else {
if (result.data[this.KEY.toString()]) {
result.data[this.KEY] = result.data[this.KEY.toString()];
delete result.data[this.KEY.toString()];
return result.data;
} else {
return result.data;
}
var ret = {};
ret = result;
//update object property for service (dao.ts) to consume
if (ret[this.KEY.toString()]) {
ret[this.KEY] = result[this.KEY.toString()];
}
return ret;
});
scanResults = scanResults.concat(results);
statement.ExclusiveStartKey = items.LastEvaluatedKey;
} while (typeof items.LastEvaluatedKey !== "undefined");
return Promise.resolve( [scanResults, {endCursor: items.LastEvaluatedKey}]);
return Promise.resolve([scanResults, { endCursor: items.LastEvaluatedKey }]);
}
public createKey(specs: any): object {
const kind0 = specs.path[0];
const partitionKey = specs.path[1];
const kind = AWSConfig.SERVICE_ENV + '-' + 'SeismicStore.' + kind0;
//kind is the table name
return { partitionKey, kind };
const table_kind = specs.path[0];
const name = specs.path[1]; //our key
var partitionKey = name; // partitionKey
var strs = specs.namespace.split('-');
if (table_kind === AWSConfig.SUBPROJECTS_KIND) {
partitionKey = strs[strs.length - 1] + ':' + partitionKey; //tenant:subprojet for id
}
if (table_kind === AWSConfig.DATASETS_KIND) {
partitionKey = strs[strs.length - 2] + ':' + strs[strs.length - 1]; //tenant:subprojet for id
}
if (table_kind === AWSConfig.APPS_KIND) {
partitionKey = strs[strs.length - 1] + ':' + name; //tenant:subprojet for id
}
const table_name = AWSConfig.SERVICE_ENV + '-' + 'SeismicStore.' + specs.path[0];
return { table_name, name, table_kind, partitionKey };
}
public getTransaction(): IJournalTransaction {
......@@ -267,12 +287,15 @@ export class AWSDynamoDbQuery implements IJournalQueryModel {
if (operator === 'HAS_ANCESTOR') {
throw new Error('HAS_ANCESTOR operator is not supported in query filters.');
}
if (!!(this.queryStatement.FilterExpression))
if (!!(this.queryStatement.FilterExpression)) {
this.queryStatement.FilterExpression += ' AND ';
}
this.queryStatement.FilterExpression += '#' + property + operator + ':' + property;
this.queryStatement.ExpressionAttributeNames['#' + property] = property;
this.queryStatement.ExpressionAttributeValues[':' + property] = value;
return this;
}
......@@ -280,7 +303,6 @@ export class AWSDynamoDbQuery implements IJournalQueryModel {
if (start instanceof Buffer) {
throw new Error('Type \'Buffer\' is not supported for DynamoDB Continuation while paging.');
}
// this.queryStatement.ExclusiveStartKey = start as DynamoDB.String;
console.log('NOT SUPPOR aws start createQuery ' + start);
return this;
}
......@@ -306,13 +328,47 @@ export class AWSDynamoDbQuery implements IJournalQueryModel {
return this;
}
public getQueryStatement(tableName: string): ScanInput {
//since we have one table for all datasets, we need to add more filters to return dataset specific for that tenant/subproject
var strs = this.namespace.split('-');
if (this.kind === AWSConfig.DATASETS_KIND || this.kind === AWSConfig.SUBPROJECTS_KIND) { //one table, filter on tenant
if (!!(this.queryStatement.FilterExpression)) {
this.queryStatement.FilterExpression += ' AND ';
}
const t_property: string = 'tenant';
var value = {}; value = this.kind === AWSConfig.DATASETS_KIND ? strs[strs.length - 2] : strs[strs.length - 1];
this.queryStatement.FilterExpression += '#' + t_property + '=' + ':' + t_property;
this.queryStatement.ExpressionAttributeNames['#' + t_property] = t_property;
this.queryStatement.ExpressionAttributeValues[':' + t_property] = value;
}
if (this.kind === AWSConfig.DATASETS_KIND) { // one table, filter on subproject too
this.queryStatement.FilterExpression += ' AND ';
const t_property: string = 'subproject';
var value = {}; value = strs[strs.length - 1];
this.queryStatement.FilterExpression += '#' + t_property + '=' + ':' + t_property;
this.queryStatement.ExpressionAttributeNames['#' + t_property] = t_property;
this.queryStatement.ExpressionAttributeValues[':' + t_property] = value;
}
if (this.kind === AWSConfig.APPS_KIND) { // one table, filter on tenant
if (!!(this.queryStatement.FilterExpression)) {
this.queryStatement.FilterExpression += ' AND ';
}
const t_property: string = 'tenant';
var value = {}; value = strs[strs.length - 1];
this.queryStatement.FilterExpression += '#' + t_property + '=' + ':' + t_property;
this.queryStatement.ExpressionAttributeNames['#' + t_property] = t_property;
this.queryStatement.ExpressionAttributeValues[':' + t_property] = value;
}
if (this.queryStatement.FilterExpression.length === 0)
delete this.queryStatement.FilterExpression;
//delete empty objects in query parameters
if (Object.entries(this.queryStatement.ExpressionAttributeNames).length === 0) {
delete this.queryStatement.ExpressionAttributeNames;
delete this.queryStatement.ExpressionAttributeValues;
}
if (this.queryStatement.FilterExpression.length === 0)
delete this.queryStatement.FilterExpression;
this.queryStatement.TableName = AWSConfig.SERVICE_ENV + '-SeismicStore.' + tableName;
return this.queryStatement;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment