Commit 63f26b16 authored by Rostislav Vatolin [SLB]'s avatar Rostislav Vatolin [SLB]
Browse files

Get ttl for cache from partition service

parent 1f725b74
......@@ -32,4 +32,4 @@ config:
domain: contoso.com
redis:
ttl:
seconds: 1
\ No newline at end of file
seconds: 1
......@@ -5,6 +5,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@ComponentScan({
"org.opengroup.osdu.core",
......@@ -13,6 +14,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
})
@SpringBootApplication
@EnableAsync
@EnableScheduling
public class EntitlementsV2Application {
public static void main(String[] args) {
Class<?>[] sources = new Class<?>[]{
......
......@@ -2,7 +2,7 @@ package org.opengroup.osdu.entitlements.v2.azure.service;
import io.github.resilience4j.retry.Retry;
import lombok.RequiredArgsConstructor;
import org.opengroup.osdu.core.common.cache.ICache;
import org.opengroup.osdu.core.common.cache.RedisCache;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.entitlements.v2.azure.service.metrics.hitsnmisses.HitsNMissesMetricService;
......@@ -17,7 +17,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
......@@ -26,7 +25,8 @@ import java.util.concurrent.TimeUnit;
public class GroupCacheServiceAzure implements GroupCacheService {
private final JaxRsDpsLog log;
private final RetrieveGroupRepo retrieveGroupRepo;
private final ICache<String, ParentReferences> redisGroupCache;
private final RedisCache<String, ParentReferences> redisGroupCache;
private final PartitionCacheTtlService partitionCacheTtlService;
private final HitsNMissesMetricService metricService;
private final RedissonClient redissonClient;
private final Retry retry;
......@@ -61,7 +61,8 @@ public class GroupCacheServiceAzure implements GroupCacheService {
if (locked) {
metricService.sendMissesMetric();
ParentReferences parentReferences = rebuildCache(requesterId, partitionId);
redisGroupCache.put(key, parentReferences);
long ttlOfKey = partitionCacheTtlService.getCacheTtlOfPartition(partitionId);
redisGroupCache.put(key, ttlOfKey, parentReferences);
return parentReferences.getParentReferencesOfUser();
} else {
ParentReferences parentReferences = Retry.decorateSupplier(retry, () -> redisGroupCache.get(key)).get();
......
package org.opengroup.osdu.entitlements.v2.azure.service;
import com.google.gson.Gson;
import lombok.RequiredArgsConstructor;
import org.opengroup.osdu.azure.util.AzureServicePrincipleTokenService;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.partition.IPartitionFactory;
import org.opengroup.osdu.core.common.partition.IPartitionProvider;
import org.opengroup.osdu.core.common.partition.PartitionException;
import org.opengroup.osdu.core.common.partition.PartitionInfo;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@Service
@RequiredArgsConstructor
public class PartitionCacheTtlService {
private static final String LOGGER_NAME = "CronJobLogger";
private static final String CACHE_TTL_PROPERTY_NAME = "ent-cache-ttl";
/**
* Logger is not depending on request data
*/
private final ILogger logger;
private final IPartitionFactory partitionFactory;
private final AzureServicePrincipleTokenService tokenService;
@Value("${app.redis.ttl.seconds}")
private int cacheTtl;
private final ConcurrentMap<String, Long> ttlPerDataPartition = new ConcurrentHashMap<>();
@PostConstruct
private void init() {
refreshTtlInfo();
}
/**
* Returns ttl in milliseconds of cache of given data partition id
*/
public long getCacheTtlOfPartition(String dataPartitionId) {
Long ttl = ttlPerDataPartition.get(dataPartitionId);
if (ttl == null) {
return 1000L * cacheTtl;
}
return ttl;
}
/**
* Refresh the ttl info cache every 5 minutes
*/
@Scheduled(cron = "0 */5 * ? * *")
private void refreshTtlInfo() {
logger.info(LOGGER_NAME, "Starting a scheduled cron job to update cache ttls for data partitions", null);
DpsHeaders dpsHeaders = new DpsHeaders();
dpsHeaders.put(DpsHeaders.AUTHORIZATION, "Bearer " + tokenService.getAuthorizationToken());
dpsHeaders.addCorrelationIdIfMissing();
IPartitionProvider provider = partitionFactory.create(dpsHeaders);
getDataPartitionIds(provider).forEach(dataPartitionId -> getPartitionInfo(provider, dataPartitionId)
.ifPresent(partitionInfo -> {
long ttl = 1000L * cacheTtl;
if (partitionInfo.getProperties().containsKey(CACHE_TTL_PROPERTY_NAME)) {
ttl = new Gson().toJsonTree(partitionInfo.getProperties())
.getAsJsonObject()
.get(CACHE_TTL_PROPERTY_NAME)
.getAsJsonObject()
.get("value").getAsLong();
}
ttlPerDataPartition.put(dataPartitionId, ttl);
}));
}
private Optional<PartitionInfo> getPartitionInfo(IPartitionProvider provider, String dataPartitionId) {
try {
return Optional.of(provider.get(dataPartitionId));
} catch (PartitionException e) {
logger.warning(LOGGER_NAME, "Couldn't get PartitionInfo from partition service for partition id: " + dataPartitionId, e, null);
return Optional.empty();
}
}
private List<String> getDataPartitionIds(IPartitionProvider provider) {
try {
return provider.list();
} catch (PartitionException e) {
logger.warning(LOGGER_NAME, "Couldn't get data partition ids from partition service", e, null);
return Collections.emptyList();
}
}
}
......@@ -10,7 +10,7 @@ import org.junit.runner.RunWith;
import org.mockito.AdditionalAnswers;
import org.mockito.Mock;
import org.mockito.stubbing.Answer;
import org.opengroup.osdu.core.common.cache.ICache;
import org.opengroup.osdu.core.common.cache.RedisCache;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.entitlements.v2.azure.service.metrics.hitsnmisses.HitsNMissesMetricService;
......@@ -33,7 +33,6 @@ import org.springframework.test.context.junit4.SpringRunner;
import redis.embedded.RedisServer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
......@@ -48,6 +47,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
@SpringBootTest
......@@ -83,9 +83,11 @@ public class GroupCacheServiceAzureTest {
@MockBean
private HitsNMissesMetricService metricService;
@MockBean
private ICache<String, ParentReferences> redisGroupCache;
private RedisCache<String, ParentReferences> redisGroupCache;
@MockBean
private JaxRsDpsLog log;
@MockBean
private PartitionCacheTtlService partitionCacheTtlService;
@Mock
private ParentTreeDto parentTreeDto;
......@@ -128,6 +130,7 @@ public class GroupCacheServiceAzureTest {
parentReferences.setParentReferencesOfUser(parents);
requester = EntityNode.createMemberNodeForNewUser("requesterId", "dp");
when(partitionCacheTtlService.getCacheTtlOfPartition("dp")).thenReturn(2000L);
}
@Test
......@@ -138,9 +141,9 @@ public class GroupCacheServiceAzureTest {
Set<ParentReference> result = this.sut.getFromPartitionCache("requesterId", "dp");
assertEquals(this.parents, result);
verify(this.retrieveGroupRepo, times(1)).loadAllParents(this.requester);
verify(this.redisGroupCache, times(1)).put("requesterId-dp", this.parentReferences);
verify(this.metricService, times(1)).sendMissesMetric();
verify(this.retrieveGroupRepo).loadAllParents(this.requester);
verify(this.redisGroupCache).put("requesterId-dp", 2000L, this.parentReferences);
verify(this.metricService).sendMissesMetric();
}
@Test
......@@ -149,7 +152,7 @@ public class GroupCacheServiceAzureTest {
Set<ParentReference> result = this.sut.getFromPartitionCache("requesterId", "dp");
assertEquals(this.parents, result);
verify(this.retrieveGroupRepo, times(0)).loadAllParents(this.requester);
verifyNoInteractions(this.retrieveGroupRepo);
verify(this.metricService, times(1)).sendHitsMetric();
}
......
package org.opengroup.osdu.entitlements.v2.azure.service;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.azure.util.AzureServicePrincipleTokenService;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.partition.IPartitionFactory;
import org.opengroup.osdu.core.common.partition.IPartitionProvider;
import org.opengroup.osdu.core.common.partition.PartitionException;
import org.opengroup.osdu.core.common.partition.PartitionInfo;
import org.opengroup.osdu.core.common.partition.Property;
import org.powermock.reflect.Whitebox;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@RunWith(MockitoJUnitRunner.class)
public class PartitionCacheTtlServiceTest {
@Mock
private ILogger logger;
@Mock
private AzureServicePrincipleTokenService azureServicePrincipleTokenService;
@Mock
private IPartitionFactory partitionFactory;
@InjectMocks
private PartitionCacheTtlService partitionCacheTtlService;
@Test
public void shouldReturnTtlOfKnownDataPartitionId() {
ConcurrentMap<String, Long> ttlPerDataPartition = new ConcurrentHashMap<>();
ttlPerDataPartition.put("common", 2000L);
Whitebox.setInternalState(partitionCacheTtlService, "ttlPerDataPartition", ttlPerDataPartition);
Long actual = partitionCacheTtlService.getCacheTtlOfPartition("common");
Assert.assertEquals(new Long(2000L), actual);
}
@Test
public void shouldReturnDefaultTtlOfUnknownDataPartitionId() {
Whitebox.setInternalState(partitionCacheTtlService, "cacheTtl", 3);
Long actual = partitionCacheTtlService.getCacheTtlOfPartition("common");
Assert.assertEquals(new Long(3000L), actual);
}
@Test
public void shouldDoInitSuccessfully() throws Exception {
Mockito.when(azureServicePrincipleTokenService.getAuthorizationToken()).thenReturn("token");
IPartitionProvider provider = Mockito.mock(IPartitionProvider.class);
Mockito.when(partitionFactory.create(Mockito.any(DpsHeaders.class))).thenReturn(provider);
Mockito.when(provider.list()).thenReturn(Collections.singletonList("dp1"));
Long ttl = 2000L;
Map<String, Property> properties = new HashMap<>();
Property property = new Property();
property.setValue(ttl);
properties.put("ent-cache-ttl", property);
PartitionInfo partitionInfo = PartitionInfo.builder().properties(properties).build();
Mockito.when(provider.get("dp1")).thenReturn(partitionInfo);
Whitebox.invokeMethod(partitionCacheTtlService, "init");
Mockito.verify(logger).info("CronJobLogger", "Starting a scheduled cron job to update cache ttls for data partitions", null);
Mockito.verifyNoMoreInteractions(logger);
}
@Test
public void shouldLogWarningWhenNoDpIdsGotFromPartitionService() throws Exception {
Mockito.when(azureServicePrincipleTokenService.getAuthorizationToken()).thenReturn("token");
IPartitionProvider provider = Mockito.mock(IPartitionProvider.class);
Mockito.when(partitionFactory.create(Mockito.any(DpsHeaders.class))).thenReturn(provider);
PartitionException exception = new PartitionException("error", null);
Mockito.when(provider.list()).thenThrow(exception);
Whitebox.invokeMethod(partitionCacheTtlService, "init");
Mockito.verify(logger).info("CronJobLogger", "Starting a scheduled cron job to update cache ttls for data partitions", null);
Mockito.verify(logger).warning("CronJobLogger", "Couldn't get data partition ids from partition service", exception, null);
Mockito.verifyNoMoreInteractions(logger);
}
@Test
public void shouldLogWarningWhenNoDpIdsGotFromPartitionService1() throws Exception {
Mockito.when(azureServicePrincipleTokenService.getAuthorizationToken()).thenReturn("token");
IPartitionProvider provider = Mockito.mock(IPartitionProvider.class);
Mockito.when(partitionFactory.create(Mockito.any(DpsHeaders.class))).thenReturn(provider);
PartitionException exception = new PartitionException("error", null);
Mockito.when(provider.list()).thenReturn(Collections.singletonList("dp1"));
Mockito.when(provider.get("dp1")).thenThrow(exception);
Whitebox.invokeMethod(partitionCacheTtlService, "init");
Mockito.verify(logger).info("CronJobLogger", "Starting a scheduled cron job to update cache ttls for data partitions", null);
Mockito.verify(logger).warning("CronJobLogger", "Couldn't get PartitionInfo from partition service for partition id: dp1", exception, null);
Mockito.verifyNoMoreInteractions(logger);
}
}
......@@ -12,6 +12,7 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.opengroup.osdu.core.common.cache.ICache;
import org.opengroup.osdu.core.common.cache.RedisCache;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
......@@ -21,6 +22,7 @@ import org.opengroup.osdu.entitlements.v2.api.DeleteGroupApi;
import org.opengroup.osdu.entitlements.v2.api.DeleteMemberApi;
import org.opengroup.osdu.entitlements.v2.auth.AuthorizationService;
import org.opengroup.osdu.entitlements.v2.azure.AzureAppProperties;
import org.opengroup.osdu.entitlements.v2.azure.service.PartitionCacheTtlService;
import org.opengroup.osdu.entitlements.v2.logging.AuditLogger;
import org.opengroup.osdu.entitlements.v2.model.GroupType;
import org.opengroup.osdu.entitlements.v2.model.ParentReference;
......@@ -94,7 +96,7 @@ public class CreateMembershipsWorkflowSinglePartitionTest {
@MockBean
private AuthorizationService authService;
@MockBean
private ICache<String, ParentReferences> redisGroupCache;
private RedisCache<String, ParentReferences> redisGroupCache;
@MockBean
private RedissonClient redissonClient;
@Mock
......@@ -103,6 +105,8 @@ public class CreateMembershipsWorkflowSinglePartitionTest {
private Retry retry;
@MockBean
private HitsNMissesMetricService metricService;
@MockBean
private PartitionCacheTtlService partitionCacheTtlService;
@Before
public void before() throws InterruptedException {
......@@ -118,6 +122,7 @@ public class CreateMembershipsWorkflowSinglePartitionTest {
when(authService.isAuthorized(any(), any())).thenReturn(true);
when(redissonClient.getLock(any())).thenReturn(cacheLock);
when(cacheLock.tryLock(anyLong(), anyLong(), any())).thenReturn(true);
when(partitionCacheTtlService.getCacheTtlOfPartition("common")).thenReturn(0L);
}
@Test
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment