Commit 67e0d9e4 authored by Diego Molteni's avatar Diego Molteni
Browse files

reapplied transactional model

parent 5621f223
Pipeline #61891 failed with stages
in 13 minutes and 37 seconds
......@@ -2504,6 +2504,15 @@
"@types/redis": "*"
}
},
"@types/redlock": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/@types/redlock/-/redlock-4.0.2.tgz",
"integrity": "sha512-3MMTCWOXrfQFU8dLAbQWDOsftnFagQxmUkfR5KK/DB/zKPUh0ZzPFkNV84nfw1yMFYLfd4MgITGT+XolYd8d1w==",
"dev": true,
"requires": {
"@types/bluebird": "*"
}
},
"@types/request": {
"version": "2.48.4",
"resolved": "https://registry.npmjs.org/@types/request/-/request-2.48.4.tgz",
......@@ -9123,11 +9132,6 @@
"bluebird": "^3.3.3"
}
},
"redlock-async": {
"version": "3.1.2-fix.2",
"resolved": "https://registry.npmjs.org/redlock-async/-/redlock-async-3.1.2-fix.2.tgz",
"integrity": "sha512-4lnHE+UPoz3TFkhUqnVsdSMybG8vE7aKYxkfby0F4sSKu9N7J1tJ3LBAA1hEzTeJ97JI7nTw5pOBrPhwOtlN9w=="
},
"release-zalgo": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/release-zalgo/-/release-zalgo-1.0.0.tgz",
......
......@@ -23,7 +23,7 @@ import { PaginatedDatasetList } from './model';
export class DatasetDAO {
public static async register(
journalClient: IJournal | IJournalTransaction, datasetEntity: {key: object, data: DatasetModel}) {
journalClient: IJournal | IJournalTransaction, datasetEntity: { key: object, data: DatasetModel }) {
datasetEntity.data.ctag = Utils.makeID(16);
await journalClient.save(datasetEntity);
}
......@@ -32,12 +32,25 @@ export class DatasetDAO {
const datasetEntityKey = journalClient.createKey({
namespace: Config.SEISMIC_STORE_NS + '-' + dataset.tenant + '-' + dataset.subproject,
path: [Config.DATASETS_KIND],
enforcedKey: dataset.path.slice(0,-1) + '/' + dataset.name
enforcedKey: dataset.path.slice(0, -1) + '/' + dataset.name
});
const [entity] = await journalClient.get(datasetEntityKey);
return entity ? await this.fixOldModel(entity, dataset.tenant, dataset.subproject) : entity;
}
public static async getByKeyTransactional(
journalClient: IJournal,
journalClientTransactional: IJournalTransaction,
dataset: DatasetModel): Promise<DatasetModel> {
const datasetEntityKey = journalClient.createKey({
namespace: Config.SEISMIC_STORE_NS + '-' + dataset.tenant + '-' + dataset.subproject,
path: [Config.DATASETS_KIND],
enforcedKey: dataset.path.slice(0, -1) + '/' + dataset.name
});
const [entity] = await journalClientTransactional.get(datasetEntityKey);
return entity ? await this.fixOldModel(entity, dataset.tenant, dataset.subproject) : entity;
}
public static async get(
journalClient: IJournal | IJournalTransaction,
dataset: DatasetModel): Promise<[DatasetModel, any]> {
......@@ -62,7 +75,7 @@ export class DatasetDAO {
}
public static async updateAll(
journalClient: IJournal | IJournalTransaction, datasets: {data: DatasetModel, key: any}[]) {
journalClient: IJournal | IJournalTransaction, datasets: { data: DatasetModel, key: any }[]) {
datasets.forEach(dataset => { dataset.data.ctag = Utils.makeID(16); });
await journalClient.save(datasets);
}
......@@ -178,30 +191,30 @@ export class DatasetDAO {
public static async listDatasets(
journalClient: IJournal | IJournalTransaction,
tenant: string, subproject: string, pagination?: PaginationModel):
Promise<{ datasets: {data: DatasetModel, key: any}[], nextPageCursor: string }> {
Promise<{ datasets: { data: DatasetModel, key: any }[], nextPageCursor: string }> {
const output: any = { datasets: [], nextPageCursor: undefined };
const output: any = { datasets: [], nextPageCursor: undefined };
// Retrieve the content datasets
let query = journalClient.createQuery(
Config.SEISMIC_STORE_NS + '-' + tenant + '-' + subproject, Config.DATASETS_KIND);
// Retrieve the content datasets
let query = journalClient.createQuery(
Config.SEISMIC_STORE_NS + '-' + tenant + '-' + subproject, Config.DATASETS_KIND);
if (pagination && pagination.cursor) query = query.start(pagination.cursor);
if (pagination && pagination.limit) query = query.limit(pagination.limit);
if (pagination && pagination.cursor) query = query.start(pagination.cursor);
if (pagination && pagination.limit) query = query.limit(pagination.limit);
const [datasetEntities, info] = (
await journalClient.runQuery(query)) as [DatasetModel[], {endCursor?: string}];
const [datasetEntities, info] = (
await journalClient.runQuery(query)) as [DatasetModel[], { endCursor?: string }];
if (datasetEntities.length !== 0) {
output.datasets = datasetEntities.map((entity) => {
return {data: entity, key: entity[journalClient.KEY]};
})
if (pagination) {
output.nextPageCursor = info.endCursor;
}
if (datasetEntities.length !== 0) {
output.datasets = datasetEntities.map((entity) => {
return { data: entity, key: entity[journalClient.KEY] };
})
if (pagination) {
output.nextPageCursor = info.endCursor;
}
}
return output;
return output;
}
public static async listContent(
......@@ -232,7 +245,7 @@ export class DatasetDAO {
(entity) => (entity.path as string).substr(dataset.path.length));
results.directories = results.directories.map(
(entity) => entity.substr(0, entity.indexOf('/') + 1)).filter(
(elem, index, self) => index === self.indexOf(elem) );
(elem, index, self) => index === self.indexOf(elem));
}
return results;
......
......@@ -107,9 +107,12 @@ export class DatasetHandler {
let writeLockSession: IWriteLockSession;
const journalClient = JournalFactoryTenantClient.get(tenant);
const journalClientTransaction = journalClient.getTransaction();
try {
await journalClientTransaction.run();
if (dataset.acls) {
const subprojectMetadata = await SubProjectDAO.get(journalClient, tenant.name, subproject.name);
const subprojectAccessPolicy = subprojectMetadata.access_policy;
......@@ -129,8 +132,8 @@ export class DatasetHandler {
// if the call is idempotent return the dataset value
if (writeLockSession.idempotent) {
const alreadyRegisteredDataset = subproject.enforce_key ?
await DatasetDAO.getByKey(journalClient, dataset) :
(await DatasetDAO.get(journalClient, dataset))[0];
await DatasetDAO.getByKeyTransactional(journalClient, journalClientTransaction, dataset) :
(await DatasetDAO.get(journalClientTransaction, dataset))[0];
if (alreadyRegisteredDataset) {
await Locker.removeWriteLock(writeLockSession, true); // Keep the lock session
return alreadyRegisteredDataset;
......@@ -163,8 +166,8 @@ export class DatasetHandler {
]);
const datasetAlreadyExist = subproject.enforce_key ?
await DatasetDAO.getByKey(journalClient, dataset) :
(await DatasetDAO.get(journalClient, dataset))[0];
await DatasetDAO.getByKeyTransactional(journalClient, journalClientTransaction, dataset) :
(await DatasetDAO.get(journalClientTransaction, dataset))[0];
// check if dataset already exist
if (datasetAlreadyExist) {
......@@ -221,7 +224,7 @@ export class DatasetHandler {
// save the dataset entity
await Promise.all([
DatasetDAO.register(journalClient, { key: datasetEntityKey, data: dataset }),
DatasetDAO.register(journalClientTransaction, { key: datasetEntityKey, data: dataset }),
(seismicmeta && (FeatureFlags.isEnabled(Feature.SEISMICMETA_STORAGE))) ?
DESStorage.insertRecord(req.headers.authorization,
[seismicmeta], tenant.esd, req[Config.DE_FORWARD_APPKEY]) : undefined,
......@@ -236,12 +239,14 @@ export class DatasetHandler {
dataset.sbit_count = 1;
// release the mutex and keep the lock session
await journalClientTransaction.commit();
await Locker.removeWriteLock(writeLockSession, true);
return dataset;
} catch (err) {
// release the mutex and unlock the resource
await journalClientTransaction.rollback();
await Locker.removeWriteLock(writeLockSession);
throw (err);
......@@ -602,8 +607,8 @@ export class DatasetHandler {
req.headers.authorization, datasetOUT.seismicmeta_guid,
tenant.esd, req[Config.DE_FORWARD_APPKEY]);
for (const keyx of Object.keys(seismicmeta)) {
seismicmetaDE[keyx] = seismicmeta[keyx];
for (const keySeismicMeta of Object.keys(seismicmeta)) {
seismicmetaDE[keySeismicMeta] = seismicmeta[keySeismicMeta];
}
datasetOUT.seismicmeta_guid = seismicmeta.id;
......
......@@ -15,7 +15,7 @@
// ============================================================================
import Redis from 'ioredis';
import Redlock from 'redlock-async';
import Redlock from 'redlock';
import { Config, LoggerFactory } from '../../cloud';
import { Error, Utils } from '../../shared';
......@@ -32,9 +32,9 @@ export class Locker {
private static EXP_READLOCK = 3600; // after 1h readlock entry will be removed
private static TIME_5MIN = 300; // exp time margin to use in the main read locks
private static redisClient;
private static redisSubscriptionClient;
private static redlock;
private static redisClient: Redis.Redis;
private static redisSubscriptionClient: Redis.Redis;
private static redlock: Redlock;
public static getWriteLockTTL(): number { return this.EXP_WRITELOCK; };
public static getReadLockTTL(): number { return this.EXP_READLOCK; };
......@@ -63,7 +63,7 @@ export class Locker {
/* Retry failed requests only once*/
maxRetriesPerRequest: 5,
/* Exponential backoff retry strategy */
/* Exponential backOff retry strategy */
retryStrategy: this.retryStrategy,
/* Max time for a single command after which a timeout occurs.
......@@ -110,7 +110,7 @@ export class Locker {
});
}
// This will automaticcally remove the wid entries from the main read lock
// This will automatically remove the wid entries from the main read lock
this.redisSubscriptionClient.on('message', async (channel, key) => {
if (channel === '__keyevent@0__:expired') {
await Locker.unlockReadLockSession(
......@@ -120,7 +120,7 @@ export class Locker {
}
});
this.redisClient.on('error', (error) => {
this.redisClient.on('error', (error: any) => {
LoggerFactory.build(Config.CLOUDPROVIDER).error(error);
});
......@@ -160,10 +160,10 @@ export class Locker {
return entity ? entity.startsWith('rms') ? entity.substr(4).split(':') : entity : undefined;
}
private static async setLock(key: string, value: string[] | string, exptime: number): Promise<string> {
private static async setLock(key: string, value: string[] | string, expTime: number): Promise<string> {
return value ? typeof (value) === 'string' ?
await this.set(key, value as string, exptime) :
await this.set(key, 'rms:' + (value as string[]).join(':'), exptime) : undefined;
await this.set(key, value as string, expTime) :
await this.set(key, 'rms:' + (value as string[]).join(':'), expTime) : undefined;
}
private static async get(key: string): Promise<string> {
......@@ -172,9 +172,9 @@ export class Locker {
});
}
private static async set(key: string, value: string, exptime: number): Promise<string> {
private static async set(key: string, value: string, expTime: number): Promise<string> {
return new Promise((resolve, reject) => {
this.redisClient.setex(key, exptime, value, (err, res) => { err ? reject(err) : resolve(res); });
this.redisClient.setex(key, expTime, value, (err, res) => { err ? reject(err) : resolve(res); });
});
}
......@@ -191,11 +191,11 @@ export class Locker {
}
// create a write lock for new resources. This is a locking operation!
// it place the mutex on the required resource!!! (the caller shold remove the mutex)
// it place the mutex on the required resource!!! (the caller should remove the mutex)
public static async createWriteLock(lockKey: string, idempotentWriteLock?: string): Promise<IWriteLockSession> {
// const datasetPath = dataset.tenant + '/' + dataset.subproject + dataset.path + dataset.name;
const cachelock = await this.acquireMutex(lockKey);
const cacheLock = await this.acquireMutex(lockKey);
const lockValue = (await Locker.getLock(lockKey));
// idempotency requirement
......@@ -204,17 +204,17 @@ export class Locker {
'The provided idempotency key, for a write-lock operation, must start with the \'W\' letter'));
}
// if the lockValue is not present in the rediscache,
// if the lockValue is not present in the redis cache,
// create the [KEY,VALUE] = [datasetPath, wid(sbit)] pair in the redis cache
if (!lockValue) {
const lockValueNew = idempotentWriteLock || this.generateWriteLockID();
await this.set(lockKey, lockValueNew, this.EXP_WRITELOCK);
return { idempotent: false, wid: lockValueNew, mutex: cachelock, key: lockKey };
return { idempotent: false, wid: lockValueNew, mutex: cacheLock, key: lockKey };
}
// check if writelock already exist and match the input one (idempotent call)
if (idempotentWriteLock && lockValue === idempotentWriteLock) {
return { idempotent: true, wid: idempotentWriteLock, mutex: cachelock, key: lockKey };
return { idempotent: true, wid: idempotentWriteLock, mutex: cacheLock, key: lockKey };
}
throw (Error.make(Error.Status.LOCKED,
......@@ -244,17 +244,17 @@ export class Locker {
'The provided idempotency key, for a write-lock operation, must start with the \'W\' letter'));
}
const cachelock = await this.acquireMutex(lockKey);
const cacheLock = await this.acquireMutex(lockKey);
const lockValue = (await Locker.getLock(lockKey));
// Already write locked but the idempotentWriteLock match the once in cache (idempotent call)
if (lockValue && idempotentWriteLock && lockValue === idempotentWriteLock) {
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
return { id: idempotentWriteLock, cnt: 0 };
}
if (lockValue && wid && wid !== lockValue && this.isWriteLock(lockValue)) {
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
throw (Error.make(Error.Status.LOCKED,
lockKey + ' is locked for write with different id ' + Error.get423WriteLockReason()));
}
......@@ -268,7 +268,7 @@ export class Locker {
// create a new write lock and save in cache
const lockID = idempotentWriteLock || this.generateWriteLockID();
await Locker.set(lockKey, lockID, this.EXP_WRITELOCK);
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
return { id: lockID, cnt: 1 };
}
......@@ -278,7 +278,7 @@ export class Locker {
// wid not specified - impossible lock
if (!wid) {
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
throw (Error.make(Error.Status.LOCKED,
lockKey + ' is locked for ' + (this.isWriteLock(lockValue) ?
'write ' + Error.get423WriteLockReason() :
......@@ -287,19 +287,19 @@ export class Locker {
// write locked and different wid
if (this.isWriteLock(lockValue) && wid !== lockValue) {
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
throw (Error.make(Error.Status.LOCKED,
lockKey + ' is locked for write with different id ' + Error.get423WriteLockReason()));
}
if (!this.isWriteLock(lockValue) && lockValue.indexOf(wid) === -1) {
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
throw (Error.make(Error.Status.LOCKED,
lockKey + ' is locked for read with different ids ' + + Error.get423ReadLockReason()));
}
// Trusted Open
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
return { id: wid, cnt: this.isWriteLock(lockValue) ? 1 : (lockValue as string[]).length };
}
......@@ -314,12 +314,12 @@ export class Locker {
}
// const datasetPath = dataset.tenant + '/' + dataset.subproject + dataset.path + dataset.name;
const cachelock = await this.acquireMutex(lockKey);
const cacheLock = await this.acquireMutex(lockKey);
const lockValue = (await Locker.getLock(lockKey));
if (lockValue && idempotentReadLock && !this.isWriteLock(lockValue) &&
(lockValue as string[]).indexOf(idempotentReadLock) > -1) {
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
return { id: idempotentReadLock, cnt: (lockValue as string[]).length };
}
......@@ -327,25 +327,25 @@ export class Locker {
// wid not specified -> error locked for write
if (!wid) {
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
throw (Error.make(Error.Status.LOCKED,
lockKey + ' is locked for write ' + Error.get423WriteLockReason()));
}
// wid different -> error different wid
if (wid !== lockValue) {
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
throw (Error.make(Error.Status.LOCKED,
lockKey + ' is locked for write with different wid ' + Error.get423WriteLockReason()));
}
// wid match -> TRUSTED OPEN
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
return { id: lockValue, cnt: 1 };
}
if (lockValue && wid && lockValue.indexOf(wid) === -1) {
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
throw (Error.make(Error.Status.LOCKED,
lockKey + ' is locked for read with different ids ' + Error.get423ReadLockReason()));
}
......@@ -361,9 +361,9 @@ export class Locker {
await Locker.setLock(lockKey + '/' + lockID, lockID, this.EXP_READLOCK);
await Locker.setLock(lockKey, [lockID], this.EXP_READLOCK + this.TIME_5MIN);
// when the session key expired i have to remove the wid/lockid from the main read lock
this.redisSubscriptionClient.subscribe('__keyevent@0__:expired', lockKey + '/' + lockID);
await this.redisSubscriptionClient.subscribe('__keyevent@0__:expired', lockKey + '/' + lockID);
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
return { id: lockID, cnt: 1 };
}
......@@ -377,21 +377,21 @@ export class Locker {
(lockValue as string[]).push(lockID);
await Locker.setLock(lockKey + '/' + lockID, lockID, this.EXP_READLOCK);
await Locker.setLock(lockKey, lockValue, this.EXP_READLOCK + this.TIME_5MIN);
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
// when the session key expired i have to remove the wid/lockid from the main read lock
this.redisSubscriptionClient.subscribe('__keyevent@0__:expired', lockKey + '/' + lockID);
await this.redisSubscriptionClient.subscribe('__keyevent@0__:expired', lockKey + '/' + lockID);
return { id: lockID, cnt: (lockValue as string[]).length };
}
// wid present and found in read lock ids -> TRUSTED OPEN
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
return { id: wid, cnt: (lockValue as string[]).length };
}
public static async unlock(lockKey: string, wid?: string): Promise<ILock> {
const cachelock = await this.acquireMutex(lockKey);
const cacheLock = await this.acquireMutex(lockKey);
const lockValue = (await Locker.getLock(lockKey));
......@@ -400,14 +400,14 @@ export class Locker {
if (this.isWriteLock(lockValue)) {
// wrong close id
if (lockValue !== wid) {
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
throw (Error.make(Error.Status.NOT_FOUND,
lockKey + ' has been locked with different ID'));
}
// unlock in cache
await Locker.del(lockKey);
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
return { id: null, cnt: 0 };
}
}
......@@ -430,13 +430,13 @@ export class Locker {
// remove main lock from cache
await Locker.del(lockKey);
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
return { id: null, cnt: 0 };
}
// dataset already unlocked
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
return { id: null, cnt: 0 };
}
......@@ -447,11 +447,11 @@ export class Locker {
if (lockValue) {
// read locked
const lockindex = lockValue.indexOf(wid);
const lockIndex = lockValue.indexOf(wid);
// wrong close id
if (lockindex === -1) {
await this.releaseMutex(cachelock, lockKey);
if (lockIndex === -1) {
await this.releaseMutex(cacheLock, lockKey);
throw (Error.make(Error.Status.NOT_FOUND,
lockKey + ' has been locked with different IDs'));
}
......@@ -465,7 +465,7 @@ export class Locker {
} else {
await Locker.del(lockKey);
}
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
return {
cnt: lockValueNew.length > 0 ? lockValueNew.length : 0,
id: lockValueNew.length > 0 ? lockValueNew.join(',') : null,
......@@ -477,13 +477,13 @@ export class Locker {
// ------------------------------------------------
// case 2: dataset already unlocked
await this.releaseMutex(cachelock, lockKey);
await this.releaseMutex(cacheLock, lockKey);
return { id: null, cnt: 0 };
}
public static async unlockReadLockSession(key: string, wid: string) {
const cachelock = await this.acquireMutex(key);
const cacheLock = await this.acquireMutex(key);
const lockValue = (await Locker.getLock(key));
if (lockValue && !this.isWriteLock(lockValue)) {
......@@ -498,7 +498,7 @@ export class Locker {
}
}
await this.releaseMutex(cachelock, key);
await this.releaseMutex(cacheLock, key);
}
......@@ -506,8 +506,8 @@ export class Locker {
public static async acquireMutex(key: string): Promise<any> {
try {
const cachelock = await this.redlock.lock('locks:' + key, this.TTL);
return cachelock;
const cacheLock = await this.redlock.lock('locks:' + key, this.TTL);
return cacheLock;
} catch (error) {
throw Error.make(Error.Status.LOCKED, key +
' cannot be locked at the moment. Please try again shortly. ' +
......@@ -516,10 +516,10 @@ export class Locker {
}
public static async releaseMutex(cachelock: any, key: string): Promise<void> {
public static async releaseMutex(cacheLock: any, key: string): Promise<void> {
try {
await this.redlock.unlock(cachelock);
await this.redlock.unlock(cacheLock);
} catch (error) {
throw Error.make(Error.Status.LOCKED, key +
' cannot be unlocked at the moment. ' +
......
......@@ -15,7 +15,7 @@
// ============================================================================
import redis from 'redis-mock';
import Redlock from 'redlock-async';
import Redlock from 'redlock';
import sinon from 'sinon';
import { google