Skip to content
Snippets Groups Projects
Commit 13257f01 authored by Zhibin Mai's avatar Zhibin Mai
Browse files

Merge branch 'limit_pub_sub_message_size_for_azure' into 'master'

Make sure that the indexing message sent to Azure service bus is not over-size

See merge request !597
parents 5ee4c7b6 09337de3
No related branches found
No related tags found
1 merge request!597Make sure that the indexing message sent to Azure service bus is not over-size
Pipeline #205343 failed
...@@ -21,6 +21,7 @@ import com.google.gson.Gson; ...@@ -21,6 +21,7 @@ import com.google.gson.Gson;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import com.microsoft.azure.servicebus.Message; import com.microsoft.azure.servicebus.Message;
import lombok.extern.java.Log; import lombok.extern.java.Log;
import org.apache.commons.collections.CollectionUtils;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
import org.opengroup.osdu.azure.servicebus.ITopicClientFactory; import org.opengroup.osdu.azure.servicebus.ITopicClientFactory;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
...@@ -57,6 +58,7 @@ import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION ...@@ -57,6 +58,7 @@ import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION
@RequestScope @RequestScope
@Primary @Primary
public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
private Gson gson = new Gson();
@Autowired @Autowired
private ITopicClientFactory topicClientFactory; private ITopicClientFactory topicClientFactory;
...@@ -81,33 +83,40 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { ...@@ -81,33 +83,40 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
@Override @Override
public void createWorkerTask(String payload, DpsHeaders headers) { public void createWorkerTask(String payload, DpsHeaders headers) {
createTask(payload, headers); createWorkerTasks(payload, headers);
} }
@Override @Override
public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) { public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
headers.addCorrelationIdIfMissing(); createWorkerTasks(payload, headers);
createTask(payload, headers);
} }
@Override @Override
public void createReIndexTask(String payload, DpsHeaders headers) { public void createReIndexTask(String payload, DpsHeaders headers) {
headers.addCorrelationIdIfMissing();
publishAllRecordsToServiceBus(payload, headers); publishAllRecordsToServiceBus(payload, headers);
} }
@Override @Override
public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) { public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
headers.addCorrelationIdIfMissing();
publishAllRecordsToServiceBus(payload, headers); publishAllRecordsToServiceBus(payload, headers);
} }
private void createWorkerTasks(String payload, DpsHeaders headers) {
RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class);
List<RecordInfo> recordInfos = parseRecordsAsJSON(receivedPayload.getData());
if(!CollectionUtils.isEmpty(recordInfos)) {
Map<String, String> attributes = (receivedPayload.getAttributes() != null)
? receivedPayload.getAttributes()
: new HashMap<>();
createTasks(recordInfos, attributes, headers);
}
}
private void publishAllRecordsToServiceBus(String payload, DpsHeaders headers) { private void publishAllRecordsToServiceBus(String payload, DpsHeaders headers) {
// fetch all the remaining records // fetch all the remaining records
// This logic is temporary and would be updated to call the storage service async. // This logic is temporary and would be updated to call the storage service async.
// Currently, the storage client can't be called out of request scope hence making the // Currently, the storage client can't be called out of request scope hence making the
// storage calls sync here // storage calls sync here
Gson gson = new Gson();
RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class); RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class);
final String recordKind = recordReindexRequest.getKind(); final String recordKind = recordReindexRequest.getKind();
RecordQueryResponse recordQueryResponse = null; RecordQueryResponse recordQueryResponse = null;
...@@ -121,21 +130,10 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { ...@@ -121,21 +130,10 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
} }
recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest); recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest);
if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) { if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) {
List<String> recordIds = recordQueryResponse.getResults();
List<List<String>> batch = Lists.partition(recordQueryResponse.getResults(), publisherConfig.getPubSubBatchSize()); List<RecordInfo> recordInfos = recordIds.stream()
for (List<String> recordsBatch : batch) { .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList());
List<RecordInfo> records = recordsBatch.stream() createTasks(recordInfos, new HashMap<>(), headers);
.map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList());
Map<String, String> attributes = new HashMap<>();
attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build();
String recordChangedMessagePayload = gson.toJson(recordChangedMessages);
createTask(recordChangedMessagePayload, headers);
}
} }
} while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsByKindBatchSize()); } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsByKindBatchSize());
...@@ -146,24 +144,24 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { ...@@ -146,24 +144,24 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
} }
} }
private void createTask(String payload, DpsHeaders headers) { private void createTasks(List<RecordInfo> recordInfos, Map<String, String> attributes, DpsHeaders headers) {
Gson gson = new Gson(); headers.addCorrelationIdIfMissing();
RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class); List<List<RecordInfo>> batch = Lists.partition(recordInfos, publisherConfig.getPubSubBatchSize());
for (List<RecordInfo> recordsBatch : batch) {
createTask(recordsBatch, attributes, headers);
}
}
private void createTask(List<RecordInfo> recordInfos, Map<String, String> attributes, DpsHeaders headers) {
Message message = new Message(); Message message = new Message();
Map<String, Object> properties = new HashMap<>();
// properties // properties
Map<String, Object> properties = new HashMap<>();
properties.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId()); properties.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
properties.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); properties.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
headers.addCorrelationIdIfMissing();
properties.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); properties.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
message.setProperties(properties); message.setProperties(properties);
// data
List<RecordInfo> recordInfos = parseRecordsAsJSON(receivedPayload.getData());
Map<String, String> attributes = receivedPayload.getAttributes();
// add all to body {"message": {"data":[], "id":...}} // add all to body {"message": {"data":[], "id":...}}
JsonObject jo = new JsonObject(); JsonObject jo = new JsonObject();
jo.add("data", gson.toJsonTree(recordInfos)); jo.add("data", gson.toJsonTree(recordInfos));
...@@ -171,7 +169,7 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { ...@@ -171,7 +169,7 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
jo.addProperty(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); jo.addProperty(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
jo.addProperty(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); jo.addProperty(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
// Append the ancestry kinds used to prevent circular chasing // Append the ancestry kinds used to prevent circular chasing
if(attributes != null && attributes.containsKey(Constants.ANCESTRY_KINDS)) { if(attributes.containsKey(Constants.ANCESTRY_KINDS)) {
jo.addProperty(Constants.ANCESTRY_KINDS, attributes.get(Constants.ANCESTRY_KINDS)); jo.addProperty(Constants.ANCESTRY_KINDS, attributes.get(Constants.ANCESTRY_KINDS));
} }
JsonObject jomsg = new JsonObject(); JsonObject jomsg = new JsonObject();
...@@ -191,7 +189,6 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { ...@@ -191,7 +189,6 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
} }
private List<RecordInfo> parseRecordsAsJSON(String inputPayload) { private List<RecordInfo> parseRecordsAsJSON(String inputPayload) {
Gson gson = new Gson();
Type type = new TypeToken<List<RecordInfo>>() {}.getType(); Type type = new TypeToken<List<RecordInfo>>() {}.getType();
List<RecordInfo> recordInfoList = gson.fromJson(inputPayload, type); List<RecordInfo> recordInfoList = gson.fromJson(inputPayload, type);
return recordInfoList; return recordInfoList;
......
...@@ -31,7 +31,12 @@ import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION ...@@ -31,7 +31,12 @@ import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class IndexerQueueTaskBuilderAzureTest { public class IndexerQueueTaskBuilderAzureTest {
private String payload = "{payload : value }"; private String payload = "{\n" +
" \"data\": \"[{\\\"id\\\":\\\"opendes:work-product-component--WellLog:566edebc-1a9f-4f4d-9a30-ed458e959ac7\\\",\\\"kind\\\":\\\"osdu:wks:work-product-component--WellLog:1.2.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"opendes:work-product-component--WellLog:84958febe54e4908a1703778e1918dae\\\",\\\"kind\\\":\\\"osdu:wks:work-product-component--WellLog:1.2.0\\\",\\\"op\\\":\\\"create\\\"}]\",\n" +
" \"attributes\": {\n" +
" \"data-partition-id\": \"opendes\"\n" +
" }\n" +
"}";
private static String partitionId = "opendes"; private static String partitionId = "opendes";
private static String correlationId = "correlationId"; private static String correlationId = "correlationId";
private static String serviceBusReindexTopicNameField = "serviceBusReindexTopicName"; private static String serviceBusReindexTopicNameField = "serviceBusReindexTopicName";
...@@ -92,7 +97,7 @@ public class IndexerQueueTaskBuilderAzureTest { ...@@ -92,7 +97,7 @@ public class IndexerQueueTaskBuilderAzureTest {
sut.createWorkerTask(payload, milliseconds, dpsHeaders); sut.createWorkerTask(payload, milliseconds, dpsHeaders);
verify(dpsHeaders, times(2)).addCorrelationIdIfMissing(); verify(dpsHeaders, times(1)).addCorrelationIdIfMissing();
verify(dpsHeaders, times(4)).getPartitionIdWithFallbackToAccountId(); verify(dpsHeaders, times(4)).getPartitionIdWithFallbackToAccountId();
verify(dpsHeaders, times(2)).getCorrelationId(); verify(dpsHeaders, times(2)).getCorrelationId();
verify(topicClientFactory, times(1)).getClient(partitionId, serviceBusReindexTopicNameValue); verify(topicClientFactory, times(1)).getClient(partitionId, serviceBusReindexTopicNameValue);
...@@ -116,7 +121,7 @@ public class IndexerQueueTaskBuilderAzureTest { ...@@ -116,7 +121,7 @@ public class IndexerQueueTaskBuilderAzureTest {
verify(requestInfo, times(1)).checkOrGetAuthorizationHeader(); verify(requestInfo, times(1)).checkOrGetAuthorizationHeader();
verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader); verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader);
verify(storageService, times(1)).getRecordsByKind(any()); verify(storageService, times(1)).getRecordsByKind(any());
verify(dpsHeaders, times(1)).addCorrelationIdIfMissing(); verify(dpsHeaders, times(0)).addCorrelationIdIfMissing();
} }
@Test @Test
...@@ -137,9 +142,9 @@ public class IndexerQueueTaskBuilderAzureTest { ...@@ -137,9 +142,9 @@ public class IndexerQueueTaskBuilderAzureTest {
verify(requestInfo, times(1)).checkOrGetAuthorizationHeader(); verify(requestInfo, times(1)).checkOrGetAuthorizationHeader();
verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader); verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader);
verify(storageService, times(1)).getRecordsByKind(any()); verify(storageService, times(1)).getRecordsByKind(any());
verify(dpsHeaders, times(6)).getPartitionIdWithFallbackToAccountId(); verify(dpsHeaders, times(4)).getPartitionIdWithFallbackToAccountId();
verify(dpsHeaders, times(3)).getCorrelationId(); verify(dpsHeaders, times(2)).getCorrelationId();
verify(dpsHeaders, times(2)).addCorrelationIdIfMissing(); verify(dpsHeaders, times(1)).addCorrelationIdIfMissing();
verify(topicClientFactory, times(1)).getClient(partitionId, serviceBusReindexTopicNameValue); verify(topicClientFactory, times(1)).getClient(partitionId, serviceBusReindexTopicNameValue);
} }
...@@ -161,9 +166,9 @@ public class IndexerQueueTaskBuilderAzureTest { ...@@ -161,9 +166,9 @@ public class IndexerQueueTaskBuilderAzureTest {
verify(requestInfo, times(1)).checkOrGetAuthorizationHeader(); verify(requestInfo, times(1)).checkOrGetAuthorizationHeader();
verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader); verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader);
verify(storageService, times(1)).getRecordsByKind(any()); verify(storageService, times(1)).getRecordsByKind(any());
verify(dpsHeaders, times(120)).getPartitionIdWithFallbackToAccountId(); verify(dpsHeaders, times(80)).getPartitionIdWithFallbackToAccountId();
verify(dpsHeaders, times(60)).getCorrelationId(); verify(dpsHeaders, times(40)).getCorrelationId();
verify(dpsHeaders, times(21)).addCorrelationIdIfMissing(); verify(dpsHeaders, times(1)).addCorrelationIdIfMissing();
verify(topicClientFactory, times(20)).getClient(partitionId, serviceBusReindexTopicNameValue); verify(topicClientFactory, times(20)).getClient(partitionId, serviceBusReindexTopicNameValue);
} }
...@@ -180,6 +185,6 @@ public class IndexerQueueTaskBuilderAzureTest { ...@@ -180,6 +185,6 @@ public class IndexerQueueTaskBuilderAzureTest {
verify(requestInfo, times(1)).checkOrGetAuthorizationHeader(); verify(requestInfo, times(1)).checkOrGetAuthorizationHeader();
verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader); verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader);
verify(storageService, times(1)).getRecordsByKind(any()); verify(storageService, times(1)).getRecordsByKind(any());
verify(dpsHeaders, times(1)).addCorrelationIdIfMissing(); verify(dpsHeaders, times(0)).addCorrelationIdIfMissing();
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment