Commit 0edca7f2 authored by Diego Molteni's avatar Diego Molteni
Browse files

locker re-enabled

parent 9af7ce41
Pipeline #33771 failed with stages
in 3 minutes and 14 seconds
......@@ -122,26 +122,29 @@ export class DatasetHandler {
const userInput = await DatasetParser.register(req);
const dataset = userInput[0];
const seismicmeta = userInput[1];
// let writeLockSession: IWriteLockSession;
let writeLockSession: IWriteLockSession;
// const journalClient = JournalFactoryTenantClient.get(tenant);
// const transaction = journalClient.getTransaction();
// try {
try {
// await transaction.run();
// attempt to acquire a mutex on the dataset name and set the lock for the dataset in redis
// a mutex is applied on the resource on the shared cahce (removed at the end of the method)
// writeLockSession = await Locker.createWriteLock(
// dataset, req.headers['x-seismic-dms-lockid'] as string);
writeLockSession = await Locker.createWriteLock(
dataset, req.headers['x-seismic-dms-lockid'] as string);
// if the call is idempotent return the dataset value
// if(writeLockSession.idempotent) {
if(writeLockSession.idempotent) {
// const alreadyRegisteredDataset = (await DatasetDAO.get(journalClient, dataset))[0];
// await Locker.removeWriteLock(writeLockSession, true); // Keep the lock session
// return alreadyRegisteredDataset;
// }
const alreadyRegisteredDataset = await this._cache.get(
Config.SEISMIC_STORE_NS + ':' + Config.DATASETS_KIND + ':' + dataset.tenant + ':' +
dataset.subproject + ':' + dataset.path + ':' + dataset.name);
await Locker.removeWriteLock(writeLockSession, true); // Keep the lock session
return alreadyRegisteredDataset;
}
// const spkey = journalClient.createKey({
// namespace: Config.SEISMIC_STORE_NS + '-' + tenant.name,
......@@ -205,9 +208,7 @@ export class DatasetHandler {
const key = Config.SEISMIC_STORE_NS + ':' + Config.DATASETS_KIND + ':' + dataset.tenant + ':' +
dataset.subproject + ':' + dataset.path + ':' + dataset.name
const res = await this._cache.get(key);
if(res) {
if(await this._cache.get(key)) {
throw (Error.make(Error.Status.ALREADY_EXISTS,
'The dataset ' + Config.SDPATHPREFIX + dataset.tenant + '/' +
dataset.subproject + dataset.path + dataset.name + ' already exists'));
......@@ -271,20 +272,18 @@ export class DatasetHandler {
dataset.ctag = dataset.ctag + tenant.gcpid + ';' + DESUtils.getDataPartitionID(tenant.esd);
// release the mutex and keep the lock session
// await Locker.removeWriteLock(writeLockSession, true);
await Locker.removeWriteLock(writeLockSession, true);
// await transaction.commit();
dataset.sbit = 'W123123412341234';
dataset.sbit_count = 1;
return dataset;
// } catch (err) {
} catch (err) {
// release the mutex and unlock the resource
// await Locker.removeWriteLock(writeLockSession);
await Locker.removeWriteLock(writeLockSession);
// await transaction.rollback();
// throw (err);
throw (err);
// }
}
}
......@@ -390,14 +389,14 @@ export class DatasetHandler {
// Retrieve the dataset path information
const datasetIn = DatasetParser.delete(req);
// // ensure is not write locked
// if(!Config.SKIP_WRITE_LOCK_CHECK_ON_MUTABLE_OPERATIONS) {
// if (Locker.isWriteLock(await Locker.getLockFromModel(datasetIn))) {
// throw (Error.make(Error.Status.LOCKED,
// 'The dataset ' + Config.SDPATHPREFIX + datasetIn.tenant + '/' +
// datasetIn.subproject + datasetIn.path + datasetIn.name + ' is write locked'));
// }
// }
// ensure is not write locked
if(!Config.SKIP_WRITE_LOCK_CHECK_ON_MUTABLE_OPERATIONS) {
if (Locker.isWriteLock(await Locker.getLockFromModel(datasetIn))) {
throw (Error.make(Error.Status.LOCKED,
'The dataset ' + Config.SDPATHPREFIX + datasetIn.tenant + '/' +
datasetIn.subproject + datasetIn.path + datasetIn.name + ' is write locked'));
}
}
// init datastore client
// const journalClient = JournalFactoryTenantClient.get(tenant);
......@@ -475,6 +474,7 @@ export class DatasetHandler {
// remove any remaining locks (this should be removed with SKIP_WRITE_LOCK_CHECK_ON_MUTABLE_OPERATIONS)
// await Locker.unlock(journalClient, dataset)
await Locker.unlock(undefined, dataset)
// } catch (err) {
// throw (err);
......@@ -489,7 +489,7 @@ export class DatasetHandler {
const [datasetIN, seismicmeta, newName, wid] = DatasetParser.patch(req);
// retrieve datastore client
const journalClient = JournalFactoryTenantClient.get(tenant);
// const journalClient = JournalFactoryTenantClient.get(tenant);
// const transaction = journalClient.getTransaction();
// return immediately if it is a simple close wiht empty body (no patch to apply)
......@@ -497,11 +497,9 @@ export class DatasetHandler {
// Retrieve the dataset metadata
// const dataset = (await DatasetDAO.get(journalClient, datasetIN))[0];
const keyx = Config.SEISMIC_STORE_NS + ':' + Config.DATASETS_KIND + ':' + datasetIN.tenant + ':' +
const datasetKey = Config.SEISMIC_STORE_NS + ':' + Config.DATASETS_KIND + ':' + datasetIN.tenant + ':' +
datasetIN.subproject + ':' + datasetIN.path + ':' + datasetIN.name
const dataset = await this._cache.get(keyx);
const dataset = await this._cache.get(datasetKey);
// check if the dataset does not exist
if (!dataset) {
......@@ -512,23 +510,25 @@ export class DatasetHandler {
// // unlock the detaset
// const unlockRes = await Locker.unlock(journalClient, datasetIN, wid)
// dataset.sbit = unlockRes.id;
// dataset.sbit_count = unlockRes.cnt;
const unlockRes = await Locker.unlock(undefined, datasetIN, wid)
dataset.sbit = unlockRes.id;
dataset.sbit_count = unlockRes.cnt;
return dataset;
}
// unlock the detaset for close opeartion (and patch)
// const lockres = wid ? await Locker.unlock(journalClient, datasetIN, wid) : { id: null, cnt: 0 };
const lockres = wid ? await Locker.unlock(undefined, datasetIN, wid) : { id: null, cnt: 0 };
// // ensure nobody got the lock between the close and the mutext acquistion
// if(!Config.SKIP_WRITE_LOCK_CHECK_ON_MUTABLE_OPERATIONS) {
// if (Locker.isWriteLock(await Locker.getLockFromModel(datasetIN))) {
// throw (Error.make(Error.Status.LOCKED,
// 'The dataset ' + Config.SDPATHPREFIX + datasetIN.tenant + '/' +
// datasetIN.subproject + datasetIN.path + datasetIN.name + ' is write locked'));
// }
// }
// ensure nobody got the lock between the close and the mutext acquistion
if(!Config.SKIP_WRITE_LOCK_CHECK_ON_MUTABLE_OPERATIONS) {
if (Locker.isWriteLock(await Locker.getLockFromModel(datasetIN))) {
throw (Error.make(Error.Status.LOCKED,
'The dataset ' + Config.SDPATHPREFIX + datasetIN.tenant + '/' +
datasetIN.subproject + datasetIN.path + datasetIN.name + ' is write locked'));
}
}
// try {
......@@ -723,10 +723,10 @@ export class DatasetHandler {
tenant.esd, req[Config.DE_FORWARD_APPKEY]) : undefined]);
// attach lock information
// if (wid) {
// datasetOUT.sbit = lockres.id;
// datasetOUT.sbit_count = lockres.cnt;
// }
if (wid) {
datasetOUT.sbit = lockres.id;
datasetOUT.sbit_count = lockres.cnt;
}
// await transaction.commit();
// attach the gcpid for fast check
......
......@@ -19,7 +19,7 @@ import { DatasetModel } from '.';
import { IJournal, IJournalTransaction } from '../../cloud';
import { Config } from '../../cloud';
import { Error, Utils } from '../../shared';
import { DatasetDAO } from './dao';
// import { DatasetDAO } from './dao';
// lock interface (this is the cache entry)
interface ILock { id: string; cnt: number; }
......@@ -237,19 +237,19 @@ export class Locker {
if (!lockValue) {
// check if the dataset is invalid
const datasetOut = await DatasetDAO.get(journalClient, dataset);
if (datasetOut[0].sbit) { // write-locked (invalid file)
await this.releaseMutex(cachelock, datasetPath);
throw (Error.make(Error.Status.BAD_REQUEST,
'The dataset ' + datasetPath + ' is invalid and can only be deleted'));
}
// const datasetOut = await DatasetDAO.get(journalClient, dataset);
// if (datasetOut[0].sbit) { // write-locked (invalid file)
// await this.releaseMutex(cachelock, datasetPath);
// throw (Error.make(Error.Status.BAD_REQUEST,
// 'The dataset ' + datasetPath + ' is invalid and can only be deleted'));
// }
// create a new write lock and save in cache and journalClient
const lockID = idempotentWriteLock || this.generateWriteLockID();
await Locker.set(datasetPath, lockID, this.EXP_WRITELOCK);
datasetOut[0].sbit = lockID;
datasetOut[0].sbit_count = 1;
await DatasetDAO.update(journalClient, datasetOut[0], datasetOut[1]);
// datasetOut[0].sbit = lockID;
// datasetOut[0].sbit_count = 1;
// await DatasetDAO.update(journalClient, datasetOut[0], datasetOut[1]);
await this.releaseMutex(cachelock, datasetPath);
return { id: lockID, cnt: 1 };
......@@ -341,12 +341,12 @@ export class Locker {
if (!lockValue) {
// check if the dataset is invalid
const datasetOut = (await DatasetDAO.get(journalClient, dataset))[0];
if (datasetOut.sbit) { // write-locked (invalid file)
await this.releaseMutex(cachelock, datasetPath);
throw (Error.make(Error.Status.BAD_REQUEST,
'The dataset ' + datasetPath + ' is invalid and can only be deleted'));
}
// const datasetOut = (await DatasetDAO.get(journalClient, dataset))[0];
// if (datasetOut.sbit) { // write-locked (invalid file)
// await this.releaseMutex(cachelock, datasetPath);
// throw (Error.make(Error.Status.BAD_REQUEST,
// 'The dataset ' + datasetPath + ' is invalid and can only be deleted'));
// }
// create a new read lock session and a new main read lock
const lockID = idempotentReadLock || this.generateReadLockID();
......@@ -402,12 +402,12 @@ export class Locker {
}
// unlock in datastore
const datasetUpdate = (await DatasetDAO.get(journalClient, dataset));
if (datasetUpdate[0]) {
datasetUpdate[0].sbit = null;
datasetUpdate[0].sbit_count = 0;
await DatasetDAO.update(journalClient, datasetUpdate[0], datasetUpdate[1]);
}
// const datasetUpdate = (await DatasetDAO.get(journalClient, dataset));
// if (datasetUpdate[0]) {
// datasetUpdate[0].sbit = null;
// datasetUpdate[0].sbit_count = 0;
// await DatasetDAO.update(journalClient, datasetUpdate[0], datasetUpdate[1]);
// }
// unlock in cache
await Locker.del(datasetPath);
......@@ -426,13 +426,13 @@ export class Locker {
if (lockValue) {
// if write locked remove lock from journalClient
if (this.isWriteLock(lockValue)) {
const datasetTmp1 = (await DatasetDAO.get(journalClient, dataset));
if (datasetTmp1[0]) {
datasetTmp1[0].sbit = null;
await DatasetDAO.update(journalClient, datasetTmp1[0], datasetTmp1[1]);
}
}
// if (this.isWriteLock(lockValue)) {
// const datasetTmp1 = (await DatasetDAO.get(journalClient, dataset));
// if (datasetTmp1[0]) {
// datasetTmp1[0].sbit = null;
// await DatasetDAO.update(journalClient, datasetTmp1[0], datasetTmp1[1]);
// }
// }
// if read locked remove all session read locks
if (!this.isWriteLock(lockValue)) {
......@@ -449,14 +449,14 @@ export class Locker {
}
// check if invalid
if (!skipInvalid) {
const datasetTmp2 = (await DatasetDAO.get(journalClient, dataset))[0];
if (datasetTmp2 && datasetTmp2.sbit) {
await this.releaseMutex(cachelock, datasetPath);
throw (Error.make(Error.Status.NOT_FOUND,
'The dataset ' + datasetPath + ' is invalid and can only be deleted'));
}
}
// if (!skipInvalid) {
// const datasetTmp2 = (await DatasetDAO.get(journalClient, dataset))[0];
// if (datasetTmp2 && datasetTmp2.sbit) {
// await this.releaseMutex(cachelock, datasetPath);
// throw (Error.make(Error.Status.NOT_FOUND,
// 'The dataset ' + datasetPath + ' is invalid and can only be deleted'));
// }
// }
// dataset already unlocked
await this.releaseMutex(cachelock, datasetPath);
......@@ -500,16 +500,16 @@ export class Locker {
// ------------------------------------------------
// if dataset not lock in cache
const datasetOut = (await DatasetDAO.get(journalClient, dataset))[0];
// const datasetOut = (await DatasetDAO.get(journalClient, dataset))[0];
// case 1: invalid dataset
if (!skipInvalid) {
if (datasetOut.sbit) {
await this.releaseMutex(cachelock, datasetPath);
throw (Error.make(Error.Status.NOT_FOUND,
'The dataset ' + datasetPath + ' is invalid and can only be deleted'));
}
}
// if (!skipInvalid) {
// if (datasetOut.sbit) {
// await this.releaseMutex(cachelock, datasetPath);
// throw (Error.make(Error.Status.NOT_FOUND,
// 'The dataset ' + datasetPath + ' is invalid and can only be deleted'));
// }
// }
// case 2: dataset already unlocked
await this.releaseMutex(cachelock, datasetPath);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment