Commit aabf027f authored by Varunkumar Manohar's avatar Varunkumar Manohar Committed by Diego Molteni
Browse files

feat: update redis client to ioredis and add exponential retry strategy with...

feat: update redis client to ioredis and add exponential retry strategy with client side command timeout
parent 0bf4ab4b
......@@ -58,7 +58,6 @@ The following software have components provided under the terms of this license:
- cluster-key-slot (from https://www.npmjs.com/package/cluster-key-slot)
- cryptography (from https://github.com/pyca/cryptography)
- denque (from https://www.npmjs.com/package/denque)
- denque (from https://www.npmjs.com/package/denque)
- detect-libc (from https://www.npmjs.com/package/detect-libc)
- ecdsa-sig-formatter (from https://www.npmjs.com/package/ecdsa-sig-formatter)
- eventid (from https://www.npmjs.com/package/eventid)
......@@ -382,6 +381,7 @@ The following software have components provided under the terms of this license:
- @types/express-jwt (from https://www.npmjs.com/package/@types/express-jwt)
- @types/express-serve-static-core (from https://www.npmjs.com/package/@types/express-serve-static-core)
- @types/express-unless (from https://www.npmjs.com/package/@types/express-unless)
- @types/ioredis (from https://www.npmjs.com/package/@types/ioredis)
- @types/long (from https://www.npmjs.com/package/@types/long)
- @types/mime (from https://www.npmjs.com/package/@types/mime)
- @types/node (from https://www.npmjs.com/package/@types/node)
......@@ -478,6 +478,7 @@ The following software have components provided under the terms of this license:
- debug (from https://www.npmjs.com/package/debug)
- debug (from https://www.npmjs.com/package/debug)
- debug (from https://www.npmjs.com/package/debug)
- debug (from https://www.npmjs.com/package/debug)
- debuglog (from https://www.npmjs.com/package/debuglog)
- decamelize (from https://www.npmjs.com/package/decamelize)
- deep-extend (from https://www.npmjs.com/package/deep-extend)
......@@ -726,7 +727,6 @@ The following software have components provided under the terms of this license:
- readable-stream (from https://www.npmjs.com/package/readable-stream)
- readable-stream (from https://www.npmjs.com/package/readable-stream)
- readable-stream (from https://www.npmjs.com/package/readable-stream)
- redis (from https://www.npmjs.com/package/redis)
- redis-commands (from https://www.npmjs.com/package/redis-commands)
- redis-errors (from https://www.npmjs.com/package/redis-errors)
- redis-parser (from https://www.npmjs.com/package/redis-parser)
......
......@@ -2139,10 +2139,9 @@
}
},
"@types/ioredis": {
"version": "4.17.11",
"resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.17.11.tgz",
"integrity": "sha512-XAg9kOLyClyyP2BjfO9NWQmDDJFTNxvz2qkE+BMBfQoAN0/o6X53X2gefgfm4bn89A9462DyzNEL1q23oeCRSg==",
"dev": true,
"version": "4.26.6",
"resolved": "https://registry.npmjs.org/@types/ioredis/-/ioredis-4.26.6.tgz",
"integrity": "sha512-Q9ydXL/5Mot751i7WLCm9OGTj5jlW3XBdkdEW21SkXZ8Y03srbkluFGbM3q8c+vzPW30JOLJ+NsZWHoly0+13A==",
"requires": {
"@types/node": "*"
}
......@@ -3885,9 +3884,9 @@
"integrity": "sha1-hMbhWbgZBP3KWaDvRM2HDTElD5o="
},
"denque": {
"version": "1.4.1",
"resolved": "https://registry.npmjs.org/denque/-/denque-1.4.1.tgz",
"integrity": "sha512-OfzPuSZKGcgr96rf1oODnfjqBFmr1DVoc/TrItj3Ohe0Ah1C5WX5Baquw/9U9KovnQ88EqmJbD66rKYUQYN1tQ=="
"version": "1.5.0",
"resolved": "https://registry.npmjs.org/denque/-/denque-1.5.0.tgz",
"integrity": "sha512-CYiCSgIF1p6EUByQPlGkKnP1M9g0ZV3qMIrqMqZqdwazygIA/YP2vrbcyl1h/WppKJTdl1F85cXIle+394iDAQ=="
},
"depd": {
"version": "1.1.2",
......@@ -5865,12 +5864,12 @@
}
},
"ioredis": {
"version": "4.23.0",
"resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.23.0.tgz",
"integrity": "sha512-R5TDCODwnEH3J3A5TSoB17+6a+SeJTtIOW6vsy5Q1yag/AM8FejHjZC5R2O1QepSXV8hwOnGSm/4buJc/LeXTQ==",
"version": "4.27.6",
"resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.27.6.tgz",
"integrity": "sha512-6W3ZHMbpCa8ByMyC1LJGOi7P2WiOKP9B3resoZOVLDhi+6dDBOW+KNsRq3yI36Hmnb2sifCxHX+YSarTeXh48A==",
"requires": {
"cluster-key-slot": "^1.1.0",
"debug": "^4.1.1",
"debug": "^4.3.1",
"denque": "^1.1.0",
"lodash.defaults": "^4.2.0",
"lodash.flatten": "^4.4.0",
......@@ -5878,13 +5877,13 @@
"redis-commands": "1.7.0",
"redis-errors": "^1.2.0",
"redis-parser": "^3.0.0",
"standard-as-callback": "^2.0.1"
"standard-as-callback": "^2.1.0"
},
"dependencies": {
"debug": {
"version": "4.3.1",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.1.tgz",
"integrity": "sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==",
"version": "4.3.2",
"resolved": "https://registry.npmjs.org/debug/-/debug-4.3.2.tgz",
"integrity": "sha512-mOp8wKcvj7XxC78zLgw/ZA+6TSgkoE2C/ienthhRD298T7UNwAg9diBpLRxC0mOezLl4B0xV7M0cCO6P/O0Xhw==",
"requires": {
"ms": "2.1.2"
}
......@@ -5893,11 +5892,6 @@
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/p-map/-/p-map-2.1.0.tgz",
"integrity": "sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw=="
},
"redis-commands": {
"version": "1.7.0",
"resolved": "https://registry.npmjs.org/redis-commands/-/redis-commands-1.7.0.tgz",
"integrity": "sha512-nJWqw3bTFy21hX/CPKHth6sfhZbdiHP6bTawSgQBlKOVRG7EZkfHbbHwQJnrE4vsQf0CMNE+3gJ4Fmm16vdVlQ=="
}
}
},
......@@ -8821,24 +8815,6 @@
"picomatch": "^2.2.1"
}
},
"redis": {
"version": "3.1.2",
"resolved": "https://registry.npmjs.org/redis/-/redis-3.1.2.tgz",
"integrity": "sha512-grn5KoZLr/qrRQVwoSkmzdbw6pwF+/rwODtrOr6vuBRiR/f3rjSTGupbF90Zpqm2oenix8Do6RV7pYEkGwlKkw==",
"requires": {
"denque": "^1.5.0",
"redis-commands": "^1.7.0",
"redis-errors": "^1.2.0",
"redis-parser": "^3.0.0"
},
"dependencies": {
"denque": {
"version": "1.5.0",
"resolved": "https://registry.npmjs.org/denque/-/denque-1.5.0.tgz",
"integrity": "sha512-CYiCSgIF1p6EUByQPlGkKnP1M9g0ZV3qMIrqMqZqdwazygIA/YP2vrbcyl1h/WppKJTdl1F85cXIle+394iDAQ=="
}
}
},
"redis-commands": {
"version": "1.7.0",
"resolved": "https://registry.npmjs.org/redis-commands/-/redis-commands-1.7.0.tgz",
......@@ -9590,9 +9566,9 @@
"integrity": "sha1-VHxws0fo0ytOEI6hoqFZ5f3eGcA="
},
"standard-as-callback": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.0.1.tgz",
"integrity": "sha512-NQOxSeB8gOI5WjSaxjBgog2QFw55FV8TkS6Y07BiB3VJ8xNTvUYm0wl0s8ObgQ5NhdpnNfigMIKjgPESzgr4tg=="
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz",
"integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A=="
},
"statuses": {
"version": "1.5.0",
......
......@@ -14,16 +14,16 @@
// limitations under the License.
// ============================================================================
import Redis from 'ioredis';
import Redlock from 'redlock-async';
import { Config } from '../../cloud';
import { Config, LoggerFactory } from '../../cloud';
import { Error, Utils } from '../../shared';
// lock interface (this is the cache entry)
interface ILock { id: string; cnt: number; }
// Write Lock interface
export interface IWriteLockSession {idempotent: boolean, wid: string, mutex: any, key: string};
export interface IWriteLockSession { idempotent: boolean, wid: string, mutex: any, key: string; };
export class Locker {
......@@ -32,14 +32,18 @@ export class Locker {
private static EXP_READLOCK = 3600; // after 1h readlock entry will be removed
private static TIME_5MIN = 300; // exp time margin to use in the main read locks
// the redis client
private static redisClient;
private static redisSubscriptionClient;
private static redlock;
public static getWriteLockTTL(): number { return this.EXP_WRITELOCK };
public static getReadLockTTL(): number { return this.EXP_READLOCK };
public static getMutexTTL(): number { return this.TTL };
public static getWriteLockTTL(): number { return this.EXP_WRITELOCK; };
public static getReadLockTTL(): number { return this.EXP_READLOCK; };
public static getMutexTTL(): number { return this.TTL; };
// Exponential Retry strategy in event of an error
private static retryStrategy = (times: number) => {
return Math.pow(2, times) + Math.random() * 100;
};
public static async init() {
......@@ -48,35 +52,61 @@ export class Locker {
this.redisClient = redis.createClient();
this.redisSubscriptionClient = redis.createClient();
} else {
const redis = require('redis');
if(Config.LOCKSMAP_REDIS_INSTANCE_KEY) {
if (Config.LOCKSMAP_REDIS_INSTANCE_KEY) {
Config.LOCKSMAP_REDIS_INSTANCE_TLS_DISABLE ?
this.redisClient = redis.createClient({
host: Config.LOCKSMAP_REDIS_INSTANCE_ADDRESS,
port: Config.LOCKSMAP_REDIS_INSTANCE_PORT,
auth_pass: Config.LOCKSMAP_REDIS_INSTANCE_KEY
}):
this.redisClient = redis.createClient({
host: Config.LOCKSMAP_REDIS_INSTANCE_ADDRESS,
port: Config.LOCKSMAP_REDIS_INSTANCE_PORT,
auth_pass: Config.LOCKSMAP_REDIS_INSTANCE_KEY,
tls: {servername: Config.LOCKSMAP_REDIS_INSTANCE_ADDRESS}}
);
this.redisSubscriptionClient = redis.createClient({
this.redisClient = new Redis({
host: Config.LOCKSMAP_REDIS_INSTANCE_ADDRESS,
port: Config.LOCKSMAP_REDIS_INSTANCE_PORT,
password: Config.LOCKSMAP_REDIS_INSTANCE_KEY,
/* Retry failed requests only once*/
maxRetriesPerRequest: 5,
/* Exponential backoff retry strategy */
retryStrategy: this.retryStrategy,
/* Max time for a single command after which a timeout occurs.
Without this the client waits indefinitely*/
commandTimeout: 2000 //
}) :
this.redisClient = new Redis({
host: Config.LOCKSMAP_REDIS_INSTANCE_ADDRESS,
port: Config.LOCKSMAP_REDIS_INSTANCE_PORT,
password: Config.LOCKSMAP_REDIS_INSTANCE_KEY,
tls: { servername: Config.LOCKSMAP_REDIS_INSTANCE_ADDRESS },
maxRetriesPerRequest: 5,
retryStrategy: this.retryStrategy,
commandTimeout: 2000
}
);
this.redisSubscriptionClient = new Redis({
host: Config.LOCKSMAP_REDIS_INSTANCE_ADDRESS,
port: Config.LOCKSMAP_REDIS_INSTANCE_PORT,
auth_pass: Config.LOCKSMAP_REDIS_INSTANCE_KEY,
tls: {servername: Config.LOCKSMAP_REDIS_INSTANCE_ADDRESS}}
);
password: Config.LOCKSMAP_REDIS_INSTANCE_KEY,
tls: { servername: Config.LOCKSMAP_REDIS_INSTANCE_ADDRESS },
maxRetriesPerRequest: 5,
retryStrategy: this.retryStrategy,
commandTimeout: 2000
});
}
else {
this.redisClient = redis.createClient({
this.redisClient = new Redis({
host: Config.LOCKSMAP_REDIS_INSTANCE_ADDRESS,
port: Config.LOCKSMAP_REDIS_INSTANCE_PORT,
maxRetriesPerRequest: 5,
retryStrategy: this.retryStrategy,
commandTimeout: 2000
});
this.redisSubscriptionClient = redis.createClient({
this.redisSubscriptionClient = new Redis({
host: Config.LOCKSMAP_REDIS_INSTANCE_ADDRESS,
port: Config.LOCKSMAP_REDIS_INSTANCE_PORT,
maxRetriesPerRequest: 5,
retryStrategy: this.retryStrategy,
commandTimeout: 2000
});
}
......@@ -89,6 +119,11 @@ export class Locker {
);
}
});
this.redisClient.on('error', (error) => {
LoggerFactory.build(Config.CLOUDPROVIDER).error(error);
});
}
// initialize the locker
......@@ -164,7 +199,7 @@ export class Locker {
const lockValue = (await Locker.getLock(lockKey));
// idempotency requirement
if(idempotentWriteLock && !idempotentWriteLock.startsWith('W')) {
if (idempotentWriteLock && !idempotentWriteLock.startsWith('W')) {
throw (Error.make(Error.Status.BAD_REQUEST,
'The provided idempotency key, for a write-lock operation, must start with the \'W\' letter'));
}
......@@ -174,26 +209,26 @@ export class Locker {
if (!lockValue) {
const lockValueNew = idempotentWriteLock || this.generateWriteLockID();
await this.set(lockKey, lockValueNew, this.EXP_WRITELOCK);
return {idempotent: false, wid: lockValueNew, mutex: cachelock, key: lockKey};
return { idempotent: false, wid: lockValueNew, mutex: cachelock, key: lockKey };
}
// check if writelock already exist and match the input one (idempotent call)
if(idempotentWriteLock && lockValue === idempotentWriteLock) {
return {idempotent: true, wid: idempotentWriteLock, mutex: cachelock, key: lockKey};
if (idempotentWriteLock && lockValue === idempotentWriteLock) {
return { idempotent: true, wid: idempotentWriteLock, mutex: cachelock, key: lockKey };
}
throw (Error.make(Error.Status.LOCKED,
lockKey + ' is ' + (this.isWriteLock(lockValue) ?
('write locked ') + Error.get423WriteLockReason():
('write locked ') + Error.get423WriteLockReason() :
('read locked ' + + Error.get423ReadLockReason()))));
}
// remove both lock and mutex
public static async removeWriteLock(writeLockSession: IWriteLockSession, keepTheLock = false): Promise<void> {
if(writeLockSession && writeLockSession.mutex) {
if (writeLockSession && writeLockSession.mutex) {
await Locker.releaseMutex(writeLockSession.mutex, writeLockSession.key);
}
if(!keepTheLock) {
if (!keepTheLock) {
if (writeLockSession && writeLockSession.wid) {
await Locker.del(writeLockSession.key);
}
......@@ -204,7 +239,7 @@ export class Locker {
public static async acquireWriteLock(lockKey: string, idempotentWriteLock: string, wid?: string): Promise<ILock> {
// idempotency requirement
if(idempotentWriteLock && !idempotentWriteLock.startsWith('W')) {
if (idempotentWriteLock && !idempotentWriteLock.startsWith('W')) {
throw (Error.make(Error.Status.BAD_REQUEST,
'The provided idempotency key, for a write-lock operation, must start with the \'W\' letter'));
}
......@@ -213,9 +248,9 @@ export class Locker {
const lockValue = (await Locker.getLock(lockKey));
// Already write locked but the idempotentWriteLock match the once in cache (idempotent call)
if(lockValue && idempotentWriteLock && lockValue === idempotentWriteLock) {
if (lockValue && idempotentWriteLock && lockValue === idempotentWriteLock) {
await this.releaseMutex(cachelock, lockKey);
return {id: idempotentWriteLock, cnt: 0};
return { id: idempotentWriteLock, cnt: 0 };
}
if (lockValue && wid && wid !== lockValue && this.isWriteLock(lockValue)) {
......@@ -273,7 +308,7 @@ export class Locker {
public static async acquireReadLock(lockKey: string, idempotentReadLock?: string, wid?: string): Promise<ILock> {
// idempotency requirement
if(idempotentReadLock && !idempotentReadLock.startsWith('R')) {
if (idempotentReadLock && !idempotentReadLock.startsWith('R')) {
throw (Error.make(Error.Status.BAD_REQUEST,
'The provided idempotency key, for a read-lock operation, must start with the \'R\' letter'));
}
......@@ -282,7 +317,7 @@ export class Locker {
const cachelock = await this.acquireMutex(lockKey);
const lockValue = (await Locker.getLock(lockKey));
if(lockValue && idempotentReadLock && !this.isWriteLock(lockValue) &&
if (lockValue && idempotentReadLock && !this.isWriteLock(lockValue) &&
(lockValue as string[]).indexOf(idempotentReadLock) > -1) {
await this.releaseMutex(cachelock, lockKey);
return { id: idempotentReadLock, cnt: (lockValue as string[]).length };
......
......@@ -14,10 +14,10 @@
// limitations under the License.
// ============================================================================
import redis from 'redis';
import Redis from 'ioredis';
import { Config } from '../cloud';
let _cacheCore: CacheCore;
export class Cache<T = string> {
......@@ -26,7 +26,7 @@ export class Cache<T = string> {
private _keyTag: string;
constructor(keyTag?: string) {
if(!_cacheCore) {
if (!_cacheCore) {
initSharedCache();
}
this._keyTag = keyTag;
......@@ -52,7 +52,12 @@ export class Cache<T = string> {
class CacheCore {
private _redisClient: redis.RedisClient;
private _redisClient: any;
// retry strategy
private static retryStrategy = (times: number) => {
return Math.pow(2, times) + Math.random() * 100;
};
constructor() {
......@@ -60,48 +65,60 @@ class CacheCore {
Config.UTEST ?
require('redis-mock').createClient() :
Config.DES_REDIS_INSTANCE_KEY ? Config.DES_REDIS_INSTANCE_TLS_DISABLE ?
redis.createClient({
new Redis({
host: Config.DES_REDIS_INSTANCE_ADDRESS,
port: Config.DES_REDIS_INSTANCE_PORT,
auth_pass: Config.DES_REDIS_INSTANCE_KEY,
password: Config.DES_REDIS_INSTANCE_KEY,
retryStrategy: CacheCore.retryStrategy,
maxRetriesPerRequest: 5,
commandTimeout: 2000
}) :
redis.createClient({
new Redis({
host: Config.DES_REDIS_INSTANCE_ADDRESS,
port: Config.DES_REDIS_INSTANCE_PORT,
auth_pass: Config.DES_REDIS_INSTANCE_KEY,
tls: { servername: Config.DES_REDIS_INSTANCE_ADDRESS }
password: Config.DES_REDIS_INSTANCE_KEY,
tls: { servername: Config.DES_REDIS_INSTANCE_ADDRESS },
retryStrategy: CacheCore.retryStrategy,
maxRetriesPerRequest: 5,
commandTimeout: 2000
}) :
redis.createClient({
new Redis({
host: Config.DES_REDIS_INSTANCE_ADDRESS,
port: Config.DES_REDIS_INSTANCE_PORT,
})
retryStrategy: CacheCore.retryStrategy,
maxRetriesPerRequest: 5,
commandTimeout: 2000
});
}
public async _del(key: string): Promise<void> {
return new Promise((resolve, reject) => {
this._redisClient.del(key, (err) => {
err ? reject(err) : resolve(); });
err ? reject(err) : resolve();
});
});
}
public async _get(key: string): Promise<any> {
return new Promise((resolve, reject) => {
this._redisClient.get(key, (err, res) => {
err ? reject(err) : resolve(res ? JSON.parse(res).value : undefined); });
err ? reject(err) : resolve(res ? JSON.parse(res).value : undefined);
});
});
}
public async _set(key: string, value: any, expireTime:number): Promise<void> {
public async _set(key: string, value: any, expireTime: number): Promise<void> {
return new Promise((resolve, reject) => {
this._redisClient.setex(key, expireTime, JSON.stringify({value}), (err) => {
err ? reject(err) : resolve(); });
this._redisClient.setex(key, expireTime, JSON.stringify({ value }), (err) => {
err ? reject(err) : resolve();
});
});
}
}
export function initSharedCache() {
if(!_cacheCore) {
if (!_cacheCore) {
_cacheCore = new CacheCore();
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment