Commit a55a9ab6 authored by harshit aggarwal's avatar harshit aggarwal
Browse files

perf changes

parent ec36a1a5
Pipeline #21975 failed with stages
in 3 minutes and 42 seconds
......@@ -22,12 +22,16 @@ metadata:
spec:
scaleTargetRef:
deploymentName: {{ .Chart.Name }}
pollingInterval: 30 # Optional. Default: 30 seconds
cooldownPeriod: 60 # Optional. Default: 300 seconds
minReplicaCount: 1 # Optional. Default: 0
maxReplicaCount: 10 # Optional. Default: 100
triggers:
- type: azure-servicebus
metadata:
type: serviceBusTrigger
direction: in
name: message
subscriptionName: {{ .Values.azure.servicebusSubscription }}
topicName: {{ .Values.azure.servicebusTopic }}
connection: SERVICE_BUS
- type: azure-servicebus
metadata:
type: serviceBusTrigger
direction: in
name: message
subscriptionName: {{ .Values.azure.servicebusSubscription }}
topicName: {{ .Values.azure.servicebusTopic }}
connection: SERVICE_BUS
......@@ -38,6 +38,8 @@ public class ProcessWKSTransform {
private ThreadDpsHeaders dpsHeaders;
public void initiateWksTransformation(IMessage message) {
boolean transformationSucceeded = false;
try {
String messageBody = new String(message.getMessageBody().getBinaryData().get(0), UTF_8);
......@@ -56,11 +58,12 @@ public class ProcessWKSTransform {
String dataPayload = messageData.getAsJsonObject().get(DATA).toString();
RawRecordDetails[] rawRecordDetails = getRecordsChangeData(dataPayload);
LOGGER.info("Transformation started for message with id: " + message.getMessageId());
LOGGER.info("Transformation started for message with id: {} containing {} Records", message.getMessageId(), rawRecordDetails.length);
wKSService.transform(rawRecordDetails, dataPartitionId, correlationId);
LOGGER.info("Transformation completed for message with id: " + message.getMessageId());
LOGGER.info("Transformation completed for message with id: {}", message.getMessageId());
transformationSucceeded = true;
} catch (BadRequestException e) {
LOGGER.error("Bad Request Reason: {}, pubsub message id: {}", e.getErrorMsg(),
......@@ -75,6 +78,10 @@ public class ProcessWKSTransform {
LOGGER.error("Exception encountered processing the message with id: {}", message.getMessageId(), e);
}
if(!transformationSucceeded) {
LOGGER.info("Transformation failed for message with id: {}", message.getMessageId());
}
ThreadScopeContextHolder.getContext().clear();
MDC.clear();
}
......
......@@ -23,6 +23,7 @@ import org.springframework.stereotype.Service;
@Service
public class StorageServiceImpl implements StorageService {
private final static Logger LOGGER = LoggerFactory.getLogger(StorageServiceImpl.class);
private final static String WKS_SUFFIX = "wks-source";
@Autowired
private RestClient restClient;
......@@ -83,7 +84,9 @@ public class StorageServiceImpl implements StorageService {
// string or else it
// gives HTTP 404 Bad Request
requestParameters.setPayload(Arrays.asList(jsonStrArr).toString());
requestParameters.setCorrelationId(correlationId);
// This WKS_SUFFIX in correlation id separates the service bus messages received from
// records changed event from the ones generated by this /PUT API call by WKS service
requestParameters.setCorrelationId(correlationId + WKS_SUFFIX);
requestParameters.setDataPartitionId(tenantName);
RestResponse response = restClient.processRequest(requestParameters);
......
......@@ -163,7 +163,6 @@ public class WKSServiceImpl implements WKSService {
statusStoreService.createOrUpdateStoreEntry(relationshipStatusList);
}
}
LOGGER.info(Constants.TRANFORMATION_PROCESS_COMPLETED);
}
private void updateMappingsWithLatestMinorAndPatchVersions(Map<String, List<MappingsModel>> mappingsMap) throws ApplicationException {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment