diff --git a/provider/indexer-gcp/docs/anthos/README.md b/provider/indexer-gcp/docs/anthos/README.md index 324e84255a74acadf343d578abe243a51834e79e..ea9413fd186bdf7792c59aaad4ba807238ca4bbf 100644 --- a/provider/indexer-gcp/docs/anthos/README.md +++ b/provider/indexer-gcp/docs/anthos/README.md @@ -190,13 +190,15 @@ curl -L -X PATCH 'https://dev.osdu.club/api/partition/v1/partitions/opendes' -H RabbitMq should have exchanges and queues with names and configs: -| EXCHANGE NAME | EXCHANGE CONFIG | Target queue name | Target queue config | -|-------------------|-------------------------------------|----------------------------|----------------------------------------------------------------------| -| indexing-progress | `Type fanout` <br/>`durable: true` | (Consumer not implemented) | (Consumer not implemented) | -| records-changed | `Type fanout` <br/>`durable: true` | indexer-records-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | -| reprocess | `Type fanout` <br/>`durable: true` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | -| schema-changed | `Type fanout` <br/>`durable: true` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | - +| EXCHANGE NAME | EXCHANGE CONFIG | Target queue name | Target queue config | +|------------------------------|-------------------------------------|--------------------------------------|--------------------------------------------------------------------------------------------------------------------------------| +| indexing-progress | `Type fanout` <br/>`durable: true` | (Consumer not implemented) | (Consumer not implemented) | +| records-changed | `Type fanout` <br/>`durable: true` | indexer-records-changed | `x-delivery-limit: 5`<br/>`x-dead-letter-exchange: records-changed-dead-letter`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| records-changed-dead-letter | `Type fanout` <br/>`durable: true` | indexer-records-changed-dead-letter | `x-queue-type: classic`<br/>`durable: true`<br/>`x-message-ttl: 604800000` | +| reprocess | `Type fanout` <br/>`durable: true` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-dead-letter-exchange: reprocess-dead-letter`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| reprocess-dead-letter | `Type fanout` <br/>`durable: true` | indexer-reprocess-dead-letter | `x-queue-type: classic`<br/>`durable: true`<br/>`x-message-ttl: 604800000` | +| schema-changed | `Type fanout` <br/>`durable: true` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-dead-letter-exchange: schema-changed-dead-letter`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| schema-changed-dead-letter | `Type fanout` <br/>`durable: true` | indexer-schema-changed-dead-letter | `x-queue-type: classic`<br/>`durable: true`<br/>`x-message-ttl: 604800000` | ## Keycloak configuration diff --git a/provider/indexer-gcp/docs/gcp/README.md b/provider/indexer-gcp/docs/gcp/README.md index 6cbeb52ec1190610a728eeafd618c24348d05313..d5a5fa81c5c185c01ea136fe17a5fa673be1cf8e 100644 --- a/provider/indexer-gcp/docs/gcp/README.md +++ b/provider/indexer-gcp/docs/gcp/README.md @@ -45,12 +45,15 @@ Usage of spring profiles is preferred. Pubsub should have topics and subscribers with names and configs: -| TOPIC NAME | Subscription name | Subscription config | -|-------------------|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| indexing-progress | (Consumer not implemented) | (Consumer not implemented) | -| records-changed | indexer-records-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds` | -| reprocess | indexer-reprocess | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds` | -| schema-changed | indexer-schema-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds` | +| TOPIC NAME | Subscription name | Subscription config | +|-----------------------------|------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| indexing-progress | (Consumer not implemented) | (Consumer not implemented) | +| records-changed | indexer-records-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | +| records-changed-dead-letter | indexer-records-changed | `Subscription message retention duration: 7 days` | +| reprocess | indexer-reprocess | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | +| reprocess-dead-letter | indexer-reprocess-dead-letter | `Subscription message retention duration: 7 days` | +| schema-changed | indexer-schema-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | +| schema-changed-dead-letter | indexer-schema-changed-dead-letter | `Subscription message retention duration: 7 days` | ### Properties set in Partition service: diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerMessagingConfigProperties.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerMessagingConfigProperties.java index 321db939f35b31503d02c4db08acf9513765dd16..04dd82069daa7e9210566136573f81cc583bc1e1 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerMessagingConfigProperties.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerMessagingConfigProperties.java @@ -29,7 +29,6 @@ import org.springframework.context.annotation.Configuration; public class IndexerMessagingConfigProperties { private String recordsChangedTopicName; - //TODO schema-changed events consuming not implemented private String schemaChangedTopicName; private String defaultRelativeIndexerWorkerUrl; private String reprocessTopicName; diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java index 0e3e0fd92ed859373da063573508480fbe1d3a33..a728ad5fb5aab37ab0d94a3e217a08d68b7a2172 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java @@ -39,6 +39,8 @@ import org.opengroup.osdu.indexer.provider.gcp.indexing.thread.ThreadScopeContex @RequiredArgsConstructor public class IndexerOqmMessageReceiver implements OqmMessageReceiver { + private static final String SCHEMA_SERVICE_NAME = "schema"; + private final ThreadDpsHeaders dpsHeaders; private final SubscriptionConsumer consumer; private final TokenProvider tokenProvider; @@ -87,9 +89,12 @@ public class IndexerOqmMessageReceiver implements OqmMessageReceiver { private boolean sendMessage(OqmMessage oqmMessage) { CloudTaskRequest cloudTaskRequest; + String serviceName = oqmMessage.getAttributes().get("service"); JsonElement jsonElement = JsonParser.parseString(oqmMessage.getData()); - if (jsonElement.isJsonArray()) { + if (SCHEMA_SERVICE_NAME.equalsIgnoreCase(serviceName)) { + cloudTaskRequest = getCloudTaskRequestProducedBySchemaService(oqmMessage); + } else if (jsonElement.isJsonArray()) { cloudTaskRequest = getCloudTaskRequestProducedByStorageService(oqmMessage); } else { cloudTaskRequest = getCloudTaskRequestProducedByIndexerService(oqmMessage); @@ -117,6 +122,13 @@ public class IndexerOqmMessageReceiver implements OqmMessageReceiver { .build(); } + private CloudTaskRequest getCloudTaskRequestProducedBySchemaService(OqmMessage oqmMessage) { + return CloudTaskRequest.builder() + .url(SCHEMA_SERVICE_NAME) + .message(gson.toJson(oqmMessage)) + .build(); + } + @NotNull private DpsHeaders getHeaders(OqmMessage oqmMessage) { DpsHeaders headers = new DpsHeaders(); diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumer.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumer.java index 5fb535367258b07f840970a073c9c870de22d27b..b9529df021e1222424f392a74b8464824c313537 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumer.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumer.java @@ -28,6 +28,7 @@ import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.indexer.JobStatus; import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; +import org.opengroup.osdu.core.common.model.indexer.SchemaChangedMessages; import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; import org.opengroup.osdu.indexer.api.RecordIndexerApi; @@ -40,6 +41,8 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class RecordsChangedSubscriptionConsumer implements SubscriptionConsumer { + private static final String SCHEMA_RELATIVE_URL = "schema"; + private final DpsHeaders dpsHeaders; private final RecordIndexerApi recordIndexerApi; private final ReindexApi reindexApi; @@ -60,6 +63,11 @@ public class RecordsChangedSubscriptionConsumer implements SubscriptionConsumer log.debug("Reindex job message body: {}", reindexBody); ResponseEntity<?> reindexResponse = reindexApi.reindex(reindexBody, false); log.info("Reindex job status: {}", reindexResponse); + } else if (url.equals(SCHEMA_RELATIVE_URL)) { + SchemaChangedMessages schemaChangedMessage = getSchemaWorkerRequestBody(request); + log.debug("Schema changed job message body: {}", schemaChangedMessage); + ResponseEntity<?> schemaChangeResponse = recordIndexerApi.schemaWorker(schemaChangedMessage); + log.info("Schema changed job status: {}", schemaChangeResponse); } return true; } catch (AppException e) { @@ -91,4 +99,11 @@ public class RecordsChangedSubscriptionConsumer implements SubscriptionConsumer return recordChangedMessages; } + private SchemaChangedMessages getSchemaWorkerRequestBody(CloudTaskRequest request) { + SchemaChangedMessages schemaChangedMessage = this.gson.fromJson(request.getMessage(), SchemaChangedMessages.class); + schemaChangedMessage.setMessageId(dpsHeaders.getCorrelationId()); + schemaChangedMessage.setPublishTime(LocalDateTime.now().toString()); + return schemaChangedMessage; + } + } diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java index 3da0b0ab19fe9b106c148257acc810b1c128c9ec..51c5597e81791f4f753a590c68e15b9601c0174c 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java @@ -52,15 +52,18 @@ public class TenantSubscriberConfiguration { log.info("OqmSubscriberManager provisioning STARTED"); IndexerOqmMessageReceiver recordsChangedMessageReceiver = new IndexerOqmMessageReceiver(headers, consumer, tokenProvider); IndexerOqmMessageReceiver reprocessOqmMessageReceiver = new IndexerOqmMessageReceiver(headers, consumer, tokenProvider); + IndexerOqmMessageReceiver schemaOqmMessageReceiver = new IndexerOqmMessageReceiver(headers, consumer, tokenProvider); String recordsChangedTopicName = properties.getRecordsChangedTopicName(); String reprocessTopicName = properties.getReprocessTopicName(); + String schemaChangedTopicName = properties.getSchemaChangedTopicName(); Collection<TenantInfo> tenantInfos = tenantInfoFactory.listTenantInfo(); for (TenantInfo tenantInfo : tenantInfos) { subscriberManager.registerSubscriber(tenantInfo, recordsChangedTopicName, getSubscriptionName(recordsChangedTopicName), recordsChangedMessageReceiver); subscriberManager.registerSubscriber(tenantInfo, reprocessTopicName, getSubscriptionName(reprocessTopicName), reprocessOqmMessageReceiver); + subscriberManager.registerSubscriber(tenantInfo, schemaChangedTopicName, getSubscriptionName(schemaChangedTopicName), schemaOqmMessageReceiver); } log.info("OqmSubscriberManager provisioning COMPLETED"); }