Commit 1be5c3f8 authored by harshit aggarwal's avatar harshit aggarwal
Browse files

adding support for batch retrieve using query api

parent b9e1a4a5
......@@ -3,4 +3,5 @@ package org.opengroup.osdu.wks.constants;
public class StorageConstants {
public static final String PUT_STORAGE_ENDPOINT = "/records";
public static final String GET_STORAGE_ENDPOINT = "/records";
public static final String POST_BATCH_GET_STORAGE_ENDPOINT = "/query/records:batch";
}
package org.opengroup.osdu.wks.service;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import com.google.gson.Gson;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.model.storage.RecordIds;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.wks.config.RequestIdentity;
......@@ -25,6 +28,7 @@ import org.springframework.stereotype.Service;
public class StorageServiceImpl implements StorageService {
private final static Logger LOGGER = LoggerFactory.getLogger(StorageServiceImpl.class);
private final static String WKS_SUFFIX = "wks-source";
private final Gson gson = new Gson();
private final RestClient restClient;
private final ServiceHostConfiguration serviceHostConfiguration;
......@@ -74,6 +78,34 @@ public class StorageServiceImpl implements StorageService {
return Optional.ofNullable(response.getMessage());
}
/**
* This method returns the raw records in batch for the given record ids
*
* @param id of the record to be retrieved.
* @return of type {@link Optional} {@link String} which contains the raw record
* @throws ApplicationException
*/
@Override
public getRecordBatch getRecordBatch(List<String> ids) {
String tenantName = requestIdentity.getDataPartitionId();
String queryRecordUrl = serviceHostConfiguration.getStorageServiceUrl().concat(StorageConstants.POST_BATCH_GET_STORAGE_ENDPOINT);
String body = this.gson.toJson(RecordIds.builder().records(ids).build());
RequestParameters requestParameters = new RequestParameters();
TenantInfo tenantInfo = tenantFactory.getTenantInfo(tenantName);
requestParameters.setUrl(queryRecordUrl);
requestParameters.setMethod(HttpMethod.POST);
requestParameters.setAuthKey(getAuthKey(tenantInfo));
requestParameters.setCorrelationId(requestIdentity.getCorrelationId());
requestParameters.setDataPartitionId(tenantName);
requestParameters.setPayload(body);
RestResponse response = restClient.processRequest(requestParameters);
return Optional.ofNullable(response.getMessage());
}
/**
* This method upload wks transformed records
*
......
......@@ -2,6 +2,7 @@ package org.opengroup.osdu.wks.service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -9,6 +10,7 @@ import java.util.Optional;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.opengroup.osdu.core.common.model.storage.PubSubInfo;
import org.opengroup.osdu.wks.config.RequestIdentity;
import org.opengroup.osdu.wks.config.ThreadScopeContextHolder;
import org.opengroup.osdu.wks.constants.Constants;
......@@ -84,6 +86,7 @@ public class WKSServiceImpl implements WKSService {
Map<String, List<MappingsModel>> mappingsMap = retrieveMappingsMap(rawRecordsDetailsForTransformation);
// Updating mappings with latest minor and patch versions
updateMappingsWithLatestMinorAndPatchVersions(mappingsMap);
getRawRecordsFromStorage(rawRecordsDetails);
transformRecords(rawRecordsDetailsForTransformation, mappingsMap, dataPartitionId, correlationId);
}
} finally {
......@@ -123,6 +126,7 @@ public class WKSServiceImpl implements WKSService {
String dataPartitionId, String correlationId) throws ApplicationException {
List<Tree> wksRecordTreeList = new ArrayList<>();
List<RelationshipStatus> relationshipStatusList = new ArrayList<>();
List<>
for (RawRecordDetails rawRecord : rawRecordsDetails) {
try {
......@@ -189,6 +193,23 @@ public class WKSServiceImpl implements WKSService {
LOGGER.info("Schemas for WKS fetched with latest minor and patch versions");
}
private Map<String, Tree> getRawRecordsFromStorage(RawRecordDetails[] rawRecordDetails) {
// Storage batch API supports maximum 20 Records
final int BATCH_SIZE = 20;
Map<String, Tree> rawRecords = new HashMap<>();
for(int i=0; i<rawRecordDetails.length; i += BATCH_SIZE) {
RawRecordDetails[] batch = Arrays.copyOfRange(rawRecordDetails, i, Math.min(rawRecordDetails.length, i + BATCH_SIZE));
List<String> ids = new ArrayList<>();
for(RawRecordDetails rawRecordDetail : rawRecordDetails) {
ids.add(rawRecordDetail.getId());
}
}
}
private void initializeMessageContext(String dataPartitionId, String correlationId) {
requestIdentity.setDataPartitionId(dataPartitionId);
requestIdentity.setCorrelationId(correlationId);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment