Commit 97ba6165 authored by harshit aggarwal's avatar harshit aggarwal
Browse files

initial commit

parent 99e6a215
Pipeline #18483 failed with stages
in 1 minute and 59 seconds
......@@ -13,8 +13,8 @@
# limitations under the License.
azure:
servicebusSubscription: wkssubscription
servicebusTopic: recordstopic
servicebusSubscription: eg_sb_wkssubscription
servicebusTopic: recordstopiceg
storageContainer: osdu-wks-mappings
image:
branch: master
......
......@@ -26,6 +26,7 @@ public class ProcessWKSTransform {
private final static Logger LOGGER = LoggerFactory.getLogger(ProcessWKSTransform.class);
private static final String DATA = "data";
private static final String ID = "id";
@Autowired
private WKSService wKSService;
......@@ -38,14 +39,29 @@ public class ProcessWKSTransform {
public void initiateWksTransformation(IMessage message) {
try {
String dataPartitionId = message.getProperties().get(DpsHeaders.DATA_PARTITION_ID).toString();
String correlationId = message.getProperties().get(DpsHeaders.CORRELATION_ID).toString();
String messageBody = new String(message.getMessageBody().getBinaryData().get(0), UTF_8);
JsonElement jsonRoot = JsonParser.parseString(messageBody);
JsonElement messageData = jsonRoot.getAsJsonObject().get(DATA);
String messageId = jsonRoot.getAsJsonObject().get(ID).getAsString();
message.setMessageId(messageId);
String dataPartitionId = messageData.getAsJsonObject().get(DpsHeaders.DATA_PARTITION_ID).getAsString();
String correlationId = messageData.getAsJsonObject().get(DpsHeaders.CORRELATION_ID).getAsString();
MDC.setContextMap(mdcContextMap.getContextMap(correlationId, dataPartitionId));
dpsHeaders.setThreadContext(dataPartitionId,correlationId);
RawRecordDetails[] rawRecordDetails = retrieveDataFromMessage(message);
String dataPayload = messageData.getAsJsonObject().get(DATA).toString();
RawRecordDetails[] rawRecordDetails = getRecordsChangeData(dataPayload);
LOGGER.info("Transformation started for message with id: " + message.getMessageId());
wKSService.transform(rawRecordDetails, dataPartitionId, correlationId);
LOGGER.info("Transformation completed for message with id: " + message.getMessageId());
} catch (BadRequestException e) {
LOGGER.error("Bad Request Reason: {}, pubsub message id: {}", e.getErrorMsg(),
message.getMessageId());
......@@ -63,21 +79,6 @@ public class ProcessWKSTransform {
MDC.clear();
}
private RawRecordDetails[] retrieveDataFromMessage(IMessage message) throws BadRequestException {
LOGGER.info("Transformation started for message with id: " + message.getMessageId());
String messageData = new String(message.getMessageBody().getBinaryData().get(0), UTF_8);
JsonElement jsonRoot = JsonParser.parseString(messageData);
JsonElement msg = jsonRoot.getAsJsonObject().get("message");
if (msg == null) {
throw new BadRequestException(HttpStatus.BAD_REQUEST,"Invalid record change message, message object not found");
}
String dataValue = msg.getAsJsonObject().get(DATA).toString();
return getRecordsChangeData(dataValue);
}
private RawRecordDetails[] getRecordsChangeData(String data) throws BadRequestException {
if (data.equals("[]")) {
......
......@@ -23,7 +23,6 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.wks.exceptions.ApplicationException;
import org.opengroup.osdu.wks.exceptions.BadRequestException;
import org.opengroup.osdu.wks.model.RawRecordDetails;
......@@ -32,8 +31,6 @@ import org.opengroup.osdu.wks.provider.azure.utils.MDCContextMap;
import org.opengroup.osdu.wks.service.WKSService;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
......@@ -50,9 +47,9 @@ public class ProcessWKSTransformTest {
private static final String dataPartitionId = "opendes";
private static final String correlationId = "908fcf8d-30c5-4c74-a0ae-ab47b48b7a85";
private static final String emptyMessage = "{}";
private static final String validMessage = "{\"message\":{\"data\":[{\"id\":\"opendes:at:wellbore-b3Blbm\",\"kind\":\"opendes:at:wellbore:1.0.0\",\"op\":\"create\"}],\"account-id\":\"opendes\",\"data-partition-id\":\"opendes\",\"correlation-id\":\"908fcf8d-30c5-4c74-a0ae-ab47b48b7a85\"}}";
private static final String noDataMessage = "{\"message\":{\"data\":[],\"account-id\":\"opendes\",\"data-partition-id\":\"opendes\",\"correlation-id\":\"908fcf8d-30c5-4c74-a0ae-ab47b48b7a85\"}}";
private static final String messageId = "message-id";
private static final String validMessage = "{\"id\":\"40cc96f5-85b9-4923-9a5b-c27f67a3e815\",\"subject\":\"RecordsChanged\",\"data\":{\"data\":[{\"id\":\"opendes:ww:wellbore-35b7eecfa2c35145053b150f47461bea630a.slb.wks.wellbore.2\",\"kind\":\"opendes:test:wellbore_har:1.0.0\",\"op\":\"create\"}],\"account-id\":\"opendes\",\"correlation-id\":\"908fcf8d-30c5-4c74-a0ae-ab47b48b7a85\",\"data-partition-id\":\"opendes\"},\"eventType\":\"RecordsChanged\",\"dataVersion\":\"1.0\",\"metadataVersion\":\"1\",\"eventTime\":\"2020-12-09T13:52:57.292Z\",\"topic\":\"/subscriptions/929e9ae0-7bb1-4563-a200-9863fe27cae4/resourceGroups/osdu-mvp-idcmvpdp1-i2or-rg/providers/Microsoft.EventGrid/topics/osdu-mvp-idcmvpd-i2or-grid-recordstopic\"}";
private static final String noDataMessage = "{\"id\":\"40cc96f5-85b9-4923-9a5b-c27f67a3e815\",\"subject\":\"RecordsChanged\",\"data\":{\"data\":[],\"account-id\":\"opendes\",\"correlation-id\":\"908fcf8d-30c5-4c74-a0ae-ab47b48b7a85\",\"data-partition-id\":\"opendes\"},\"eventType\":\"RecordsChanged\",\"dataVersion\":\"1.0\",\"metadataVersion\":\"1\",\"eventTime\":\"2020-12-09T13:52:57.292Z\",\"topic\":\"/subscriptions/929e9ae0-7bb1-4563-a200-9863fe27cae4/resourceGroups/osdu-mvp-idcmvpdp1-i2or-rg/providers/Microsoft.EventGrid/topics/osdu-mvp-idcmvpd-i2or-grid-recordstopic\"}";
private static final String messageId = "40cc96f5-85b9-4923-9a5b-c27f67a3e815";
@InjectMocks
private ProcessWKSTransform processWKSTransform;
......@@ -71,10 +68,8 @@ public class ProcessWKSTransformTest {
@BeforeEach
public void init() {
when(message.getProperties()).thenReturn(getMessageProperties());
lenient().when(message.getMessageId()).thenReturn(messageId);
doNothing().when(dpsHeaders).setThreadContext(dataPartitionId, correlationId);
lenient().doNothing().when(dpsHeaders).setThreadContext(dataPartitionId, correlationId);
}
@Test
......@@ -84,10 +79,8 @@ public class ProcessWKSTransformTest {
processWKSTransform.initiateWksTransformation(message);
verify(mdcContextMap, times(1)).getContextMap(correlationId, dataPartitionId);
verify(message, times(2)).getProperties();
verify(message, times(1)).getMessageBody();
verify(message, times(2)).getMessageId();
verify(message, times(1)).getMessageId();
}
@Test
......@@ -98,9 +91,10 @@ public class ProcessWKSTransformTest {
processWKSTransform.initiateWksTransformation(message);
verify(mdcContextMap, times(1)).getContextMap(correlationId, dataPartitionId);
verify(message, times(2)).getProperties();
verify(message, times(1)).getMessageBody();
verify(message, times(2)).getMessageId();
verify(message, times(1)).getMessageId();
verify(message, times(1)).setMessageId(messageId);
}
@Test
......@@ -112,9 +106,9 @@ public class ProcessWKSTransformTest {
processWKSTransform.initiateWksTransformation(message);
verify(mdcContextMap, times(1)).getContextMap(correlationId, dataPartitionId);
verify(message, times(2)).getProperties();
verify(message, times(1)).getMessageBody();
verify(message, times(2)).getMessageId();
verify(message, times(1)).setMessageId(messageId);
}
@Test
......@@ -126,15 +120,8 @@ public class ProcessWKSTransformTest {
processWKSTransform.initiateWksTransformation(message);
verify(mdcContextMap, times(1)).getContextMap(correlationId, dataPartitionId);
verify(message, times(2)).getProperties();
verify(message, times(1)).getMessageBody();
}
private Map<String, Object> getMessageProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
properties.put(DpsHeaders.CORRELATION_ID, correlationId);
return properties;
verify(message, times(1)).setMessageId(messageId);
}
private MessageBody getMessageBody(String messageValue) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment