Skip to content
Snippets Groups Projects
Commit 07ad6a51 authored by Riabokon Stanislav(EPAM)[GCP]'s avatar Riabokon Stanislav(EPAM)[GCP]
Browse files

Merge branch 'schemas_processing' into 'master'

Added schemas processing

See merge request !452
parents 5f60e383 b7b5d126
No related branches found
No related tags found
1 merge request!452Added schemas processing
Pipeline #158697 failed
......@@ -190,13 +190,15 @@ curl -L -X PATCH 'https://dev.osdu.club/api/partition/v1/partitions/opendes' -H
RabbitMq should have exchanges and queues with names and configs:
| EXCHANGE NAME | EXCHANGE CONFIG | Target queue name | Target queue config |
|-------------------|-------------------------------------|----------------------------|----------------------------------------------------------------------|
| indexing-progress | `Type fanout` <br/>`durable: true` | (Consumer not implemented) | (Consumer not implemented) |
| records-changed | `Type fanout` <br/>`durable: true` | indexer-records-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| reprocess | `Type fanout` <br/>`durable: true` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| schema-changed | `Type fanout` <br/>`durable: true` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| EXCHANGE NAME | EXCHANGE CONFIG | Target queue name | Target queue config |
|------------------------------|-------------------------------------|--------------------------------------|--------------------------------------------------------------------------------------------------------------------------------|
| indexing-progress | `Type fanout` <br/>`durable: true` | (Consumer not implemented) | (Consumer not implemented) |
| records-changed | `Type fanout` <br/>`durable: true` | indexer-records-changed | `x-delivery-limit: 5`<br/>`x-dead-letter-exchange: records-changed-dead-letter`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| records-changed-dead-letter | `Type fanout` <br/>`durable: true` | indexer-records-changed-dead-letter | `x-queue-type: classic`<br/>`durable: true`<br/>`x-message-ttl: 604800000` |
| reprocess | `Type fanout` <br/>`durable: true` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-dead-letter-exchange: reprocess-dead-letter`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| reprocess-dead-letter | `Type fanout` <br/>`durable: true` | indexer-reprocess-dead-letter | `x-queue-type: classic`<br/>`durable: true`<br/>`x-message-ttl: 604800000` |
| schema-changed | `Type fanout` <br/>`durable: true` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-dead-letter-exchange: schema-changed-dead-letter`<br/>`x-queue-type: quorum`<br/>`durable: true` |
| schema-changed-dead-letter | `Type fanout` <br/>`durable: true` | indexer-schema-changed-dead-letter | `x-queue-type: classic`<br/>`durable: true`<br/>`x-message-ttl: 604800000` |
## Keycloak configuration
......
......@@ -45,12 +45,15 @@ Usage of spring profiles is preferred.
Pubsub should have topics and subscribers with names and configs:
| TOPIC NAME | Subscription name | Subscription config |
|-------------------|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| indexing-progress | (Consumer not implemented) | (Consumer not implemented) |
| records-changed | indexer-records-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds` |
| reprocess | indexer-reprocess | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds` |
| schema-changed | indexer-schema-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds` |
| TOPIC NAME | Subscription name | Subscription config |
|-----------------------------|------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| indexing-progress | (Consumer not implemented) | (Consumer not implemented) |
| records-changed | indexer-records-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` |
| records-changed-dead-letter | indexer-records-changed | `Subscription message retention duration: 7 days` |
| reprocess | indexer-reprocess | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` |
| reprocess-dead-letter | indexer-reprocess-dead-letter | `Subscription message retention duration: 7 days` |
| schema-changed | indexer-schema-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` |
| schema-changed-dead-letter | indexer-schema-changed-dead-letter | `Subscription message retention duration: 7 days` |
### Properties set in Partition service:
......
......@@ -29,7 +29,6 @@ import org.springframework.context.annotation.Configuration;
public class IndexerMessagingConfigProperties {
private String recordsChangedTopicName;
//TODO schema-changed events consuming not implemented
private String schemaChangedTopicName;
private String defaultRelativeIndexerWorkerUrl;
private String reprocessTopicName;
......
......@@ -39,6 +39,8 @@ import org.opengroup.osdu.indexer.provider.gcp.indexing.thread.ThreadScopeContex
@RequiredArgsConstructor
public class IndexerOqmMessageReceiver implements OqmMessageReceiver {
private static final String SCHEMA_SERVICE_NAME = "schema";
private final ThreadDpsHeaders dpsHeaders;
private final SubscriptionConsumer consumer;
private final TokenProvider tokenProvider;
......@@ -87,9 +89,12 @@ public class IndexerOqmMessageReceiver implements OqmMessageReceiver {
private boolean sendMessage(OqmMessage oqmMessage) {
CloudTaskRequest cloudTaskRequest;
String serviceName = oqmMessage.getAttributes().get("service");
JsonElement jsonElement = JsonParser.parseString(oqmMessage.getData());
if (jsonElement.isJsonArray()) {
if (SCHEMA_SERVICE_NAME.equalsIgnoreCase(serviceName)) {
cloudTaskRequest = getCloudTaskRequestProducedBySchemaService(oqmMessage);
} else if (jsonElement.isJsonArray()) {
cloudTaskRequest = getCloudTaskRequestProducedByStorageService(oqmMessage);
} else {
cloudTaskRequest = getCloudTaskRequestProducedByIndexerService(oqmMessage);
......@@ -117,6 +122,13 @@ public class IndexerOqmMessageReceiver implements OqmMessageReceiver {
.build();
}
private CloudTaskRequest getCloudTaskRequestProducedBySchemaService(OqmMessage oqmMessage) {
return CloudTaskRequest.builder()
.url(SCHEMA_SERVICE_NAME)
.message(gson.toJson(oqmMessage))
.build();
}
@NotNull
private DpsHeaders getHeaders(OqmMessage oqmMessage) {
DpsHeaders headers = new DpsHeaders();
......
......@@ -28,6 +28,7 @@ import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.JobStatus;
import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest;
import org.opengroup.osdu.core.common.model.indexer.SchemaChangedMessages;
import org.opengroup.osdu.core.common.model.search.CloudTaskRequest;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.indexer.api.RecordIndexerApi;
......@@ -40,6 +41,8 @@ import org.springframework.stereotype.Component;
@RequiredArgsConstructor
public class RecordsChangedSubscriptionConsumer implements SubscriptionConsumer {
private static final String SCHEMA_RELATIVE_URL = "schema";
private final DpsHeaders dpsHeaders;
private final RecordIndexerApi recordIndexerApi;
private final ReindexApi reindexApi;
......@@ -60,6 +63,11 @@ public class RecordsChangedSubscriptionConsumer implements SubscriptionConsumer
log.debug("Reindex job message body: {}", reindexBody);
ResponseEntity<?> reindexResponse = reindexApi.reindex(reindexBody, false);
log.info("Reindex job status: {}", reindexResponse);
} else if (url.equals(SCHEMA_RELATIVE_URL)) {
SchemaChangedMessages schemaChangedMessage = getSchemaWorkerRequestBody(request);
log.debug("Schema changed job message body: {}", schemaChangedMessage);
ResponseEntity<?> schemaChangeResponse = recordIndexerApi.schemaWorker(schemaChangedMessage);
log.info("Schema changed job status: {}", schemaChangeResponse);
}
return true;
} catch (AppException e) {
......@@ -91,4 +99,11 @@ public class RecordsChangedSubscriptionConsumer implements SubscriptionConsumer
return recordChangedMessages;
}
private SchemaChangedMessages getSchemaWorkerRequestBody(CloudTaskRequest request) {
SchemaChangedMessages schemaChangedMessage = this.gson.fromJson(request.getMessage(), SchemaChangedMessages.class);
schemaChangedMessage.setMessageId(dpsHeaders.getCorrelationId());
schemaChangedMessage.setPublishTime(LocalDateTime.now().toString());
return schemaChangedMessage;
}
}
......@@ -52,15 +52,18 @@ public class TenantSubscriberConfiguration {
log.info("OqmSubscriberManager provisioning STARTED");
IndexerOqmMessageReceiver recordsChangedMessageReceiver = new IndexerOqmMessageReceiver(headers, consumer, tokenProvider);
IndexerOqmMessageReceiver reprocessOqmMessageReceiver = new IndexerOqmMessageReceiver(headers, consumer, tokenProvider);
IndexerOqmMessageReceiver schemaOqmMessageReceiver = new IndexerOqmMessageReceiver(headers, consumer, tokenProvider);
String recordsChangedTopicName = properties.getRecordsChangedTopicName();
String reprocessTopicName = properties.getReprocessTopicName();
String schemaChangedTopicName = properties.getSchemaChangedTopicName();
Collection<TenantInfo> tenantInfos = tenantInfoFactory.listTenantInfo();
for (TenantInfo tenantInfo : tenantInfos) {
subscriberManager.registerSubscriber(tenantInfo, recordsChangedTopicName, getSubscriptionName(recordsChangedTopicName), recordsChangedMessageReceiver);
subscriberManager.registerSubscriber(tenantInfo, reprocessTopicName, getSubscriptionName(reprocessTopicName), reprocessOqmMessageReceiver);
subscriberManager.registerSubscriber(tenantInfo, schemaChangedTopicName, getSubscriptionName(schemaChangedTopicName), schemaOqmMessageReceiver);
}
log.info("OqmSubscriberManager provisioning COMPLETED");
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment