Skip to content
Snippets Groups Projects
Commit ff2ab9ae authored by Zhibin Mai's avatar Zhibin Mai
Browse files
parents 7ce05779 735d157c
No related branches found
No related tags found
1 merge request!572Implement Redis cache in Azure for two kinds of object caches that are...
Pipeline #197358 failed
Showing
with 243 additions and 166 deletions
...@@ -201,6 +201,8 @@ curl -L -X PATCH 'https://dev.osdu.club/api/partition/v1/partitions/opendes' -H ...@@ -201,6 +201,8 @@ curl -L -X PATCH 'https://dev.osdu.club/api/partition/v1/partitions/opendes' -H
#### Exchanges and queues configuration #### Exchanges and queues configuration
![Screenshot](./pics/indexer.png)
RabbitMq should have exchanges and queues with names and configs: RabbitMq should have exchanges and queues with names and configs:
| EXCHANGE NAME | EXCHANGE CONFIG | Target queue name | Target queue config | | EXCHANGE NAME | EXCHANGE CONFIG | Target queue name | Target queue config |
...@@ -212,6 +214,8 @@ RabbitMq should have exchanges and queues with names and configs: ...@@ -212,6 +214,8 @@ RabbitMq should have exchanges and queues with names and configs:
| indexer-reprocess-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | | indexer-reprocess-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| schema-changed | `Type fanout` <br/>`durable: true` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | | schema-changed | `Type fanout` <br/>`durable: true` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| indexer-schema-changed-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | | indexer-schema-changed-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| reindex | `Type fanout` <br/>`durable: true` | indexer-reindex | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| indexer-reindex-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-reindex | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
## Keycloak configuration ## Keycloak configuration
......
provider/indexer-gc/docs/anthos/pics/indexer.png

515 KiB

...@@ -42,17 +42,21 @@ Usage of spring profiles is preferred. ...@@ -42,17 +42,21 @@ Usage of spring profiles is preferred.
## Pubsub configuration ## Pubsub configuration
![Screenshot](../anthos/pics/indexer.png)
Pubsub should have topics and subscribers with names and configs: Pubsub should have topics and subscribers with names and configs:
| TOPIC NAME | Subscription name | Subscription config | | TOPIC NAME | Subscription name | Subscription config |
|-----------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| |----------------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| indexing-progress | (Consumer not implemented) | (Consumer not implemented) | | indexing-progress | (Consumer not implemented) | (Consumer not implemented) |
| records-changed | indexer-records-changed | `Maximum delivery attempts: 10`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 0 seconds`<br/>`Maximum backoff duration: 30 seconds`<br/>`Grant forwarding permissions for dead letter` | | records-changed | indexer-records-changed | `Maximum delivery attempts: 10`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 0 seconds`<br/>`Maximum backoff duration: 30 seconds`<br/>`Grant forwarding permissions for dead letter` |
| records-changed-dead-letter | (Consumer not implemented) | (Consumer not implemented) | | records-changed-dead-lettering | (Consumer not implemented) | (Consumer not implemented) |
| reprocess | indexer-reprocess | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | | reprocess | indexer-reprocess | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` |
| reprocess-dead-letter | (Consumer not implemented) | (Consumer not implemented) | | indexer-reprocess-dead-lettering | (Consumer not implemented) | (Consumer not implemented) |
| schema-changed | indexer-schema-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | | schema-changed | indexer-schema-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` |
| schema-changed-dead-letter | (Consumer not implemented) | (Consumer not implemented) | | schema-changed-dead-lettering | (Consumer not implemented) | (Consumer not implemented) |
| reindex | indexer-reindex | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` |
| reindex-dead-lettering | (Consumer not implemented) | (Consumer not implemented) |
### Additional throughput configuration for PubSub subscription consumer via Partition service ### Additional throughput configuration for PubSub subscription consumer via Partition service
......
...@@ -27,7 +27,7 @@ import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; ...@@ -27,7 +27,7 @@ import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.indexer.model.Constants; import org.opengroup.osdu.indexer.model.Constants;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties; import org.opengroup.osdu.indexer.provider.gcp.indexing.config.MessagingConfigProperties;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -52,16 +52,16 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder { ...@@ -52,16 +52,16 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
private final OqmDriver driver; private final OqmDriver driver;
private final IndexerMessagingConfigProperties properties; private final MessagingConfigProperties properties;
private OqmTopic reprocessOqmTopic; private OqmTopic reprocessOqmTopic;
private OqmTopic recordsChangedTopic; private OqmTopic reindexTopic;
@PostConstruct @PostConstruct
public void setUp() { public void setUp() {
reprocessOqmTopic = OqmTopic.builder().name(properties.getReprocessTopicName()).build(); reprocessOqmTopic = OqmTopic.builder().name(properties.getReprocessTopicName()).build();
recordsChangedTopic = OqmTopic.builder().name(properties.getRecordsChangedTopicName()).build(); reindexTopic = OqmTopic.builder().name(properties.getReindexTopicName()).build();
} }
public void createWorkerTask(String payload, DpsHeaders headers) { public void createWorkerTask(String payload, DpsHeaders headers) {
...@@ -112,15 +112,6 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder { ...@@ -112,15 +112,6 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
); );
} }
private void publishReindexTask(String payload, DpsHeaders headers) {
OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId())
.build();
Map<String, String> attributes = getAttributesFromHeaders(headers);
OqmMessage oqmMessage = OqmMessage.builder().data(payload).attributes(attributes).build();
log.info("Reprocessing task: {} ,has been published.", oqmMessage);
driver.publish(oqmMessage, reprocessOqmTopic, oqmDestination);
}
private void publishRecordsChangedTask(String payload, DpsHeaders headers) { private void publishRecordsChangedTask(String payload, DpsHeaders headers) {
OqmDestination oqmDestination = OqmDestination.builder() OqmDestination oqmDestination = OqmDestination.builder()
.partitionId(headers.getPartitionId()) .partitionId(headers.getPartitionId())
...@@ -141,8 +132,17 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder { ...@@ -141,8 +132,17 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
.attributes(attributes) .attributes(attributes)
.build(); .build();
log.info("Reindex task: {} ,has been published.", oqmMessage);
driver.publish(oqmMessage, reindexTopic, oqmDestination);
}
private void publishReindexTask(String payload, DpsHeaders headers) {
OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId())
.build();
Map<String, String> attributes = getAttributesFromHeaders(headers);
OqmMessage oqmMessage = OqmMessage.builder().data(payload).attributes(attributes).build();
log.info("Reprocessing task: {} ,has been published.", oqmMessage); log.info("Reprocessing task: {} ,has been published.", oqmMessage);
driver.publish(oqmMessage, recordsChangedTopic, oqmDestination); driver.publish(oqmMessage, reprocessOqmTopic, oqmDestination);
} }
@NotNull @NotNull
......
...@@ -31,7 +31,7 @@ import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; ...@@ -31,7 +31,7 @@ import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties; import org.opengroup.osdu.indexer.provider.gcp.indexing.config.MessagingConfigProperties;
import org.opengroup.osdu.indexer.provider.interfaces.IPublisher; import org.opengroup.osdu.indexer.provider.interfaces.IPublisher;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -41,7 +41,7 @@ import org.springframework.stereotype.Component; ...@@ -41,7 +41,7 @@ import org.springframework.stereotype.Component;
public class StatusPublisherImpl implements IPublisher { public class StatusPublisherImpl implements IPublisher {
private final OqmDriver driver; private final OqmDriver driver;
private final IndexerMessagingConfigProperties properties; private final MessagingConfigProperties properties;
private final JsonSerializer<JobStatus> statusJsonSerializer; private final JsonSerializer<JobStatus> statusJsonSerializer;
private OqmTopic oqmTopic; private OqmTopic oqmTopic;
private Gson gson; private Gson gson;
......
/* /*
* Copyright 2020-2022 Google LLC * Copyright 2020-2023 Google LLC
* Copyright 2020-2022 EPAM Systems, Inc * Copyright 2020-2023 EPAM Systems, Inc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; package org.opengroup.osdu.indexer.provider.gcp.indexing.config;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
...@@ -26,12 +26,12 @@ import org.springframework.context.annotation.Configuration; ...@@ -26,12 +26,12 @@ import org.springframework.context.annotation.Configuration;
@Getter @Getter
@ConfigurationProperties @ConfigurationProperties
@Configuration @Configuration
public class IndexerMessagingConfigProperties { public class MessagingConfigProperties {
@Deprecated
private String defaultRelativeIndexerWorkerUrl;
private String recordsChangedTopicName; private String recordsChangedTopicName;
private String schemaChangedTopicName; private String schemaChangedTopicName;
private String defaultRelativeIndexerWorkerUrl;
private String reprocessTopicName;
private String statusChangedTopicName; private String statusChangedTopicName;
private String reprocessTopicName;
private String reindexTopicName;
} }
...@@ -17,96 +17,100 @@ ...@@ -17,96 +17,100 @@
package org.opengroup.osdu.indexer.provider.gcp.indexing.initialization; package org.opengroup.osdu.indexer.provider.gcp.indexing.initialization;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.*; import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriber;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriberThroughput;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriptionQuery;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Nullable;
@Service @Service
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class OqmSubscriberManager { public class OqmSubscriberManager {
private final OqmDriver driver; private final OqmDriver driver;
private OqmSubscription getSubscriptionForTenant(TenantInfo tenantInfo, String topicName, String subscriptionName) { public void registerSubscriber(String dataPartitionId, String topicName, String subscriptionName,
String dataPartitionId = tenantInfo.getDataPartitionId(); OqmMessageReceiver messageReceiver, OqmSubscriberThroughput throughput) {
log.info("OQM: provisioning tenant {}:", dataPartitionId); OqmSubscription subscriptionForTenant = getSubscriptionForTenant(dataPartitionId, topicName, subscriptionName);
log.info("OQM: check for topic {} existence:", topicName); log.info("OQM: registering Subscriber for subscription {}", subscriptionName);
OqmTopic topic = driver.getTopic(topicName, getDestination(tenantInfo)).orElse(null);
if (topic == null) { OqmDestination destination = getDestination(dataPartitionId);
log.error("OQM: check for topic: {}, tenant: {} existence: ABSENT.", topicName, OqmSubscriber subscriber = OqmSubscriber.builder()
dataPartitionId); .subscription(subscriptionForTenant)
throw new AppException( .messageReceiver(messageReceiver)
HttpStatus.INTERNAL_SERVER_ERROR.value(), .throughput(throughput)
"Required topic not exists.", .build();
String.format(
"Required topic not exists. Create topic: %s for tenant: %s and restart service.",
topicName, dataPartitionId
)
);
}
log.info("OQM: check for topic {} existence: PRESENT", topicName); driver.subscribe(subscriber, destination);
OqmSubscription subscription = getSubscription(tenantInfo, topic, subscriptionName); log.info("OQM: provisioning subscription {}: Subscriber REGISTERED.", subscriptionName);
}
if (subscription == null) { private OqmSubscription getSubscriptionForTenant(String dataPartitionId, String topicName, String subscriptionName) {
log.error( log.info("OQM: provisioning tenant {}:", dataPartitionId);
"OQM: check for subscription {}, tenant: {} existence: ABSENT.", log.info("OQM: check for topic {} existence:", topicName);
subscriptionName, OqmTopic topic = driver.getTopic(topicName, getDestination(dataPartitionId)).orElse(null);
dataPartitionId
);
throw new AppException(
HttpStatus.INTERNAL_SERVER_ERROR.value(),
"Required subscription not exists.",
String.format(
"Required subscription not exists. Create subscription: %s for tenant: %s and restart service.",
subscriptionName,
dataPartitionId
)
);
}
log.info("OQM: provisioning tenant {}: COMPLETED.", dataPartitionId);
return subscription;
}
@Nullable if (topic == null) {
private OqmSubscription getSubscription(TenantInfo tenantInfo, OqmTopic topic, String subscriptionName) { log.error("OQM: check for topic: {}, tenant: {} existence: ABSENT.", topicName, dataPartitionId);
log.info("OQM: check for subscription {} existence:", subscriptionName); throw new AppException(
OqmSubscriptionQuery query = OqmSubscriptionQuery.builder() HttpStatus.INTERNAL_SERVER_ERROR.value(),
.namePrefix(subscriptionName) "Required topic not exists.",
.subscriberable(true) String.format(
.build(); "Required topic not exists. Create topic: %s for tenant: %s and restart service.",
return driver topicName, dataPartitionId
.listSubscriptions(topic, query, getDestination(tenantInfo)).stream() )
.findAny() );
.orElse(null);
} }
public void registerSubscriber(TenantInfo tenantInfo, String topicName, String subscriptionName, OqmMessageReceiver messageReceiver, OqmSubscriberThroughput throughput) { log.info("OQM: check for topic {} existence: PRESENT", topicName);
OqmSubscription subscriptionForTenant = getSubscriptionForTenant(tenantInfo, topicName, subscriptionName); OqmSubscription subscription = getSubscription(dataPartitionId, topic, subscriptionName);
log.info("OQM: registering Subscriber for subscription {}", subscriptionName);
OqmDestination destination = getDestination(tenantInfo);
OqmSubscriber subscriber = OqmSubscriber.builder() if (subscription == null) {
.subscription(subscriptionForTenant) log.error(
.messageReceiver(messageReceiver) "OQM: check for subscription {}, tenant: {} existence: ABSENT.",
.throughput(throughput) subscriptionName,
.build(); dataPartitionId
driver.subscribe(subscriber, destination); );
log.info("OQM: provisioning subscription {}: Subscriber REGISTERED.", subscriptionName); throw new AppException(
HttpStatus.INTERNAL_SERVER_ERROR.value(),
"Required subscription not exists.",
String.format(
"Required subscription not exists. Create subscription: %s for tenant: %s and restart service.",
subscriptionName,
dataPartitionId
)
);
} }
log.info("OQM: provisioning tenant {}: COMPLETED.", dataPartitionId);
return subscription;
}
private OqmDestination getDestination(TenantInfo tenantInfo) { @Nullable
return OqmDestination.builder() private OqmSubscription getSubscription(String dataPartitionId, OqmTopic topic, String subscriptionName) {
.partitionId(tenantInfo.getDataPartitionId()) log.info("OQM: check for subscription {} existence:", subscriptionName);
.build(); OqmSubscriptionQuery query = OqmSubscriptionQuery.builder()
} .namePrefix(subscriptionName)
.subscriberable(true)
.build();
return driver
.listSubscriptions(topic, query, getDestination(dataPartitionId)).stream()
.findAny()
.orElse(null);
}
private OqmDestination getDestination(String dataPartitionId) {
return OqmDestination.builder()
.partitionId(dataPartitionId)
.build();
}
} }
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.opengroup.osdu.indexer.provider.gcp.indexing.initialization; package org.opengroup.osdu.indexer.provider.gcp.indexing.initialization;
import java.util.Collection;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -27,9 +26,10 @@ import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; ...@@ -27,9 +26,10 @@ import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriberThroughput; import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriberThroughput;
import org.opengroup.osdu.indexer.api.RecordIndexerApi; import org.opengroup.osdu.indexer.api.RecordIndexerApi;
import org.opengroup.osdu.indexer.api.ReindexApi; import org.opengroup.osdu.indexer.api.ReindexApi;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties; import org.opengroup.osdu.indexer.provider.gcp.indexing.config.MessagingConfigProperties;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.RecordsChangedMessageReceiver; import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.RecordsChangedMessageReceiver;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.RepressorMessageReceiver; import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.ReindexMessageReceiver;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.ReprocessorMessageReceiver;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.SchemaChangedMessageReceiver; import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.SchemaChangedMessageReceiver;
import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -42,55 +42,63 @@ import org.springframework.stereotype.Component; ...@@ -42,55 +42,63 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor @RequiredArgsConstructor
public class TenantSubscriberConfiguration { public class TenantSubscriberConfiguration {
private static final String SUBSCRIPTION_PREFIX = "indexer-"; private static final String SUBSCRIPTION_PREFIX = "indexer-";
private final IndexerMessagingConfigProperties properties; private final MessagingConfigProperties properties;
private final OqmSubscriberManager subscriberManager; private final OqmSubscriberManager subscriberManager;
private final ITenantFactory tenantInfoFactory; private final ITenantFactory tenantInfoFactory;
private final TokenProvider tokenProvider; private final TokenProvider tokenProvider;
private final ThreadDpsHeaders headers; private final ThreadDpsHeaders headers;
private final RecordIndexerApi recordIndexerApi; private final RecordIndexerApi recordIndexerApi;
private final ReindexApi reindexApi; private final ReindexApi reindexApi;
/** /**
* Tenant configurations provided by the Partition service will be used to configure subscribers. If tenants use the same message broker(The same RabbitMQ * Tenant configurations provided by the Partition service will be used to configure subscribers. If tenants use the
* instance, or the same GCP project Pub/Sub) then only one subscriber in this broker will be used. * same message broker(The same RabbitMQ instance, or the same GCP project Pub/Sub) then only one subscriber in this
*/ * broker will be used.
@PostConstruct */
void postConstruct() { @PostConstruct
log.info("OqmSubscriberManager provisioning STARTED"); void postConstruct() {
String recordsChangedTopicName = properties.getRecordsChangedTopicName(); log.info("OqmSubscriberManager provisioning STARTED");
String reprocessTopicName = properties.getReprocessTopicName(); String recordsChangedTopicName = properties.getRecordsChangedTopicName();
String schemaChangedTopicName = properties.getSchemaChangedTopicName(); String schemaChangedTopicName = properties.getSchemaChangedTopicName();
String reprocessTopicName = properties.getReprocessTopicName();
String reindexTopicName = properties.getReindexTopicName();
Collection<TenantInfo> tenantInfos = tenantInfoFactory.listTenantInfo(); for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) {
String dataPartitionId = tenantInfo.getDataPartitionId();
for (TenantInfo tenantInfo : tenantInfos) { subscriberManager.registerSubscriber(
subscriberManager.registerSubscriber( dataPartitionId,
tenantInfo, recordsChangedTopicName,
recordsChangedTopicName, getSubscriptionName(recordsChangedTopicName),
getSubscriptionName(recordsChangedTopicName), new RecordsChangedMessageReceiver(headers, tokenProvider, recordIndexerApi),
new RecordsChangedMessageReceiver(headers, tokenProvider, recordIndexerApi), OqmSubscriberThroughput.MAX
OqmSubscriberThroughput.MAX );
); subscriberManager.registerSubscriber(
subscriberManager.registerSubscriber( dataPartitionId,
tenantInfo, schemaChangedTopicName,
reprocessTopicName, getSubscriptionName(schemaChangedTopicName),
getSubscriptionName(reprocessTopicName), new SchemaChangedMessageReceiver(headers, tokenProvider, recordIndexerApi),
new RepressorMessageReceiver(headers, tokenProvider, reindexApi), OqmSubscriberThroughput.MIN
OqmSubscriberThroughput.MIN );
); subscriberManager.registerSubscriber(
subscriberManager.registerSubscriber( dataPartitionId,
tenantInfo, reprocessTopicName,
schemaChangedTopicName, getSubscriptionName(reprocessTopicName),
getSubscriptionName(schemaChangedTopicName), new ReprocessorMessageReceiver(headers, tokenProvider, reindexApi),
new SchemaChangedMessageReceiver(headers, tokenProvider, recordIndexerApi), OqmSubscriberThroughput.MIN
OqmSubscriberThroughput.MIN );
); subscriberManager.registerSubscriber(
} dataPartitionId,
log.info("OqmSubscriberManager provisioning COMPLETED"); reindexTopicName,
getSubscriptionName(reindexTopicName),
new ReindexMessageReceiver(headers, tokenProvider, recordIndexerApi),
OqmSubscriberThroughput.MAX
);
} }
log.info("OqmSubscriberManager provisioning COMPLETED");
}
private String getSubscriptionName(String topicName) { private String getSubscriptionName(String topicName) {
return SUBSCRIPTION_PREFIX + topicName; return SUBSCRIPTION_PREFIX + topicName;
} }
} }
\ No newline at end of file
/*
* Copyright 2020-2023 Google LLC
* Copyright 2020-2023 EPAM Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.indexer.provider.gcp.indexing.processing;
import java.time.LocalDateTime;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.auth.TokenProvider;
import org.opengroup.osdu.core.common.model.indexer.JobStatus;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.indexer.api.RecordIndexerApi;
import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders;
import org.springframework.http.ResponseEntity;
@Slf4j
public class ReindexMessageReceiver extends IndexerOqmMessageReceiver {
private final RecordIndexerApi recordIndexerApi;
public ReindexMessageReceiver(ThreadDpsHeaders dpsHeaders, TokenProvider tokenProvider, RecordIndexerApi recordIndexerApi) {
super(dpsHeaders, tokenProvider);
this.recordIndexerApi = recordIndexerApi;
}
@Override
protected void sendMessage(OqmMessage oqmMessage) throws Exception {
RecordChangedMessages indexWorkerRequestBody = getIndexWorkerRequestBody(oqmMessage);
log.debug("Reindex job message body: {}", indexWorkerRequestBody);
ResponseEntity<JobStatus> jobStatusResponse = recordIndexerApi.indexWorker(indexWorkerRequestBody);
log.debug("Job status: {}", jobStatusResponse);
}
private RecordChangedMessages getIndexWorkerRequestBody(OqmMessage request) {
RecordChangedMessages recordChangedMessages = new RecordChangedMessages();
recordChangedMessages.setMessageId(dpsHeaders.getCorrelationId());
recordChangedMessages.setData(request.getData());
recordChangedMessages.setAttributes(request.getAttributes());
recordChangedMessages.setPublishTime(LocalDateTime.now().toString());
return recordChangedMessages;
}
}
/* /*
* Copyright 2020-2022 Google LLC * Copyright 2020-2023 Google LLC
* Copyright 2020-2022 EPAM Systems, Inc * Copyright 2020-2023 EPAM Systems, Inc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -27,13 +27,12 @@ import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; ...@@ -27,13 +27,12 @@ import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
@Slf4j @Slf4j
public class RepressorMessageReceiver extends IndexerOqmMessageReceiver { public class ReprocessorMessageReceiver extends IndexerOqmMessageReceiver {
private final Gson gson = new Gson(); private final Gson gson = new Gson();
private final ReindexApi reindexApi; private final ReindexApi reindexApi;
public RepressorMessageReceiver(ThreadDpsHeaders dpsHeaders, TokenProvider tokenProvider, public ReprocessorMessageReceiver(ThreadDpsHeaders dpsHeaders, TokenProvider tokenProvider, ReindexApi reindexApi) {
ReindexApi reindexApi) {
super(dpsHeaders, tokenProvider); super(dpsHeaders, tokenProvider);
this.reindexApi = reindexApi; this.reindexApi = reindexApi;
} }
......
...@@ -20,13 +20,15 @@ kinds-redis-database=1 ...@@ -20,13 +20,15 @@ kinds-redis-database=1
cron-index-cleanup-threshold-days=3 cron-index-cleanup-threshold-days=3
cron-empty-index-cleanup-threshold-days=7 cron-empty-index-cleanup-threshold-days=7
#indexer service config # Indexer service config
propertyResolver.strategy=partition
DEFAULT_DATA_COUNTRY=US DEFAULT_DATA_COUNTRY=US
gae-service=indexer gae-service=indexer
security.https.certificate.trust=false security.https.certificate.trust=false
storage-records-by-kind-batch-size=1000 storage-records-by-kind-batch-size=1000
storage-records-batch-size=20 storage-records-batch-size=20
# External services config
REDIS_SEARCH_PORT=6379 REDIS_SEARCH_PORT=6379
REDIS_SEARCH_HOST=redis-cache-search REDIS_SEARCH_HOST=redis-cache-search
...@@ -51,9 +53,9 @@ SCHEMA_PATH=/api/schema-service/v1/schema ...@@ -51,9 +53,9 @@ SCHEMA_PATH=/api/schema-service/v1/schema
SCHEMA_HOST=${SCHEMA_BASE_HOST}${SCHEMA_PATH} SCHEMA_HOST=${SCHEMA_BASE_HOST}${SCHEMA_PATH}
SEARCH_HOST=${SEARCH_BASE_HOST}/api/search/v2 SEARCH_HOST=${SEARCH_BASE_HOST}/api/search/v2
# OQM config
records-changed-topic-name=records-changed records-changed-topic-name=records-changed
schema-changed-topic-name=schema-changed schema-changed-topic-name=schema-changed
reprocess-topic-name=reprocess
status-changed-topic-name=indexing-progress status-changed-topic-name=indexing-progress
reprocess-topic-name=reprocess
propertyResolver.strategy=partition reindex-topic-name=reindex
\ No newline at end of file \ No newline at end of file
...@@ -35,7 +35,7 @@ import org.opengroup.osdu.indexer.api.ReindexApi; ...@@ -35,7 +35,7 @@ import org.opengroup.osdu.indexer.api.ReindexApi;
import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders;
@RunWith(Theories.class) @RunWith(Theories.class)
public class RepressorMessageReceiverTest { public class ReprocessorMessageReceiverTest {
protected ThreadDpsHeaders dpsHeaders = Mockito.mock(ThreadDpsHeaders.class); protected ThreadDpsHeaders dpsHeaders = Mockito.mock(ThreadDpsHeaders.class);
...@@ -45,11 +45,11 @@ public class RepressorMessageReceiverTest { ...@@ -45,11 +45,11 @@ public class RepressorMessageReceiverTest {
private ReindexApi reindexApi = Mockito.mock(ReindexApi.class); private ReindexApi reindexApi = Mockito.mock(ReindexApi.class);
private RepressorMessageReceiver receiver; private ReprocessorMessageReceiver receiver;
@Before @Before
public void setUp() { public void setUp() {
receiver = new RepressorMessageReceiver(dpsHeaders, tokenProvider, reindexApi); receiver = new ReprocessorMessageReceiver(dpsHeaders, tokenProvider, reindexApi);
} }
@DataPoints("VALID_EVENTS") @DataPoints("VALID_EVENTS")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment