Skip to content
Snippets Groups Projects
Commit 264dd189 authored by Aman Verma's avatar Aman Verma
Browse files

using header directly

parent c25f8586
No related branches found
No related tags found
1 merge request!112Moving the while loop to iterate over storage records in azure code
Pipeline #30191 passed
......@@ -74,18 +74,16 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
@Override
public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
headers.addCorrelationIdIfMissing();
createTask(payload, headers.getPartitionIdWithFallbackToAccountId(), headers.getCorrelationId());
createTask(payload, headers);
}
@Override
public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
headers.addCorrelationIdIfMissing();
String dataPartitionId = headers.getPartitionIdWithFallbackToAccountId();
String correlationId = headers.getCorrelationId();
publishAllRecordsToServiceBus(payload, dataPartitionId, correlationId);
publishAllRecordsToServiceBus(payload, headers);
}
private Boolean publishAllRecordsToServiceBus(String payload, String dataPartitionId, String correlationId) {
private void publishAllRecordsToServiceBus(String payload, DpsHeaders headers) {
// fetch all the remaining records
Gson gson = new Gson();
RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class);
......@@ -104,26 +102,24 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
.map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList());
Map<String, String> attributes = new HashMap<>();
attributes.put(DpsHeaders.ACCOUNT_ID, dataPartitionId);
attributes.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
attributes.put(DpsHeaders.CORRELATION_ID, correlationId);
attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
createTask(recordChangedMessagePayload, dataPartitionId, correlationId);
createTask(recordChangedMessagePayload, headers);
}
} while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsBatchSize());
} catch (AppException e) {
throw e;
} catch (Exception e) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Unknown error", "An unknown error has occurred.", e);
}
return true;
}
private void createTask(String payload, String dataPartitionId, String correlationId) {
private void createTask(String payload, DpsHeaders headers) {
Gson gson = new Gson();
RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class);
......@@ -131,9 +127,10 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
Map<String, Object> properties = new HashMap<>();
// properties
properties.put(DpsHeaders.ACCOUNT_ID, dataPartitionId);
properties.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
properties.put(DpsHeaders.CORRELATION_ID, dataPartitionId);
properties.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
properties.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
headers.addCorrelationIdIfMissing();
properties.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
message.setProperties(properties);
// data
......@@ -142,9 +139,9 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
// add all to body {"message": {"data":[], "id":...}}
JsonObject jo = new JsonObject();
jo.add("data", gson.toJsonTree(recordInfos));
jo.addProperty(DpsHeaders.ACCOUNT_ID, dataPartitionId);
jo.addProperty(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
jo.addProperty(DpsHeaders.CORRELATION_ID, correlationId);
jo.addProperty(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
jo.addProperty(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
jo.addProperty(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
JsonObject jomsg = new JsonObject();
jomsg.add("message", jo);
......@@ -152,13 +149,15 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
message.setContentType("application/json");
try {
logger.info("Indexer publishes message to Service Bus " + correlationId);
topicClientFactory.getClient(dataPartitionId, serviceBusTopic).send(message);
logger.info("Indexer publishes message to Service Bus " + headers.getCorrelationId());
topicClientFactory.getClient(headers.getPartitionId(), serviceBusTopic).send(message);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private List<RecordInfo> parseRecordsAsJSON(String inputPayload) {
Gson gson = new Gson();
Type type = new TypeToken<List<RecordInfo>>(){}.getType();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment