Commit 9337f8af authored by Abhishek Kumar's avatar Abhishek Kumar
Browse files

merging latest code fmro osdu-r3-code

parents 617e39a2 f422faca
Pipeline #24612 failed with stages
in 4 minutes and 13 seconds
......@@ -27,7 +27,7 @@ Example:
```
"{\"id\":\"partition-test:osdudemo:test\", \"authorization\":\"token\", \"dataPartitionId\":\"partition-test\", \"steps\":[\"LOAD_FROM_BLOB\",\"ID\",\"LEGAL\", \"KIND\", \"ACL\", \"META\", \"STORE_TO_BLOB\"], \"source\": \"test/step1\", \"destination\": \"test/step2\", \"storageSas\":\"storage-sas\"}"```
```
Here is a list of the all possible steps: LOAD_FROM_CSV, LOAD_FROM_BLOB, ACL, LEGAL, KIND, ID, TYPE_COERCION, META, STORE_TO_BLOB, STORE_TO_OSDU
Here is a list of the all possible steps: LOAD_FROM_CSV, LOAD_FROM_BLOB, ACL, LEGAL, KIND, ID, TYPE_COERCION, META, UNIT, CRS, RELATIONSHIP, STORE_TO_BLOB, STORE_TO_OSDU
### Configure following environment variables.
```
......@@ -42,6 +42,8 @@ partition_service_endpoint
storage_service_endpoint
schema_service_endpoint
workflow_service_endpoint
search_service_endpoint
unit_service_endpoint
```
# Running Automated Integration Test
......@@ -53,7 +55,7 @@ Test can be executed from commandline using below command from the project root.
```
cd testing
export AZURE_STORAGE_CONTAINER={container};AZURE_CLIENT_ID={client};AZURE_CLIENT_SECRET={secret};AZURE_TENANT_ID={tenant};KEYVAULT_URI=https:{keyVaultUri};aad_client_id={addId};storage_service_endpoint={storageServiceEndpoint};schema_service_endpoint={schemaServiceEndpoint};partition_service_endpoint={partitionServiceEndpoint};workflow_service_endpoint={workflowServiceEndpoint}
export AZURE_STORAGE_CONTAINER={container};AZURE_CLIENT_ID={client};AZURE_CLIENT_SECRET={secret};AZURE_TENANT_ID={tenant};KEYVAULT_URI=https:{keyVaultUri};aad_client_id={addId};storage_service_endpoint={storageServiceEndpoint};schema_service_endpoint={schemaServiceEndpoint};partition_service_endpoint={partitionServiceEndpoint};search_service_endpoint={searchServiceEndpoint};unit_service_endpoint={unitServiceEndpoint}
mvn verify -Dmaven.test.skip=false -DDATA_PARTITION_ID={tenant} -DDMS_KIND={kind} -DDMS_ACL={acl} -DDMS_LEGAL_TAG={legalTag}
```
# Contribute
......
import airflow
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from json import dumps
from datetime import timedelta
# default args for airflow
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Get values from dag run configuration
record_id = "{{ dag_run.conf['runConfiguration']['id'] }}"
kind = "{{ dag_run.conf['runConfiguration']['kind'] }}"
authorization = "{{ dag_run.conf['authToken'] }}"
dataPartitionId = "{{ dag_run.conf['runConfiguration']['dataPartitionId'] }}"
run_id = "{{ dag_run.conf['runId'] }}"
steps = ["LOAD_FROM_CSV", "TYPE_COERCION", "ID", "ACL", "LEGAL", "KIND", "META", "TAGS", "UNIT", "CRS", "RELATIONSHIP", "STORE_TO_OSDU"]
# Constants
DAG_INIT = "dag-init"
DAG_NAME = "csv_ingestion_all_steps"
CSV_PARSER = "csv-parser"
DOCKER_IMAGE = "osdur3mvpcrqa21xncr.azurecr.io/csv-parser-qa:c6d9b7123a5209f20eb7f470af7b79b8494a85e2"
NAMESPACE = "airflow"
# Values to pass to csv parser
params = {
"id": record_id,
"kind": kind,
"authorization": authorization,
"dataPartitionId": dataPartitionId,
"steps": steps
}
# Get environment variables
env_vars = {
"storage_service_endpoint": "https://{{ var.value.dns_host }}/api/storage/v2",
"schema_service_endpoint": "https://{{ var.value.dns_host }}/api/schema-service/v1",
"search_service_endpoint": "https://{{ var.value.dns_host }}/api/search/v2",
"partition_service_endpoint": "https://{{ var.value.dns_host }}/api/partition/v1",
"unit_service_endpoint": "https://{{ var.value.dns_host }}/api/unit/v2/unit/symbol",
"AZURE_STORAGE_CONTAINER": "file-persistent-area",
"AZURE_TENANT_ID": "{{ var.value.azure_tenant_id }}",
"AZURE_CLIENT_ID": "{{ var.value.azure_client_id }}",
"AZURE_CLIENT_SECRET": "{{ var.value.azure_client_secret }}",
"aad_client_id": "{{ var.value.aad_client_id }}",
"KEYVAULT_URI": "{{ var.value.keyvault_uri }}",
"appinsights_key": "{{ var.value.appinsights_key }}"
}
dag = DAG(DAG_NAME, default_args=default_args)
csv_parser = KubernetesPodOperator(
namespace=NAMESPACE,
task_id=CSV_PARSER,
name=CSV_PARSER,
env_vars=env_vars,
arguments=[dumps(params)],
is_delete_operator_pod=True,
image=DOCKER_IMAGE, dag=dag)
......@@ -129,6 +129,55 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.4</version>
<configuration>
<excludes>
<exclude>**/model/*</exclude>
</excludes>
</configuration>
<executions>
<execution>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>report</id>
<goals>
<goal>report</goal>
</goals>
<phase>test</phase>
</execution>
<execution>
<id>check</id>
<goals>
<goal>check</goal>
</goals>
<configuration>
<rules>
<rule>
<element>BUNDLE</element>
<limits>
<limit>
<counter>LINE</counter>
<value>COVEREDRATIO</value>
<minimum>0.90</minimum>
</limit>
<limit>
<counter>BRANCH</counter>
<value>COVEREDRATIO</value>
<minimum>0.90</minimum>
</limit>
</limits>
</rule>
</rules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
......
......@@ -28,7 +28,7 @@ public class CsvIngestionRunner implements CommandLineRunner {
if (args.length != 1) {
throw new BadRequestException("Ingestion request argument is not passed");
}
System.out.println("ARGS " + args[0]);
IngestionRequest ingestionRequest = getIngestionRequest(args[0]);
validate(ingestionRequest);
......
......@@ -3,7 +3,7 @@ package org.opengroup.osdu.csvparser.descriptor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.csvparser.descriptor.model.IngestionDescriptor;
import org.opengroup.osdu.csvparser.exception.IngestionException;
import org.opengroup.osdu.csvparser.exception.model.IngestionException;
import org.opengroup.osdu.csvparser.storage.StorageClient;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Service;
......
......@@ -18,6 +18,11 @@ public class FileContentsDetails {
@JsonProperty("FrameOfReference")
private List<Map<String, Object>> frameOfReference;
private List<Map<String, Map<String, Object>>> relatedNaturalKey;
@JsonProperty("TargetKind")
private String targetKind;
@JsonProperty("unitOfMeasure")
private String unitOfMeasure;
}
......@@ -7,7 +7,6 @@ import lombok.NoArgsConstructor;
import org.opengroup.osdu.core.common.model.entitlements.Acl;
import org.opengroup.osdu.core.common.model.entitlements.validation.ValidAcl;
import org.opengroup.osdu.core.common.model.legal.Legal;
import org.opengroup.osdu.core.common.model.storage.validation.ValidKind;
import org.opengroup.osdu.core.common.model.storage.validation.ValidationDoc;
import javax.validation.Valid;
......@@ -31,8 +30,10 @@ public class IngestionDescriptor {
@NotNull(message = "Ingestion data cannot be empty")
private IngestionData data;
private Map<String, String> tags;
private Map<String, String> dataTypeMapping;
private Map<String, Integer> naturalKeys;
}
\ No newline at end of file
}
package org.opengroup.osdu.csvparser.exception.model;
import org.opengroup.osdu.core.common.exception.CoreException;
public class IngestionException extends CoreException {
public IngestionException(String message) {
super(message);
}
public IngestionException(String message, Throwable cause) {
super(message, cause);
}
}
package org.opengroup.osdu.csvparser.file;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.exception.UnauthorizedException;
import org.opengroup.osdu.csvparser.exception.model.IngestionException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Service;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import static org.springframework.http.HttpStatus.FORBIDDEN;
import static org.springframework.http.HttpStatus.UNAUTHORIZED;
@Slf4j
@RequiredArgsConstructor
@Service
public class FileService {
@Value("${FILE_SERVICE_ENDPOINT}")
private String fileService;
private final RestTemplate rest;
public String getDownloadURL(String id, HttpHeaders requestHeaders) throws RestClientException {
try {
JsonNode signedURL = rest.exchange(
fileService + "/v1/files/" + id + "/downloadURL",
HttpMethod.GET,
new HttpEntity(requestHeaders),
JsonNode.class).getBody();
return signedURL.get("SignedUrl").textValue();
} catch (HttpClientErrorException e) {
if (UNAUTHORIZED.equals(e.getStatusCode()) || FORBIDDEN.equals(e.getStatusCode())) {
throw new UnauthorizedException("User not authorized error: " + e.getMessage(), e);
}
log.error("File service failed with error={}", e.getMessage());
throw new IngestionException("File service failed error: " + e.getMessage(), e);
}
}
public InputStream getFileContent(String id, HttpHeaders requestHeaders) throws IngestionException {
try {
String signedURL = getDownloadURL(id, requestHeaders);
return new URL(signedURL).openStream();
} catch (IOException e) {
log.warn("Error reading file using the SaS URL: " + e.getMessage());
throw new IngestionException("Error reading file from storage: " + e.getMessage(), e);
} catch (UnauthorizedException | IngestionException e) {
log.warn("Error processing the SaS URL: " + e.getMessage());
throw new IngestionException("Error reading file from storage: " + e.getMessage(), e);
}
}
}
......@@ -14,6 +14,10 @@ public enum StepType {
ID,
TYPE_COERCION,
META,
TAGS,
UNIT,
CRS,
RELATIONSHIP,
STORE_TO_BLOB(false, true),
STORE_TO_OSDU(false, true);
......@@ -39,7 +43,7 @@ public enum StepType {
}
public static List<StepType> getDefaultFlow() {
return asList(LOAD_FROM_CSV, ACL, LEGAL, KIND, ID, TYPE_COERCION, META, STORE_TO_OSDU);
return asList(LOAD_FROM_CSV, ACL, LEGAL, KIND, ID, TYPE_COERCION, META, TAGS, UNIT, CRS, RELATIONSHIP, STORE_TO_OSDU);
}
}
package org.opengroup.osdu.csvparser.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.opengroup.osdu.csvparser.blob.BlobStorage;
import org.opengroup.osdu.csvparser.handler.handlers.AclHandler;
import org.opengroup.osdu.csvparser.handler.handlers.BlobStorageHandler;
import org.opengroup.osdu.csvparser.handler.handlers.IdHandler;
import org.opengroup.osdu.csvparser.handler.handlers.KindHandler;
import org.opengroup.osdu.csvparser.handler.handlers.LegalHandler;
import org.opengroup.osdu.csvparser.handler.handlers.MetaHandler;
import org.opengroup.osdu.csvparser.handler.handlers.StorageHandler;
import org.opengroup.osdu.csvparser.handler.handlers.TypeCoercionHandler;
import org.opengroup.osdu.csvparser.handler.handlers.*;
import org.opengroup.osdu.csvparser.ingestion.IngestionContext;
import org.opengroup.osdu.csvparser.storage.StoringStrategy;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
......@@ -22,13 +15,12 @@ import static org.springframework.beans.factory.config.ConfigurableBeanFactory.S
@SuppressWarnings({"unused", "java:S3305"})
@Configuration
@RequiredArgsConstructor
public class HandlerConfig {
@Autowired
private BeanFactory factory;
private final BeanFactory factory;
@Autowired
private ObjectMapper mapper;
private final ObjectMapper mapper;
@Scope(SCOPE_PROTOTYPE)
@Bean
......@@ -66,6 +58,30 @@ public class HandlerConfig {
return new MetaHandler(context.getDescriptor());
}
@Scope(SCOPE_PROTOTYPE)
@Bean
public UnitHandler getUnitHandler(IngestionContext context) {
return new UnitHandler(context.getDescriptor(), context.getOsduServiceHeaders());
}
@Scope(SCOPE_PROTOTYPE)
@Bean
public CrsHandler getCrsHandler(IngestionContext context) {
return new CrsHandler(context.getDescriptor());
}
@Scope(SCOPE_PROTOTYPE)
@Bean
public RelationshipHandler getRelationshipHandler(IngestionContext context) {
return new RelationshipHandler(context.getDescriptor(), context.getOsduServiceHeaders());
}
@Scope(SCOPE_PROTOTYPE)
@Bean
public TagsHandler getTagsHandler(IngestionContext context) {
return new TagsHandler(context.getDescriptor());
}
@Scope(SCOPE_PROTOTYPE)
@Bean
public BlobStorageHandler getBlobStorageHandler(IngestionContext context) {
......
package org.opengroup.osdu.csvparser.handler;
public class HandlerConstants {
private HandlerConstants() {}
public static final String KIND = "kind";
public static final String DATE_TIME = "DateTime";
public static final String KIND_UNIT = "Unit";
public static final String KIND_CRS = "CRS";
public static final String DEFAULT_UOM = "Energistics_UoM";
public static final String PERSISTABLE_REFERENCE = "persistableReference";
public static final String DEPRECATION_INFO = "deprecationInfo";
public static final String NAME = "name";
public static final String STATE = "state";
public static final String IDENTICAL = "identical";
public static final String SUPERSEDED_BY_UNIT = "supersededByUnit";
public static final String PROPERTY_NAMES = "propertyNames";
}
package org.opengroup.osdu.csvparser.handler;
import lombok.RequiredArgsConstructor;
import org.opengroup.osdu.csvparser.exception.IngestionException;
import org.opengroup.osdu.csvparser.exception.model.IngestionException;
import org.opengroup.osdu.csvparser.flow.model.StepType;
import org.opengroup.osdu.csvparser.handler.handlers.AclHandler;
import org.opengroup.osdu.csvparser.handler.handlers.BlobStorageHandler;
import org.opengroup.osdu.csvparser.handler.handlers.IdHandler;
import org.opengroup.osdu.csvparser.handler.handlers.KindHandler;
import org.opengroup.osdu.csvparser.handler.handlers.LegalHandler;
import org.opengroup.osdu.csvparser.handler.handlers.MetaHandler;
import org.opengroup.osdu.csvparser.handler.handlers.StorageHandler;
import org.opengroup.osdu.csvparser.handler.handlers.TypeCoercionHandler;
import org.opengroup.osdu.csvparser.handler.handlers.*;
import org.opengroup.osdu.csvparser.ingestion.IngestionContext;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.stereotype.Component;
......@@ -49,6 +42,22 @@ public class HandlerProvider {
handler = MetaHandler.class;
break;
case UNIT:
handler = UnitHandler.class;
break;
case CRS:
handler = CrsHandler.class;
break;
case RELATIONSHIP:
handler = RelationshipHandler.class;
break;
case TAGS:
handler = TagsHandler.class;
break;
case STORE_TO_BLOB:
handler = BlobStorageHandler.class;
break;
......
......@@ -4,7 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.opengroup.osdu.core.common.model.storage.Record;
import org.opengroup.osdu.csvparser.blob.BlobStorage;
import org.opengroup.osdu.csvparser.exception.IngestionException;
import org.opengroup.osdu.csvparser.exception.model.IngestionException;
import org.opengroup.osdu.csvparser.flow.model.StepType;
import org.opengroup.osdu.csvparser.handler.Handler;
......
package org.opengroup.osdu.csvparser.handler.handlers;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import org.opengroup.osdu.core.common.model.storage.Record;
import org.opengroup.osdu.csvparser.descriptor.model.ExtensionProperties;
import org.opengroup.osdu.csvparser.descriptor.model.FileContentsDetails;
import org.opengroup.osdu.csvparser.descriptor.model.IngestionData;
import org.opengroup.osdu.csvparser.descriptor.model.IngestionDescriptor;
import org.opengroup.osdu.csvparser.flow.model.StepType;
import org.opengroup.osdu.csvparser.handler.Handler;
import java.util.*;
import java.util.stream.Collectors;
import static org.opengroup.osdu.csvparser.flow.model.StepType.CRS;
import static org.opengroup.osdu.csvparser.handler.HandlerConstants.PROPERTY_NAMES;
import static org.opengroup.osdu.csvparser.handler.HandlerConstants.PERSISTABLE_REFERENCE;
import static org.opengroup.osdu.csvparser.handler.HandlerConstants.KIND;
import static org.opengroup.osdu.csvparser.handler.HandlerConstants.KIND_CRS;
@RequiredArgsConstructor
public class CrsHandler implements Handler {
private final IngestionDescriptor descriptor;
@Override
public StepType getStep() {
return CRS;
}
@Override
public void handle(String recordNumber, Record record) {
List<Map<String, Object>> existingCrsProperties = getCrsProperties(descriptor);
if(!existingCrsProperties.isEmpty()) {
List<Map<String, Object>> updatedCrsProperties = copyExistingProperty(existingCrsProperties);
matchUpdatedPropertyNames(record, updatedCrsProperties);
List<Map<String, Object>> crsMeta = filterMissingPersistableReference(updatedCrsProperties);
if (Optional.ofNullable(record.getMeta()).isPresent()) {
crsMeta.addAll(Arrays.asList(record.getMeta()));
}
record.setMeta(crsMeta.toArray(new Map[0]));
}
}
private List<Map<String, Object>> filterMissingPersistableReference(List<Map<String, Object>> updatedCrsProperties) {
return updatedCrsProperties.stream()
.filter(entry -> isPersistableReferenceAvailable(entry))
.collect(Collectors.toList());
}
private boolean isPersistableReferenceAvailable(Map<String, Object> entry) {
if(entry.containsKey(PROPERTY_NAMES)
&& !((List) entry.get(PROPERTY_NAMES)).isEmpty()
&& entry.containsKey(PERSISTABLE_REFERENCE)) {
return true;
}
return false;
}
private void matchUpdatedPropertyNames(Record record, List<Map<String, Object>> updatedCrsProperties) {
for (Map<String, Object> property : updatedCrsProperties) {
if (property.containsKey(PROPERTY_NAMES)) {
property.put(PROPERTY_NAMES, getMatchingPropertyNames(record, property));
}
}
}
private List<Map<String, Object>> copyExistingProperty(List<Map<String, Object>> existingCrsProperties) {
List<Map<String, Object>> updatedCrsProperties = new ArrayList<>();
for (Map<String, Object> map : existingCrsProperties) {
Map<String, Object> copy = new HashMap<>(map);
updatedCrsProperties.add(copy);
}
return updatedCrsProperties;
}
private List<String> getMatchingPropertyNames(Record record, Map<String, Object> existingMap) {
List<String> existingPropertyNames = (List<String>) existingMap.get(PROPERTY_NAMES);
List<String> matchingPropertyNames = new ArrayList<>();
for (String existingPropertyName : existingPropertyNames) {
if (isValidProperty(record, existingPropertyName)) {
matchingPropertyNames.add(existingPropertyName);
}
}
return matchingPropertyNames;
}
private boolean isValidProperty(Record record, String propertyName) {
return record.getData().containsKey(propertyName)
&& !Strings.isNullOrEmpty(String.valueOf(record.getData().get(propertyName)));
}
private List<Map<String, Object>> getCrsProperties(IngestionDescriptor descriptor) {
List<Map<String, Object>> descriptorReferenceProperties = Optional.ofNullable(descriptor.getData())
.map(IngestionData::getExtensionProperties)
.map(ExtensionProperties::getFileContentsDetails)
.map(FileContentsDetails::getFrameOfReference)
.orElseGet(ArrayList::new);
return descriptorReferenceProperties.stream()
.filter(property -> property.containsKey(KIND))
.filter(property -> property.get(KIND).equals(KIND_CRS))
.collect(Collectors.toList());
}