Commit 1073fd7c authored by devesh bajpai's avatar devesh bajpai
Browse files

add operation handler

parent 85c15576
......@@ -8,7 +8,8 @@ import org.opengroup.osdu.azure.blobstorage.BlobStore;
import org.opengroup.osdu.azure.cosmosdb.CosmosStore;
import org.opengroup.osdu.wks.config.RequestIdentity;
import org.opengroup.osdu.wks.constants.Constants;
import org.opengroup.osdu.wks.model.MappingInfo;
import org.opengroup.osdu.wks.model.mapping.MappingDefinition;
import org.opengroup.osdu.wks.model.mapping.MappingInfo;
import org.opengroup.osdu.wks.model.MappingsModel;
import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
import org.opengroup.osdu.wks.provider.azure.di.CosmosContainerConfig;
......@@ -42,11 +43,13 @@ public class MappingStoreImpl implements MappingStore {
private RequestIdentity requestIdentity;
@Override
public List<MappingInfo> getMappingInfo(final String authority, final String entity, final String source, final String majorVersion) {
public List<MappingInfo> getMappingInfo(
final String authority, final String entity, final String source, final String majorVersion) {
String schemaKind = String.join(Constants.COLON_SEPARATOR, authority, source, entity, majorVersion);
SqlQuerySpec query = new SqlQuerySpec("SELECT * FROM c where c.sourceSchemaAuthority = @authority AND c.sourceEntityType = @entity AND " +
"c.sourceSchemaSource = @source AND c.sourceSchemaMajorVersion = @majorVersion AND c.sourceSchemaKind = @schemaKind");
SqlQuerySpec query = new SqlQuerySpec(
"SELECT * FROM c where c.sourceSchemaAuthority = @authority AND c.sourceEntityType = @entity AND "
+ "c.sourceSchemaSource = @source AND c.sourceSchemaMajorVersion = @majorVersion AND c.sourceSchemaKind = @schemaKind");
List<SqlParameter> pars = query.getParameters();
......@@ -57,29 +60,41 @@ public class MappingStoreImpl implements MappingStore {
pars.add(new SqlParameter("@schemaKind", schemaKind));
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
List<MappingInfo> mappingsDocList = new ArrayList<>(cosmosStore.queryItems(requestIdentity.getDataPartitionId(), cosmosContainerConfig.getDatabase(), cosmosContainerConfig.getMappingInfoContainer(), query, options, MappingInfo.class));
if(mappingsDocList.isEmpty()) {
LOGGER.info("Mapping information not found from Cosmos Db for the following parameters - Authority: {}, Source: {}, Entity: {}, MajorVersion: {}", authority, source, entity, majorVersion);
}
else {
LOGGER.info("Mapping information successfully fetched from Cosmos Db for the following parameters - Authority: {}, Source: {}, Entity: {}, MajorVersion: {}", authority, source, entity, majorVersion);
List<MappingInfo> mappingsDocList = new ArrayList<>(cosmosStore.queryItems(requestIdentity.getDataPartitionId(),
cosmosContainerConfig.getDatabase(),
cosmosContainerConfig.getMappingInfoContainer(),
query,
options,
MappingInfo.class));
if (mappingsDocList.isEmpty()) {
LOGGER.info(
"Mapping information not found from Cosmos Db for the following parameters - Authority: {}, Source: {}, Entity: {}, MajorVersion: {}",
authority,
source,
entity,
majorVersion);
} else {
LOGGER.info(
"Mapping information successfully fetched from Cosmos Db for the following parameters - Authority: {}, Source: {}, Entity: {}, MajorVersion: {}",
authority,
source,
entity,
majorVersion);
}
return mappingsDocList;
}
public List<MappingsModel> getMappingFilesFromStorage(List<MappingInfo> mappingsDocList) {
List<MappingsModel> mappingsList = new ArrayList<>();
public List<MappingDefinition> getMappingFilesFromStorage(List<MappingInfo> mappingsDocList) {
List<MappingDefinition> mappingsList = new ArrayList<>();
ObjectMapper mapper = new ObjectMapper();
for (MappingInfo mappingInfoDoc : mappingsDocList) {
try {
String content = blobStore.readFromStorageContainer(
requestIdentity.getDataPartitionId(),
mappingInfoDoc.getFileName(),
azureBootstrapConfig.containerName());
mappingsList.add(mapper.readValue(content, MappingsModel.class));
String content = blobStore.readFromStorageContainer(requestIdentity.getDataPartitionId(),
mappingInfoDoc.getFileName(),
azureBootstrapConfig.containerName());
mappingsList.add(mapper.readValue(content, MappingDefinition.class));
} catch (Exception e) {
LOGGER.error("Error while processing mappings from blob store {}", e.getMessage(), e);
}
......@@ -87,4 +102,16 @@ public class MappingStoreImpl implements MappingStore {
return mappingsList;
}
public MappingDefinition getMappingDefinitionFromStorage(MappingInfo MappingInfo) {
ObjectMapper mapper = new ObjectMapper();
try {
String content = blobStore.readFromStorageContainer(requestIdentity.getDataPartitionId(),
MappingInfo.getFileName(),
azureBootstrapConfig.containerName());
return mapper.readValue(content, MappingDefinition.class);
} catch (Exception e) {
LOGGER.error("Error while processing mappings from blob store {}", e.getMessage(), e);
return new MappingDefinition();
}
}
}
......@@ -17,12 +17,16 @@ import org.opengroup.osdu.azure.cosmosdb.CosmosStore;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.wks.config.RequestIdentity;
import org.opengroup.osdu.wks.model.AttributeMappingModel;
import org.opengroup.osdu.wks.model.MappingInfo;
import org.opengroup.osdu.wks.model.mapping.Mapping;
import org.opengroup.osdu.wks.model.mapping.MappingDefinition;
import org.opengroup.osdu.wks.model.mapping.MappingInfo;
import org.opengroup.osdu.wks.model.MappingsModel;
import org.opengroup.osdu.wks.model.mapping.operation.Operation;
import org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig;
import org.opengroup.osdu.wks.provider.azure.di.CosmosContainerConfig;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
......@@ -119,14 +123,16 @@ public class MappingStoreImplTest {
@Test
public void shouldReturnMappingFilesFromBlobStorage() throws JsonProcessingException {
MappingsModel mappingsModel = new MappingsModel();
mappingsModel.setTargetSchemaKind(WKS_SCHEMA_KIND);
AttributeMappingModel attributeMappingModel = new AttributeMappingModel();
attributeMappingModel.setRawAttributeName(rawAttribute);
attributeMappingModel.setWksAttributeName(wksAttribute);
mappingsModel.setAttributeMappings(new AttributeMappingModel[]{attributeMappingModel});
MappingDefinition mappingDefinition = new MappingDefinition();
Mapping mapping = new Mapping();
Operation operation = new Operation();
operation.setSourceProperty("abc");
operation.setTargetProperty("def");
operation.setType("copy");
mapping.setOperations(Collections.singletonList(operation));
mappingDefinition.setMappings(Collections.singletonList(mapping));
ObjectMapper mapper = new ObjectMapper();
String content = mapper.writeValueAsString(mappingsModel);
String content = mapper.writeValueAsString(mappingDefinition);
List<MappingInfo> mappingInfoDocList = Arrays.asList(mappingInfoDoc1, mappingInfoDoc2);
doReturn("file1").when(mappingInfoDoc1).getFileName();
......@@ -135,10 +141,10 @@ public class MappingStoreImplTest {
when(blobStore.readFromStorageContainer(DpsHeaders.DATA_PARTITION_ID, "file1", containerName)).thenReturn(content);
when(blobStore.readFromStorageContainer(DpsHeaders.DATA_PARTITION_ID, "file2", containerName)).thenReturn(content);
List<MappingsModel> resultMappings = mappingStore.getMappingFilesFromStorage(mappingInfoDocList);
List<MappingDefinition> resultMappings = mappingStore.getMappingFilesFromStorage(mappingInfoDocList);
assertEquals(mappingsModel.toString(), resultMappings.get(0).toString());
assertEquals(mappingsModel.toString(), resultMappings.get(1).toString());
assertEquals(mappingDefinition.toString(), resultMappings.get(0).toString());
assertEquals(mappingDefinition.toString(), resultMappings.get(1).toString());
verify(blobStore, times(1)).readFromStorageContainer(DpsHeaders.DATA_PARTITION_ID, "file1", containerName);
verify(blobStore, times(1)).readFromStorageContainer(DpsHeaders.DATA_PARTITION_ID, "file2", containerName);
}
......@@ -152,7 +158,7 @@ public class MappingStoreImplTest {
when(blobStore.readFromStorageContainer(DpsHeaders.DATA_PARTITION_ID, "file1", containerName)).thenReturn(content);
List<MappingsModel> resultMappings = mappingStore.getMappingFilesFromStorage(mappingInfoDocList);
List<MappingDefinition> resultMappings = mappingStore.getMappingFilesFromStorage(mappingInfoDocList);
assertEquals(resultMappings.size(), 0);
verify(blobStore, times(1)).readFromStorageContainer(DpsHeaders.DATA_PARTITION_ID, "file1", containerName);
......
......@@ -7,7 +7,8 @@ import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobListOption;
import lombok.extern.java.Log;
import org.opengroup.osdu.wks.constants.Constants;
import org.opengroup.osdu.wks.model.MappingInfo;
import org.opengroup.osdu.wks.model.mapping.MappingDefinition;
import org.opengroup.osdu.wks.model.mapping.MappingInfo;
import org.opengroup.osdu.wks.model.MappingsModel;
import org.opengroup.osdu.wks.provider.interfaces.MappingStore;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -46,10 +47,15 @@ public class MappingStoreImpl implements MappingStore {
}
@Override
public List<MappingsModel> getMappingFilesFromStorage(List<MappingInfo> mappingsDocList) {
public List<MappingDefinition> getMappingFilesFromStorage(List<MappingInfo> mappingsDocList) {
return new ArrayList<>();
}
@Override
public MappingDefinition getMappingDefinitionFromStorage(MappingInfo mappingsDoc) {
return new MappingDefinition();
}
public MappingsModel getMapping(final String fileName) {
MappingsModel mappings = null;
try {
......@@ -94,4 +100,6 @@ public class MappingStoreImpl implements MappingStore {
return blobList.get(0);
}
}
package org.opengroup.osdu.wks.model;
import java.util.Map;
import io.datatree.Tree;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class DataBlockTransformationResult {
Tree transformedTree;
Map<String,String> mappedProperties;
}
package org.opengroup.osdu.wks.model;
import org.opengroup.osdu.wks.model.mapping.MappingDefinition;
import io.datatree.Tree;
import lombok.AllArgsConstructor;
import lombok.Getter;
......@@ -10,5 +11,5 @@ import lombok.Setter;
@AllArgsConstructor
public class TransformRequest {
private Tree rawRecordTree;
private MappingsModel mappingsModel;
private MappingDefinition mappingDefinition;
}
package org.opengroup.osdu.wks.model.mapping;
import java.util.List;
import org.opengroup.osdu.wks.model.mapping.operation.Operation;
import lombok.Data;
@Data
public class Mapping {
String identifier;
List<Operation> operations;
}
\ No newline at end of file
package org.opengroup.osdu.wks.model.mapping;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
@Data
public class MappingDefinition {
public static final Predicate<MappingDefinition> isMappingDefinitionNull = Objects::isNull;
public static final Predicate<MappingDefinition> isMappingsNull = mappingDefinition1 -> mappingDefinition1.getMappings() == null;
public static final Predicate<MappingDefinition> isMappingsEmpty = mappingDefinition1 -> mappingDefinition1.getMappings().isEmpty();
@JsonIgnoreProperties(ignoreUnknown = true)
private MappingInfo mappingInfo;
private MappingIdentity mappingIdentity;
private List<Mapping> mappings;
}
\ No newline at end of file
package org.opengroup.osdu.wks.model.mapping;
import org.opengroup.osdu.wks.model.SchemaIdentity;
import lombok.*;
@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class MappingIdentity {
private String authority;
private SchemaIdentity sourceSchema;
private SchemaIdentity targetSchema;
}
package org.opengroup.osdu.wks.model;
package org.opengroup.osdu.wks.model.mapping;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.*;
import org.opengroup.osdu.wks.enums.MappingScope;
......@@ -15,6 +11,7 @@ import java.sql.Timestamp;
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Builder
public class MappingInfo {
private String id;
private String mappingId;
......
package org.opengroup.osdu.wks.model.mapping.operation;
import lombok.Data;
@Data
public class Operation {
String type;
String sourceProperty;
String targetProperty;
}
package org.opengroup.osdu.wks.model.mapping.operation;
import org.opengroup.osdu.wks.service.handler.operation.CopyOperationHandler;
import org.opengroup.osdu.wks.service.handler.operation.OperationHandler;
public enum OperationType {
COPY("COPY",new CopyOperationHandler()),
COPY_SUBSTRUCTURE("COPY SUBSTRUCTURE", new CopyOperationHandler());
private String value;
private OperationHandler operationHandler;
OperationType(String value,OperationHandler operationHandler)
{
this.value = value;
this.operationHandler = operationHandler;
}
public String getValue() {
return value;
}
public OperationHandler getHandler() {
return operationHandler;
}
private static OperationType getOperationByText(String operation) {
for (OperationType operationType : OperationType.values()) {
if (operationType.getValue().equalsIgnoreCase(operation)) {
return operationType;
}
}
return null;
}
public static OperationHandler getOperationHandler(String operation){
OperationType operationType = getOperationByText(operation);
if(operationType == null){
return null;
}
return operationType.getHandler();
}
}
package org.opengroup.osdu.wks.provider.interfaces;
import org.opengroup.osdu.wks.model.MappingInfo;
import org.opengroup.osdu.wks.model.mapping.MappingDefinition;
import org.opengroup.osdu.wks.model.mapping.MappingInfo;
import org.opengroup.osdu.wks.model.MappingsModel;
import java.util.List;
public interface MappingStore {
List<MappingInfo> getMappingInfo(String authorityName, String entityNamePrefix, String sourceName, String majorVersion);
List<MappingsModel> getMappingFilesFromStorage(List<MappingInfo> mappingsDocList);
MappingDefinition getMappingDefinitionFromStorage(MappingInfo mappingsInfo);
List<MappingDefinition> getMappingFilesFromStorage(List<MappingInfo> mappingsDocList);
}
......@@ -5,5 +5,5 @@ import org.opengroup.osdu.wks.exceptions.ApplicationException;
import java.util.List;
public interface ISchemaService {
public List<String> getSchemas(String kind) throws ApplicationException;
public List<String> getSchemas(String authority, String source, String entityType, String majorVersion) throws ApplicationException;
}
package org.opengroup.osdu.wks.service;
import org.opengroup.osdu.wks.exceptions.ApplicationException;
import org.opengroup.osdu.wks.model.MappingInfo;
import org.opengroup.osdu.wks.model.MappingsModel;
import java.util.List;
import org.opengroup.osdu.wks.exceptions.ApplicationException;
import org.opengroup.osdu.wks.model.mapping.MappingDefinition;
import org.opengroup.osdu.wks.model.mapping.MappingInfo;
public interface MappingService {
List<MappingsModel> getMapping(String kind) throws ApplicationException;
List<MappingDefinition> getMapping(String kind) throws ApplicationException;
List<MappingInfo> getMappingInfos(String authority, String entity, String source, String majorVersion);
......
package org.opengroup.osdu.wks.service;
import org.opengroup.osdu.wks.model.MappingInfo;
import org.opengroup.osdu.wks.model.mapping.MappingDefinition;
import org.opengroup.osdu.wks.model.mapping.MappingInfo;
import org.opengroup.osdu.wks.model.MappingsModel;
import org.opengroup.osdu.wks.provider.interfaces.MappingStore;
import org.opengroup.osdu.wks.util.KindUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
......@@ -20,7 +22,7 @@ public class MappingServiceImpl implements MappingService {
private KindUtil kindUtil;
@Override
public List<MappingsModel> getMapping(String kind) {
public List<MappingDefinition> getMapping(String kind) {
String authorityName = kindUtil.retrieveAuthorityName(kind);
String entityNamePrefix = kindUtil.retrieveEntityName(kind);
String sourceName = kindUtil.retrieveSourceName(kind);
......@@ -29,7 +31,7 @@ public class MappingServiceImpl implements MappingService {
if(mappingInfos.isEmpty()) {
return Collections.emptyList();
}
return mappingStore.getMappingFilesFromStorage(mappingInfos);
return getMappingDefinitions(mappingInfos);
}
@Override
......@@ -37,4 +39,14 @@ public class MappingServiceImpl implements MappingService {
return mappingStore.getMappingInfo(authority, entity, source, majorVersion);
}
private List<MappingDefinition> getMappingDefinitions(List<MappingInfo> mappingInfos){
List<MappingDefinition> mappingsList = new ArrayList<>();
for (MappingInfo mappingInfoDoc : mappingInfos) {
MappingDefinition mappingDefinition = mappingStore.getMappingDefinitionFromStorage(mappingInfoDoc);
mappingDefinition.setMappingInfo(mappingInfoDoc);
mappingsList.add(mappingDefinition);
}
return mappingsList;
}
}
......@@ -51,10 +51,10 @@ public class SchemaService implements ISchemaService{
@Autowired
private KindUtil kindUtil;
public List<String> getSchemas(String kind) throws ApplicationException {
public List<String> getSchemas(String authority, String source, String entityType, String majorVersion) throws ApplicationException {
RequestParameters requestParameters = new RequestParameters();
TenantInfo tenantInfo = tenantFactory.getTenantInfo(requestIdentity.getDataPartitionId());
requestParameters.setUrl(getRequestURL(kind));
requestParameters.setUrl(getRequestURL(authority, source, entityType, majorVersion));
requestParameters.setMethod(HttpMethod.GET);
requestParameters.setAppKey("");
requestParameters.setAuthKey(userCredential.getIdToken(tenantInfo));
......@@ -102,15 +102,13 @@ public class SchemaService implements ISchemaService{
}
}
private String getRequestURL(String kind) throws ApplicationException {
private String getRequestURL(String authority, String source, String entity, String majorVersion) throws ApplicationException {
try {
String[] kindArr = kind.split(Constants.COLON_SEPARATOR);
String majorVersion = kindUtil.getMajorVersionFromKind(kind).toString();
String schemaUrl = serviceConfig.getSchemaServiceUrl() + SchemaConstants.POST_SCHEMA_ENDPOINT;
String schemaUrl = serviceConfig.getSchemaServiceUrl() + SchemaConstants.POST_SCHEMA_ENDPOINT;
URIBuilder uriBuilder = new URIBuilder(schemaUrl);
uriBuilder.addParameter("authority", kindArr[0]);
uriBuilder.addParameter("source", kindArr[1]);
uriBuilder.addParameter("entityType", kindArr[2]);
uriBuilder.addParameter("authority", authority);
uriBuilder.addParameter("source", source);
uriBuilder.addParameter("entityType", entity);
uriBuilder.addParameter("schemaVersionMajor", majorVersion);
uriBuilder.addParameter("latestVersion", "true");
......
......@@ -14,11 +14,13 @@ import org.opengroup.osdu.wks.config.ThreadScopeContextHolder;
import org.opengroup.osdu.wks.constants.Constants;
import org.opengroup.osdu.wks.exceptions.ApplicationException;
import org.opengroup.osdu.wks.exceptions.BadRequestException;
import org.opengroup.osdu.wks.exceptions.NotFoundException;
import org.opengroup.osdu.wks.model.MappingsModel;
import org.opengroup.osdu.wks.model.RawRecordDetails;
import org.opengroup.osdu.wks.model.RelationshipStatus;
import org.opengroup.osdu.wks.model.TransformRequest;
import org.opengroup.osdu.wks.model.TransformationResult;
import org.opengroup.osdu.wks.model.mapping.MappingDefinition;
import org.opengroup.osdu.wks.provider.interfaces.StatusStoreService;
import org.opengroup.osdu.wks.util.KindUtil;
import org.opengroup.osdu.wks.util.WksTransformator;
......@@ -81,9 +83,9 @@ public class WKSServiceImpl implements WKSService {
rawRecordsDetails);
if(rawRecordsDetailsForTransformation.length > 0) {
Map<String, List<MappingsModel>> mappingsMap = retrieveMappingsMap(rawRecordsDetailsForTransformation);
// Updating mappings with latest minor and patch versions
updateMappingsWithLatestMinorAndPatchVersions(mappingsMap);
Map<String, List<MappingDefinition>> mappingsMap = retrieveMappingsMap(rawRecordsDetailsForTransformation);
// // Updating mappings with latest minor and patch versions
// updateMappingsWithLatestMinorAndPatchVersions(mappingsMap);
transformRecords(rawRecordsDetailsForTransformation, mappingsMap, dataPartitionId, correlationId);
}
} finally {
......@@ -119,17 +121,17 @@ public class WKSServiceImpl implements WKSService {
}
private void transformRecords(RawRecordDetails[] rawRecordsDetails, Map<String, List<MappingsModel>> mappingsMap,
private void transformRecords(RawRecordDetails[] rawRecordsDetails, Map<String, List<MappingDefinition>> mappingsMap,
String dataPartitionId, String correlationId) throws ApplicationException, BadRequestException {
List<Tree> wksRecordTreeList = new ArrayList<>();
List<RelationshipStatus> relationshipStatusList = new ArrayList<>();
for (RawRecordDetails rawRecord : rawRecordsDetails) {
try {
List<MappingsModel> mappingsList = retrieveMappings(mappingsMap, rawRecord);
List<MappingDefinition> mappingsList = retrieveMappings(mappingsMap, rawRecord);
Tree rawRecordTree = retrieveRawRecordTree(rawRecord);
for (MappingsModel mappings: mappingsList) {
for (MappingDefinition mappings: mappingsList) {
try {
TransformRequest transformRequest = new TransformRequest(rawRecordTree, mappings);
......@@ -147,7 +149,9 @@ public class WKSServiceImpl implements WKSService {
wksRecordTree.get(ID).asString(), wksRecordTree.get(KIND));
}
catch (ApplicationException e) {
LOGGER.warn(Constants.TRANSFORMATION_FAILED_FOR_GIVEN_WKS_KIND, rawRecord.getId(), mappings.getTargetSchemaKind(), e.getErrorMsg(), e);
LOGGER.warn(Constants.TRANSFORMATION_FAILED_FOR_GIVEN_WKS_KIND, rawRecord.getId(), e.getErrorMsg(), e);
} catch (NotFoundException e) {
e.printStackTrace();
}
}
......@@ -166,28 +170,28 @@ public class WKSServiceImpl implements WKSService {
LOGGER.info(Constants.TRANFORMATION_PROCESS_COMPLETED);
}
private void updateMappingsWithLatestMinorAndPatchVersions(Map<String, List<MappingsModel>> mappingsMap) throws ApplicationException {
for (Map.Entry<String, List<MappingsModel>> entry : mappingsMap.entrySet()) {
List<MappingsModel> mappingsModelList = new ArrayList<>();
for (MappingsModel mappingsModel : entry.getValue()) {
List<String> schemaKindsWithLatestMinorAndPatchVersions = schemaService.getSchemas(mappingsModel.getTargetSchemaKind());
mappingsModel.setTargetSchemaKind(schemaKindsWithLatestMinorAndPatchVersions.get(0));
mappingsModelList.add(mappingsModel);