diff --git a/NOTICE b/NOTICE index c560b8a4b549d7d67fa7c03fb5aaee1de314a1d4..41aa0c80abd97c03d386557f96e77e9e731743fa 100644 --- a/NOTICE +++ b/NOTICE @@ -508,6 +508,7 @@ The following software have components provided under the terms of this license: - Mojo's Maven plugin for Cobertura (from http://mojo.codehaus.org/cobertura-maven-plugin/) - MongoDB Driver (from https://www.mongodb.com/) - MongoDB Java Driver (from http://mongodb.org/, http://www.mongodb.org, https://www.mongodb.com/) +- NanoHttpd-Core (from https://repo1.maven.org/maven2/org/nanohttpd/nanohttpd) - Netty Reactive Streams Implementation (from https://repo1.maven.org/maven2/com/typesafe/netty/netty-reactive-streams) - Netty/Buffer (from https://repo1.maven.org/maven2/io/netty/netty-buffer) - Netty/Codec (from https://repo1.maven.org/maven2/io/netty/netty-codec) @@ -773,6 +774,7 @@ The following software have components provided under the terms of this license: - Microsoft Application Insights Java SDK Web Module (from https://github.com/Microsoft/ApplicationInsights-Java) - Microsoft Application Insights Log4j 2 Appender (from https://github.com/Microsoft/ApplicationInsights-Java) - Mockito (from http://mockito.org, http://www.mockito.org, https://github.com/mockito/mockito) +- NanoHttpd-Core (from https://repo1.maven.org/maven2/org/nanohttpd/nanohttpd) - Netty/Codec/HTTP (from https://repo1.maven.org/maven2/io/netty/netty-codec-http) - Plexus Common Utilities (from http://plexus.codehaus.org/plexus-utils, https://repo1.maven.org/maven2/org/codehaus/plexus/plexus-utils) - PostgreSQL JDBC Driver @@ -1093,6 +1095,7 @@ LGPL-2.1-only The following software have components provided under the terms of this license: - Cobertura (from http://cobertura.sourceforge.net) +- Java Native Access (from https://github.com/java-native-access/jna, https://github.com/twall/jna) - Java Native Access Platform (from https://github.com/java-native-access/jna) - Logback Classic Module (from http://logback.qos.ch, https://repo1.maven.org/maven2/ch/qos/logback/logback-classic) - Logback Contrib :: JSON :: Classic (from https://repo1.maven.org/maven2/ch/qos/logback/contrib/logback-json-classic) @@ -1316,6 +1319,7 @@ The following software have components provided under the terms of this license: - JBoss Logging 3 (from http://www.jboss.org) - JSON in Java (from https://github.com/douglascrockford/JSON-java) - LatencyUtils (from http://latencyutils.github.io/LatencyUtils/) +- Microsoft Azure client library for Blob Storage (from https://github.com/Azure/azure-sdk-for-java) - PostgreSQL JDBC Driver - jersey-core-common (from https://repo1.maven.org/maven2/org/glassfish/jersey/core/jersey-common) diff --git a/provider/indexer-gc/docs/gc/README.md b/provider/indexer-gc/docs/gc/README.md index bfbbd9fbc5729535119ee38f8e164d2859499b65..7c390dea4b1046123997f174c8f41165c90c53e4 100644 --- a/provider/indexer-gc/docs/gc/README.md +++ b/provider/indexer-gc/docs/gc/README.md @@ -14,21 +14,21 @@ Must have: Defined in default application property file but possible to override: -| name | value | description | sensitive? | source | -|------------------------------------|---------------------------------------------------------------------------|---------------------------------------------------------------------------|------------|------------------------------------------------------------| -| `LOG_PREFIX` | `service` | Logging prefix | no | - | -| `LOG_LEVEL` | `****` | Logging level | no | - | -| `SECURITY_HTTPS_CERTIFICATE_TRUST` | ex `false` | Elastic client connection uses TrustSelfSignedStrategy(), if it is 'true' | false | output of infrastructure deployment | -| `REDIS_SEARCH_HOST` | ex `127.0.0.1` | Redis host | no | | -| `REDIS_SEARCH_PORT` | ex `6379` | Redis host port | no | | -| `REDIS_SEARCH_PASSWORD` | ex `*****` | Redis host password | yes | | -| `REDIS_SEARCH_WITH_SSL` | ex `true` or `false` | Redis host ssl config | no | | -| `REDIS_SEARCH_EXPIRATION` | ex `30` | Redis cache expiration in seconds | no | | -| `PARTITION_HOST` | ex `https://partition.com` | Partition host | no | output of infrastructure deployment | -| `ENTITLEMENTS_HOST` | ex `https://entitlements.com` | Entitlements host | no | output of infrastructure deployment | -| `STORAGE_HOST` | ex `https://storage.com` | Storage host | no | output of infrastructure deployment | -| `INDEXER_QUEUE_HOST` | ex `http://indexer-queue/api/indexer-queue/v1/_dps/task-handlers/enqueue` | Indexer-Queue host endpoint used for reprocessing tasks | no | output of infrastructure deployment | -| `SCHEMA_BASE_HOST` | ex `https://schema.com` | Schema service host | no | output of infrastructure deployment | +| name | value | description | sensitive? | source | +|------------------------------------|---------------------------------------------------------------------------|---------------------------------------------------------------------------|------------|--------------------------------------------------------------| +| `LOG_PREFIX` | `service` | Logging prefix | no | - | +| `LOG_LEVEL` | `****` | Logging level | no | - | +| `SECURITY_HTTPS_CERTIFICATE_TRUST` | ex `false` | Elastic client connection uses TrustSelfSignedStrategy(), if it is 'true' | false | output of infrastructure deployment | +| `REDIS_SEARCH_HOST` | ex `127.0.0.1` | Redis host | no | | +| `REDIS_SEARCH_PORT` | ex `6379` | Redis host port | no | | +| `REDIS_SEARCH_PASSWORD` | ex `*****` | Redis host password | yes | | +| `REDIS_SEARCH_WITH_SSL` | ex `true` or `false` | Redis host ssl config | no | | +| `REDIS_SEARCH_EXPIRATION` | ex `30` | Redis cache expiration in seconds | no | | +| `PARTITION_HOST` | ex `https://partition.com` | Partition host | no | output of infrastructure deployment | +| `ENTITLEMENTS_HOST` | ex `https://entitlements.com` | Entitlements host | no | output of infrastructure deployment | +| `STORAGE_HOST` | ex `https://storage.com` | Storage host | no | output of infrastructure deployment | +| `INDEXER_QUEUE_HOST` | ex `http://indexer-queue/api/indexer-queue/v1/_dps/task-handlers/enqueue` | Indexer-Queue host endpoint used for reprocessing tasks | no | output of infrastructure deployment | +| `SCHEMA_BASE_HOST` | ex `https://schema.com` | Schema service host | no | output of infrastructure deployment | | `GOOGLE_APPLICATION_CREDENTIALS` | ex `/path/to/directory/service-key.json` | Service account credentials, you only need this if running locally | yes | <https://console.cloud.google.com/iam-admin/serviceaccounts> | These variables define service behavior, and are used to switch between `Reference` or `Google Cloud` environments, their overriding and usage in mixed mode was not tested. @@ -44,15 +44,41 @@ Usage of spring profiles is preferred. Pubsub should have topics and subscribers with names and configs: -| TOPIC NAME | Subscription name | Subscription config | -|-----------------------------|------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| indexing-progress | (Consumer not implemented) | (Consumer not implemented) | -| records-changed | indexer-records-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | -| records-changed-dead-letter | indexer-records-changed | `Subscription message retention duration: 7 days` | -| reprocess | indexer-reprocess | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | -| reprocess-dead-letter | indexer-reprocess-dead-letter | `Subscription message retention duration: 7 days` | -| schema-changed | indexer-schema-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | -| schema-changed-dead-letter | indexer-schema-changed-dead-letter | `Subscription message retention duration: 7 days` | +| TOPIC NAME | Subscription name | Subscription config | +|-----------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| indexing-progress | (Consumer not implemented) | (Consumer not implemented) | +| records-changed | indexer-records-changed | `Maximum delivery attempts: 10`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 0 seconds`<br/>`Maximum backoff duration: 30 seconds`<br/>`Grant forwarding permissions for dead letter` | +| records-changed-dead-letter | (Consumer not implemented) | (Consumer not implemented) | +| reprocess | indexer-reprocess | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | +| reprocess-dead-letter | (Consumer not implemented) | (Consumer not implemented) | +| schema-changed | indexer-schema-changed | `Maximum delivery attempts: 5`<br/>`Retry policy: Retry after exponential backoff delay`<br/>`Minimum backoff duration: 10 seconds`<br/>`Maximum backoff duration: 600 seconds`<br/>`Grant forwarding permissions for dead letter` | +| schema-changed-dead-letter | (Consumer not implemented) | (Consumer not implemented) | + +### Additional throughput configuration for PubSub subscription consumer via Partition service + +It is possible, but not necessary to adjust consumer throughput via Partition service, there are 3 levels of consumers: + +*MIN* - for mildly consumers, defaults(streams = 1, threads = 2, outstanding elements = 20) +*MID* - for consumers with the average load, defaults(streams = 2, threads = 2, outstanding elements = 40) +*MAX* - for maximum loaded consumers, defaults(streams = 2, threads = 5, outstanding elements = 100) + +https://community.opengroup.org/osdu/platform/system/lib/cloud/gcp/oqm/-/blob/master/src/main/java/org/opengroup/osdu/core/gcp/oqm/driver/pubsub/config/PsThroughputConfiguration.java + +``` + "max.sub.parallel.streams": { + "sensitive": false, + "value": 2 + }, + "max.sub.thread.per.stream": { + "sensitive": false, + "value": 5 + }, + "max.sub.max.outstanding.elements": { + "sensitive": true, + "value": 100 + } +``` + ### Properties set in Partition service diff --git a/provider/indexer-gc/pom.xml b/provider/indexer-gc/pom.xml index 6e27bb6bcadcb845ddf0009f7ec35d354d94a40d..b9e656058533af7c7ae99ca37f6af6b4ff7720b7 100644 --- a/provider/indexer-gc/pom.xml +++ b/provider/indexer-gc/pom.xml @@ -23,7 +23,7 @@ <dependency> <groupId>org.opengroup.osdu</groupId> <artifactId>core-lib-gcp</artifactId> - <version>0.19.0-rc3</version> + <version>0.19.0-rc7</version> </dependency> <dependency> <groupId>org.opengroup.osdu.indexer</groupId> diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/JobStatusJsonSerializer.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/JobStatusJsonSerializer.java new file mode 100644 index 0000000000000000000000000000000000000000..01d88736ff10b20a61547db5931aa0b6868fad26 --- /dev/null +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/JobStatusJsonSerializer.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.common.publish; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +import java.lang.reflect.Type; +import org.opengroup.osdu.core.common.model.indexer.JobStatus; +import org.springframework.stereotype.Component; + +@Component +public class JobStatusJsonSerializer implements JsonSerializer<JobStatus> { + + private final Gson gson = new Gson(); + + @Override + public JsonElement serialize(JobStatus src, Type typeOfSrc, JsonSerializationContext context) { + JsonObject jsonObject = new JsonObject(); + jsonObject.add("recordsStatus", this.gson.toJsonTree(src.getStatusesList())); + jsonObject.add("debugInfo", this.gson.toJsonTree(src.getDebugInfos())); + return jsonObject; + } +} diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java index db8bb3259bb7774a00a93dc4b856a0a52d11bf17..4caa625e53d6687f4362c6892182dae7f2676407 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java @@ -28,7 +28,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.opengroup.osdu.core.common.model.http.DpsHeaders; -import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; +import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; @@ -45,61 +45,77 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder { - private final OqmDriver driver; - - private final TenantInfo tenantInfo; - - private final IndexerMessagingConfigProperties properties; - - private OqmTopic oqmTopic; - - @PostConstruct - public void setUp() { - oqmTopic = OqmTopic.builder().name(properties.getReprocessTopicName()).build(); - } - - public void createWorkerTask(String payload, DpsHeaders headers) { - createTask(WORKER_RELATIVE_URL, payload, 0l, headers); - } - - public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) { - createTask(WORKER_RELATIVE_URL, payload, countdownMillis, headers); - } - - public void createReIndexTask(String payload, DpsHeaders headers) { - createTask(REINDEX_RELATIVE_URL, payload, 0l, headers); - } - - public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) { - createTask(REINDEX_RELATIVE_URL, payload, countdownMillis, headers); - } - - private void createTask(String url, String payload, Long countdownMillis, DpsHeaders headers) { - CloudTaskRequest cloudTaskRequest = CloudTaskRequest.builder() - .message(payload) - .url(url) - .initialDelayMillis(countdownMillis) - .build(); - - OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId()).build(); - - Map<String, String> attributes = getAttributesFromHeaders(headers); - - String json = new Gson().toJson(cloudTaskRequest); - - OqmMessage oqmMessage = OqmMessage.builder().data(json).attributes(attributes).build(); - log.info("Reprocessing task: {} ,has been published.", oqmMessage); - driver.publish(oqmMessage, oqmTopic, oqmDestination); - } - - @NotNull - private Map<String, String> getAttributesFromHeaders(DpsHeaders headers) { - Map<String, String> attributes = new HashMap<>(); - attributes.put(DpsHeaders.USER_EMAIL, headers.getUserEmail()); - attributes.put(DpsHeaders.ACCOUNT_ID, this.tenantInfo.getName()); - attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); - headers.addCorrelationIdIfMissing(); - attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); - return attributes; - } + private final Gson gson = new Gson(); + + private final OqmDriver driver; + + private final TenantInfo tenantInfo; + + private final IndexerMessagingConfigProperties properties; + + private OqmTopic reprocessOqmTopic; + + private OqmTopic recordsChangedTopic; + + @PostConstruct + public void setUp() { + reprocessOqmTopic = OqmTopic.builder().name(properties.getReprocessTopicName()).build(); + recordsChangedTopic = OqmTopic.builder().name(properties.getRecordsChangedTopicName()).build(); + } + + public void createWorkerTask(String payload, DpsHeaders headers) { + publishRecordsChangedTask(WORKER_RELATIVE_URL, payload, 0l, headers); + } + + public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) { + publishRecordsChangedTask(WORKER_RELATIVE_URL, payload, countdownMillis, headers); + } + + public void createReIndexTask(String payload, DpsHeaders headers) { + publishReindexTask(REINDEX_RELATIVE_URL, payload, 0l, headers); + } + + public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) { + publishReindexTask(REINDEX_RELATIVE_URL, payload, countdownMillis, headers); + } + + private void publishReindexTask(String url, String payload, Long countdownMillis, + DpsHeaders headers) { + OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId()) + .build(); + Map<String, String> attributes = getAttributesFromHeaders(headers); + OqmMessage oqmMessage = OqmMessage.builder().data(payload).attributes(attributes).build(); + log.info("Reprocessing task: {} ,has been published.", oqmMessage); + driver.publish(oqmMessage, reprocessOqmTopic, oqmDestination); + } + + private void publishRecordsChangedTask(String url, String payload, Long countdownMillis, + DpsHeaders headers) { + OqmDestination oqmDestination = OqmDestination.builder() + .partitionId(headers.getPartitionId()) + .build(); + + RecordChangedMessages recordChangedMessages = gson.fromJson(payload, + RecordChangedMessages.class); + + OqmMessage oqmMessage = OqmMessage.builder() + .id(headers.getCorrelationId()) + .data(recordChangedMessages.getData()) + .attributes(getAttributesFromHeaders(headers)) + .build(); + + log.info("Reprocessing task: {} ,has been published.", oqmMessage); + driver.publish(oqmMessage, recordsChangedTopic, oqmDestination); + } + + @NotNull + private Map<String, String> getAttributesFromHeaders(DpsHeaders headers) { + Map<String, String> attributes = new HashMap<>(); + attributes.put(DpsHeaders.USER_EMAIL, headers.getUserEmail()); + attributes.put(DpsHeaders.ACCOUNT_ID, this.tenantInfo.getName()); + attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); + headers.addCorrelationIdIfMissing(); + attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); + return attributes; + } } diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/StatusPublisherImpl.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/StatusPublisherImpl.java index 5e1787f341606e15d3f34fe7cef3061be6bd1be0..4551656f75da06d7f66f58004de9f0c4262c1352 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/StatusPublisherImpl.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/StatusPublisherImpl.java @@ -19,12 +19,12 @@ package org.opengroup.osdu.indexer.provider.gcp.common.publish; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; +import com.google.gson.JsonSerializer; import java.util.HashMap; import java.util.Map; import javax.annotation.PostConstruct; import lombok.RequiredArgsConstructor; -import lombok.extern.java.Log; +import lombok.extern.slf4j.Slf4j; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.indexer.JobStatus; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; @@ -35,25 +35,30 @@ import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessag import org.opengroup.osdu.indexer.provider.interfaces.IPublisher; import org.springframework.stereotype.Component; -@Log +@Slf4j @Component @RequiredArgsConstructor public class StatusPublisherImpl implements IPublisher { private final OqmDriver driver; private final IndexerMessagingConfigProperties properties; + private final JsonSerializer<JobStatus> statusJsonSerializer; private OqmTopic oqmTopic; + private Gson gson; @PostConstruct public void setUp() { - oqmTopic = OqmTopic.builder().name(properties.getStatusChangedTopicName()).build(); + this.oqmTopic = OqmTopic.builder().name(properties.getStatusChangedTopicName()).build(); + this.gson = new GsonBuilder() + .registerTypeHierarchyAdapter(JobStatus.class, statusJsonSerializer) + .create(); } - @Override + @Override public void publishStatusChangedTagsToTopic(DpsHeaders headers, JobStatus indexerBatchStatus) { OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId()).build(); - String json = generatePubSubMessage(indexerBatchStatus); + String json = this.gson.toJson(indexerBatchStatus); Map<String, String> attributes = getAttributes(headers); OqmMessage oqmMessage = OqmMessage.builder().data(json).attributes(attributes).build(); @@ -68,10 +73,4 @@ public class StatusPublisherImpl implements IPublisher { attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); return attributes; } - - private String generatePubSubMessage(JobStatus jobStatus) { - Gson gson = new GsonBuilder().create(); - JsonElement statusChangedTagsJson = gson.toJsonTree(jobStatus, JobStatus.class); - return statusChangedTagsJson.toString(); - } } diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java index a728ad5fb5aab37ab0d94a3e217a08d68b7a2172..3d19f8e6fb68bc5a22a1ffa80a6ea306da1714f0 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java @@ -17,18 +17,16 @@ package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; -import static org.opengroup.osdu.core.common.Constants.WORKER_RELATIVE_URL; - import com.google.common.base.Strings; -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; +import java.io.IOException; +import java.util.Optional; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.opengroup.osdu.core.auth.TokenProvider; +import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; -import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; +import org.opengroup.osdu.core.common.model.http.RequestStatus; import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; @@ -37,19 +35,15 @@ import org.opengroup.osdu.indexer.provider.gcp.indexing.thread.ThreadScopeContex @Slf4j @RequiredArgsConstructor -public class IndexerOqmMessageReceiver implements OqmMessageReceiver { - - private static final String SCHEMA_SERVICE_NAME = "schema"; +public abstract class IndexerOqmMessageReceiver implements OqmMessageReceiver { - private final ThreadDpsHeaders dpsHeaders; - private final SubscriptionConsumer consumer; + protected final ThreadDpsHeaders dpsHeaders; private final TokenProvider tokenProvider; - private final Gson gson = new Gson(); - @Override public void receiveMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier) { - log.info("OQM message: {} - {} - {}", oqmMessage.getId(), oqmMessage.getData(), oqmMessage.getAttributes()); + log.info("OQM message: {} - {} - {}", oqmMessage.getId(), oqmMessage.getData(), + oqmMessage.getAttributes()); boolean acked = false; try { if (!validInput(oqmMessage)) { @@ -60,9 +54,31 @@ public class IndexerOqmMessageReceiver implements OqmMessageReceiver { DpsHeaders headers = getHeaders(oqmMessage); // Filling thread context required by the core services. dpsHeaders.setThreadContext(headers.getHeaders()); - acked = sendMessage(oqmMessage); - } catch (Exception e) { - log.error("Error occurred during message receiving: ", e); + sendMessage(oqmMessage); + acked = true; + } catch (AppException appException) { + int statusCode = appException.getError().getCode(); + if (statusCode > 199 && statusCode < 300 && statusCode != RequestStatus.INVALID_RECORD) { + log.info( + "Event : {}, was not processed, and will NOT be rescheduled.", + oqmMessage, + appException + ); + acked = true; + } else { + //It is possible to get both AppException with wrapped in original Exception or the original Exception without any wrapper + Exception exception = Optional.ofNullable(appException.getOriginalException()).orElse(appException); + log.warn( + "Event : {}, was not processed, and will BE rescheduled.", + oqmMessage, + exception + ); + } + } catch (Exception exception) { + log.error( + "Error, Event : {}, was not processed, and will BE rescheduled.", + oqmMessage, + exception); } finally { if (!acked) { oqmAckReplier.nack(); @@ -87,55 +103,11 @@ public class IndexerOqmMessageReceiver implements OqmMessageReceiver { return isValid; } - private boolean sendMessage(OqmMessage oqmMessage) { - CloudTaskRequest cloudTaskRequest; - String serviceName = oqmMessage.getAttributes().get("service"); - JsonElement jsonElement = JsonParser.parseString(oqmMessage.getData()); - - if (SCHEMA_SERVICE_NAME.equalsIgnoreCase(serviceName)) { - cloudTaskRequest = getCloudTaskRequestProducedBySchemaService(oqmMessage); - } else if (jsonElement.isJsonArray()) { - cloudTaskRequest = getCloudTaskRequestProducedByStorageService(oqmMessage); - } else { - cloudTaskRequest = getCloudTaskRequestProducedByIndexerService(oqmMessage); - } - - return consumer.consume(cloudTaskRequest); - } - - /** - * @param oqmMessage produced by Indexer packs messages in org.opengroup.osdu.core.common.model.search.CloudTaskRequest - * @return CloudTaskRequest as it was packed by Indexer - */ - private CloudTaskRequest getCloudTaskRequestProducedByIndexerService(OqmMessage oqmMessage) { - return this.gson.fromJson(oqmMessage.getData(), CloudTaskRequest.class); - } - - /** - * @param oqmMessage produced by Storage packs messages in array of org.opengroup.osdu.core.common.model.storage.PubSubInfo ; - * @return CloudTaskRequest with array of PubSubInfo's packed in message property - */ - private CloudTaskRequest getCloudTaskRequestProducedByStorageService(OqmMessage oqmMessage) { - return CloudTaskRequest.builder() - .url(WORKER_RELATIVE_URL) - .message(gson.toJson(oqmMessage)) - .build(); - } - - private CloudTaskRequest getCloudTaskRequestProducedBySchemaService(OqmMessage oqmMessage) { - return CloudTaskRequest.builder() - .url(SCHEMA_SERVICE_NAME) - .message(gson.toJson(oqmMessage)) - .build(); - } + protected abstract void sendMessage(OqmMessage oqmMessage) throws Exception; @NotNull private DpsHeaders getHeaders(OqmMessage oqmMessage) { - DpsHeaders headers = new DpsHeaders(); - headers.getHeaders().put("data-partition-id", oqmMessage.getAttributes().get("data-partition-id")); - headers.getHeaders().put("correlation-id", oqmMessage.getAttributes().get("correlation-id")); - headers.getHeaders().put("account-id", oqmMessage.getAttributes().get("account-id")); - headers.getHeaders().put("user", oqmMessage.getAttributes().get("user")); + DpsHeaders headers = DpsHeaders.createFromMap(oqmMessage.getAttributes()); headers.getHeaders().put("authorization", "Bearer " + tokenProvider.getIdToken()); return headers; } diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/OqmSubscriberManager.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/OqmSubscriberManager.java index b69491029f4bc9febcbfe0b3119f7c3e3eeaf418..b7f6bfd7d5687e1f663db57b5f64d6a13bc7cd10 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/OqmSubscriberManager.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/OqmSubscriberManager.java @@ -25,6 +25,7 @@ import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriber; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriberThroughput; import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriptionQuery; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; @@ -81,7 +82,7 @@ public class OqmSubscriberManager { return driver.createAndGetSubscription(request, getDestination(tenantInfo)); } - public void registerSubscriber(TenantInfo tenantInfo, String topicName, String subscriptionName, OqmMessageReceiver messageReceiver) { + public void registerSubscriber(TenantInfo tenantInfo, String topicName, String subscriptionName, OqmMessageReceiver messageReceiver, OqmSubscriberThroughput throughput) { OqmSubscription subscriptionForTenant = getOrCreateSubscriptionForTenant(tenantInfo, topicName, subscriptionName); log.info("OQM: registering Subscriber for subscription {}", subscriptionName); OqmDestination destination = getDestination(tenantInfo); @@ -89,6 +90,7 @@ public class OqmSubscriberManager { OqmSubscriber subscriber = OqmSubscriber.builder() .subscription(subscriptionForTenant) .messageReceiver(messageReceiver) + .throughput(throughput) .build(); driver.subscribe(subscriber, destination); log.info("OQM: provisioning subscription {}: Subscriber REGISTERED.", subscriptionName); diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedMessageReceiver.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedMessageReceiver.java new file mode 100644 index 0000000000000000000000000000000000000000..9589701a4f7b2f030a8b35750a6cef822b694500 --- /dev/null +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedMessageReceiver.java @@ -0,0 +1,58 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import java.time.LocalDateTime; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.auth.TokenProvider; +import org.opengroup.osdu.core.common.model.indexer.JobStatus; +import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.indexer.api.RecordIndexerApi; +import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; +import org.springframework.http.ResponseEntity; + +@Slf4j +public class RecordsChangedMessageReceiver extends IndexerOqmMessageReceiver { + + private final RecordIndexerApi recordIndexerApi; + + public RecordsChangedMessageReceiver(ThreadDpsHeaders dpsHeaders, TokenProvider tokenProvider, + RecordIndexerApi recordIndexerApi) { + super(dpsHeaders, tokenProvider); + this.recordIndexerApi = recordIndexerApi; + } + + @Override + protected void sendMessage(OqmMessage oqmMessage) throws Exception { + RecordChangedMessages indexWorkerRequestBody = getIndexWorkerRequestBody(oqmMessage); + log.debug("Job message body: {}", indexWorkerRequestBody); + ResponseEntity<JobStatus> jobStatusResponse = recordIndexerApi.indexWorker( + indexWorkerRequestBody); + log.debug("Job status: {}", jobStatusResponse); + } + + private RecordChangedMessages getIndexWorkerRequestBody(OqmMessage request) { + RecordChangedMessages recordChangedMessages = new RecordChangedMessages(); + recordChangedMessages.setMessageId(dpsHeaders.getCorrelationId()); + recordChangedMessages.setData(request.getData()); + recordChangedMessages.setAttributes(request.getAttributes()); + recordChangedMessages.setPublishTime(LocalDateTime.now().toString()); + return recordChangedMessages; + } +} diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumer.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumer.java deleted file mode 100644 index b9529df021e1222424f392a74b8464824c313537..0000000000000000000000000000000000000000 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumer.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright 2020-2022 Google LLC - * Copyright 2020-2022 EPAM Systems, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; - -import static org.opengroup.osdu.core.common.Constants.REINDEX_RELATIVE_URL; -import static org.opengroup.osdu.core.common.Constants.WORKER_RELATIVE_URL; - -import com.google.gson.Gson; -import java.time.LocalDateTime; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.opengroup.osdu.core.common.model.http.AppException; -import org.opengroup.osdu.core.common.model.http.DpsHeaders; -import org.opengroup.osdu.core.common.model.indexer.JobStatus; -import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; -import org.opengroup.osdu.core.common.model.indexer.SchemaChangedMessages; -import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; -import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; -import org.opengroup.osdu.indexer.api.RecordIndexerApi; -import org.opengroup.osdu.indexer.api.ReindexApi; -import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -@RequiredArgsConstructor -public class RecordsChangedSubscriptionConsumer implements SubscriptionConsumer { - - private static final String SCHEMA_RELATIVE_URL = "schema"; - - private final DpsHeaders dpsHeaders; - private final RecordIndexerApi recordIndexerApi; - private final ReindexApi reindexApi; - private final Gson gson = new Gson(); - - public boolean consume(CloudTaskRequest request) { - String url = request.getUrl(); - log.debug("Incoming async processing task: {} with headers: {}", request, dpsHeaders.getHeaders()); - - try { - if (url.equals(WORKER_RELATIVE_URL)) { - RecordChangedMessages indexWorkerRequestBody = getIndexWorkerRequestBody(request, dpsHeaders); - log.debug("Job message body: {}", indexWorkerRequestBody); - ResponseEntity<JobStatus> jobStatusResponse = recordIndexerApi.indexWorker(indexWorkerRequestBody); - log.info("Job status: {}", jobStatusResponse); - } else if (url.equals(REINDEX_RELATIVE_URL)) { - RecordReindexRequest reindexBody = getReindexBody(request); - log.debug("Reindex job message body: {}", reindexBody); - ResponseEntity<?> reindexResponse = reindexApi.reindex(reindexBody, false); - log.info("Reindex job status: {}", reindexResponse); - } else if (url.equals(SCHEMA_RELATIVE_URL)) { - SchemaChangedMessages schemaChangedMessage = getSchemaWorkerRequestBody(request); - log.debug("Schema changed job message body: {}", schemaChangedMessage); - ResponseEntity<?> schemaChangeResponse = recordIndexerApi.schemaWorker(schemaChangedMessage); - log.info("Schema changed job status: {}", schemaChangeResponse); - } - return true; - } catch (AppException e) { - int statusCode = e.getError().getCode(); - if (statusCode > 199 && statusCode < 300) { - log.info("Event : {}, with headers: {} was not processed, with AppException: {} and will not be rescheduled", request, dpsHeaders.getHeaders(), - e); - return true; - } else { - log.warn("Event : {}, with headers: {} was not processed, with AppException: {}, stack trace: {} and will be rescheduled", request, - dpsHeaders.getHeaders(), e.getOriginalException(), e.getOriginalException().getStackTrace()); - return false; - } - } catch (Exception e) { - log.error("Error, Event : {}, with headers: {} was not processed, and will be rescheduled, reason: {}, stack trace: {}", request, - dpsHeaders.getHeaders(), e, e.getStackTrace()); - return false; - } - } - - private RecordReindexRequest getReindexBody(CloudTaskRequest request) { - return this.gson.fromJson(request.getMessage(), RecordReindexRequest.class); - } - - private RecordChangedMessages getIndexWorkerRequestBody(CloudTaskRequest request, DpsHeaders dpsHeaders) { - RecordChangedMessages recordChangedMessages = this.gson.fromJson(request.getMessage(), RecordChangedMessages.class); - recordChangedMessages.setMessageId(dpsHeaders.getCorrelationId()); - recordChangedMessages.setPublishTime(LocalDateTime.now().toString()); - return recordChangedMessages; - } - - private SchemaChangedMessages getSchemaWorkerRequestBody(CloudTaskRequest request) { - SchemaChangedMessages schemaChangedMessage = this.gson.fromJson(request.getMessage(), SchemaChangedMessages.class); - schemaChangedMessage.setMessageId(dpsHeaders.getCorrelationId()); - schemaChangedMessage.setPublishTime(LocalDateTime.now().toString()); - return schemaChangedMessage; - } - -} diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RepressorMessageReceiver.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RepressorMessageReceiver.java new file mode 100644 index 0000000000000000000000000000000000000000..ccfe7a214004595ffdc9b29026a095d65db33015 --- /dev/null +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RepressorMessageReceiver.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import com.google.gson.Gson; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.auth.TokenProvider; +import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.indexer.api.ReindexApi; +import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; +import org.springframework.http.ResponseEntity; + +@Slf4j +public class RepressorMessageReceiver extends IndexerOqmMessageReceiver { + + private final Gson gson = new Gson(); + private final ReindexApi reindexApi; + + public RepressorMessageReceiver(ThreadDpsHeaders dpsHeaders, TokenProvider tokenProvider, + ReindexApi reindexApi) { + super(dpsHeaders, tokenProvider); + this.reindexApi = reindexApi; + } + + @Override + protected void sendMessage(OqmMessage oqmMessage) throws Exception { + RecordReindexRequest reindexBody = getReindexBody(oqmMessage); + log.debug("Reindex job message body: {}", reindexBody); + ResponseEntity<?> reindexResponse = reindexApi.reindex(reindexBody, false); + log.debug("Reindex job status: {}", reindexResponse); + } + + private RecordReindexRequest getReindexBody(OqmMessage request) { + return this.gson.fromJson(request.getData(), RecordReindexRequest.class); + } +} diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/SchemaChangedMessageReceiver.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/SchemaChangedMessageReceiver.java new file mode 100644 index 0000000000000000000000000000000000000000..cde2d8954c08e47f5b0e62f86ebbb732578044de --- /dev/null +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/SchemaChangedMessageReceiver.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import com.google.gson.Gson; +import java.io.IOException; +import java.time.LocalDateTime; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.auth.TokenProvider; +import org.opengroup.osdu.core.common.model.indexer.SchemaChangedMessages; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.indexer.api.RecordIndexerApi; +import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; +import org.springframework.http.ResponseEntity; + +@Slf4j +public class SchemaChangedMessageReceiver extends IndexerOqmMessageReceiver { + + private final RecordIndexerApi recordIndexerApi; + private final Gson gson = new Gson(); + + public SchemaChangedMessageReceiver(ThreadDpsHeaders dpsHeaders, TokenProvider tokenProvider, + RecordIndexerApi recordIndexerApi) { + super(dpsHeaders, tokenProvider); + this.recordIndexerApi = recordIndexerApi; + } + + @Override + protected void sendMessage(OqmMessage oqmMessage) throws Exception { + SchemaChangedMessages schemaChangedMessage = getSchemaWorkerRequestBody(oqmMessage); + log.debug("Schema changed job message body: {}", schemaChangedMessage); + ResponseEntity<?> schemaChangeResponse = recordIndexerApi.schemaWorker(schemaChangedMessage); + log.debug("Schema changed job status: {}", schemaChangeResponse); + } + + private SchemaChangedMessages getSchemaWorkerRequestBody(OqmMessage oqmMessage) { + SchemaChangedMessages schemaChangedMessages = new SchemaChangedMessages(); + schemaChangedMessages.setMessageId(dpsHeaders.getCorrelationId()); + schemaChangedMessages.setData(oqmMessage.getData()); + schemaChangedMessages.setAttributes(oqmMessage.getAttributes()); + schemaChangedMessages.setPublishTime(LocalDateTime.now().toString()); + return schemaChangedMessages; + } +} diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/SubscriptionConsumer.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/SubscriptionConsumer.java deleted file mode 100644 index 00dd163c7890edf9c68005f3afd4000cc7bc3c91..0000000000000000000000000000000000000000 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/SubscriptionConsumer.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2020-2022 Google LLC - * Copyright 2020-2022 EPAM Systems, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; - -import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; - -public interface SubscriptionConsumer { - - boolean consume(CloudTaskRequest request); -} diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java index 51c5597e81791f4f753a590c68e15b9601c0174c..9d3a7789d585a6575073719655c6bb2251e343a2 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java @@ -24,6 +24,9 @@ import lombok.extern.slf4j.Slf4j; import org.opengroup.osdu.core.auth.TokenProvider; import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriberThroughput; +import org.opengroup.osdu.indexer.api.RecordIndexerApi; +import org.opengroup.osdu.indexer.api.ReindexApi; import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; import org.springframework.stereotype.Component; @@ -40,8 +43,9 @@ public class TenantSubscriberConfiguration { private final OqmSubscriberManager subscriberManager; private final ITenantFactory tenantInfoFactory; private final TokenProvider tokenProvider; - private final SubscriptionConsumer consumer; - private final ThreadDpsHeaders headers; + private final ThreadDpsHeaders headers; + private final RecordIndexerApi recordIndexerApi; + private final ReindexApi reindexApi; /** * Tenant configurations provided by the Partition service will be used to configure subscribers. If tenants use the same message broker(The same RabbitMQ @@ -50,10 +54,6 @@ public class TenantSubscriberConfiguration { @PostConstruct void postConstruct() { log.info("OqmSubscriberManager provisioning STARTED"); - IndexerOqmMessageReceiver recordsChangedMessageReceiver = new IndexerOqmMessageReceiver(headers, consumer, tokenProvider); - IndexerOqmMessageReceiver reprocessOqmMessageReceiver = new IndexerOqmMessageReceiver(headers, consumer, tokenProvider); - IndexerOqmMessageReceiver schemaOqmMessageReceiver = new IndexerOqmMessageReceiver(headers, consumer, tokenProvider); - String recordsChangedTopicName = properties.getRecordsChangedTopicName(); String reprocessTopicName = properties.getReprocessTopicName(); String schemaChangedTopicName = properties.getSchemaChangedTopicName(); @@ -61,9 +61,27 @@ public class TenantSubscriberConfiguration { Collection<TenantInfo> tenantInfos = tenantInfoFactory.listTenantInfo(); for (TenantInfo tenantInfo : tenantInfos) { - subscriberManager.registerSubscriber(tenantInfo, recordsChangedTopicName, getSubscriptionName(recordsChangedTopicName), recordsChangedMessageReceiver); - subscriberManager.registerSubscriber(tenantInfo, reprocessTopicName, getSubscriptionName(reprocessTopicName), reprocessOqmMessageReceiver); - subscriberManager.registerSubscriber(tenantInfo, schemaChangedTopicName, getSubscriptionName(schemaChangedTopicName), schemaOqmMessageReceiver); + subscriberManager.registerSubscriber( + tenantInfo, + recordsChangedTopicName, + getSubscriptionName(recordsChangedTopicName), + new RecordsChangedMessageReceiver(headers, tokenProvider, recordIndexerApi), + OqmSubscriberThroughput.MAX + ); + subscriberManager.registerSubscriber( + tenantInfo, + reprocessTopicName, + getSubscriptionName(reprocessTopicName), + new RepressorMessageReceiver(headers, tokenProvider, reindexApi), + OqmSubscriberThroughput.MIN + ); + subscriberManager.registerSubscriber( + tenantInfo, + schemaChangedTopicName, + getSubscriptionName(schemaChangedTopicName), + new SchemaChangedMessageReceiver(headers, tokenProvider, recordIndexerApi), + OqmSubscriberThroughput.MIN + ); } log.info("OqmSubscriberManager provisioning COMPLETED"); } diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScope.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScope.java index 139ab8386029b108c2d3c55b310483403f0ef597..c2e5fef7361b8477f2fc606b7025d9ccd21769d8 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScope.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScope.java @@ -17,12 +17,56 @@ package org.opengroup.osdu.indexer.provider.gcp.indexing.thread; -import org.springframework.context.support.SimpleThreadScope; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectFactory; +import org.springframework.beans.factory.config.Scope; -public class ThreadScope extends SimpleThreadScope { +@Slf4j +public class ThreadScope implements Scope { - @Override - public void registerDestructionCallback(String name, Runnable callback) { - ThreadScopeContextHolder.currentThreadScopeAttributes().registerRequestDestructionCallback(name, callback); + public Object get(String name, ObjectFactory<?> factory) { + log.trace("Get bean:{} with factory: {} current Thread: {}", name, factory, + Thread.currentThread().getName()); + Object result = null; + Map<String, Object> hBeans = ThreadScopeContextHolder.currentThreadScopeAttributes() + .getBeanMap(); + if (!hBeans.containsKey(name)) { + result = factory.getObject(); + log.trace( + "No bean in context with name: {} factory provisioning result is: {} current Thread: {}", + name, result, Thread.currentThread().getName()); + hBeans.put(name, result); + } else { + result = hBeans.get(name); } + + return result; + } + + public Object remove(String name) { + log.trace("Removing bean : {} current Thread: {}", name, Thread.currentThread().getName()); + Object result = null; + Map<String, Object> hBeans = ThreadScopeContextHolder.currentThreadScopeAttributes() + .getBeanMap(); + if (hBeans.containsKey(name)) { + result = hBeans.get(name); + hBeans.remove(name); + } + + return result; + } + + public void registerDestructionCallback(String name, Runnable callback) { + ThreadScopeContextHolder.currentThreadScopeAttributes().registerRequestDestructionCallback(name, callback); + } + + public Object resolveContextualObject(String key) { + return null; + } + + public String getConversationId() { + return Thread.currentThread().getName(); + } } + diff --git a/provider/indexer-gc/src/main/resources/application.properties b/provider/indexer-gc/src/main/resources/application.properties index 97b51a86fc5888af5f762c9150618314254a075a..721d5d180b4acc19d9ac61a0c525a5683980c8c4 100644 --- a/provider/indexer-gc/src/main/resources/application.properties +++ b/provider/indexer-gc/src/main/resources/application.properties @@ -4,7 +4,7 @@ server.servlet.contextPath=/api/indexer/v2 #log config LOG_PREFIX=indexer -logging.level.org.springframework.web=${LOG_LEVEL:DEBUG} +logging.level.org.springframework=${LOG_LEVEL:DEBUG} #jvm config JAVA_OPTS=-Xms3072m -Xmx3072m diff --git a/provider/indexer-gc/src/main/resources/logback.xml b/provider/indexer-gc/src/main/resources/logback.xml index 3d4fbc039ba4367897cba9a478d4ed44a2059b76..cdfa2e625e54da4cfb64929655942bfd3b739cbb 100644 --- a/provider/indexer-gc/src/main/resources/logback.xml +++ b/provider/indexer-gc/src/main/resources/logback.xml @@ -1,7 +1,7 @@ <?xml version="1.0" encoding="UTF-8"?> <configuration> <include resource="org/springframework/boot/logging/logback/defaults.xml"/> - <logger name="org.opengroup.osdu" level="DEBUG"/> + <logger name="org.opengroup.osdu" level="${LOG_LEVEL}"/> <springProfile name="local"> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> diff --git a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiverTest.java b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiverTest.java index 6a72833f40b6cc63665d51b62cc36bf62e5444c2..9ca74b0822616d47ec3fd4f285598e0e071c2e19 100644 --- a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiverTest.java +++ b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiverTest.java @@ -1,6 +1,6 @@ /* - * Copyright 2020-2022 Google LLC - * Copyright 2020-2022 EPAM Systems, Inc + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,16 +20,9 @@ package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; -import com.google.gson.Gson; -import com.google.gson.stream.JsonReader; -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; import java.util.List; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.Before; import org.junit.experimental.theories.DataPoints; import org.junit.experimental.theories.FromDataPoints; @@ -38,7 +31,6 @@ import org.junit.experimental.theories.Theory; import org.junit.runner.RunWith; import org.mockito.Mockito; import org.opengroup.osdu.core.auth.TokenProvider; -import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; @@ -46,30 +38,24 @@ import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; @RunWith(Theories.class) public class IndexerOqmMessageReceiverTest { - private final Gson gson = new Gson(); + protected ThreadDpsHeaders dpsHeaders = Mockito.mock(ThreadDpsHeaders.class); - private ThreadDpsHeaders dpsHeaders = Mockito.mock(ThreadDpsHeaders.class); + protected TokenProvider tokenProvider = Mockito.mock(TokenProvider.class); - private SubscriptionConsumer consumer = Mockito.mock(SubscriptionConsumer.class); + protected OqmAckReplier ackReplier = Mockito.mock(OqmAckReplier.class); - private TokenProvider tokenProvider = Mockito.mock(TokenProvider.class); - - private OqmAckReplier ackReplier = Mockito.mock(OqmAckReplier.class); - - private IndexerOqmMessageReceiver receiver; + protected IndexerOqmMessageReceiver receiver; @Before - public void setUp() { - receiver = new IndexerOqmMessageReceiver(dpsHeaders, consumer, tokenProvider); - } - - @DataPoints("VALID_EVENTS") - public static List<ImmutablePair> validEvents() { - return ImmutableList.of( - ImmutablePair.of("/test-events/storage-index-event.json", "/test-events/formatted-as-cloud-task-storage-event.json"), - ImmutablePair.of("/test-events/indexer-reprocess-event.json", "/test-events/formatted-as-cloud-task-indexer-reprocess-event.json"), - ImmutablePair.of("/test-events/reindex-event.json", "/test-events/formatted-as-cloud-task-reindex-event.json") - ); + public void setUp(){ + IndexerOqmMessageReceiver indexerOqmMessageReceiver = new IndexerOqmMessageReceiver( + dpsHeaders, tokenProvider) { + @Override + protected void sendMessage(OqmMessage oqmMessage) throws Exception { + //do nothing + } + }; + receiver = Mockito.spy(indexerOqmMessageReceiver); } @DataPoints("NOT_VALID_EVENTS") @@ -81,34 +67,11 @@ public class IndexerOqmMessageReceiverTest { } @Theory - public void shouldReceiveValidEvent(@FromDataPoints("VALID_EVENTS") ImmutablePair<String, String> pair) { - when(consumer.consume(any())).thenReturn(true); - OqmMessage oqmMessage = readEventFromFile(pair.getLeft()); - CloudTaskRequest cloudTaskRequest = readCloudTaskFromFile(pair.getRight()); + public void shouldNotConsumeNotValidEvent(@FromDataPoints("NOT_VALID_EVENTS") String fileName) + throws Exception { + OqmMessage oqmMessage = ReadFromFileUtil.readEventFromFile(fileName); receiver.receiveMessage(oqmMessage, ackReplier); - verify(consumer).consume(cloudTaskRequest); verify(ackReplier).ack(); - } - - @Theory - public void shouldNotConsumeNotValidEvent(@FromDataPoints("NOT_VALID_EVENTS") String fileName) { - OqmMessage oqmMessage = readEventFromFile(fileName); - receiver.receiveMessage(oqmMessage, ackReplier); - verify(ackReplier).ack(); - verify(consumer, never()).consume(any()); - } - - private OqmMessage readEventFromFile(String filename) { - InputStream resourceAsStream = this.getClass().getResourceAsStream(filename); - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resourceAsStream)); - JsonReader reader = new JsonReader(bufferedReader); - return gson.fromJson(reader, OqmMessage.class); - } - - private CloudTaskRequest readCloudTaskFromFile(String filename) { - InputStream resourceAsStream = this.getClass().getResourceAsStream(filename); - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resourceAsStream)); - JsonReader reader = new JsonReader(bufferedReader); - return gson.fromJson(reader, CloudTaskRequest.class); + verify(receiver, never()).sendMessage(any()); } } diff --git a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/ReadFromFileUtil.java b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/ReadFromFileUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..6a44900e75b9eea3e002cfe09d61c3aaf84a059d --- /dev/null +++ b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/ReadFromFileUtil.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; + +public class ReadFromFileUtil { + + private final static Gson gson = new Gson(); + + public static OqmMessage readEventFromFile(String filename) { + InputStream resourceAsStream = ReadFromFileUtil.class.getResourceAsStream(filename); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resourceAsStream)); + JsonReader reader = new JsonReader(bufferedReader); + return gson.fromJson(reader, OqmMessage.class); + } + +} diff --git a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedMessageReceiverTest.java b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedMessageReceiverTest.java new file mode 100644 index 0000000000000000000000000000000000000000..72540d02f6fce90d93bb9997740d56d993c9f416 --- /dev/null +++ b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedMessageReceiverTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.junit.Before; +import org.junit.experimental.theories.DataPoints; +import org.junit.experimental.theories.FromDataPoints; +import org.junit.experimental.theories.Theories; +import org.junit.experimental.theories.Theory; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.opengroup.osdu.core.auth.TokenProvider; +import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.indexer.api.RecordIndexerApi; +import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; + +@RunWith(Theories.class) +public class RecordsChangedMessageReceiverTest { + + protected ThreadDpsHeaders dpsHeaders = Mockito.mock(ThreadDpsHeaders.class); + + protected TokenProvider tokenProvider = Mockito.mock(TokenProvider.class); + + protected OqmAckReplier ackReplier = Mockito.mock(OqmAckReplier.class); + + private RecordIndexerApi recordIndexerApi = Mockito.mock(RecordIndexerApi.class); + + private RecordsChangedMessageReceiver receiver; + + @Before + public void setUp() { + receiver = new RecordsChangedMessageReceiver(dpsHeaders, tokenProvider, recordIndexerApi); + } + + @DataPoints("VALID_EVENTS") + public static List<String> validEvents() { + return ImmutableList.of( + "/test-events/storage-index-event.json" + ); + } + + @Theory + public void shouldReceiveValidEvent( + @FromDataPoints("VALID_EVENTS") String fileName) throws Exception { + OqmMessage oqmMessage = ReadFromFileUtil.readEventFromFile(fileName); + receiver.receiveMessage(oqmMessage, ackReplier); + verify(ackReplier).ack(); + } +} diff --git a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumerTest.java b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumerTest.java deleted file mode 100644 index 6ade2840f51dd77d0cc5bfe2028e9db6e1714932..0000000000000000000000000000000000000000 --- a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumerTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2020-2022 Google LLC - * Copyright 2020-2022 EPAM Systems, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.verify; - -import com.google.common.collect.ImmutableList; -import com.google.gson.Gson; -import com.google.gson.stream.JsonReader; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import org.junit.Before; -import org.junit.experimental.theories.DataPoints; -import org.junit.experimental.theories.FromDataPoints; -import org.junit.experimental.theories.Theories; -import org.junit.experimental.theories.Theory; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; -import org.opengroup.osdu.core.common.model.http.DpsHeaders; -import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; -import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; -import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; -import org.opengroup.osdu.indexer.api.RecordIndexerApi; -import org.opengroup.osdu.indexer.api.ReindexApi; - -@RunWith(Theories.class) -public class RecordsChangedSubscriptionConsumerTest { - - private final Gson gson = new Gson(); - - private DpsHeaders dpsHeaders = Mockito.mock(DpsHeaders.class); - - private RecordIndexerApi recordIndexerApi = Mockito.mock(RecordIndexerApi.class); - - private ReindexApi reindexApi = Mockito.mock(ReindexApi.class); - - private RecordsChangedSubscriptionConsumer consumer; - - @Before - public void setUp() { - consumer = new RecordsChangedSubscriptionConsumer(dpsHeaders, recordIndexerApi, reindexApi); - } - - @DataPoints("REINDEX_TASKS") - public static ImmutableList<String> reindexEvents() { - return ImmutableList.of( - "/test-events/formatted-as-cloud-task-reindex-event.json" - ); - } - - @DataPoints("INDEX_TASKS") - public static ImmutableList<String> indexEvents() { - return ImmutableList.of( - "/test-events/formatted-as-cloud-task-indexer-reprocess-event.json", - "/test-events/formatted-as-cloud-task-storage-event.json" - ); - } - - @Theory - public void shouldProcessReindexEvents(@FromDataPoints("REINDEX_TASKS") String fileName) throws IOException { - CloudTaskRequest cloudTaskRequest = readCloudTaskFromFile(fileName); - consumer.consume(cloudTaskRequest); - RecordReindexRequest recordReindexRequest = gson.fromJson(cloudTaskRequest.getMessage(), RecordReindexRequest.class); - verify(reindexApi).reindex(recordReindexRequest, false); - } - - @Theory - public void shouldProcessIndexEvents(@FromDataPoints("INDEX_TASKS") String fileName) throws Exception { - CloudTaskRequest cloudTaskRequest = readCloudTaskFromFile(fileName); - consumer.consume(cloudTaskRequest); - ArgumentCaptor<RecordChangedMessages> captor = ArgumentCaptor.forClass(RecordChangedMessages.class); - verify(recordIndexerApi).indexWorker(captor.capture()); - RecordChangedMessages expectedMessages = this.gson.fromJson(cloudTaskRequest.getMessage(), RecordChangedMessages.class); - RecordChangedMessages actualMessages = captor.getValue(); - assertEquals(expectedMessages.getData(),actualMessages.getData()); - } - - private CloudTaskRequest readCloudTaskFromFile(String filename) { - InputStream resourceAsStream = this.getClass().getResourceAsStream(filename); - BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resourceAsStream)); - JsonReader reader = new JsonReader(bufferedReader); - return gson.fromJson(reader, CloudTaskRequest.class); - } -} diff --git a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RepressorMessageReceiverTest.java b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RepressorMessageReceiverTest.java new file mode 100644 index 0000000000000000000000000000000000000000..be752140196c8601cd17d2271dc1b2baef4411ef --- /dev/null +++ b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RepressorMessageReceiverTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.junit.Before; +import org.junit.experimental.theories.DataPoints; +import org.junit.experimental.theories.FromDataPoints; +import org.junit.experimental.theories.Theories; +import org.junit.experimental.theories.Theory; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.opengroup.osdu.core.auth.TokenProvider; +import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.indexer.api.ReindexApi; +import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; + +@RunWith(Theories.class) +public class RepressorMessageReceiverTest { + + protected ThreadDpsHeaders dpsHeaders = Mockito.mock(ThreadDpsHeaders.class); + + protected TokenProvider tokenProvider = Mockito.mock(TokenProvider.class); + + protected OqmAckReplier ackReplier = Mockito.mock(OqmAckReplier.class); + + private ReindexApi reindexApi = Mockito.mock(ReindexApi.class); + + private RepressorMessageReceiver receiver; + + @Before + public void setUp() { + receiver = new RepressorMessageReceiver(dpsHeaders, tokenProvider, reindexApi); + } + + @DataPoints("VALID_EVENTS") + public static List<String> validEvents() { + return ImmutableList.of( + "/test-events/reindex-event.json" + ); + } + + @Theory + public void shouldReceiveValidEvent( + @FromDataPoints("VALID_EVENTS") String fileName) throws Exception { + OqmMessage oqmMessage = ReadFromFileUtil.readEventFromFile(fileName); + receiver.receiveMessage(oqmMessage, ackReplier); + verify(ackReplier).ack(); + } +} diff --git a/provider/indexer-gc/src/test/resources/test-events/formatted-as-cloud-task-indexer-reprocess-event.json b/provider/indexer-gc/src/test/resources/test-events/formatted-as-cloud-task-indexer-reprocess-event.json deleted file mode 100644 index 4494af3d6b6003e1018d9af85989a5f28a76b118..0000000000000000000000000000000000000000 --- a/provider/indexer-gc/src/test/resources/test-events/formatted-as-cloud-task-indexer-reprocess-event.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "url": "/api/indexer/v2/_dps/task-handlers/index-worker", - "message": "{\"data\":\"[{\\\"id\\\":\\\"osdu:reference-data--AgreementType:AgencyTrust\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:AmendingWorkingInterestClarification\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:AssetSaleAndPurchase\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:AssetSalePurchase\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:AssignmentNovation\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:AssignmentNovationAmending\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:CommonStreamOperating\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:Concession\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:ConfidentialDisclosure\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:Confidentiality\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:ConstructionOwnershipOperating\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:ConsultantBusinessDevelopment\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:ContractWellOperating\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:CrossConveyedPooling\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:CrossconveyedPooling\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:DataExchange\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:DataLicence\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:DataPurchase\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:DataRoom\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:DataSubscription\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"correlation-id\":\"fbe4a2b4-b3be-48f6-bc2a-b7dc068bce62\",\"data-partition-id\":\"osdu\"}}", - "initialDelayMillis": 30000 -} diff --git a/provider/indexer-gc/src/test/resources/test-events/formatted-as-cloud-task-reindex-event.json b/provider/indexer-gc/src/test/resources/test-events/formatted-as-cloud-task-reindex-event.json deleted file mode 100644 index 23bfbba0a9c10cdb43b660a4cf35a98b9abe5211..0000000000000000000000000000000000000000 --- a/provider/indexer-gc/src/test/resources/test-events/formatted-as-cloud-task-reindex-event.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "url": "/api/indexer/v2/_dps/task-handlers/reindex-worker", - "message": "{\"kind\":\"osdu:wks:reference-data--AgreementType:1.0.0\",\"cursor\":\"12345\\u003d\\u003d\"}", - "initialDelayMillis": 30000 -} diff --git a/provider/indexer-gc/src/test/resources/test-events/formatted-as-cloud-task-storage-event.json b/provider/indexer-gc/src/test/resources/test-events/formatted-as-cloud-task-storage-event.json deleted file mode 100644 index 91c3a19749086fdf31673189c2b3698622a7fc1f..0000000000000000000000000000000000000000 --- a/provider/indexer-gc/src/test/resources/test-events/formatted-as-cloud-task-storage-event.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "url": "/api/indexer/v2/_dps/task-handlers/index-worker", - "message": "{\"id\":\"6405379623020546\",\"data\":\"[{\\\"id\\\":\\\"osdu:query:3b4ee153-48c4-4026-955a-59644ddaf3110\\\",\\\"kind\\\":\\\"osdu:ds:query:1.0.1670526387956\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"account-id\":\"osdu\",\"data-partition-id\":\"osdu\",\"user\":\"gcp-integration-tester@nice-etching-277309.iam.gserviceaccount.com\",\"correlation-id\":\"c4e8b544-ad55-44e0-a6c8-90afb25aaad9\",\"googclient_deliveryattempt\":\"1\"}}", - "initialDelayMillis": 0 -}