Commit cc365d61 authored by harshit aggarwal's avatar harshit aggarwal
Browse files

Adding query API

parent 1be5c3f8
......@@ -60,7 +60,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
new MessageHandlerOptions(Integer.parseUnsignedInt(azureBootstrapConfig.getMaxConcurrentCalls()),
false,
Duration.ofSeconds(Integer.parseUnsignedInt(azureBootstrapConfig.getMaxLockRenewDurationInSeconds())),
Duration.ofSeconds(1)
Duration.ofSeconds(60)
),
executorService);
......
package org.opengroup.osdu.wks.service;
import java.util.List;
import java.util.Optional;
import org.opengroup.osdu.core.common.model.indexer.Records;
import org.opengroup.osdu.wks.exceptions.ApplicationException;
public interface StorageService {
......@@ -9,4 +11,6 @@ public interface StorageService {
Optional<String> getRecord(String id);
Records getRecordBatch(List<String> ids) throws ApplicationException;
}
package org.opengroup.osdu.wks.service;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import io.datatree.Tree;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.indexer.Records;
import org.opengroup.osdu.core.common.model.storage.RecordIds;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
......@@ -16,6 +25,8 @@ import org.opengroup.osdu.wks.exceptions.ApplicationException;
import org.opengroup.osdu.wks.exceptions.AuthorizationException;
import org.opengroup.osdu.wks.model.RequestParameters;
import org.opengroup.osdu.wks.model.RestResponse;
import org.opengroup.osdu.wks.model.SchemaInfo;
import org.opengroup.osdu.wks.model.SchemaInfoResponse;
import org.opengroup.osdu.wks.provider.interfaces.UserCredential;
import org.opengroup.osdu.wks.util.RestClient;
import org.slf4j.Logger;
......@@ -81,12 +92,12 @@ public class StorageServiceImpl implements StorageService {
/**
* This method returns the raw records in batch for the given record ids
*
* @param id of the record to be retrieved.
* @param ids List of record ids to be retrieved.
* @return of type {@link Optional} {@link String} which contains the raw record
* @throws ApplicationException
*/
@Override
public getRecordBatch getRecordBatch(List<String> ids) {
public Records getRecordBatch(List<String> ids) throws ApplicationException {
String tenantName = requestIdentity.getDataPartitionId();
String queryRecordUrl = serviceHostConfiguration.getStorageServiceUrl().concat(StorageConstants.POST_BATCH_GET_STORAGE_ENDPOINT);
......@@ -103,7 +114,10 @@ public class StorageServiceImpl implements StorageService {
RestResponse response = restClient.processRequest(requestParameters);
return Optional.ofNullable(response.getMessage());
verifyResponseStatus(response, HttpStatus.SC_OK);
return extractResponseBody(response);
}
/**
......@@ -134,7 +148,7 @@ public class StorageServiceImpl implements StorageService {
requestParameters.setDataPartitionId(tenantName);
RestResponse response = restClient.processRequest(requestParameters);
verifyResponseStatus(response);
verifyResponseStatus(response, HttpStatus.SC_CREATED);
return response.getMessage();
}
......@@ -148,10 +162,25 @@ private String getAuthKey(TenantInfo tenantInfo) {
}
}
private void verifyResponseStatus(RestResponse response) throws ApplicationException {
if (response.getStatus() != HttpStatus.SC_CREATED) {
private void verifyResponseStatus(RestResponse response, int status) throws ApplicationException {
if (response.getStatus() != status) {
LOGGER.warn(response.getException());
throw new ApplicationException(response.getException());
}
}
private Records extractResponseBody(RestResponse response) throws ApplicationException {
String bulkStorageData = response.getMessage();
if (Strings.isNullOrEmpty(bulkStorageData)) {
throw new AppException(HttpStatus.SC_NOT_FOUND, "Invalid request", "Storage service returned empty response");
}
Type recordsListType = new TypeToken<Records>() {
}.getType();
Records records = this.gson.fromJson(bulkStorageData, recordsListType);
return records;
}
}
......@@ -8,9 +8,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import com.google.gson.Gson;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.opengroup.osdu.core.common.model.storage.PubSubInfo;
import org.opengroup.osdu.core.common.model.indexer.Records;
import org.opengroup.osdu.wks.config.RequestIdentity;
import org.opengroup.osdu.wks.config.ThreadScopeContextHolder;
import org.opengroup.osdu.wks.constants.Constants;
......@@ -42,6 +43,8 @@ public class WKSServiceImpl implements WKSService {
private static final String CREATE = "create";
private static final String ID = "id";
private static final String KIND = "kind";
private final Gson gson = new Gson();
@Autowired
private MappingService mappingService;
......@@ -86,8 +89,8 @@ public class WKSServiceImpl implements WKSService {
Map<String, List<MappingsModel>> mappingsMap = retrieveMappingsMap(rawRecordsDetailsForTransformation);
// Updating mappings with latest minor and patch versions
updateMappingsWithLatestMinorAndPatchVersions(mappingsMap);
getRawRecordsFromStorage(rawRecordsDetails);
transformRecords(rawRecordsDetailsForTransformation, mappingsMap, dataPartitionId, correlationId);
Map<String, Tree> rawRecordsMap = getRawRecordsFromStorage(rawRecordsDetails);
transformRecords(rawRecordsDetailsForTransformation, mappingsMap, rawRecordsMap, dataPartitionId, correlationId);
}
} finally {
ThreadScopeContextHolder.getContext().clear();
......@@ -123,16 +126,18 @@ public class WKSServiceImpl implements WKSService {
private void transformRecords(RawRecordDetails[] rawRecordsDetails, Map<String, List<MappingsModel>> mappingsMap,
String dataPartitionId, String correlationId) throws ApplicationException {
Map<String, Tree>rawRecordsMap, String dataPartitionId, String correlationId) throws ApplicationException {
List<Tree> wksRecordTreeList = new ArrayList<>();
List<RelationshipStatus> relationshipStatusList = new ArrayList<>();
List<>
for (RawRecordDetails rawRecord : rawRecordsDetails) {
try {
List<MappingsModel> mappingsList = retrieveMappings(mappingsMap, rawRecord);
LOGGER.info(Constants.TRANSFORMATION_STARTED, rawRecord.getId(), mappingsList.size());
Tree rawRecordTree = retrieveRawRecordTree(rawRecord);
Tree rawRecordTree = rawRecordsMap.get(rawRecord.getId());
if(rawRecordTree == null) {
throw new BadRequestException(Constants.RAW_RECORD_NOT_PRESENT);
}
for (MappingsModel mappings: mappingsList) {
......@@ -193,7 +198,7 @@ public class WKSServiceImpl implements WKSService {
LOGGER.info("Schemas for WKS fetched with latest minor and patch versions");
}
private Map<String, Tree> getRawRecordsFromStorage(RawRecordDetails[] rawRecordDetails) {
private Map<String, Tree> getRawRecordsFromStorage(RawRecordDetails[] rawRecordDetails) throws ApplicationException {
// Storage batch API supports maximum 20 Records
final int BATCH_SIZE = 20;
......@@ -202,11 +207,18 @@ public class WKSServiceImpl implements WKSService {
for(int i=0; i<rawRecordDetails.length; i += BATCH_SIZE) {
RawRecordDetails[] batch = Arrays.copyOfRange(rawRecordDetails, i, Math.min(rawRecordDetails.length, i + BATCH_SIZE));
List<String> ids = new ArrayList<>();
for(RawRecordDetails rawRecordDetail : rawRecordDetails) {
for(RawRecordDetails rawRecordDetail : batch) {
ids.add(rawRecordDetail.getId());
}
Records records = storageService.getRecordBatch(ids);
for (Records.Entity rawRecord : records.getRecords()) {
rawRecords.put(rawRecord.getId(), convertJsonStringToTree(this.gson.toJson(rawRecord)));
}
}
return rawRecords;
}
......@@ -239,23 +251,23 @@ public class WKSServiceImpl implements WKSService {
LOGGER.info(message);
}
private Tree retrieveRawRecordTree(RawRecordDetails rawRecord)
throws ApplicationException, BadRequestException {
Optional<String> optionalRawRecord = storageService.getRecord(rawRecord.getId());
if (!optionalRawRecord.isPresent()) {
LOGGER.warn(Constants.RAW_RECORD_NOT_PRESENT);
throw new BadRequestException(Constants.RAW_RECORD_NOT_PRESENT);
}
String rawRecordJsonStr = optionalRawRecord.get();
Tree rawRecordTree = convertJsonStringToTree(rawRecordJsonStr);
if (rawRecordTree.isEmpty()) {
LOGGER.warn(Constants.RAW_RECORD_NOT_PRESENT);
throw new ApplicationException(Constants.RAW_RECORD_NOT_PRESENT);
}
return rawRecordTree;
}
// private Tree retrieveRawRecordTree(RawRecordDetails rawRecord)
// throws ApplicationException, BadRequestException {
//
// Optional<String> optionalRawRecord = storageService.getRecord(rawRecord.getId());
// if (!optionalRawRecord.isPresent()) {
// LOGGER.warn(Constants.RAW_RECORD_NOT_PRESENT);
// throw new BadRequestException(Constants.RAW_RECORD_NOT_PRESENT);
// }
//
// String rawRecordJsonStr = optionalRawRecord.get();
// Tree rawRecordTree = convertJsonStringToTree(rawRecordJsonStr);
// if (rawRecordTree.isEmpty()) {
// LOGGER.warn(Constants.RAW_RECORD_NOT_PRESENT);
// throw new ApplicationException(Constants.RAW_RECORD_NOT_PRESENT);
// }
// return rawRecordTree;
// }
private boolean isEmptyMappings(List<MappingsModel> mappings) {
return mappings == null || mappings.isEmpty();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment