diff --git a/provider/indexer-gc/docs/anthos/README.md b/provider/indexer-gc/docs/anthos/README.md index bcbe2b3562471fa3270cfb906a1a36e6a8fa9381..e8755d005d7663280058ff087e742b09a12e415a 100644 --- a/provider/indexer-gc/docs/anthos/README.md +++ b/provider/indexer-gc/docs/anthos/README.md @@ -201,6 +201,8 @@ curl -L -X PATCH 'https://dev.osdu.club/api/partition/v1/partitions/opendes' -H #### Exchanges and queues configuration + + RabbitMq should have exchanges and queues with names and configs: | EXCHANGE NAME | EXCHANGE CONFIG | Target queue name | Target queue config | @@ -212,6 +214,8 @@ RabbitMq should have exchanges and queues with names and configs: | indexer-reprocess-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | | schema-changed | `Type fanout` <br/>`durable: true` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | | indexer-schema-changed-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| reindex | `Type fanout` <br/>`durable: true` | indexer-reindex | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| indexer-reindex-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-reindex | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | ## Keycloak configuration diff --git a/provider/indexer-gc/docs/anthos/pics/indexer.png b/provider/indexer-gc/docs/anthos/pics/indexer.png new file mode 100644 index 0000000000000000000000000000000000000000..711fc590e19dab1bbcb1837521ad58d96fff81da Binary files /dev/null and b/provider/indexer-gc/docs/anthos/pics/indexer.png differ diff --git a/provider/indexer-gc/docs/gc/README.md b/provider/indexer-gc/docs/gc/README.md index c3944a2bdb546f766cd410caacd405d286b21a76..cac8344500587a8fbcf827ba972527f54f280499 100644 --- a/provider/indexer-gc/docs/gc/README.md +++ b/provider/indexer-gc/docs/gc/README.md @@ -42,17 +42,21 @@ Usage of spring profiles is preferred. ## Pubsub configuration + + Pubsub should have topics and subscribers with names and configs: -| TOPIC NAME | Subscription name | Subscription config | -|-----------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| indexing-progress | (Consumer not implemented) | (Consumer not implemented) | -| records-changed | indexer-records-changed | `Maximum delivery attempts: 10`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 0 seconds`<br/>`Maximum backoff duration: 30 seconds`<br/>`Grant forwarding permissions for dead letter` | -| records-changed-dead-letter | (Consumer not implemented) | (Consumer not implemented) | -| reprocess | indexer-reprocess | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | -| reprocess-dead-letter | (Consumer not implemented) | (Consumer not implemented) | -| schema-changed | indexer-schema-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | -| schema-changed-dead-letter | (Consumer not implemented) | (Consumer not implemented) | +| TOPIC NAME | Subscription name | Subscription config | +|----------------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| indexing-progress | (Consumer not implemented) | (Consumer not implemented) | +| records-changed | indexer-records-changed | `Maximum delivery attempts: 10`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 0 seconds`<br/>`Maximum backoff duration: 30 seconds`<br/>`Grant forwarding permissions for dead letter` | +| records-changed-dead-lettering | (Consumer not implemented) | (Consumer not implemented) | +| reprocess | indexer-reprocess | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | +| indexer-reprocess-dead-lettering | (Consumer not implemented) | (Consumer not implemented) | +| schema-changed | indexer-schema-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | +| schema-changed-dead-lettering | (Consumer not implemented) | (Consumer not implemented) | +| reindex | indexer-reindex | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | +| reindex-dead-lettering | (Consumer not implemented) | (Consumer not implemented) | ### Additional throughput configuration for PubSub subscription consumer via Partition service diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java index ce93ee0f68c149b5f8f051d0b17cf74c754e093a..85a824f6241add7fd46f74f7bbabe365ba2c5ee4 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java @@ -27,7 +27,7 @@ import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; import org.opengroup.osdu.indexer.model.Constants; -import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties; +import org.opengroup.osdu.indexer.provider.gcp.indexing.config.MessagingConfigProperties; import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; @@ -52,16 +52,16 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder { private final OqmDriver driver; - private final IndexerMessagingConfigProperties properties; + private final MessagingConfigProperties properties; private OqmTopic reprocessOqmTopic; - private OqmTopic recordsChangedTopic; + private OqmTopic reindexTopic; @PostConstruct public void setUp() { reprocessOqmTopic = OqmTopic.builder().name(properties.getReprocessTopicName()).build(); - recordsChangedTopic = OqmTopic.builder().name(properties.getRecordsChangedTopicName()).build(); + reindexTopic = OqmTopic.builder().name(properties.getReindexTopicName()).build(); } public void createWorkerTask(String payload, DpsHeaders headers) { @@ -112,15 +112,6 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder { ); } - private void publishReindexTask(String payload, DpsHeaders headers) { - OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId()) - .build(); - Map<String, String> attributes = getAttributesFromHeaders(headers); - OqmMessage oqmMessage = OqmMessage.builder().data(payload).attributes(attributes).build(); - log.info("Reprocessing task: {} ,has been published.", oqmMessage); - driver.publish(oqmMessage, reprocessOqmTopic, oqmDestination); - } - private void publishRecordsChangedTask(String payload, DpsHeaders headers) { OqmDestination oqmDestination = OqmDestination.builder() .partitionId(headers.getPartitionId()) @@ -141,8 +132,17 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder { .attributes(attributes) .build(); + log.info("Reindex task: {} ,has been published.", oqmMessage); + driver.publish(oqmMessage, reindexTopic, oqmDestination); + } + + private void publishReindexTask(String payload, DpsHeaders headers) { + OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId()) + .build(); + Map<String, String> attributes = getAttributesFromHeaders(headers); + OqmMessage oqmMessage = OqmMessage.builder().data(payload).attributes(attributes).build(); log.info("Reprocessing task: {} ,has been published.", oqmMessage); - driver.publish(oqmMessage, recordsChangedTopic, oqmDestination); + driver.publish(oqmMessage, reprocessOqmTopic, oqmDestination); } @NotNull diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/StatusPublisherImpl.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/StatusPublisherImpl.java index 4551656f75da06d7f66f58004de9f0c4262c1352..2cad9bb1dc87599c5a7408c194013ff8ec44cab9 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/StatusPublisherImpl.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/StatusPublisherImpl.java @@ -31,7 +31,7 @@ import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; -import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties; +import org.opengroup.osdu.indexer.provider.gcp.indexing.config.MessagingConfigProperties; import org.opengroup.osdu.indexer.provider.interfaces.IPublisher; import org.springframework.stereotype.Component; @@ -41,7 +41,7 @@ import org.springframework.stereotype.Component; public class StatusPublisherImpl implements IPublisher { private final OqmDriver driver; - private final IndexerMessagingConfigProperties properties; + private final MessagingConfigProperties properties; private final JsonSerializer<JobStatus> statusJsonSerializer; private OqmTopic oqmTopic; private Gson gson; diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerMessagingConfigProperties.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/config/MessagingConfigProperties.java similarity index 81% rename from provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerMessagingConfigProperties.java rename to provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/config/MessagingConfigProperties.java index 04dd82069daa7e9210566136573f81cc583bc1e1..384768395976742f72174fae415e52e4bbd46302 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerMessagingConfigProperties.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/config/MessagingConfigProperties.java @@ -1,6 +1,6 @@ /* - * Copyright 2020-2022 Google LLC - * Copyright 2020-2022 EPAM Systems, Inc + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; +package org.opengroup.osdu.indexer.provider.gcp.indexing.config; import lombok.Getter; import lombok.Setter; @@ -26,12 +26,12 @@ import org.springframework.context.annotation.Configuration; @Getter @ConfigurationProperties @Configuration -public class IndexerMessagingConfigProperties { - +public class MessagingConfigProperties { + @Deprecated + private String defaultRelativeIndexerWorkerUrl; private String recordsChangedTopicName; private String schemaChangedTopicName; - private String defaultRelativeIndexerWorkerUrl; - private String reprocessTopicName; private String statusChangedTopicName; - + private String reprocessTopicName; + private String reindexTopicName; } diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/OqmSubscriberManager.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/OqmSubscriberManager.java index 55157916adf14e8221dbf01bd0991c2d42ad88f9..716dc5a80e5690e3d8d620e09e39683268200e40 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/OqmSubscriberManager.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/OqmSubscriberManager.java @@ -17,96 +17,100 @@ package org.opengroup.osdu.indexer.provider.gcp.indexing.initialization; +import javax.annotation.Nullable; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opengroup.osdu.core.common.model.http.AppException; -import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; -import org.opengroup.osdu.core.gcp.oqm.model.*; +import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriber; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriberThroughput; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriptionQuery; +import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; -import javax.annotation.Nullable; - @Service @Slf4j @RequiredArgsConstructor public class OqmSubscriberManager { - private final OqmDriver driver; + private final OqmDriver driver; - private OqmSubscription getSubscriptionForTenant(TenantInfo tenantInfo, String topicName, String subscriptionName) { - String dataPartitionId = tenantInfo.getDataPartitionId(); - log.info("OQM: provisioning tenant {}:", dataPartitionId); - log.info("OQM: check for topic {} existence:", topicName); - OqmTopic topic = driver.getTopic(topicName, getDestination(tenantInfo)).orElse(null); + public void registerSubscriber(String dataPartitionId, String topicName, String subscriptionName, + OqmMessageReceiver messageReceiver, OqmSubscriberThroughput throughput) { + OqmSubscription subscriptionForTenant = getSubscriptionForTenant(dataPartitionId, topicName, subscriptionName); + log.info("OQM: registering Subscriber for subscription {}", subscriptionName); - if (topic == null) { - log.error("OQM: check for topic: {}, tenant: {} existence: ABSENT.", topicName, - dataPartitionId); - throw new AppException( - HttpStatus.INTERNAL_SERVER_ERROR.value(), - "Required topic not exists.", - String.format( - "Required topic not exists. Create topic: %s for tenant: %s and restart service.", - topicName, dataPartitionId - ) - ); - } + OqmDestination destination = getDestination(dataPartitionId); + OqmSubscriber subscriber = OqmSubscriber.builder() + .subscription(subscriptionForTenant) + .messageReceiver(messageReceiver) + .throughput(throughput) + .build(); - log.info("OQM: check for topic {} existence: PRESENT", topicName); - OqmSubscription subscription = getSubscription(tenantInfo, topic, subscriptionName); + driver.subscribe(subscriber, destination); + log.info("OQM: provisioning subscription {}: Subscriber REGISTERED.", subscriptionName); + } - if (subscription == null) { - log.error( - "OQM: check for subscription {}, tenant: {} existence: ABSENT.", - subscriptionName, - dataPartitionId - ); - throw new AppException( - HttpStatus.INTERNAL_SERVER_ERROR.value(), - "Required subscription not exists.", - String.format( - "Required subscription not exists. Create subscription: %s for tenant: %s and restart service.", - subscriptionName, - dataPartitionId - ) - ); - } - log.info("OQM: provisioning tenant {}: COMPLETED.", dataPartitionId); - return subscription; - } + private OqmSubscription getSubscriptionForTenant(String dataPartitionId, String topicName, String subscriptionName) { + log.info("OQM: provisioning tenant {}:", dataPartitionId); + log.info("OQM: check for topic {} existence:", topicName); + OqmTopic topic = driver.getTopic(topicName, getDestination(dataPartitionId)).orElse(null); - @Nullable - private OqmSubscription getSubscription(TenantInfo tenantInfo, OqmTopic topic, String subscriptionName) { - log.info("OQM: check for subscription {} existence:", subscriptionName); - OqmSubscriptionQuery query = OqmSubscriptionQuery.builder() - .namePrefix(subscriptionName) - .subscriberable(true) - .build(); - return driver - .listSubscriptions(topic, query, getDestination(tenantInfo)).stream() - .findAny() - .orElse(null); + if (topic == null) { + log.error("OQM: check for topic: {}, tenant: {} existence: ABSENT.", topicName, dataPartitionId); + throw new AppException( + HttpStatus.INTERNAL_SERVER_ERROR.value(), + "Required topic not exists.", + String.format( + "Required topic not exists. Create topic: %s for tenant: %s and restart service.", + topicName, dataPartitionId + ) + ); } - public void registerSubscriber(TenantInfo tenantInfo, String topicName, String subscriptionName, OqmMessageReceiver messageReceiver, OqmSubscriberThroughput throughput) { - OqmSubscription subscriptionForTenant = getSubscriptionForTenant(tenantInfo, topicName, subscriptionName); - log.info("OQM: registering Subscriber for subscription {}", subscriptionName); - OqmDestination destination = getDestination(tenantInfo); + log.info("OQM: check for topic {} existence: PRESENT", topicName); + OqmSubscription subscription = getSubscription(dataPartitionId, topic, subscriptionName); - OqmSubscriber subscriber = OqmSubscriber.builder() - .subscription(subscriptionForTenant) - .messageReceiver(messageReceiver) - .throughput(throughput) - .build(); - driver.subscribe(subscriber, destination); - log.info("OQM: provisioning subscription {}: Subscriber REGISTERED.", subscriptionName); + if (subscription == null) { + log.error( + "OQM: check for subscription {}, tenant: {} existence: ABSENT.", + subscriptionName, + dataPartitionId + ); + throw new AppException( + HttpStatus.INTERNAL_SERVER_ERROR.value(), + "Required subscription not exists.", + String.format( + "Required subscription not exists. Create subscription: %s for tenant: %s and restart service.", + subscriptionName, + dataPartitionId + ) + ); } + log.info("OQM: provisioning tenant {}: COMPLETED.", dataPartitionId); + return subscription; + } - private OqmDestination getDestination(TenantInfo tenantInfo) { - return OqmDestination.builder() - .partitionId(tenantInfo.getDataPartitionId()) - .build(); - } + @Nullable + private OqmSubscription getSubscription(String dataPartitionId, OqmTopic topic, String subscriptionName) { + log.info("OQM: check for subscription {} existence:", subscriptionName); + OqmSubscriptionQuery query = OqmSubscriptionQuery.builder() + .namePrefix(subscriptionName) + .subscriberable(true) + .build(); + return driver + .listSubscriptions(topic, query, getDestination(dataPartitionId)).stream() + .findAny() + .orElse(null); + } + + private OqmDestination getDestination(String dataPartitionId) { + return OqmDestination.builder() + .partitionId(dataPartitionId) + .build(); + } } diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/TenantSubscriberConfiguration.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/TenantSubscriberConfiguration.java index 03376bcb6bb5054b99eb29bf9e5ebe49510306a2..10c7722bf9c56a265bf45be14ba9d8fe681ea855 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/TenantSubscriberConfiguration.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/TenantSubscriberConfiguration.java @@ -17,7 +17,6 @@ package org.opengroup.osdu.indexer.provider.gcp.indexing.initialization; -import java.util.Collection; import javax.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -27,9 +26,10 @@ import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriberThroughput; import org.opengroup.osdu.indexer.api.RecordIndexerApi; import org.opengroup.osdu.indexer.api.ReindexApi; -import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties; +import org.opengroup.osdu.indexer.provider.gcp.indexing.config.MessagingConfigProperties; import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.RecordsChangedMessageReceiver; -import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.RepressorMessageReceiver; +import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.ReindexMessageReceiver; +import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.ReprocessorMessageReceiver; import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.SchemaChangedMessageReceiver; import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; import org.springframework.stereotype.Component; @@ -42,55 +42,63 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class TenantSubscriberConfiguration { - private static final String SUBSCRIPTION_PREFIX = "indexer-"; - private final IndexerMessagingConfigProperties properties; - private final OqmSubscriberManager subscriberManager; - private final ITenantFactory tenantInfoFactory; - private final TokenProvider tokenProvider; - private final ThreadDpsHeaders headers; - private final RecordIndexerApi recordIndexerApi; - private final ReindexApi reindexApi; + private static final String SUBSCRIPTION_PREFIX = "indexer-"; + private final MessagingConfigProperties properties; + private final OqmSubscriberManager subscriberManager; + private final ITenantFactory tenantInfoFactory; + private final TokenProvider tokenProvider; + private final ThreadDpsHeaders headers; + private final RecordIndexerApi recordIndexerApi; + private final ReindexApi reindexApi; - /** - * Tenant configurations provided by the Partition service will be used to configure subscribers. If tenants use the same message broker(The same RabbitMQ - * instance, or the same GCP project Pub/Sub) then only one subscriber in this broker will be used. - */ - @PostConstruct - void postConstruct() { - log.info("OqmSubscriberManager provisioning STARTED"); - String recordsChangedTopicName = properties.getRecordsChangedTopicName(); - String reprocessTopicName = properties.getReprocessTopicName(); - String schemaChangedTopicName = properties.getSchemaChangedTopicName(); + /** + * Tenant configurations provided by the Partition service will be used to configure subscribers. If tenants use the + * same message broker(The same RabbitMQ instance, or the same GCP project Pub/Sub) then only one subscriber in this + * broker will be used. + */ + @PostConstruct + void postConstruct() { + log.info("OqmSubscriberManager provisioning STARTED"); + String recordsChangedTopicName = properties.getRecordsChangedTopicName(); + String schemaChangedTopicName = properties.getSchemaChangedTopicName(); + String reprocessTopicName = properties.getReprocessTopicName(); + String reindexTopicName = properties.getReindexTopicName(); - Collection<TenantInfo> tenantInfos = tenantInfoFactory.listTenantInfo(); - - for (TenantInfo tenantInfo : tenantInfos) { - subscriberManager.registerSubscriber( - tenantInfo, - recordsChangedTopicName, - getSubscriptionName(recordsChangedTopicName), - new RecordsChangedMessageReceiver(headers, tokenProvider, recordIndexerApi), - OqmSubscriberThroughput.MAX - ); - subscriberManager.registerSubscriber( - tenantInfo, - reprocessTopicName, - getSubscriptionName(reprocessTopicName), - new RepressorMessageReceiver(headers, tokenProvider, reindexApi), - OqmSubscriberThroughput.MIN - ); - subscriberManager.registerSubscriber( - tenantInfo, - schemaChangedTopicName, - getSubscriptionName(schemaChangedTopicName), - new SchemaChangedMessageReceiver(headers, tokenProvider, recordIndexerApi), - OqmSubscriberThroughput.MIN - ); - } - log.info("OqmSubscriberManager provisioning COMPLETED"); + for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) { + String dataPartitionId = tenantInfo.getDataPartitionId(); + subscriberManager.registerSubscriber( + dataPartitionId, + recordsChangedTopicName, + getSubscriptionName(recordsChangedTopicName), + new RecordsChangedMessageReceiver(headers, tokenProvider, recordIndexerApi), + OqmSubscriberThroughput.MAX + ); + subscriberManager.registerSubscriber( + dataPartitionId, + schemaChangedTopicName, + getSubscriptionName(schemaChangedTopicName), + new SchemaChangedMessageReceiver(headers, tokenProvider, recordIndexerApi), + OqmSubscriberThroughput.MIN + ); + subscriberManager.registerSubscriber( + dataPartitionId, + reprocessTopicName, + getSubscriptionName(reprocessTopicName), + new ReprocessorMessageReceiver(headers, tokenProvider, reindexApi), + OqmSubscriberThroughput.MIN + ); + subscriberManager.registerSubscriber( + dataPartitionId, + reindexTopicName, + getSubscriptionName(reindexTopicName), + new ReindexMessageReceiver(headers, tokenProvider, recordIndexerApi), + OqmSubscriberThroughput.MAX + ); } + log.info("OqmSubscriberManager provisioning COMPLETED"); + } - private String getSubscriptionName(String topicName) { - return SUBSCRIPTION_PREFIX + topicName; - } -} + private String getSubscriptionName(String topicName) { + return SUBSCRIPTION_PREFIX + topicName; + } +} \ No newline at end of file diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/ReindexMessageReceiver.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/ReindexMessageReceiver.java new file mode 100644 index 0000000000000000000000000000000000000000..e00c9c0e767fbe018618cfa9b59c7a988ce1a1cf --- /dev/null +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/ReindexMessageReceiver.java @@ -0,0 +1,56 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import java.time.LocalDateTime; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.auth.TokenProvider; +import org.opengroup.osdu.core.common.model.indexer.JobStatus; +import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.indexer.api.RecordIndexerApi; +import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; +import org.springframework.http.ResponseEntity; + +@Slf4j +public class ReindexMessageReceiver extends IndexerOqmMessageReceiver { + + private final RecordIndexerApi recordIndexerApi; + + public ReindexMessageReceiver(ThreadDpsHeaders dpsHeaders, TokenProvider tokenProvider, RecordIndexerApi recordIndexerApi) { + super(dpsHeaders, tokenProvider); + this.recordIndexerApi = recordIndexerApi; + } + + @Override + protected void sendMessage(OqmMessage oqmMessage) throws Exception { + RecordChangedMessages indexWorkerRequestBody = getIndexWorkerRequestBody(oqmMessage); + log.debug("Reindex job message body: {}", indexWorkerRequestBody); + ResponseEntity<JobStatus> jobStatusResponse = recordIndexerApi.indexWorker(indexWorkerRequestBody); + log.debug("Job status: {}", jobStatusResponse); + } + + private RecordChangedMessages getIndexWorkerRequestBody(OqmMessage request) { + RecordChangedMessages recordChangedMessages = new RecordChangedMessages(); + recordChangedMessages.setMessageId(dpsHeaders.getCorrelationId()); + recordChangedMessages.setData(request.getData()); + recordChangedMessages.setAttributes(request.getAttributes()); + recordChangedMessages.setPublishTime(LocalDateTime.now().toString()); + return recordChangedMessages; + } +} diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RepressorMessageReceiver.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/ReprocessorMessageReceiver.java similarity index 86% rename from provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RepressorMessageReceiver.java rename to provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/ReprocessorMessageReceiver.java index ccfe7a214004595ffdc9b29026a095d65db33015..c5d71abf72d70f3765dda5c2999b00b0c7870969 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RepressorMessageReceiver.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/ReprocessorMessageReceiver.java @@ -1,6 +1,6 @@ /* - * Copyright 2020-2022 Google LLC - * Copyright 2020-2022 EPAM Systems, Inc + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,13 +27,12 @@ import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; import org.springframework.http.ResponseEntity; @Slf4j -public class RepressorMessageReceiver extends IndexerOqmMessageReceiver { +public class ReprocessorMessageReceiver extends IndexerOqmMessageReceiver { private final Gson gson = new Gson(); private final ReindexApi reindexApi; - public RepressorMessageReceiver(ThreadDpsHeaders dpsHeaders, TokenProvider tokenProvider, - ReindexApi reindexApi) { + public ReprocessorMessageReceiver(ThreadDpsHeaders dpsHeaders, TokenProvider tokenProvider, ReindexApi reindexApi) { super(dpsHeaders, tokenProvider); this.reindexApi = reindexApi; } diff --git a/provider/indexer-gc/src/main/resources/application.properties b/provider/indexer-gc/src/main/resources/application.properties index d26678140c5b47019f09362a24de19abdbb01884..7259834e06820469324fabe055f78d305118979f 100644 --- a/provider/indexer-gc/src/main/resources/application.properties +++ b/provider/indexer-gc/src/main/resources/application.properties @@ -20,13 +20,15 @@ kinds-redis-database=1 cron-index-cleanup-threshold-days=3 cron-empty-index-cleanup-threshold-days=7 -#indexer service config +# Indexer service config +propertyResolver.strategy=partition DEFAULT_DATA_COUNTRY=US gae-service=indexer security.https.certificate.trust=false storage-records-by-kind-batch-size=1000 storage-records-batch-size=20 +# External services config REDIS_SEARCH_PORT=6379 REDIS_SEARCH_HOST=redis-cache-search @@ -51,9 +53,9 @@ SCHEMA_PATH=/api/schema-service/v1/schema SCHEMA_HOST=${SCHEMA_BASE_HOST}${SCHEMA_PATH} SEARCH_HOST=${SEARCH_BASE_HOST}/api/search/v2 +# OQM config records-changed-topic-name=records-changed schema-changed-topic-name=schema-changed -reprocess-topic-name=reprocess status-changed-topic-name=indexing-progress - -propertyResolver.strategy=partition \ No newline at end of file +reprocess-topic-name=reprocess +reindex-topic-name=reindex \ No newline at end of file diff --git a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RepressorMessageReceiverTest.java b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/ReprocessorMessageReceiverTest.java similarity index 92% rename from provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RepressorMessageReceiverTest.java rename to provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/ReprocessorMessageReceiverTest.java index be752140196c8601cd17d2271dc1b2baef4411ef..ebddfb3c23686379ca347e2699a7c00063df2d09 100644 --- a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RepressorMessageReceiverTest.java +++ b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/ReprocessorMessageReceiverTest.java @@ -35,7 +35,7 @@ import org.opengroup.osdu.indexer.api.ReindexApi; import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; @RunWith(Theories.class) -public class RepressorMessageReceiverTest { +public class ReprocessorMessageReceiverTest { protected ThreadDpsHeaders dpsHeaders = Mockito.mock(ThreadDpsHeaders.class); @@ -45,11 +45,11 @@ public class RepressorMessageReceiverTest { private ReindexApi reindexApi = Mockito.mock(ReindexApi.class); - private RepressorMessageReceiver receiver; + private ReprocessorMessageReceiver receiver; @Before public void setUp() { - receiver = new RepressorMessageReceiver(dpsHeaders, tokenProvider, reindexApi); + receiver = new ReprocessorMessageReceiver(dpsHeaders, tokenProvider, reindexApi); } @DataPoints("VALID_EVENTS")