From 090f80051b1a9768216c7fae9fed08b39848e8aa Mon Sep 17 00:00:00 2001 From: aalekhj Date: Mon, 7 Feb 2022 17:58:38 +0530 Subject: [PATCH 01/10] Enabled redis cache usage --- .../workflow-azure/src/main/resources/application.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/provider/workflow-azure/src/main/resources/application.properties b/provider/workflow-azure/src/main/resources/application.properties index 1ba5443b..b441869f 100644 --- a/provider/workflow-azure/src/main/resources/application.properties +++ b/provider/workflow-azure/src/main/resources/application.properties @@ -78,7 +78,7 @@ management.health.elasticsearch.enabled=false # Cache related configs # This is a temporary change to use VM Cache for service deployed in AKS and not just local # Set to false when issue with Redis Cache is resolved -runtime.env.local=${runtime_env_local:true} +runtime.env.local=${runtime_env_local:false} # Redis configuration osdu.azure.redis.redisPort=${redis_port:6380} -- GitLab From 25b361286b0b9288788f44e6fa720602c6ea2071 Mon Sep 17 00:00:00 2001 From: aalekhj Date: Thu, 3 Mar 2022 12:50:27 +0530 Subject: [PATCH 02/10] Upgraded os core common version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 442b0d7c..b8efe4ce 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ 1 1.3.1.Final 3.0.0-M4 - 0.13.0 + 0.14.0-SNAPSHOT 3.0.0 2.17.1 2.4.7 -- GitLab From 6a13830054db29bdeca3c8341b6a2c13a5328b4c Mon Sep 17 00:00:00 2001 From: aalekhj Date: Thu, 3 Mar 2022 12:52:11 +0530 Subject: [PATCH 03/10] Updated ActiveDagRunsRedisCache.java --- .../azure/cache/ActiveDagRunsRedisCache.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/cache/ActiveDagRunsRedisCache.java b/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/cache/ActiveDagRunsRedisCache.java index b41fb878..7bd91986 100644 --- a/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/cache/ActiveDagRunsRedisCache.java +++ b/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/cache/ActiveDagRunsRedisCache.java @@ -1,15 +1,33 @@ package org.opengroup.osdu.workflow.provider.azure.cache; +import com.lambdaworks.redis.codec.RedisCodec; +import org.opengroup.osdu.core.common.cache.JsonCodec; import org.opengroup.osdu.core.common.cache.RedisCache; import org.opengroup.osdu.workflow.provider.azure.config.RedisConfig; +import org.opengroup.osdu.workflow.provider.azure.interfaces.IActiveDagRunsCache; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @Component("ActiveDagRunsCache") @ConditionalOnProperty(value = "runtime.env.local", havingValue = "false", matchIfMissing = true) -public class ActiveDagRunsRedisCache extends RedisCache { +public class ActiveDagRunsRedisCache extends RedisCache implements IActiveDagRunsCache { public ActiveDagRunsRedisCache(final RedisConfig redisConfig) { super(redisConfig.getRedisHost(), redisConfig.getRedisPort(), redisConfig.getRedisPassword(), redisConfig.getActiveDagRunsTtl(), String.class, Integer.class); } + + @Override + public void incrementKey(String key) { + this.increment(key); + } + + @Override + public void decrementKey(String key) { + this.decrement(key); + } + + @Override + public RedisCodec getCodec(Class classOfK, Class classOfV) { + return new JsonCodec<>(classOfK, classOfV); + } } -- GitLab From 16bcec11043a818ea8133777f9841db09e500350 Mon Sep 17 00:00:00 2001 From: aalekhj Date: Thu, 3 Mar 2022 12:52:34 +0530 Subject: [PATCH 04/10] Updated ActiveDagRunsVmCache.java --- .../azure/cache/ActiveDagRunsVmCache.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/cache/ActiveDagRunsVmCache.java b/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/cache/ActiveDagRunsVmCache.java index f39ea3ae..d214cadd 100644 --- a/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/cache/ActiveDagRunsVmCache.java +++ b/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/cache/ActiveDagRunsVmCache.java @@ -1,6 +1,7 @@ package org.opengroup.osdu.workflow.provider.azure.cache; import org.opengroup.osdu.core.common.cache.VmCache; +import org.opengroup.osdu.workflow.provider.azure.interfaces.IActiveDagRunsCache; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @@ -9,8 +10,20 @@ import static org.opengroup.osdu.workflow.provider.azure.consts.CacheConstants.A @Component("ActiveDagRunsCache") @ConditionalOnProperty(value = "runtime.env.local", havingValue = "true") -public class ActiveDagRunsVmCache extends VmCache { +public class ActiveDagRunsVmCache extends VmCache implements IActiveDagRunsCache { public ActiveDagRunsVmCache() { super(ACTIVE_DAG_RUNS_LOCAL_CACHE_EXPIRATION_SECONDS, ACTIVE_DAG_RUNS_LOCAL_CACHE_MAXIMUM_SIZE); } + + @Override + public void incrementKey(String key) { + Integer value = this.get(key); + this.put(key, value + 1); + } + + @Override + public void decrementKey(String key) { + Integer value = this.get(key); + this.put(key, value - 1); + } } -- GitLab From acc18d4e3887c54bcb72238f77d602437aa80c22 Mon Sep 17 00:00:00 2001 From: aalekhj Date: Thu, 3 Mar 2022 13:31:14 +0530 Subject: [PATCH 05/10] Updated WorkflowRunRepository.java --- .../azure/repository/WorkflowRunRepository.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowRunRepository.java b/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowRunRepository.java index e37e1400..ee6be702 100644 --- a/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowRunRepository.java +++ b/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowRunRepository.java @@ -8,7 +8,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpStatus; import org.opengroup.osdu.azure.cosmosdb.CosmosStore; import org.opengroup.osdu.azure.query.CosmosStorePageRequest; -import org.opengroup.osdu.core.common.cache.ICache; import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.workflow.exception.WorkflowRunNotFoundException; @@ -17,6 +16,7 @@ import org.opengroup.osdu.workflow.model.WorkflowRunsPage; import org.opengroup.osdu.workflow.model.WorkflowStatusType; import org.opengroup.osdu.workflow.provider.azure.config.CosmosConfig; import org.opengroup.osdu.workflow.provider.azure.consts.WorkflowRunConstants; +import org.opengroup.osdu.workflow.provider.azure.interfaces.IActiveDagRunsCache; import org.opengroup.osdu.workflow.provider.azure.model.WorkflowRunDoc; import org.opengroup.osdu.workflow.provider.azure.utils.CursorUtils; import org.opengroup.osdu.workflow.provider.interfaces.IWorkflowRunRepository; @@ -48,7 +48,7 @@ public class WorkflowRunRepository implements IWorkflowRunRepository { private final WorkflowTasksSharingRepository workflowTasksSharingRepository; @Qualifier("ActiveDagRunsCache") - private final ICache activeDagRunsCache; + private final IActiveDagRunsCache activeDagRunsCache; @Override public WorkflowRun saveWorkflowRun(final WorkflowRun workflowRun) { @@ -184,10 +184,9 @@ public class WorkflowRunRepository implements IWorkflowRunRepository { private void decrementActiveDagRunsCountInCache() { Integer numberOfActiveDagRuns = activeDagRunsCache.get(ACTIVE_DAG_RUNS_COUNT_CACHE_KEY); // Update the cache count: decrementing the count by 1 - if (numberOfActiveDagRuns != null) { - numberOfActiveDagRuns = Math.max(0, numberOfActiveDagRuns - 1); - log.info("Decrementing the number of active dag runs in cache to {}", numberOfActiveDagRuns); - activeDagRunsCache.put(ACTIVE_DAG_RUNS_COUNT_CACHE_KEY, numberOfActiveDagRuns); + if (numberOfActiveDagRuns != null && numberOfActiveDagRuns != 0) { + log.info("Decrementing the number of active dag runs in cache to {}", numberOfActiveDagRuns - 1); + activeDagRunsCache.decrementKey(ACTIVE_DAG_RUNS_COUNT_CACHE_KEY); } } -- GitLab From ec320931c23a1dbc76f08db30086dc9c5f2e003d Mon Sep 17 00:00:00 2001 From: aalekhj Date: Thu, 3 Mar 2022 13:31:42 +0530 Subject: [PATCH 06/10] Updated WorkflowEngineServiceImpl.java --- .../azure/service/WorkflowEngineServiceImpl.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/service/WorkflowEngineServiceImpl.java b/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/service/WorkflowEngineServiceImpl.java index 813e2bbb..3ac153b5 100644 --- a/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/service/WorkflowEngineServiceImpl.java +++ b/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/service/WorkflowEngineServiceImpl.java @@ -9,7 +9,6 @@ import com.sun.jersey.api.client.WebResource; import org.json.JSONObject; import org.opengroup.osdu.azure.partition.PartitionInfoAzure; import org.opengroup.osdu.azure.partition.PartitionServiceClient; -import org.opengroup.osdu.core.common.cache.ICache; import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.workflow.config.AirflowConfig; @@ -22,6 +21,7 @@ import org.opengroup.osdu.workflow.provider.azure.config.AirflowConfigResolver; import org.opengroup.osdu.workflow.provider.azure.config.AzureWorkflowEngineConfig; import org.opengroup.osdu.workflow.provider.azure.fileshare.FileShareConfig; import org.opengroup.osdu.workflow.provider.azure.fileshare.FileShareStore; +import org.opengroup.osdu.workflow.provider.azure.interfaces.IActiveDagRunsCache; import org.opengroup.osdu.workflow.provider.azure.utils.airflow.IAirflowWorkflowEngineUtil; import org.opengroup.osdu.workflow.provider.interfaces.IWorkflowEngineService; import org.slf4j.Logger; @@ -82,7 +82,7 @@ public class WorkflowEngineServiceImpl implements IWorkflowEngineService { @Autowired @Qualifier("ActiveDagRunsCache") - private ICache activeDagRunsCache; + private IActiveDagRunsCache activeDagRunsCache; @Autowired private ActiveDagRunsConfig activeDagRunsConfig; @@ -257,9 +257,8 @@ public class WorkflowEngineServiceImpl implements IWorkflowEngineService { private void incrementActiveDagRunsCountInCache() { Integer numberOfActiveDagRuns = activeDagRunsCache.get(ACTIVE_DAG_RUNS_COUNT_CACHE_KEY); if (numberOfActiveDagRuns != null) { - numberOfActiveDagRuns += 1; - LOGGER.info("Incrementing the number of active dag runs in cache to {}", numberOfActiveDagRuns); - activeDagRunsCache.put(ACTIVE_DAG_RUNS_COUNT_CACHE_KEY, numberOfActiveDagRuns); + LOGGER.info("Incrementing the number of active dag runs in cache to {}", numberOfActiveDagRuns + 1); + activeDagRunsCache.incrementKey(ACTIVE_DAG_RUNS_COUNT_CACHE_KEY); } } -- GitLab From d02482344eec9c3f7ac202bff9feb11b8c846c77 Mon Sep 17 00:00:00 2001 From: aalekhj Date: Thu, 3 Mar 2022 13:32:07 +0530 Subject: [PATCH 07/10] Updated WorkflowRunRepositoryTest.java --- .../provider/azure/repository/WorkflowRunRepositoryTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowRunRepositoryTest.java b/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowRunRepositoryTest.java index fe7253ee..50621075 100644 --- a/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowRunRepositoryTest.java +++ b/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowRunRepositoryTest.java @@ -15,7 +15,6 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opengroup.osdu.azure.cosmosdb.CosmosStore; import org.opengroup.osdu.azure.query.CosmosStorePageRequest; -import org.opengroup.osdu.core.common.cache.ICache; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.model.http.AppError; import org.opengroup.osdu.core.common.model.http.AppException; @@ -25,6 +24,7 @@ import org.opengroup.osdu.workflow.model.WorkflowRun; import org.opengroup.osdu.workflow.model.WorkflowRunsPage; import org.opengroup.osdu.workflow.provider.azure.config.CosmosConfig; import org.opengroup.osdu.workflow.provider.azure.consts.WorkflowRunConstants; +import org.opengroup.osdu.workflow.provider.azure.interfaces.IActiveDagRunsCache; import org.opengroup.osdu.workflow.provider.azure.model.WorkflowRunDoc; import org.opengroup.osdu.workflow.provider.azure.utils.CursorUtils; import org.springframework.data.domain.Page; @@ -127,7 +127,7 @@ public class WorkflowRunRepositoryTest { private WorkflowTasksSharingRepository workflowTasksSharingRepository; @Mock - private ICache activeDagRunsCache; + private IActiveDagRunsCache activeDagRunsCache; @InjectMocks private WorkflowRunRepository workflowRunRepository; -- GitLab From be37c5685ffecdae75f15d17bf6e2d012fcfbcdf Mon Sep 17 00:00:00 2001 From: aalekhj Date: Thu, 3 Mar 2022 13:32:22 +0530 Subject: [PATCH 08/10] Added IActiveDagRunsCache.java --- .../provider/azure/interfaces/IActiveDagRunsCache.java | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/interfaces/IActiveDagRunsCache.java diff --git a/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/interfaces/IActiveDagRunsCache.java b/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/interfaces/IActiveDagRunsCache.java new file mode 100644 index 00000000..af2573f9 --- /dev/null +++ b/provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/interfaces/IActiveDagRunsCache.java @@ -0,0 +1,8 @@ +package org.opengroup.osdu.workflow.provider.azure.interfaces; + +import org.opengroup.osdu.core.common.cache.ICache; + +public interface IActiveDagRunsCache extends ICache { + void incrementKey(K key); + void decrementKey(K key); +} -- GitLab From f7f5df1cb35d247dc4bea29ebbdab35715181687 Mon Sep 17 00:00:00 2001 From: Harshit Aggarwal Date: Fri, 6 May 2022 19:25:47 +0530 Subject: [PATCH 09/10] increment core common version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 9805a9ea..d7ef6dae 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ 1 1.3.1.Final 3.0.0-M4 - 0.14.0-SNAPSHOT + 0.15.0-rc5 3.0.0 2.17.1 2.4.7 -- GitLab From 72e6f9537413cea0e0a67bc082546c2e938b1cb8 Mon Sep 17 00:00:00 2001 From: Harshit Aggarwal Date: Fri, 6 May 2022 20:58:22 +0530 Subject: [PATCH 10/10] fix ut --- .../azure/repository/WorkflowRunRepositoryTest.java | 2 +- .../azure/service/WorkflowEngineServiceImplTest.java | 7 ++++--- .../azure/service/WorkflowEngineServiceV2ImplTest.java | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowRunRepositoryTest.java b/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowRunRepositoryTest.java index 50621075..e49e5e7d 100644 --- a/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowRunRepositoryTest.java +++ b/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowRunRepositoryTest.java @@ -243,7 +243,7 @@ public class WorkflowRunRepositoryTest { verify(cosmosConfig, times(2)).getWorkflowRunCollection(); verify(dpsHeaders, times(3)).getPartitionId(); verify(activeDagRunsCache).get(eq(ACTIVE_DAG_RUNS_COUNT_CACHE_KEY)); - verify(activeDagRunsCache).put(eq(ACTIVE_DAG_RUNS_COUNT_CACHE_KEY), eq(0)); + verify(activeDagRunsCache).decrementKey(eq(ACTIVE_DAG_RUNS_COUNT_CACHE_KEY)); assertThat(workflowRunDocArgumentCaptor.getValue().getStatus(), equalTo(response.getStatus().toString())); assertThat(workflowRunDocArgumentCaptor.getValue().getId(), equalTo(response.getRunId())); assertThat(workflowRunDocArgumentCaptor.getValue().getWorkflowName(), equalTo(response.getWorkflowId())); diff --git a/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/service/WorkflowEngineServiceImplTest.java b/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/service/WorkflowEngineServiceImplTest.java index 2b2bbd82..f8664061 100644 --- a/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/service/WorkflowEngineServiceImplTest.java +++ b/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/service/WorkflowEngineServiceImplTest.java @@ -28,6 +28,7 @@ import org.opengroup.osdu.workflow.provider.azure.config.AirflowConfigResolver; import org.opengroup.osdu.workflow.provider.azure.config.AzureWorkflowEngineConfig; import org.opengroup.osdu.workflow.provider.azure.fileshare.FileShareConfig; import org.opengroup.osdu.workflow.provider.azure.fileshare.FileShareStore; +import org.opengroup.osdu.workflow.provider.azure.interfaces.IActiveDagRunsCache; import org.opengroup.osdu.workflow.provider.azure.utils.airflow.AirflowV1WorkflowEngineUtil; import org.skyscreamer.jsonassert.JSONAssert; import org.springframework.jdbc.core.JdbcTemplate; @@ -148,7 +149,7 @@ public class WorkflowEngineServiceImplTest { private ClientResponse clientResponse; @Mock - private ICache activeDagRunsCache; + private IActiveDagRunsCache activeDagRunsCache; @Mock private ActiveDagRunsConfig activeDagRunsConfig; @@ -274,10 +275,10 @@ public class WorkflowEngineServiceImplTest { verify(airflowConfig).getAppKey(); verify(airflowConfig).isDagRunAbstractionEnabled(); verify(activeDagRunsCache, times(2)).get(eq(ACTIVE_DAG_RUNS_CACHE_KEY)); - verify(activeDagRunsCache, times(2)).put(eq(ACTIVE_DAG_RUNS_CACHE_KEY), numberOfActiveDagRunsCaptor.capture()); + verify(activeDagRunsCache, times(1)).put(eq(ACTIVE_DAG_RUNS_CACHE_KEY), numberOfActiveDagRunsCaptor.capture()); + verify(activeDagRunsCache, times(1)).incrementKey(eq(ACTIVE_DAG_RUNS_CACHE_KEY)); verify(activeDagRunsConfig).getThreshold(); assertEquals(0, numberOfActiveDagRunsCaptor.getAllValues().get(0)); - assertEquals(1, numberOfActiveDagRunsCaptor.getAllValues().get(1)); JSONAssert.assertEquals(AIRFLOW_INPUT, airflowInputCaptor.getValue(), true); } diff --git a/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/service/WorkflowEngineServiceV2ImplTest.java b/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/service/WorkflowEngineServiceV2ImplTest.java index eb959026..3b7b6043 100644 --- a/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/service/WorkflowEngineServiceV2ImplTest.java +++ b/provider/workflow-azure/src/test/java/org/opengroup/osdu/workflow/provider/azure/service/WorkflowEngineServiceV2ImplTest.java @@ -28,6 +28,7 @@ import org.opengroup.osdu.workflow.provider.azure.config.AirflowConfigResolver; import org.opengroup.osdu.workflow.provider.azure.config.AzureWorkflowEngineConfig; import org.opengroup.osdu.workflow.provider.azure.fileshare.FileShareConfig; import org.opengroup.osdu.workflow.provider.azure.fileshare.FileShareStore; +import org.opengroup.osdu.workflow.provider.azure.interfaces.IActiveDagRunsCache; import org.opengroup.osdu.workflow.provider.azure.utils.airflow.AirflowV2WorkflowEngineUtil; import org.skyscreamer.jsonassert.JSONAssert; import org.springframework.jdbc.core.JdbcTemplate; @@ -135,7 +136,7 @@ public class WorkflowEngineServiceV2ImplTest { private ClientResponse clientResponse; @Mock - private ICache activeDagRunsCache; + private IActiveDagRunsCache activeDagRunsCache; @Mock private ActiveDagRunsConfig activeDagRunsConfig; @@ -249,10 +250,9 @@ public class WorkflowEngineServiceV2ImplTest { verify(airflowConfig).getAppKey(); verify(airflowConfig).isDagRunAbstractionEnabled(); verify(activeDagRunsCache, times(2)).get(eq(ACTIVE_DAG_RUNS_CACHE_KEY)); - verify(activeDagRunsCache, times(2)).put(eq(ACTIVE_DAG_RUNS_CACHE_KEY), numberOfActiveDagRunsCaptor.capture()); + verify(activeDagRunsCache, times(1)).put(eq(ACTIVE_DAG_RUNS_CACHE_KEY), numberOfActiveDagRunsCaptor.capture()); verify(activeDagRunsConfig).getThreshold(); assertEquals(0, numberOfActiveDagRunsCaptor.getAllValues().get(0)); - assertEquals(1, numberOfActiveDagRunsCaptor.getAllValues().get(1)); JSONAssert.assertEquals(AIRFLOW_INPUT, airflowInputCaptor.getValue(), true); } -- GitLab