Commit 5a194280 authored by Rustam Lotsmanenko (EPAM)'s avatar Rustam Lotsmanenko (EPAM)
Browse files

Add a version of Airflow into an endpoint 'info' for Workflow Service


Signed-off-by: Rustam Lotsmanenko (EPAM)'s avatarRustam_Lotsmanenko <Rustam_Lotsmanenko@epam.com>
parent a6df370b
Pipeline #80697 failed with stages
in 42 seconds
package org.opengroup.osdu.workflow.di;
import java.util.ArrayList;
import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.opengroup.osdu.core.common.info.ConnectedOuterServicesBuilder;
import org.opengroup.osdu.core.common.model.info.ConnectedOuterService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.opengroup.osdu.workflow.service.AirflowV2WorkflowEngineServiceImpl;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@Component
@ConditionalOnMissingBean(type = "ConnectedOuterServicesBuilder")
@RequiredArgsConstructor
@ConditionalOnProperty(name = "osdu.airflow.version2", havingValue = "true", matchIfMissing = false)
public class CloudConnectedOuterServicesBuilder implements ConnectedOuterServicesBuilder {
// TODO Need to implement this functionality after upgrading Airflow version,
// currently Airflow doesn't have public version info endpoint.
public static final String AIRFLOW = "Airflow";
private final AirflowV2WorkflowEngineServiceImpl airflowV2WorkflowEngineService;
@Override
public List<ConnectedOuterService> buildConnectedOuterServices() {
return new ArrayList<>();
String airflowVersion = airflowV2WorkflowEngineService.getAirflowVersion();
return ImmutableList.of(
ConnectedOuterService.builder()
.name(AIRFLOW)
.version(airflowVersion)
.build()
);
}
}
package org.opengroup.osdu.workflow.service;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import static java.lang.String.format;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.WebResource;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONObject;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.workflow.config.AirflowConfig;
import org.opengroup.osdu.workflow.model.*;
import org.opengroup.osdu.workflow.model.AirflowGetDAGRunStatus;
import org.opengroup.osdu.workflow.model.ClientResponse;
import org.opengroup.osdu.workflow.model.TriggerWorkflowResponse;
import org.opengroup.osdu.workflow.model.WorkflowEngineRequest;
import org.opengroup.osdu.workflow.model.WorkflowStatusType;
import org.opengroup.osdu.workflow.provider.interfaces.IWorkflowEngineService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import static java.lang.String.format;
@Service
@Slf4j
@ConditionalOnProperty(name = "osdu.airflow.version2", havingValue = "true", matchIfMissing=false)
public class AirflowV2WorkflowEngineServiceImpl implements IWorkflowEngineService {
private static final String RUN_ID_PARAMETER_NAME_STABLE = "dag_run_id";
private static final String RUN_ID_PARAMETER_NAME_STABLE = "dag_run_id";
private static final String AIRFLOW_EXECUTION_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss";
private static final String AIRFLOW_PAYLOAD_PARAMETER_NAME = "conf";
private static final String EXECUTION_DATE_PARAMETER_NAME = "execution_date";
private static final String TRIGGER_AIRFLOW_ENDPOINT_STABLE = "api/v1/dags/%s/dagRuns";
private static final String AIRFLOW_RUN_ENDPOINT_STABLE = "api/v1/dags/%s/dagRuns/%s";
private static final String AIRFLOW_VERSION_ENDPOINT = "api/v1/version";
private static final String NOT_AVAILABLE = "N/A";
public static final String VERSION = "version";
private static final String AIRFLOW_TRIGGER_DAG_ERROR_MESSAGE =
"Failed to trigger workflow with id %s and name %s";
private static final String AIRFLOW_WORKFLOW_RUN_NOT_FOUND =
"No WorkflowRun executed for Workflow: %s on %s ";
private final Client restClient;
private final Client restClient;
private final AirflowConfig airflowConfig;
public AirflowV2WorkflowEngineServiceImpl(Client restClient, AirflowConfig airflowConfig){
......@@ -131,6 +137,32 @@ public class AirflowV2WorkflowEngineServiceImpl implements IWorkflowEngineServic
}
}
public String getAirflowVersion() {
ClientResponse clientResponse =
callAirflow(
HttpMethod.GET,
AIRFLOW_VERSION_ENDPOINT,
null,
null,
null);
try {
ObjectMapper om = new ObjectMapper();
String body = clientResponse.getResponseBody().toString();
JsonNode jsonNode = om.readValue(body, JsonNode.class);
if (jsonNode.has(VERSION)) {
return jsonNode.get(VERSION).asText();
} else {
log.error("Unable to locate version in Airflow response. Airflow response: {}.",
clientResponse);
return NOT_AVAILABLE;
}
} catch (JsonProcessingException e) {
log.error("Unable to Process(Parse, Generate) JSON value. Airflow response: {}.",
clientResponse);
return NOT_AVAILABLE;
}
}
protected ClientResponse callAirflow(String httpMethod, String apiEndpoint, String body,
WorkflowEngineRequest rq, String errorMessage) {
String url = format("%s/%s", airflowConfig.getUrl(), apiEndpoint);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment