Skip to content
Snippets Groups Projects
Commit 735d157c authored by Riabokon Stanislav(EPAM)[GCP]'s avatar Riabokon Stanislav(EPAM)[GCP]
Browse files

Merge branch 'reindex-topic' into 'master'

Added Reindex topic handler (GONRG-7281)

See merge request !578
parents 59d71531 30d431f1
No related branches found
No related tags found
1 merge request!578Added Reindex topic handler (GONRG-7281)
Pipeline #197201 failed
Showing
with 243 additions and 166 deletions
......@@ -201,6 +201,8 @@ curl -L -X PATCH 'https://dev.osdu.club/api/partition/v1/partitions/opendes' -H
#### Exchanges and queues configuration
![Screenshot](./pics/indexer.png)
RabbitMq should have exchanges and queues with names and configs:
| EXCHANGE NAME | EXCHANGE CONFIG | Target queue name | Target queue config |
......@@ -212,6 +214,8 @@ RabbitMq should have exchanges and queues with names and configs:
| indexer-reprocess-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| schema-changed | `Type fanout` <br/>`durable: true` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| indexer-schema-changed-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| reindex | `Type fanout` <br/>`durable: true` | indexer-reindex | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| indexer-reindex-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-reindex | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
## Keycloak configuration
......
provider/indexer-gc/docs/anthos/pics/indexer.png

515 KiB

......@@ -42,17 +42,21 @@ Usage of spring profiles is preferred.
## Pubsub configuration
![Screenshot](../anthos/pics/indexer.png)
Pubsub should have topics and subscribers with names and configs:
| TOPIC NAME | Subscription name | Subscription config |
|-----------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| indexing-progress | (Consumer not implemented) | (Consumer not implemented) |
| records-changed | indexer-records-changed | `Maximum delivery attempts: 10`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 0 seconds`<br/>`Maximum backoff duration: 30 seconds`<br/>`Grant forwarding permissions for dead letter` |
| records-changed-dead-letter | (Consumer not implemented) | (Consumer not implemented) |
| reprocess | indexer-reprocess | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` |
| reprocess-dead-letter | (Consumer not implemented) | (Consumer not implemented) |
| schema-changed | indexer-schema-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` |
| schema-changed-dead-letter | (Consumer not implemented) | (Consumer not implemented) |
| TOPIC NAME | Subscription name | Subscription config |
|----------------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| indexing-progress | (Consumer not implemented) | (Consumer not implemented) |
| records-changed | indexer-records-changed | `Maximum delivery attempts: 10`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 0 seconds`<br/>`Maximum backoff duration: 30 seconds`<br/>`Grant forwarding permissions for dead letter` |
| records-changed-dead-lettering | (Consumer not implemented) | (Consumer not implemented) |
| reprocess | indexer-reprocess | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` |
| indexer-reprocess-dead-lettering | (Consumer not implemented) | (Consumer not implemented) |
| schema-changed | indexer-schema-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` |
| schema-changed-dead-lettering | (Consumer not implemented) | (Consumer not implemented) |
| reindex | indexer-reindex | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` |
| reindex-dead-lettering | (Consumer not implemented) | (Consumer not implemented) |
### Additional throughput configuration for PubSub subscription consumer via Partition service
......
......@@ -27,7 +27,7 @@ import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.indexer.model.Constants;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties;
import org.opengroup.osdu.indexer.provider.gcp.indexing.config.MessagingConfigProperties;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
......@@ -52,16 +52,16 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
private final OqmDriver driver;
private final IndexerMessagingConfigProperties properties;
private final MessagingConfigProperties properties;
private OqmTopic reprocessOqmTopic;
private OqmTopic recordsChangedTopic;
private OqmTopic reindexTopic;
@PostConstruct
public void setUp() {
reprocessOqmTopic = OqmTopic.builder().name(properties.getReprocessTopicName()).build();
recordsChangedTopic = OqmTopic.builder().name(properties.getRecordsChangedTopicName()).build();
reindexTopic = OqmTopic.builder().name(properties.getReindexTopicName()).build();
}
public void createWorkerTask(String payload, DpsHeaders headers) {
......@@ -112,15 +112,6 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
);
}
private void publishReindexTask(String payload, DpsHeaders headers) {
OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId())
.build();
Map<String, String> attributes = getAttributesFromHeaders(headers);
OqmMessage oqmMessage = OqmMessage.builder().data(payload).attributes(attributes).build();
log.info("Reprocessing task: {} ,has been published.", oqmMessage);
driver.publish(oqmMessage, reprocessOqmTopic, oqmDestination);
}
private void publishRecordsChangedTask(String payload, DpsHeaders headers) {
OqmDestination oqmDestination = OqmDestination.builder()
.partitionId(headers.getPartitionId())
......@@ -141,8 +132,17 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
.attributes(attributes)
.build();
log.info("Reindex task: {} ,has been published.", oqmMessage);
driver.publish(oqmMessage, reindexTopic, oqmDestination);
}
private void publishReindexTask(String payload, DpsHeaders headers) {
OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId())
.build();
Map<String, String> attributes = getAttributesFromHeaders(headers);
OqmMessage oqmMessage = OqmMessage.builder().data(payload).attributes(attributes).build();
log.info("Reprocessing task: {} ,has been published.", oqmMessage);
driver.publish(oqmMessage, recordsChangedTopic, oqmDestination);
driver.publish(oqmMessage, reprocessOqmTopic, oqmDestination);
}
@NotNull
......
......@@ -31,7 +31,7 @@ import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties;
import org.opengroup.osdu.indexer.provider.gcp.indexing.config.MessagingConfigProperties;
import org.opengroup.osdu.indexer.provider.interfaces.IPublisher;
import org.springframework.stereotype.Component;
......@@ -41,7 +41,7 @@ import org.springframework.stereotype.Component;
public class StatusPublisherImpl implements IPublisher {
private final OqmDriver driver;
private final IndexerMessagingConfigProperties properties;
private final MessagingConfigProperties properties;
private final JsonSerializer<JobStatus> statusJsonSerializer;
private OqmTopic oqmTopic;
private Gson gson;
......
/*
* Copyright 2020-2022 Google LLC
* Copyright 2020-2022 EPAM Systems, Inc
* Copyright 2020-2023 Google LLC
* Copyright 2020-2023 EPAM Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.opengroup.osdu.indexer.provider.gcp.indexing.processing;
package org.opengroup.osdu.indexer.provider.gcp.indexing.config;
import lombok.Getter;
import lombok.Setter;
......@@ -26,12 +26,12 @@ import org.springframework.context.annotation.Configuration;
@Getter
@ConfigurationProperties
@Configuration
public class IndexerMessagingConfigProperties {
public class MessagingConfigProperties {
@Deprecated
private String defaultRelativeIndexerWorkerUrl;
private String recordsChangedTopicName;
private String schemaChangedTopicName;
private String defaultRelativeIndexerWorkerUrl;
private String reprocessTopicName;
private String statusChangedTopicName;
private String reprocessTopicName;
private String reindexTopicName;
}
......@@ -17,96 +17,100 @@
package org.opengroup.osdu.indexer.provider.gcp.indexing.initialization;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.*;
import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriber;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriberThroughput;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriptionQuery;
import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import javax.annotation.Nullable;
@Service
@Slf4j
@RequiredArgsConstructor
public class OqmSubscriberManager {
private final OqmDriver driver;
private final OqmDriver driver;
private OqmSubscription getSubscriptionForTenant(TenantInfo tenantInfo, String topicName, String subscriptionName) {
String dataPartitionId = tenantInfo.getDataPartitionId();
log.info("OQM: provisioning tenant {}:", dataPartitionId);
log.info("OQM: check for topic {} existence:", topicName);
OqmTopic topic = driver.getTopic(topicName, getDestination(tenantInfo)).orElse(null);
public void registerSubscriber(String dataPartitionId, String topicName, String subscriptionName,
OqmMessageReceiver messageReceiver, OqmSubscriberThroughput throughput) {
OqmSubscription subscriptionForTenant = getSubscriptionForTenant(dataPartitionId, topicName, subscriptionName);
log.info("OQM: registering Subscriber for subscription {}", subscriptionName);
if (topic == null) {
log.error("OQM: check for topic: {}, tenant: {} existence: ABSENT.", topicName,
dataPartitionId);
throw new AppException(
HttpStatus.INTERNAL_SERVER_ERROR.value(),
"Required topic not exists.",
String.format(
"Required topic not exists. Create topic: %s for tenant: %s and restart service.",
topicName, dataPartitionId
)
);
}
OqmDestination destination = getDestination(dataPartitionId);
OqmSubscriber subscriber = OqmSubscriber.builder()
.subscription(subscriptionForTenant)
.messageReceiver(messageReceiver)
.throughput(throughput)
.build();
log.info("OQM: check for topic {} existence: PRESENT", topicName);
OqmSubscription subscription = getSubscription(tenantInfo, topic, subscriptionName);
driver.subscribe(subscriber, destination);
log.info("OQM: provisioning subscription {}: Subscriber REGISTERED.", subscriptionName);
}
if (subscription == null) {
log.error(
"OQM: check for subscription {}, tenant: {} existence: ABSENT.",
subscriptionName,
dataPartitionId
);
throw new AppException(
HttpStatus.INTERNAL_SERVER_ERROR.value(),
"Required subscription not exists.",
String.format(
"Required subscription not exists. Create subscription: %s for tenant: %s and restart service.",
subscriptionName,
dataPartitionId
)
);
}
log.info("OQM: provisioning tenant {}: COMPLETED.", dataPartitionId);
return subscription;
}
private OqmSubscription getSubscriptionForTenant(String dataPartitionId, String topicName, String subscriptionName) {
log.info("OQM: provisioning tenant {}:", dataPartitionId);
log.info("OQM: check for topic {} existence:", topicName);
OqmTopic topic = driver.getTopic(topicName, getDestination(dataPartitionId)).orElse(null);
@Nullable
private OqmSubscription getSubscription(TenantInfo tenantInfo, OqmTopic topic, String subscriptionName) {
log.info("OQM: check for subscription {} existence:", subscriptionName);
OqmSubscriptionQuery query = OqmSubscriptionQuery.builder()
.namePrefix(subscriptionName)
.subscriberable(true)
.build();
return driver
.listSubscriptions(topic, query, getDestination(tenantInfo)).stream()
.findAny()
.orElse(null);
if (topic == null) {
log.error("OQM: check for topic: {}, tenant: {} existence: ABSENT.", topicName, dataPartitionId);
throw new AppException(
HttpStatus.INTERNAL_SERVER_ERROR.value(),
"Required topic not exists.",
String.format(
"Required topic not exists. Create topic: %s for tenant: %s and restart service.",
topicName, dataPartitionId
)
);
}
public void registerSubscriber(TenantInfo tenantInfo, String topicName, String subscriptionName, OqmMessageReceiver messageReceiver, OqmSubscriberThroughput throughput) {
OqmSubscription subscriptionForTenant = getSubscriptionForTenant(tenantInfo, topicName, subscriptionName);
log.info("OQM: registering Subscriber for subscription {}", subscriptionName);
OqmDestination destination = getDestination(tenantInfo);
log.info("OQM: check for topic {} existence: PRESENT", topicName);
OqmSubscription subscription = getSubscription(dataPartitionId, topic, subscriptionName);
OqmSubscriber subscriber = OqmSubscriber.builder()
.subscription(subscriptionForTenant)
.messageReceiver(messageReceiver)
.throughput(throughput)
.build();
driver.subscribe(subscriber, destination);
log.info("OQM: provisioning subscription {}: Subscriber REGISTERED.", subscriptionName);
if (subscription == null) {
log.error(
"OQM: check for subscription {}, tenant: {} existence: ABSENT.",
subscriptionName,
dataPartitionId
);
throw new AppException(
HttpStatus.INTERNAL_SERVER_ERROR.value(),
"Required subscription not exists.",
String.format(
"Required subscription not exists. Create subscription: %s for tenant: %s and restart service.",
subscriptionName,
dataPartitionId
)
);
}
log.info("OQM: provisioning tenant {}: COMPLETED.", dataPartitionId);
return subscription;
}
private OqmDestination getDestination(TenantInfo tenantInfo) {
return OqmDestination.builder()
.partitionId(tenantInfo.getDataPartitionId())
.build();
}
@Nullable
private OqmSubscription getSubscription(String dataPartitionId, OqmTopic topic, String subscriptionName) {
log.info("OQM: check for subscription {} existence:", subscriptionName);
OqmSubscriptionQuery query = OqmSubscriptionQuery.builder()
.namePrefix(subscriptionName)
.subscriberable(true)
.build();
return driver
.listSubscriptions(topic, query, getDestination(dataPartitionId)).stream()
.findAny()
.orElse(null);
}
private OqmDestination getDestination(String dataPartitionId) {
return OqmDestination.builder()
.partitionId(dataPartitionId)
.build();
}
}
......@@ -17,7 +17,6 @@
package org.opengroup.osdu.indexer.provider.gcp.indexing.initialization;
import java.util.Collection;
import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
......@@ -27,9 +26,10 @@ import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriberThroughput;
import org.opengroup.osdu.indexer.api.RecordIndexerApi;
import org.opengroup.osdu.indexer.api.ReindexApi;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties;
import org.opengroup.osdu.indexer.provider.gcp.indexing.config.MessagingConfigProperties;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.RecordsChangedMessageReceiver;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.RepressorMessageReceiver;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.ReindexMessageReceiver;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.ReprocessorMessageReceiver;
import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.SchemaChangedMessageReceiver;
import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders;
import org.springframework.stereotype.Component;
......@@ -42,55 +42,63 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor
public class TenantSubscriberConfiguration {
private static final String SUBSCRIPTION_PREFIX = "indexer-";
private final IndexerMessagingConfigProperties properties;
private final OqmSubscriberManager subscriberManager;
private final ITenantFactory tenantInfoFactory;
private final TokenProvider tokenProvider;
private final ThreadDpsHeaders headers;
private final RecordIndexerApi recordIndexerApi;
private final ReindexApi reindexApi;
private static final String SUBSCRIPTION_PREFIX = "indexer-";
private final MessagingConfigProperties properties;
private final OqmSubscriberManager subscriberManager;
private final ITenantFactory tenantInfoFactory;
private final TokenProvider tokenProvider;
private final ThreadDpsHeaders headers;
private final RecordIndexerApi recordIndexerApi;
private final ReindexApi reindexApi;
/**
* Tenant configurations provided by the Partition service will be used to configure subscribers. If tenants use the same message broker(The same RabbitMQ
* instance, or the same GCP project Pub/Sub) then only one subscriber in this broker will be used.
*/
@PostConstruct
void postConstruct() {
log.info("OqmSubscriberManager provisioning STARTED");
String recordsChangedTopicName = properties.getRecordsChangedTopicName();
String reprocessTopicName = properties.getReprocessTopicName();
String schemaChangedTopicName = properties.getSchemaChangedTopicName();
/**
* Tenant configurations provided by the Partition service will be used to configure subscribers. If tenants use the
* same message broker(The same RabbitMQ instance, or the same GCP project Pub/Sub) then only one subscriber in this
* broker will be used.
*/
@PostConstruct
void postConstruct() {
log.info("OqmSubscriberManager provisioning STARTED");
String recordsChangedTopicName = properties.getRecordsChangedTopicName();
String schemaChangedTopicName = properties.getSchemaChangedTopicName();
String reprocessTopicName = properties.getReprocessTopicName();
String reindexTopicName = properties.getReindexTopicName();
Collection<TenantInfo> tenantInfos = tenantInfoFactory.listTenantInfo();
for (TenantInfo tenantInfo : tenantInfos) {
subscriberManager.registerSubscriber(
tenantInfo,
recordsChangedTopicName,
getSubscriptionName(recordsChangedTopicName),
new RecordsChangedMessageReceiver(headers, tokenProvider, recordIndexerApi),
OqmSubscriberThroughput.MAX
);
subscriberManager.registerSubscriber(
tenantInfo,
reprocessTopicName,
getSubscriptionName(reprocessTopicName),
new RepressorMessageReceiver(headers, tokenProvider, reindexApi),
OqmSubscriberThroughput.MIN
);
subscriberManager.registerSubscriber(
tenantInfo,
schemaChangedTopicName,
getSubscriptionName(schemaChangedTopicName),
new SchemaChangedMessageReceiver(headers, tokenProvider, recordIndexerApi),
OqmSubscriberThroughput.MIN
);
}
log.info("OqmSubscriberManager provisioning COMPLETED");
for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) {
String dataPartitionId = tenantInfo.getDataPartitionId();
subscriberManager.registerSubscriber(
dataPartitionId,
recordsChangedTopicName,
getSubscriptionName(recordsChangedTopicName),
new RecordsChangedMessageReceiver(headers, tokenProvider, recordIndexerApi),
OqmSubscriberThroughput.MAX
);
subscriberManager.registerSubscriber(
dataPartitionId,
schemaChangedTopicName,
getSubscriptionName(schemaChangedTopicName),
new SchemaChangedMessageReceiver(headers, tokenProvider, recordIndexerApi),
OqmSubscriberThroughput.MIN
);
subscriberManager.registerSubscriber(
dataPartitionId,
reprocessTopicName,
getSubscriptionName(reprocessTopicName),
new ReprocessorMessageReceiver(headers, tokenProvider, reindexApi),
OqmSubscriberThroughput.MIN
);
subscriberManager.registerSubscriber(
dataPartitionId,
reindexTopicName,
getSubscriptionName(reindexTopicName),
new ReindexMessageReceiver(headers, tokenProvider, recordIndexerApi),
OqmSubscriberThroughput.MAX
);
}
log.info("OqmSubscriberManager provisioning COMPLETED");
}
private String getSubscriptionName(String topicName) {
return SUBSCRIPTION_PREFIX + topicName;
}
}
private String getSubscriptionName(String topicName) {
return SUBSCRIPTION_PREFIX + topicName;
}
}
\ No newline at end of file
/*
* Copyright 2020-2023 Google LLC
* Copyright 2020-2023 EPAM Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.indexer.provider.gcp.indexing.processing;
import java.time.LocalDateTime;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.auth.TokenProvider;
import org.opengroup.osdu.core.common.model.indexer.JobStatus;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
import org.opengroup.osdu.indexer.api.RecordIndexerApi;
import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders;
import org.springframework.http.ResponseEntity;
@Slf4j
public class ReindexMessageReceiver extends IndexerOqmMessageReceiver {
private final RecordIndexerApi recordIndexerApi;
public ReindexMessageReceiver(ThreadDpsHeaders dpsHeaders, TokenProvider tokenProvider, RecordIndexerApi recordIndexerApi) {
super(dpsHeaders, tokenProvider);
this.recordIndexerApi = recordIndexerApi;
}
@Override
protected void sendMessage(OqmMessage oqmMessage) throws Exception {
RecordChangedMessages indexWorkerRequestBody = getIndexWorkerRequestBody(oqmMessage);
log.debug("Reindex job message body: {}", indexWorkerRequestBody);
ResponseEntity<JobStatus> jobStatusResponse = recordIndexerApi.indexWorker(indexWorkerRequestBody);
log.debug("Job status: {}", jobStatusResponse);
}
private RecordChangedMessages getIndexWorkerRequestBody(OqmMessage request) {
RecordChangedMessages recordChangedMessages = new RecordChangedMessages();
recordChangedMessages.setMessageId(dpsHeaders.getCorrelationId());
recordChangedMessages.setData(request.getData());
recordChangedMessages.setAttributes(request.getAttributes());
recordChangedMessages.setPublishTime(LocalDateTime.now().toString());
return recordChangedMessages;
}
}
/*
* Copyright 2020-2022 Google LLC
* Copyright 2020-2022 EPAM Systems, Inc
* Copyright 2020-2023 Google LLC
* Copyright 2020-2023 EPAM Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -27,13 +27,12 @@ import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders;
import org.springframework.http.ResponseEntity;
@Slf4j
public class RepressorMessageReceiver extends IndexerOqmMessageReceiver {
public class ReprocessorMessageReceiver extends IndexerOqmMessageReceiver {
private final Gson gson = new Gson();
private final ReindexApi reindexApi;
public RepressorMessageReceiver(ThreadDpsHeaders dpsHeaders, TokenProvider tokenProvider,
ReindexApi reindexApi) {
public ReprocessorMessageReceiver(ThreadDpsHeaders dpsHeaders, TokenProvider tokenProvider, ReindexApi reindexApi) {
super(dpsHeaders, tokenProvider);
this.reindexApi = reindexApi;
}
......
......@@ -20,13 +20,15 @@ kinds-redis-database=1
cron-index-cleanup-threshold-days=3
cron-empty-index-cleanup-threshold-days=7
#indexer service config
# Indexer service config
propertyResolver.strategy=partition
DEFAULT_DATA_COUNTRY=US
gae-service=indexer
security.https.certificate.trust=false
storage-records-by-kind-batch-size=1000
storage-records-batch-size=20
# External services config
REDIS_SEARCH_PORT=6379
REDIS_SEARCH_HOST=redis-cache-search
......@@ -51,9 +53,9 @@ SCHEMA_PATH=/api/schema-service/v1/schema
SCHEMA_HOST=${SCHEMA_BASE_HOST}${SCHEMA_PATH}
SEARCH_HOST=${SEARCH_BASE_HOST}/api/search/v2
# OQM config
records-changed-topic-name=records-changed
schema-changed-topic-name=schema-changed
reprocess-topic-name=reprocess
status-changed-topic-name=indexing-progress
propertyResolver.strategy=partition
\ No newline at end of file
reprocess-topic-name=reprocess
reindex-topic-name=reindex
\ No newline at end of file
......@@ -35,7 +35,7 @@ import org.opengroup.osdu.indexer.api.ReindexApi;
import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders;
@RunWith(Theories.class)
public class RepressorMessageReceiverTest {
public class ReprocessorMessageReceiverTest {
protected ThreadDpsHeaders dpsHeaders = Mockito.mock(ThreadDpsHeaders.class);
......@@ -45,11 +45,11 @@ public class RepressorMessageReceiverTest {
private ReindexApi reindexApi = Mockito.mock(ReindexApi.class);
private RepressorMessageReceiver receiver;
private ReprocessorMessageReceiver receiver;
@Before
public void setUp() {
receiver = new RepressorMessageReceiver(dpsHeaders, tokenProvider, reindexApi);
receiver = new ReprocessorMessageReceiver(dpsHeaders, tokenProvider, reindexApi);
}
@DataPoints("VALID_EVENTS")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment