Commit 5e0816ea authored by Rustam Lotsmanenko (EPAM)'s avatar Rustam Lotsmanenko (EPAM)
Browse files

Add airflow version for info endpoint, enable stable API for gcp


Signed-off-by: Rustam Lotsmanenko (EPAM)'s avatarRustam_Lotsmanenko <Rustam_Lotsmanenko@epam.com>
parent a6df370b
Pipeline #78941 failed with stages
in 46 minutes and 9 seconds
......@@ -15,4 +15,5 @@ data:
OSDU_AIRFLOW_URL: "{{ .Values.data.gcp_airflow_url }}"
WORKFLOW_ADMIN_ACCOUNT: "{{ .Values.data.workflow_admin_account }}"
SHARED_TENANT_NAME: "{{ .Values.data.shared_tenant_name }}"
OSDU_AIRFLOW_VERSION2: "{{ .Values.data.airflow_version_2 }}"
......@@ -11,6 +11,7 @@ data:
gcp_airflow_url: ""
workflow_admin_account: ""
shared_tenant_name: ""
airflow_version_2: "true"
conf:
configmap: "workflow-config"
......
/*
* Copyright 2020-2021 Google LLC
* Copyright 2020-2021 EPAM Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.workflow.provider.gcp.osm.repository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.gcp.osm.service.Context;
import org.opengroup.osdu.workflow.model.WorkflowStatus;
import org.opengroup.osdu.workflow.provider.gcp.config.WorkflowPropertiesConfiguration;
import org.opengroup.osdu.workflow.provider.gcp.osm.config.IDestinationProvider;
import org.opengroup.osdu.workflow.provider.gcp.repository.IWorkflowStatusRepository;
import org.springframework.stereotype.Service;
@Service
@Slf4j
@RequiredArgsConstructor
public class GcpOsmWorkflowStatusRepository implements IWorkflowStatusRepository {
private final WorkflowPropertiesConfiguration workflowConfig;
private final IDestinationProvider destinationProvider;
private final Context context;
private final TenantInfo tenantInfo;
@Override
public WorkflowStatus saveWorkflowStatus(WorkflowStatus workflowStatus) {
log.info("Saving workflow status. Workflow status id : {}", workflowStatus.getWorkflowId());
return context.upsertAndGet(workflowStatus, this.destinationProvider.getDestination(tenantInfo,
workflowConfig.getWorkflowStatusKind()));
}
}
/*
* Copyright 2020-2021 Google LLC
* Copyright 2020-2021 EPAM Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.workflow.provider.gcp.repository;
import org.opengroup.osdu.workflow.model.WorkflowStatus;
public interface IWorkflowStatusRepository {
WorkflowStatus saveWorkflowStatus(WorkflowStatus workflowStatus);
}
......@@ -17,7 +17,6 @@
package org.opengroup.osdu.workflow.provider.gcp.service;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.opengroup.osdu.workflow.logging.LoggerUtils.getTruncatedData;
......@@ -25,28 +24,14 @@ import static org.opengroup.osdu.workflow.logging.LoggerUtils.getTruncatedData;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.DatastoreException;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.Key;
import com.google.cloud.datastore.Transaction;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.legal.PersistenceException;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.multitenancy.IDatastoreFactory;
import org.opengroup.osdu.workflow.config.AirflowConfig;
import org.opengroup.osdu.workflow.model.ClientResponse;
import org.opengroup.osdu.workflow.model.WorkflowEngineRequest;
import org.opengroup.osdu.workflow.model.WorkflowStatusType;
import org.opengroup.osdu.workflow.provider.gcp.config.WorkflowPropertiesConfiguration;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
......@@ -55,18 +40,8 @@ import org.springframework.stereotype.Service;
@RequiredArgsConstructor
public class ComposerIaapClient {
private static final String KEY_AIRFLOW_RUN_ID = "AirflowRunID";
private static final String KEY_WORKFLOW_ID = "WorkflowID";
private static final String KEY_RUN_ID = "RunID";
private static final String KEY_STATUS = "Status";
private final GoogleIapHelper googleIapHelper;
private final AirflowConfig airflowConfig;
private final WorkflowPropertiesConfiguration workflowConfig;
private final TenantInfo tenantInfo;
private final Map<String, Datastore> tenantRepositories = new HashMap<>();
private final IDatastoreFactory datastoreFactory;
private final ITenantFactory tenantFactory;
public ClientResponse sendAirflowRequest(
String httpMethod, String url, String stringData, WorkflowEngineRequest rq) {
......@@ -81,9 +56,7 @@ public class ComposerIaapClient {
this.googleIapHelper.buildIapRequest(url, iapClientId, httpMethod, stringData);
HttpResponse response = httpRequest.execute();
String content = IOUtils.toString(response.getContent(), UTF_8);
if (HttpMethod.POST.name().equals(httpMethod) && response.getStatusCode() == 200) {
saveWorkflowStatus(content, rq);
}
return ClientResponse.builder()
.contentEncoding(response.getContentEncoding())
.contentType(response.getContentType())
......@@ -103,56 +76,4 @@ public class ComposerIaapClient {
}
}
private void saveWorkflowStatus(String content, WorkflowEngineRequest rq) {
log.info("Saving workflow status. Workflow status id : {}", rq.getWorkflowId());
String airflowRunId = getRunIdFromResponse(content);
if (isNullOrEmpty(airflowRunId)) {
airflowRunId = rq.getRunId();
}
Datastore ds = getDatastore();
Transaction txn = ds.newTransaction();
try {
Entity entity = buildStatusEntity(airflowRunId, rq.getWorkflowName(), rq.getRunId());
txn.put(entity);
txn.commit();
} catch (DatastoreException ex) {
throw new PersistenceException(ex.getCode(), ex.getMessage(), ex.getReason());
} finally {
if (txn.isActive()) {
txn.rollback();
}
}
}
private Entity buildStatusEntity(String airflowRunId,
String workflowName, String runId) {
Datastore ds = getDatastore();
Key newKey = ds.newKeyFactory()
.setKind(workflowConfig.getWorkflowStatusKind())
.newKey(runId);
return Entity.newBuilder(newKey)
.set(KEY_WORKFLOW_ID, workflowName)
.set(KEY_AIRFLOW_RUN_ID, airflowRunId)
.set(KEY_RUN_ID, runId)
.set(KEY_STATUS, WorkflowStatusType.SUBMITTED.name()).build();
}
private String getRunIdFromResponse(String response) {
String[] responsePath = response.split(" ");
if (responsePath.length < 10) {
log.warn(String.format("Incorrect response from airflow. Response: %s", response));
return "";
}
return responsePath[6].replace(",", "");
}
private Datastore getDatastore() {
String tenantName = this.tenantInfo.getName();
if (!this.tenantRepositories.containsKey(tenantName)) {
TenantInfo info = this.tenantFactory.getTenantInfo(tenantName);
Datastore datastore = this.datastoreFactory.getDatastore(info);
this.tenantRepositories.put(tenantName, datastore);
}
return this.tenantRepositories.get(tenantName);
}
}
......@@ -17,12 +17,15 @@
package org.opengroup.osdu.gcp.workflow.workflow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.opengroup.osdu.workflow.consts.TestConstants.CREATE_WORKFLOW_RUN_URL;
import static org.opengroup.osdu.workflow.consts.TestConstants.CREATE_WORKFLOW_URL;
import static org.opengroup.osdu.workflow.consts.TestConstants.CREATE_WORKFLOW_WORKFLOW_NAME;
import static org.opengroup.osdu.workflow.consts.TestConstants.GET_DETAILS_WORKFLOW_RUN_URL;
import static org.opengroup.osdu.workflow.util.PayloadBuilder.buildCreateWorkflowRunValidPayloadWithGivenRunId;
import static org.opengroup.osdu.workflow.util.PayloadBuilder.buildUpdateWorkflowPayload;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
......@@ -30,10 +33,14 @@ import com.google.gson.JsonElement;
import com.sun.jersey.api.client.ClientResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.ws.rs.HttpMethod;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.opengroup.osdu.gcp.workflow.util.HTTPClientGCP;
import org.opengroup.osdu.workflow.workflow.v3.WorkflowRunV3IntegrationTests;
......@@ -87,7 +94,46 @@ public class TestWorkflowRunV3Integration extends WorkflowRunV3IntegrationTests
sendDeleteRequest(url);
}
protected ClientResponse sendWorkflowRunFinishedUpdateRequest(String workflowName, String runId) throws Exception {
/*
* this test case will not work for airflow 2.0
*/
@Override
@Test
@Disabled
public void triggerWorkflowRun_should_returnBadRequest_when_givenDuplicateRunId() throws Exception {
//super.triggerWorkflowRun_should_returnBadRequest_when_givenDuplicateRunId();
}
@Override
@Test
public void triggerWorkflowRun_should_returnConflict_when_givenDuplicateRunId_with_airflow2_stable_API()
throws Exception {
String workflowResponseBody = createWorkflow();
Map<String, String> workflowInfo =
new ObjectMapper().readValue(workflowResponseBody, HashMap.class);
createdWorkflows.add(workflowInfo);
String workflowRunResponseBody = createWorkflowRun();
Map<String, String> workflowRunInfo =
new ObjectMapper().readValue(workflowRunResponseBody, HashMap.class);
createdWorkflowRuns.add(workflowRunInfo);
String duplicateRunIdPayload = buildCreateWorkflowRunValidPayloadWithGivenRunId(
workflowRunInfo.get(WORKFLOW_RUN_ID_FIELD));
ClientResponse duplicateRunIdResponse = client.send(
HttpMethod.POST,
String.format(CREATE_WORKFLOW_RUN_URL, CREATE_WORKFLOW_WORKFLOW_NAME),
duplicateRunIdPayload,
headers,
client.getAccessToken()
);
assertEquals(org.apache.http.HttpStatus.SC_CONFLICT, duplicateRunIdResponse.getStatus());
}
protected ClientResponse sendWorkflowRunFinishedUpdateRequest(String workflowName, String runId)
throws Exception {
return client.send(
HttpMethod.PUT,
String.format(GET_DETAILS_WORKFLOW_RUN_URL, workflowName,
......
package org.opengroup.osdu.workflow.di;
import java.util.ArrayList;
import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.opengroup.osdu.core.common.info.ConnectedOuterServicesBuilder;
import org.opengroup.osdu.core.common.model.info.ConnectedOuterService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.opengroup.osdu.workflow.service.AirflowV2WorkflowEngineServiceImpl;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@Component
@ConditionalOnMissingBean(type = "ConnectedOuterServicesBuilder")
@RequiredArgsConstructor
@ConditionalOnProperty(name = "osdu.airflow.version2", havingValue = "true", matchIfMissing = false)
public class CloudConnectedOuterServicesBuilder implements ConnectedOuterServicesBuilder {
// TODO Need to implement this functionality after upgrading Airflow version,
// currently Airflow doesn't have public version info endpoint.
public static final String AIRFLOW = "Airflow";
private final AirflowV2WorkflowEngineServiceImpl airflowV2WorkflowEngineService;
@Override
public List<ConnectedOuterService> buildConnectedOuterServices() {
return new ArrayList<>();
String airflowVersion = airflowV2WorkflowEngineService.getAirflowVersion();
return ImmutableList.of(
ConnectedOuterService.builder()
.name(AIRFLOW)
.version(airflowVersion)
.build()
);
}
}
package org.opengroup.osdu.workflow.service;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import static java.lang.String.format;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.WebResource;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONObject;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.workflow.config.AirflowConfig;
import org.opengroup.osdu.workflow.model.*;
import org.opengroup.osdu.workflow.model.AirflowGetDAGRunStatus;
import org.opengroup.osdu.workflow.model.ClientResponse;
import org.opengroup.osdu.workflow.model.TriggerWorkflowResponse;
import org.opengroup.osdu.workflow.model.WorkflowEngineRequest;
import org.opengroup.osdu.workflow.model.WorkflowStatusType;
import org.opengroup.osdu.workflow.provider.interfaces.IWorkflowEngineService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import static java.lang.String.format;
@Service
@Slf4j
@ConditionalOnProperty(name = "osdu.airflow.version2", havingValue = "true", matchIfMissing=false)
public class AirflowV2WorkflowEngineServiceImpl implements IWorkflowEngineService {
private static final String RUN_ID_PARAMETER_NAME_STABLE = "dag_run_id";
private static final String RUN_ID_PARAMETER_NAME_STABLE = "dag_run_id";
private static final String AIRFLOW_EXECUTION_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss";
private static final String AIRFLOW_PAYLOAD_PARAMETER_NAME = "conf";
private static final String EXECUTION_DATE_PARAMETER_NAME = "execution_date";
private static final String TRIGGER_AIRFLOW_ENDPOINT_STABLE = "api/v1/dags/%s/dagRuns";
private static final String AIRFLOW_RUN_ENDPOINT_STABLE = "api/v1/dags/%s/dagRuns/%s";
private static final String AIRFLOW_VERSION_ENDPOINT = "api/v1/version";
private static final String NOT_AVAILABLE = "N/A";
public static final String VERSION = "version";
private static final String AIRFLOW_TRIGGER_DAG_ERROR_MESSAGE =
"Failed to trigger workflow with id %s and name %s";
private static final String AIRFLOW_WORKFLOW_RUN_NOT_FOUND =
"No WorkflowRun executed for Workflow: %s on %s ";
private final Client restClient;
private final Client restClient;
private final AirflowConfig airflowConfig;
public AirflowV2WorkflowEngineServiceImpl(Client restClient, AirflowConfig airflowConfig){
......@@ -131,6 +137,30 @@ public class AirflowV2WorkflowEngineServiceImpl implements IWorkflowEngineServic
}
}
public String getAirflowVersion() {
ClientResponse clientResponse =
callAirflow(
HttpMethod.GET,
AIRFLOW_VERSION_ENDPOINT,
null,
null,
null);
try {
ObjectMapper om = new ObjectMapper();
String body = clientResponse.getResponseBody().toString();
JsonNode jsonNode = om.readValue(body, JsonNode.class);
if (jsonNode.has(VERSION)) {
return jsonNode.get(VERSION).asText();
} else {
return NOT_AVAILABLE;
}
} catch (JsonProcessingException e) {
log.error("Unable to Process(Parse, Generate) JSON value. Airflow response: {}.",
clientResponse);
return NOT_AVAILABLE;
}
}
protected ClientResponse callAirflow(String httpMethod, String apiEndpoint, String body,
WorkflowEngineRequest rq, String errorMessage) {
String url = format("%s/%s", airflowConfig.getUrl(), apiEndpoint);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment