Skip to content
Snippets Groups Projects
Commit aef37808 authored by Zhibin Mai's avatar Zhibin Mai
Browse files

Update IndexerQueueTaskBuilder for Azure and GCP to forward the new attribute 'ancestry_kinds

parent 4340bd62
No related branches found
No related tags found
Loading
Pipeline #163811 failed
......@@ -162,6 +162,7 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
jo.addProperty(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
jo.addProperty(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
jo.addProperty(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
// Append the ancestry kinds used to prevent circular chasing
if(attributes.containsKey(Constants.ANCESTRY_KINDS)) {
jo.addProperty(Constants.ANCESTRY_KINDS, attributes.get(Constants.ANCESTRY_KINDS));
}
......
......@@ -34,6 +34,7 @@ import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.indexer.model.Constants;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.springframework.context.annotation.Primary;
......@@ -98,10 +99,16 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
RecordChangedMessages recordChangedMessages = gson.fromJson(payload,
RecordChangedMessages.class);
Map<String, String> attributes = getAttributesFromHeaders(headers);
// Append the ancestry kinds used to prevent circular chasing
if(recordChangedMessages.getAttributes().containsKey(Constants.ANCESTRY_KINDS)) {
attributes.put(Constants.ANCESTRY_KINDS, recordChangedMessages.getAttributes().get(Constants.ANCESTRY_KINDS));
}
OqmMessage oqmMessage = OqmMessage.builder()
.id(headers.getCorrelationId())
.data(recordChangedMessages.getData())
.attributes(getAttributesFromHeaders(headers))
.attributes(attributes)
.build();
log.info("Reprocessing task: {} ,has been published.", oqmMessage);
......
......@@ -49,6 +49,7 @@ STORAGE_SCHEMA_HOST=${STORAGE_API}/schemas
SCHEMA_BASE_HOST=http://schema
SCHEMA_PATH=/api/schema-service/v1/schema
SCHEMA_HOST=${SCHEMA_BASE_HOST}${SCHEMA_PATH}
SEARCH_HOST=${SEARCH_BASE_HOST}/api/search/v2
records-changed-topic-name=records-changed
schema-changed-topic-name=schema-changed
......
......@@ -27,6 +27,7 @@ CRON_INDEX_CLEANUP_THRESHOLD_DAYS=3
CRON_EMPTY_INDEX_CLEANUP_THRESHOLD_DAYS=7
SCHEMA_HOST=${HOST}/api/schema-service/v1/schema
SEARCH_HOST=${search_service_url}/api/search/v2
storage_service_url=http://localhost:8082
#storage_service_url=https://os-storage-ibm-osdu-r2.osduadev-a1c3eaf78a86806e299f5f3f207556f0-0000.us-south.containers.appdomain.cloud
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment