Commit 710860e8 authored by kiranveerapaneni's avatar kiranveerapaneni
Browse files

Merge remote-tracking branch 'origin/master' into SystemApiToLeverageIstio

parents 3dd4cfe4 9aaf1ac8
Pipeline #73266 failed with stages
in 49 minutes
......@@ -53,7 +53,7 @@ include:
- project: "osdu/platform/ci-cd-pipelines"
ref: "master"
file: "cloud-providers/osdu-gcp-cloudrun.yml"
file: "cloud-providers/osdu-gcp-gke.yml"
- project: "osdu/platform/ci-cd-pipelines"
ref: "master"
......
This diff is collapsed.
......@@ -13,6 +13,7 @@
* [GET /v1/workflow/{workflow_name}/workflowRun](#get-v1workflowworkflow_nameworkflowrun)
* [GET /v1/workflow/{workflow_name}/workflowRun/{runId}](#get-v1workflowworkflow_nameworkflowrunrunid)
* [PUT /v1/workflow/{workflow_name}/workflowRun/{runId}](#put-v1workflowworkflow_nameworkflowrunrunid)
* [Airflow 2.0 support](#airflow-2-support)
* [Service Provider Interfaces](#workflow-service-provider-interfaces)
* [GCP implementation](#gcp-implementation)
* [Firestore](#firestore-collections)
......@@ -344,6 +345,22 @@ curl --location --request PUT 'https://{path}/v1/workflow/{workflow_name}/workfl
| status | `String` | Workflow status |
| submittedBy | `String` | User Id who started the workflow |
## Airflow 2 support
as per airflow community, airflow experimnetal API will be discontinued. with help of MR 160 we added airflow 2.0 stable api support. for more details please check mentioned MR. 160
#### Procedure to switch airflow 2.0 :
- add following properties
| Key | Value | Decription |
| ------ | ------ | ------ |
| osdu.airflow.version2 | true | if this property is missing or false, airflow 1 experimental api will be called
| osdu.airflow.username| `<_airflow_username_>` | airflow username if basic auth is enabled
| osdu.airflow.password | `<_airflow-password_>` | airflow password if basic auth is enabled
- override and disable integration test case ` org.opengroup.osdu.workflow.workflow.v3.WorkflowRunV3IntegrationTests.triggerWorkflowRun_should_returnBadRequest_when_givenDuplicateRunId()` in provider level.
- override and enable integration test case `org.opengroup.osdu.workflow.workflow.v3.WorkflowRunV3IntegrationTests.triggerWorkflowRun_should_returnConflict_when_givenDuplicateRunId_with_airflow2_stable_API()` in provider level
## Workflow Service Provider Interfaces
The Workflow service has several Service Provider Interfaces that the classes need to implement.
......
......@@ -4,9 +4,9 @@
data:
requests_cpu: "0.25"
requests_memory: "256M"
requests_memory: "1G"
limits_cpu: "1"
limits_memory: "1G"
limits_memory: "2G"
serviceAccountName: ""
image: ""
......
......@@ -49,7 +49,7 @@
<javax.inject.version>1</javax.inject.version>
<org.mapstruct.version>1.3.1.Final</org.mapstruct.version>
<maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
<os-core-common.version>0.12.0-SNAPSHOT</os-core-common.version>
<os-core-common.version>0.12.0-rc3</os-core-common.version>
<springfox.version>3.0.0</springfox.version>
</properties>
......
......@@ -31,7 +31,7 @@
<properties>
<azure.version>2.1.7</azure.version>
<osdu.corelibazure.version>0.11.0</osdu.corelibazure.version>
<osdu.corelibazure.version>0.12.0-rc10</osdu.corelibazure.version>
<azure.appservice.resourcegroup />
<azure.appservice.plan />
<azure.appservice.appname />
......
......@@ -27,8 +27,8 @@ import org.springframework.stereotype.Service;
import static java.lang.String.format;
@Service
@Primary
//@Service
//@Primary
@Slf4j
public class WorkflowEngineServiceImpl implements IWorkflowEngineService {
private static final String RUN_ID_PARAMETER_NAME = "run_id";
......
......@@ -36,7 +36,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<spring-boot.version>2.2.2.RELEASE</spring-boot.version>
<jersey.version>1.19.4</jersey.version>
<os-core-common.version>0.11.0-rc4</os-core-common.version>
<os-core-common.version>0.12.0-rc3</os-core-common.version>
</properties>
<dependencyManagement>
......
......@@ -19,7 +19,12 @@ package org.opengroup.osdu.workflow.workflow.v3;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.jersey.api.client.ClientResponse;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.opengroup.osdu.workflow.util.HTTPClient;
import org.opengroup.osdu.workflow.util.v3.TestBase;
import org.springframework.http.HttpStatus;
......@@ -195,6 +200,11 @@ public abstract class WorkflowRunV3IntegrationTests extends TestBase {
createdWorkflowRuns.add(workflowRunInfo);
}
/*
* after switch to airflow 2.0 stable API, this has to be marked as ignore or
* removed. Airflow 2.x stable API will always throw 409 Conflicts instead of 400
* BAD REQUEST when you pass duplicate run id
*/
@Test
public void triggerWorkflowRun_should_returnBadRequest_when_givenDuplicateRunId() throws Exception {
String workflowResponseBody = createWorkflow();
......@@ -217,6 +227,34 @@ public abstract class WorkflowRunV3IntegrationTests extends TestBase {
assertEquals(org.apache.http.HttpStatus.SC_BAD_REQUEST, duplicateRunIdResponse.getStatus());
}
/*
* Enable this Integration test for airflow 2.0 stable API by removing
* '@Disbaled'
*/
@Test
@Disabled("Until switch to airflow 2.0 stable api")
public void triggerWorkflowRun_should_returnConflict_when_givenDuplicateRunId_with_airflow2_stable_API() throws Exception {
String workflowResponseBody = createWorkflow();
Map<String, String> workflowInfo = new ObjectMapper().readValue(workflowResponseBody, HashMap.class);
createdWorkflows.add(workflowInfo);
String workflowRunResponseBody = createWorkflowRun();
Map<String, String> workflowRunInfo = new ObjectMapper().readValue(workflowRunResponseBody, HashMap.class);
createdWorkflowRuns.add(workflowRunInfo);
String duplicateRunIdPayload = buildCreateWorkflowRunValidPayloadWithGivenRunId(workflowRunInfo.get(WORKFLOW_RUN_ID_FIELD));
ClientResponse duplicateRunIdResponse = client.send(
HttpMethod.POST,
String.format(CREATE_WORKFLOW_RUN_URL, CREATE_WORKFLOW_WORKFLOW_NAME),
duplicateRunIdPayload,
headers,
client.getAccessToken()
);
assertEquals(org.apache.http.HttpStatus.SC_CONFLICT, duplicateRunIdResponse.getStatus());
}
@Test
public void triggerWorkflowRun_should_return_WorkflowNotFound_when_givenInvalidWorkflowName() throws Exception {
......
......@@ -3,8 +3,12 @@ package org.opengroup.osdu.ibm.workflow.workflow.v3;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.opengroup.osdu.workflow.consts.TestConstants.GET_DETAILS_WORKFLOW_RUN_URL;
import static org.opengroup.osdu.workflow.util.PayloadBuilder.buildCreateWorkflowRunValidPayloadWithGivenRunId;
import static org.opengroup.osdu.workflow.consts.TestConstants.CREATE_WORKFLOW_RUN_URL;
import static org.opengroup.osdu.workflow.consts.TestConstants.CREATE_WORKFLOW_WORKFLOW_NAME;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.ws.rs.HttpMethod;
......@@ -18,6 +22,7 @@ import org.opengroup.osdu.ibm.workflow.util.HTTPClientIBM;
import org.opengroup.osdu.workflow.workflow.v3.WorkflowRunV3IntegrationTests;
import org.springframework.http.HttpStatus;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.jersey.api.client.ClientResponse;
public class TestWorkflowRunV3Integration extends WorkflowRunV3IntegrationTests {
......@@ -28,9 +33,9 @@ public class TestWorkflowRunV3Integration extends WorkflowRunV3IntegrationTests
this.client = new HTTPClientIBM();
this.headers = client.getCommonHeader();
try {
deleteTestWorkflows(CREATE_WORKFLOW_WORKFLOW_NAME);
deleteTestWorkflows(CREATE_WORKFLOW_WORKFLOW_NAME);
} catch (Exception e) {
throw e;
throw e;
}
}
......@@ -49,18 +54,55 @@ public class TestWorkflowRunV3Integration extends WorkflowRunV3IntegrationTests
@Disabled
public void shouldReturn400WhenGetDetailsForSpecificWorkflowRunInstance() throws Exception {
String workflowId = UUID
.randomUUID().toString().replace("-", "");
String runId = UUID
.randomUUID().toString().replace("-", "");
ClientResponse getResponse = client.send(
HttpMethod.GET,
String.format(GET_DETAILS_WORKFLOW_RUN_URL, workflowId, runId),
null,
headers,
client.getAccessToken()
);
assertEquals(HttpStatus.NOT_FOUND, getResponse.getStatus());
.randomUUID().toString().replace("-", "");
String runId = UUID
.randomUUID().toString().replace("-", "");
ClientResponse getResponse = client.send(
HttpMethod.GET,
String.format(GET_DETAILS_WORKFLOW_RUN_URL, workflowId, runId),
null,
headers,
client.getAccessToken()
);
assertEquals(HttpStatus.NOT_FOUND, getResponse.getStatus());
}
/*
* this test case will not work for airflow 2.0, enable test case :
* triggerWorkflowRun_should_returnConflict_when_givenDuplicateRunId_with_airflow2_stable_API
*/
@Override
@Test
@Disabled
public void triggerWorkflowRun_should_returnBadRequest_when_givenDuplicateRunId() throws Exception {
//super.triggerWorkflowRun_should_returnBadRequest_when_givenDuplicateRunId();
}
@Override
@Test
public void triggerWorkflowRun_should_returnConflict_when_givenDuplicateRunId_with_airflow2_stable_API()
throws Exception {
String workflowResponseBody = createWorkflow();
Map<String, String> workflowInfo = new ObjectMapper().readValue(workflowResponseBody, HashMap.class);
createdWorkflows.add(workflowInfo);
String workflowRunResponseBody = createWorkflowRun();
Map<String, String> workflowRunInfo = new ObjectMapper().readValue(workflowRunResponseBody, HashMap.class);
createdWorkflowRuns.add(workflowRunInfo);
String duplicateRunIdPayload = buildCreateWorkflowRunValidPayloadWithGivenRunId(workflowRunInfo.get(WORKFLOW_RUN_ID_FIELD));
ClientResponse duplicateRunIdResponse = client.send(
HttpMethod.POST,
String.format(CREATE_WORKFLOW_RUN_URL, CREATE_WORKFLOW_WORKFLOW_NAME),
duplicateRunIdPayload,
headers,
client.getAccessToken()
);
assertEquals(org.apache.http.HttpStatus.SC_CONFLICT, duplicateRunIdResponse.getStatus());
}
private void deleteAllTestWorkflowRecords() {
......
......@@ -24,6 +24,7 @@ public class AirflowConfig {
private String password;
private boolean dagRunAbstractionEnabled;
private String controllerDagId;
private boolean version2;
public String getAppKey() {
if (Objects.isNull(username)) {
......
......@@ -6,6 +6,7 @@ import lombok.Data;
@Data
@Builder
public class WorkflowEngineRequest {
private String runId;
private String workflowId;
private String workflowName;
......@@ -15,4 +16,5 @@ public class WorkflowEngineRequest {
private long executionTimeStamp = System.currentTimeMillis();
private boolean isDeployedThroughWorkflowService;
private final boolean isSystemWorkflow;
}
package org.opengroup.osdu.workflow.service;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.WebResource;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONObject;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.workflow.config.AirflowConfig;
import org.opengroup.osdu.workflow.model.*;
import org.opengroup.osdu.workflow.provider.interfaces.IWorkflowEngineService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import static java.lang.String.format;
@Service
@Slf4j
@ConditionalOnProperty(name = "osdu.airflow.version2", havingValue = "true", matchIfMissing=false)
public class AirflowV2WorkflowEngineServiceImpl implements IWorkflowEngineService {
private static final String RUN_ID_PARAMETER_NAME_STABLE = "dag_run_id";
private static final String AIRFLOW_EXECUTION_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss";
private static final String AIRFLOW_PAYLOAD_PARAMETER_NAME = "conf";
private static final String EXECUTION_DATE_PARAMETER_NAME = "execution_date";
private static final String TRIGGER_AIRFLOW_ENDPOINT_STABLE = "api/v1/dags/%s/dagRuns";
private static final String AIRFLOW_RUN_ENDPOINT_STABLE = "api/v1/dags/%s/dagRuns/%s";
private static final String AIRFLOW_TRIGGER_DAG_ERROR_MESSAGE =
"Failed to trigger workflow with id %s and name %s";
private static final String AIRFLOW_WORKFLOW_RUN_NOT_FOUND =
"No WorkflowRun executed for Workflow: %s on %s ";
private final Client restClient;
private final AirflowConfig airflowConfig;
public AirflowV2WorkflowEngineServiceImpl(Client restClient, AirflowConfig airflowConfig){
this.restClient = restClient;
this.airflowConfig = airflowConfig;
}
@Override
public void createWorkflow(final WorkflowEngineRequest rq, final Map<String, Object> registrationInstruction) {
// This is not relevant for a default implementation
}
@Override
public void deleteWorkflow(final WorkflowEngineRequest rq) {
// This is not relevant for a default implementation
}
@Override
public void saveCustomOperator(String customOperatorDefinition, String fileName) {
//
}
@Override
public TriggerWorkflowResponse triggerWorkflow(WorkflowEngineRequest rq, Map<String, Object> context) {
log.info("IBM : Submitting ingestion with Airflow 2 with dagName: {}", rq.getDagName());
String url = "";
final JSONObject requestBody = new JSONObject();
requestBody.put(AIRFLOW_PAYLOAD_PARAMETER_NAME, context);
url = format(TRIGGER_AIRFLOW_ENDPOINT_STABLE, rq.getDagName());
requestBody.put(RUN_ID_PARAMETER_NAME_STABLE, rq.getRunId());
final String errMsg = format(AIRFLOW_TRIGGER_DAG_ERROR_MESSAGE, rq.getWorkflowId(), rq.getWorkflowName());
ClientResponse airflowRs = callAirflow(
HttpMethod.POST,
url,
requestBody.toString(),
rq,
errMsg
);
try {
ObjectMapper om = new ObjectMapper();
String body = airflowRs.getResponseBody().toString();
JsonNode jsonNode = om.readValue(body, JsonNode.class);
String execution_date = "";
String dag_run_id = "";
if(jsonNode.has(EXECUTION_DATE_PARAMETER_NAME))
execution_date = jsonNode.get(EXECUTION_DATE_PARAMETER_NAME).asText();
if(jsonNode.has(RUN_ID_PARAMETER_NAME_STABLE))
dag_run_id = jsonNode.get(RUN_ID_PARAMETER_NAME_STABLE).asText();
return new TriggerWorkflowResponse(execution_date, "", dag_run_id );
} catch (JsonProcessingException e) {
log.info("Airflow response: {}.", airflowRs);
final String error = "Unable to Process(Parse, Generate) JSON value";
throw new AppException(HttpStatus.INTERNAL_SERVER_ERROR.value(), error, e.getMessage());
}
}
@Override
public WorkflowStatusType getWorkflowRunStatus(WorkflowEngineRequest rq) {
log.info("getting status of WorkflowRun of Workflow {} executed on {}", rq.getWorkflowName(),
rq.getExecutionTimeStamp());
final String executionDate = executionDate(rq.getExecutionTimeStamp());
String url = format(AIRFLOW_RUN_ENDPOINT_STABLE, rq.getWorkflowName(), rq.getRunId());
final String errMsg = String.format(AIRFLOW_WORKFLOW_RUN_NOT_FOUND, rq.getWorkflowName(), executionDate);
final ClientResponse response = callAirflow(
HttpMethod.GET,
url,
null,
rq,
errMsg);
try {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
final AirflowGetDAGRunStatus airflowResponse =
objectMapper.readValue(response.getResponseBody().toString(),
AirflowGetDAGRunStatus.class);
return airflowResponse.getStatusType();
} catch (JsonProcessingException e) {
final String errorMessage = format("Unable to Process Json Received. %s", e.getMessage());
log.error(errorMessage, e);
throw new AppException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Failed to Get Status from Airflow", errorMessage);
}
}
protected ClientResponse callAirflow(String httpMethod, String apiEndpoint, String body,
WorkflowEngineRequest rq, String errorMessage) {
String url = format("%s/%s", airflowConfig.getUrl(), apiEndpoint);
log.info("Calling airflow 2 endpoint {} with method {}", url, httpMethod);
WebResource webResource = restClient.resource(url);
com.sun.jersey.api.client.ClientResponse response = webResource
.type(MediaType.APPLICATION_JSON)
.header("Authorization", "Basic " + airflowConfig.getAppKey())
.method(httpMethod, com.sun.jersey.api.client.ClientResponse.class, body);
final int status = response.getStatus();
log.info("Received response status: {}.", status);
if (status != HttpStatus.OK.value()) {
String responseBody = response.getEntity(String.class);
throw new AppException(status, responseBody, errorMessage);
}
return ClientResponse.builder()
.contentType(String.valueOf(response.getType()))
.responseBody(response.getEntity(String.class))
.status(HttpStatus.OK)
.statusCode(response.getStatus())
.statusMessage(response.getStatusInfo().getReasonPhrase())
.build();
}
protected String executionDate(final Long executionTimeStamp){
Instant instant = Instant.ofEpochMilli(executionTimeStamp);
ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(instant, ZoneId.of("UTC"));
return zonedDateTime.format(DateTimeFormatter.ofPattern(AIRFLOW_EXECUTION_DATE_FORMAT));
}
}
......@@ -20,7 +20,7 @@ import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.workflow.config.AirflowConfig;
import org.opengroup.osdu.workflow.model.*;
import org.opengroup.osdu.workflow.provider.interfaces.IWorkflowEngineService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
......@@ -28,6 +28,7 @@ import static java.lang.String.format;
@Service
@Slf4j
@ConditionalOnProperty(name = "osdu.airflow.version2", havingValue = "false", matchIfMissing=true)
public class AirflowWorkflowEngineServiceImpl implements IWorkflowEngineService {
private static final String RUN_ID_PARAMETER_NAME = "run_id";
private static final String AIRFLOW_EXECUTION_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss";
......
......@@ -241,13 +241,16 @@ public class WorkflowRunServiceImpl implements IWorkflowRunService {
if (activeStatusTypes.contains(workflowRun.getStatus())) {
final WorkflowMetadata workflowMetadata = getWorkflowByName(workflowRun.getWorkflowName());
final String workflowName = workflowMetadata.getWorkflowName();
final WorkflowEngineRequest rq = WorkflowEngineRequest.builder()
.runId(workflowRun.getRunId())
.workflowName(workflowName)
.executionTimeStamp(workflowRun.getStartTimeStamp())
.workflowEngineExecutionDate(workflowRun.getWorkflowEngineExecutionDate())
.dagName(getDagName(workflowMetadata))
.isSystemWorkflow(workflowMetadata.isSystemWorkflow())
.build();
final WorkflowStatusType currentStatusType = workflowEngineService.getWorkflowRunStatus(rq);
if (currentStatusType != workflowRun.getStatus() && currentStatusType != null) {
if (getCompletedStatusTypes().contains(currentStatusType)) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment