Commit cb144ac7 authored by Kishore Battula's avatar Kishore Battula
Browse files

Updatating azure commons in workflow service to 0.0.24 version

parent 6b86c5dc
Pipeline #7412 failed with stage
in 1 minute and 47 seconds
......@@ -31,7 +31,7 @@
<properties>
<azure.version>2.1.7</azure.version>
<osdu.azurecore.version>0.0.6-SNAPSHOT</osdu.azurecore.version>
<osdu.azurecore.version>0.0.24</osdu.azurecore.version>
<azure.appservice.resourcegroup></azure.appservice.resourcegroup>
<azure.appservice.plan></azure.appservice.plan>
<azure.appservice.appname></azure.appservice.appname>
......@@ -47,6 +47,17 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
......
......@@ -34,15 +34,6 @@ public class AzureBootstrapConfig {
@Value("${azure.keyvault.url}")
private String keyVaultURL;
@Value("${azure.cosmosdb.database}")
private String cosmosDBName;
@Value("${azure.cosmosdb.ingestionstrategy.collection}")
private String ingestionStrategyCollectionName;
@Value("${azure.cosmosdb.workflowstatus.collection}")
private String workflowStatusCollectionName;
@Value("${azure.airflow.url}")
private String airflowURL;
......@@ -84,18 +75,4 @@ public class AzureBootstrapConfig {
public String cosmosKey(SecretClient kv) {
return KeyVaultFacade.getSecretWithValidation(kv, "cosmos-primary-key");
}
@Bean
@Named("INGESTION_STRATEGY_CONTAINER")
CosmosContainer ingestionStrategyContainer(final CosmosClient cosmosClient) {
Validators.checkNotNull(cosmosClient, "Cosmos client cannot be null");
return cosmosClient.getDatabase(cosmosDBName).getContainer(ingestionStrategyCollectionName);
}
@Bean
@Named("WORKFLOW_STATUS_CONTAINER")
CosmosContainer workflowStatusContainer(final CosmosClient cosmosClient) {
Validators.checkNotNull(cosmosClient, "Cosmos client cannot be null");
return cosmosClient.getDatabase(cosmosDBName).getContainer(workflowStatusCollectionName);
}
}
package org.opengroup.osdu.workflow.provider.azure.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@ConfigurationProperties("azure.cosmosdb")
@Configuration
@Getter
@Setter
public class CosmosConfig {
private String database;
private String ingestionStrategyCollection;
private String workflowStatusCollection;
}
......@@ -22,9 +22,12 @@ import java.util.logging.Logger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.azure.CosmosFacade;
import org.opengroup.osdu.azure.CosmosStore;
import org.opengroup.osdu.core.common.model.WorkflowType;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.workflow.model.IngestionStrategy;
import org.opengroup.osdu.workflow.provider.azure.config.AzureBootstrapConfig;
import org.opengroup.osdu.workflow.provider.azure.config.CosmosConfig;
import org.opengroup.osdu.workflow.provider.azure.model.IngestionStrategyDoc;
import org.opengroup.osdu.workflow.provider.interfaces.IIngestionStrategyRepository;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -42,20 +45,26 @@ public class IngestionStrategyRepository implements IIngestionStrategyRepository
private static Logger logger = Logger.getLogger(IngestionStrategyRepository.class.getName());
@Autowired
@Named("INGESTION_STRATEGY_CONTAINER")
private CosmosContainer ingestionStrategyContainer;
private CosmosStore cosmosStore;
@Autowired
private DpsHeaders dpsHeaders;
@Autowired
private CosmosConfig cosmosConfig;
@Override
public IngestionStrategy findByWorkflowTypeAndDataTypeAndUserId(WorkflowType workflowType,
String dataType, String userId) {
logger.log(Level.INFO, String.format("Requesting dag selection. Workflow type: {%s}, Data type: {%s}, User id: {%s}",
workflowType, dataType, userId));
Optional<IngestionStrategyDoc> document = CosmosFacade.findItem(
ingestionStrategyContainer,
String.format("%s-%s", workflowType.toString().toLowerCase(), dataType.toLowerCase()),
workflowType.toString().toLowerCase(),
IngestionStrategyDoc.class);
Optional<IngestionStrategyDoc> document = cosmosStore.findItem(
dpsHeaders.getPartitionId(),
cosmosConfig.getDatabase(),
cosmosConfig.getIngestionStrategyCollection(),
String.format("%s-%s", workflowType.toString().toLowerCase(), dataType.toLowerCase()),
workflowType.toString().toLowerCase(),
IngestionStrategyDoc.class);
IngestionStrategyDoc ingestionStrategyDoc = !document.isPresent() ? null : document.get();
......
......@@ -20,10 +20,12 @@ import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.azure.CosmosFacade;
import org.opengroup.osdu.azure.CosmosStore;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.workflow.exception.WorkflowNotFoundException;
import org.opengroup.osdu.workflow.model.WorkflowStatus;
import org.opengroup.osdu.workflow.model.WorkflowStatusType;
import org.opengroup.osdu.workflow.provider.azure.config.CosmosConfig;
import org.opengroup.osdu.workflow.provider.azure.model.WorkflowStatusDoc;
import org.opengroup.osdu.workflow.provider.interfaces.IWorkflowStatusRepository;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -41,21 +43,28 @@ public class WorkflowStatusRepository implements IWorkflowStatusRepository {
private static Logger logger = Logger.getLogger(WorkflowStatusRepository.class.getName());
@Autowired
@Named("WORKFLOW_STATUS_CONTAINER")
private CosmosContainer workflowStatusContainer;
private CosmosConfig cosmosConfig;
@Autowired
private CosmosStore cosmosStore;
@Autowired
private DpsHeaders dpsHeaders;
@Override
public WorkflowStatus findWorkflowStatus(String workflowId) {
logger.log(Level.INFO, String.format("Requesting workflow status by workflow Id :{%s}",
workflowId));
Optional<WorkflowStatusDoc> document = CosmosFacade.findItem(
workflowStatusContainer,
workflowId,
workflowId,
WorkflowStatusDoc.class);
Optional<WorkflowStatusDoc> document = cosmosStore.findItem(
dpsHeaders.getPartitionId(),
cosmosConfig.getDatabase(),
cosmosConfig.getWorkflowStatusCollection(),
workflowId,
workflowId,
WorkflowStatusDoc.class);
WorkflowStatusDoc workflowStatusDoc = !document.isPresent() ? null : document.get();
WorkflowStatusDoc workflowStatusDoc = !document.isPresent() ? null : document.get();
if(workflowStatusDoc == null) {
throw new WorkflowNotFoundException(
......@@ -74,15 +83,18 @@ public class WorkflowStatusRepository implements IWorkflowStatusRepository {
workflowStatus));
Optional<WorkflowStatusDoc> existingDoc = CosmosFacade.findItem(
workflowStatusContainer,
workflowStatus.getWorkflowId(),
workflowStatus.getWorkflowId(),
WorkflowStatusDoc.class);
Optional<WorkflowStatusDoc> existingDoc = cosmosStore.findItem(
dpsHeaders.getPartitionId(),
cosmosConfig.getDatabase(),
cosmosConfig.getWorkflowStatusCollection(),
workflowStatus.getWorkflowId(),
workflowStatus.getWorkflowId(),
WorkflowStatusDoc.class);
if (!existingDoc.isPresent()) {
WorkflowStatusDoc newStatusDoc = buildWorkflowStatusDoc(workflowStatus);
CosmosFacade.upsertItem(workflowStatusContainer, newStatusDoc);
cosmosStore.upsertItem(dpsHeaders.getPartitionId(), cosmosConfig.getDatabase(),
cosmosConfig.getWorkflowStatusCollection(), newStatusDoc);
}
logger.log(Level.INFO, String.format("Fetch saved workflow status: {%s}", workflowStatus));
......@@ -96,11 +108,13 @@ public class WorkflowStatusRepository implements IWorkflowStatusRepository {
logger.log(Level.INFO, String.format("Update workflow status for workflow id: {%s}, new status: {%s}",
workflowId, workflowStatusType));
Optional<WorkflowStatusDoc> existingDoc = CosmosFacade.findItem(
workflowStatusContainer,
workflowId,
workflowId,
WorkflowStatusDoc.class);
Optional<WorkflowStatusDoc> existingDoc = cosmosStore.findItem(
dpsHeaders.getPartitionId(),
cosmosConfig.getDatabase(),
cosmosConfig.getWorkflowStatusCollection(),
workflowId,
workflowId,
WorkflowStatusDoc.class);
WorkflowStatusDoc workflowStatusDoc = !existingDoc.isPresent() ? null : existingDoc.get();
......@@ -112,7 +126,8 @@ public class WorkflowStatusRepository implements IWorkflowStatusRepository {
logger.log(Level.INFO, String.format("Found workflow status : {%s}", workflowStatusDoc));
workflowStatusDoc.workflowStatusType = WorkflowStatusType.valueOf(workflowStatusType.toString());
CosmosFacade.upsertItem(workflowStatusContainer, workflowStatusDoc);
cosmosStore.upsertItem(dpsHeaders.getPartitionId(), cosmosConfig.getDatabase(),
cosmosConfig.getWorkflowStatusCollection(), workflowStatusDoc);
WorkflowStatus workflowStatus = buildWorkflowStatus(workflowStatusDoc);
logger.log(Level.INFO, String.format("Updated workflow status : {%s}", workflowStatus));
......
......@@ -21,8 +21,8 @@ azure.activedirectory.AppIdUri=api://${azure.activedirectory.client-id}
# Azure CosmosDB configuration
azure.cosmosdb.database=${cosmosdb_database}
azure.cosmosdb.ingestionstrategy.collection=IngestionStrategy
azure.cosmosdb.workflowstatus.collection=WorkflowStatus
azure.cosmosdb.ingestionStrategyCollection=IngestionStrategy
azure.cosmosdb.workflowStatusCollection=WorkflowStatus
# Azure KeyVault configuration
azure.keyvault.url=${KEYVAULT_URI}
......@@ -35,3 +35,8 @@ spring.application.name=workflow-azure
azure.airflow.url=${airflow_url}
azure.airflow.username=${airflow_username}
azure.airflow.password=${airflow_password}
# Logging
logging.transaction.enabled=true
logging.slf4jlogger.enabled=true
logging.mdccontext.enabled=true
......@@ -11,46 +11,52 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.azure.CosmosStore;
import org.opengroup.osdu.core.common.model.WorkflowType;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.workflow.model.IngestionStrategy;
import org.opengroup.osdu.workflow.provider.azure.WorkflowApplication;
import org.opengroup.osdu.workflow.provider.azure.config.CosmosConfig;
import org.opengroup.osdu.workflow.provider.azure.model.IngestionStrategyDoc;
import org.springframework.boot.test.context.SpringBootTest;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
import javax.inject.Named;
import java.io.IOException;
import java.util.Optional;
@RunWith(MockitoJUnitRunner.class)
@SpringBootTest(classes = {WorkflowApplication.class})
public class IngestionStrategyRepositoryTest {
@Mock
private CosmosItem cosmosItem;
private static final String DATABASE_NAME = "someDatabase";
private static final String INGESTION_STRATEGY_COLLECTION_NAME = "someIngestionStrategyName";
private static final String PARTITION_ID = "somePartition";
@Mock
private CosmosItemResponse cosmosResponse;
private CosmosStore cosmosStore;
@Mock
private CosmosItemProperties cosmosItemProperties;
private CosmosConfig cosmosConfig;
@Mock
@Named("INGESTION_STRATEGY_CONTAINER")
private CosmosContainer ingestionStrategyContainer;
private DpsHeaders dpsHeaders;
@InjectMocks
private IngestionStrategyRepository repository;
@Before
public void initMocks() throws Exception {
doReturn(cosmosItem).when(ingestionStrategyContainer).getItem(any(), any());
doReturn(cosmosResponse).when(cosmosItem).read(any());
doReturn(cosmosItemProperties).when(cosmosResponse).getProperties();
when(cosmosConfig.getDatabase()).thenReturn(DATABASE_NAME);
when(cosmosConfig.getIngestionStrategyCollection()).thenReturn(INGESTION_STRATEGY_COLLECTION_NAME);
when(dpsHeaders.getPartitionId()).thenReturn(PARTITION_ID);
}
@Test
......@@ -59,9 +65,13 @@ public class IngestionStrategyRepositoryTest {
ingestionStrategyDoc.setDagName("osdu_python_sdk_well_log_ingestion");
ingestionStrategyDoc.setDataType("well_log");
ingestionStrategyDoc.setWorkflowType(WorkflowType.OSDU.name());
doReturn(ingestionStrategyDoc)
.when(cosmosItemProperties)
.getObject(any());
final String dataType = "well_log";
when(cosmosStore.findItem(eq(PARTITION_ID),
eq(DATABASE_NAME),
eq(INGESTION_STRATEGY_COLLECTION_NAME),
eq(String.format("%s-%s", WorkflowType.OSDU.toString().toLowerCase(), dataType.toLowerCase())),
eq(WorkflowType.OSDU.toString().toLowerCase()),
eq(IngestionStrategyDoc.class))).thenReturn(Optional.of(ingestionStrategyDoc));
IngestionStrategy ingestionStrategy = repository.findByWorkflowTypeAndDataTypeAndUserId(WorkflowType.OSDU, "well_log", "");
Assert.assertNotNull(ingestionStrategy);
......@@ -71,37 +81,18 @@ public class IngestionStrategyRepositoryTest {
}
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionWhenRecordNotFound() throws CosmosClientException {
doThrow(NullPointerException.class)
.when(cosmosItem)
.read(any());
repository.findByWorkflowTypeAndDataTypeAndUserId(WorkflowType.OSDU, "test", "");
}
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionWhenDataTypeAndWorkflowTypeNotFound() throws Throwable {
doThrow(NotFoundException.class)
.when(cosmosItem)
.read(any());
repository.findByWorkflowTypeAndDataTypeAndUserId(WorkflowType.OSDU, "well_log111", "opendes11");
Assert.assertFalse(throwException());
}
@Test(expected = NullPointerException.class)
public void shouldThrowExceptionWhenDocumentisMalformed() throws IOException {
doThrow(IOException.class)
.when(cosmosItemProperties)
.getObject(any());
repository.findByWorkflowTypeAndDataTypeAndUserId(WorkflowType.OSDU, "well_log111", "opendes11");
Assert.assertFalse(throwException());
@Test
public void shouldReturnNullWhenRecordNotFound() throws CosmosClientException {
when(cosmosStore.findItem(any(), any(), any(), any(), any(), any()))
.thenReturn(Optional.empty());
Assert.assertNull(repository.findByWorkflowTypeAndDataTypeAndUserId(WorkflowType.OSDU, "test", ""));
}
@Test(expected = AppException.class)
public void shouldThrowExceptionWhenCosmosException() throws CosmosClientException {
doThrow(CosmosClientException.class)
.when(cosmosItem)
.read(any());
doThrow(AppException.class)
.when(cosmosStore)
.findItem(any(), any(), any(), any(), any(), any());
repository.findByWorkflowTypeAndDataTypeAndUserId(WorkflowType.INGEST, "well_log", "");
}
......
......@@ -10,21 +10,30 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.azure.CosmosStore;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.workflow.exception.WorkflowNotFoundException;
import org.opengroup.osdu.workflow.model.WorkflowStatus;
import org.opengroup.osdu.workflow.model.WorkflowStatusType;
import org.opengroup.osdu.workflow.provider.azure.WorkflowApplication;
import org.opengroup.osdu.workflow.provider.azure.config.CosmosConfig;
import org.opengroup.osdu.workflow.provider.azure.model.WorkflowStatusDoc;
import org.springframework.boot.test.context.SpringBootTest;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import javax.inject.Named;
import java.io.IOException;
import java.util.Optional;
@RunWith(MockitoJUnitRunner.class)
......@@ -32,33 +41,40 @@ import java.io.IOException;
public class WorkflowStatusRepositoryTest {
private static final String TEST_WORKFLOW_ID = "test-workflow-id";
private static final String TEST_AIRFLOW_RUN_ID = "test-airflow-run-id";
private static final String DATABASE_NAME = "someDatabase";
private static final String WORKFLOW_STATUS_COLLECTION_NAME = "someWorkflowStatusName";
private static final String PARTITION_ID = "somePartition";
@Mock
private CosmosItem cosmosItem;
@Mock
private CosmosItemResponse cosmosResponse;
private CosmosStore cosmosStore;
@Mock
private CosmosItemProperties cosmosItemProperties;
private CosmosConfig cosmosConfig;
@Mock
@Named("WORKFLOW_STATUS_CONTAINER")
private CosmosContainer workflowStatusContainer;
private DpsHeaders dpsHeaders;
@InjectMocks
private WorkflowStatusRepository workflowStatusRepository;
@Before
public void initMocks() throws Exception {
doReturn(cosmosItem).when(workflowStatusContainer).getItem(any(), any());
doReturn(cosmosResponse).when(cosmosItem).read(any());
doReturn(cosmosItemProperties).when(cosmosResponse).getProperties();
when(cosmosConfig.getDatabase()).thenReturn(DATABASE_NAME);
when(cosmosConfig.getWorkflowStatusCollection()).thenReturn(WORKFLOW_STATUS_COLLECTION_NAME);
when(dpsHeaders.getPartitionId()).thenReturn(PARTITION_ID);
}
@Test
public void shouldFindWorkflowStatusByWorkflowId() throws IOException {
WorkflowStatusDoc workflowStatusDoc = createWorkflowStatusDocWithStatusFinished();
doReturn(workflowStatusDoc)
.when(cosmosItemProperties)
.getObject(any());
WorkflowStatus workflowStatus = workflowStatusRepository.findWorkflowStatus("TestWorkflowId");
when(cosmosStore.findItem(
eq(PARTITION_ID),
eq(DATABASE_NAME),
eq(WORKFLOW_STATUS_COLLECTION_NAME),
eq(TEST_WORKFLOW_ID),
eq(TEST_WORKFLOW_ID),
eq(WorkflowStatusDoc.class))).thenReturn(Optional.of(workflowStatusDoc));
WorkflowStatus workflowStatus = workflowStatusRepository.findWorkflowStatus(TEST_WORKFLOW_ID);
Assert.assertNotNull(workflowStatus);
Assert.assertEquals(getWorkflowStatus().getAirflowRunId(), workflowStatusDoc.getAirflowRunId());
Assert.assertEquals(getWorkflowStatus().getWorkflowId(), workflowStatusDoc.getWorkflowId());
......@@ -68,17 +84,25 @@ public class WorkflowStatusRepositoryTest {
@Test(expected = WorkflowNotFoundException.class)
public void shouldThrowExceptionWhenWorkflowNotFound() throws CosmosClientException {
doThrow(NotFoundException.class)
.when(cosmosItem)
.read(any());
workflowStatusRepository.findWorkflowStatus("InvalidWorkflowId");
when(cosmosStore.findItem(
eq(PARTITION_ID),
eq(DATABASE_NAME),
eq(WORKFLOW_STATUS_COLLECTION_NAME),
eq(TEST_WORKFLOW_ID),
eq(TEST_WORKFLOW_ID),
eq(WorkflowStatusDoc.class))).thenReturn(Optional.empty());
workflowStatusRepository.findWorkflowStatus(TEST_WORKFLOW_ID);
}
@Test(expected = AppException.class)
public void shouldThrowExceptionWhenCosmosException() throws CosmosClientException {
doThrow(CosmosClientException.class)
.when(cosmosItem)
.read(any());
when(cosmosStore.findItem(
eq(PARTITION_ID),
eq(DATABASE_NAME),
eq(WORKFLOW_STATUS_COLLECTION_NAME),
eq(TEST_WORKFLOW_ID),
eq(TEST_WORKFLOW_ID),
eq(WorkflowStatusDoc.class))).thenThrow(new AppException(500, "", ""));
workflowStatusRepository.findWorkflowStatus(TEST_WORKFLOW_ID);
}
......@@ -97,40 +121,37 @@ public class WorkflowStatusRepositoryTest {
.airflowRunId(TEST_AIRFLOW_RUN_ID)
.workflowStatusType(WorkflowStatusType.FINISHED)
.build();
doReturn(createWorkflowStatusDocWithStatusFinished())
.when(cosmosItemProperties)
.getObject(any());
WorkflowStatus status = workflowStatusRepository.saveWorkflowStatus(workflowstatus);
Assert.assertNotNull(status);
Assert.assertEquals(status.getWorkflowId(), workflowstatus.getWorkflowId());
Assert.assertEquals(status.getAirflowRunId(), workflowstatus.getAirflowRunId());
}
when(cosmosStore.findItem(
eq(PARTITION_ID),
eq(DATABASE_NAME),
eq(WORKFLOW_STATUS_COLLECTION_NAME),
eq(TEST_WORKFLOW_ID),
eq(TEST_WORKFLOW_ID),
eq(WorkflowStatusDoc.class))).thenReturn(Optional.empty());
doNothing().when(cosmosStore).upsertItem(
eq(PARTITION_ID),
eq(DATABASE_NAME),
eq(WORKFLOW_STATUS_COLLECTION_NAME),
any());
@Test
public void saveWorkflowStatus_return_exising_status_from_collection() throws CosmosClientException {
WorkflowStatus workflowstatus = WorkflowStatus.builder()
.workflowId(TEST_WORKFLOW_ID)
.airflowRunId(TEST_AIRFLOW_RUN_ID)
.workflowStatusType(WorkflowStatusType.SUBMITTED)
.build();
doThrow(NotFoundException.class)
.when(cosmosItem)
.read(any());
WorkflowStatus status = workflowStatusRepository.saveWorkflowStatus(workflowstatus);
Assert.assertNotNull(status);
Assert.assertEquals(status.getWorkflowId(), workflowstatus.getWorkflowId());
Assert.assertEquals(status.getAirflowRunId(), workflowstatus.getAirflowRunId());
}
@Test
public void updateWorkflowStatus() throws IOException {