diff --git a/NOTICE b/NOTICE index ba7891fad64d315704b373871343c6a9acbd3352..fd99064bb80fce2ee2fc8dd661221ed67a901829 100644 --- a/NOTICE +++ b/NOTICE @@ -17,7 +17,6 @@ The following software have components provided under the terms of this license: - Apache Commons CLI (from https://commons.apache.org/proper/commons-cli/, https://repo1.maven.org/maven2/commons-cli/commons-cli) - Apache Geronimo JMS Spec 2.0 (from http://geronimo.apache.org/maven/${siteId}/${version}) -- Apache Log4j Core (from https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-core) - Apache Log4j JUL Adapter (from https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-jul) - Apache Maven Invoker (from https://repo1.maven.org/maven2/org/apache/maven/shared/maven-invoker) - Apache Maven Reporting API (from https://repo1.maven.org/maven2/org/apache/maven/reporting/maven-reporting-api) @@ -682,6 +681,7 @@ BSD-2-Clause The following software have components provided under the terms of this license: - API Common (from https://github.com/googleapis, https://github.com/googleapis/api-common-java, https://repo1.maven.org/maven2/com/google/api/api-common) +- Apache Log4j Core (from https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-core) - Apache Lucene (module: memory) (from https://lucene.apache.org/) - Apache Lucene (module: misc) (from https://lucene.apache.org/, https://repo1.maven.org/maven2/org/apache/lucene/lucene-misc) - Apache Lucene (module: spatial-extras) (from https://lucene.apache.org/, https://repo1.maven.org/maven2/org/apache/lucene/lucene-spatial-extras) @@ -963,8 +963,6 @@ The following software have components provided under the terms of this license: - HK2 core module (from https://repo1.maven.org/maven2/org/glassfish/hk2/hk2-core) - Jakarta Annotations API (from https://projects.eclipse.org/projects/ee4j.ca) - Jakarta RESTful WS API (from https://github.com/eclipse-ee4j/jaxrs-api) -- Java Architecture for XML Binding (from http://jaxb.java.net/, https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api) -- JavaBeans Activation Framework (from <http://java.sun.com/javase/technologies/desktop/javabeans/jaf/index.jsp>, http://java.sun.com/javase/technologies/desktop/javabeans/jaf/index.jsp, https://repo1.maven.org/maven2/com/sun/activation/javax.activation) - RabbitMQ Java Client (from http://www.rabbitmq.com, https://www.rabbitmq.com) - ServiceLocator Default Implementation (from https://repo1.maven.org/maven2/org/glassfish/hk2/hk2-locator) - aopalliance-repackaged (from https://repo1.maven.org/maven2/org/glassfish/hk2/external/aopalliance-repackaged) @@ -1026,7 +1024,6 @@ GPL-3.0-only The following software have components provided under the terms of this license: - Jakarta Annotations API (from https://projects.eclipse.org/projects/ee4j.ca) -- Java Architecture for XML Binding (from http://jaxb.java.net/, https://repo1.maven.org/maven2/javax/xml/bind/jaxb-api) ======================================================================== GPL-3.0-or-later diff --git a/provider/indexer-gc/docs/anthos/README.md b/provider/indexer-gc/docs/anthos/README.md index 9800216a3e1cdb6ccc86f5537f108206e179dfd2..868411924d670d8e4ea9a512b53b81d7a1f1b1a8 100644 --- a/provider/indexer-gc/docs/anthos/README.md +++ b/provider/indexer-gc/docs/anthos/README.md @@ -203,15 +203,15 @@ curl -L -X PATCH 'https://dev.osdu.club/api/partition/v1/partitions/opendes' -H RabbitMq should have exchanges and queues with names and configs: -| EXCHANGE NAME | EXCHANGE CONFIG | Target queue name | Target queue config | -|----------------------------------|-----------------------------------------------------------------------------|---------------------------|----------------------------------------------------------------------| +| EXCHANGE NAME | EXCHANGE CONFIG | Target queue name | Target queue config | +|----------------------------------|-----------------------------------------------------------------------------|----------------------------|----------------------------------------------------------------------| | indexing-progress | `Type fanout` <br/>`durable: true` | (Consumer not implemented) | (Consumer not implemented) | -| records-changed | `Type fanout` <br/>`durable: true` | indexer-records-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | -| indexer-records-changed-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-records-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | -| reprocess | `Type fanout` <br/>`durable: true` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | -| indexer-reprocess-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | -| schema-changed | `Type fanout` <br/>`durable: true` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | -| indexer-schema-changed-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| records-changed | `Type fanout` <br/>`durable: true` | indexer-records-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| indexer-records-changed-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-records-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| reprocess | `Type fanout` <br/>`durable: true` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| indexer-reprocess-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-reprocess | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| schema-changed | `Type fanout` <br/>`durable: true` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | +| indexer-schema-changed-exchange | `Type x-delayed-message` <br/>`durable: true`<br/>`x-delayed-type: fanout` | indexer-schema-changed | `x-delivery-limit: 5`<br/>`x-queue-type: quorum`<br/>`durable: true` | ## Keycloak configuration diff --git a/provider/indexer-gc/docs/gc/README.md b/provider/indexer-gc/docs/gc/README.md index 8edb09a7137f11a0a93a89ffab304faa2a81e472..11e5ce1f958906aa2ac9aca4d1f15ee88a297ea2 100644 --- a/provider/indexer-gc/docs/gc/README.md +++ b/provider/indexer-gc/docs/gc/README.md @@ -14,21 +14,22 @@ Must have: Defined in default application property file but possible to override: -| name | value | description | sensitive? | source | -|------------------------------------|---------------------------------------------------------------------------|---------------------------------------------------------------------------|------------|--------------------------------------------------------------| -| `LOG_PREFIX` | `service` | Logging prefix | no | - | -| `LOG_LEVEL` | `****` | Logging level | no | - | -| `SECURITY_HTTPS_CERTIFICATE_TRUST` | ex `false` | Elastic client connection uses TrustSelfSignedStrategy(), if it is 'true' | false | output of infrastructure deployment | -| `REDIS_SEARCH_HOST` | ex `127.0.0.1` | Redis host | no | | -| `REDIS_SEARCH_PORT` | ex `6379` | Redis host port | no | | -| `REDIS_SEARCH_PASSWORD` | ex `*****` | Redis host password | yes | | -| `REDIS_SEARCH_WITH_SSL` | ex `true` or `false` | Redis host ssl config | no | | -| `REDIS_SEARCH_EXPIRATION` | ex `30` | Redis cache expiration in seconds | no | | -| `PARTITION_HOST` | ex `https://partition.com` | Partition host | no | output of infrastructure deployment | -| `ENTITLEMENTS_HOST` | ex `https://entitlements.com` | Entitlements host | no | output of infrastructure deployment | -| `STORAGE_HOST` | ex `https://storage.com` | Storage host | no | output of infrastructure deployment | -| `SCHEMA_BASE_HOST` | ex `https://schema.com` | Schema service host | no | output of infrastructure deployment | -| `GOOGLE_APPLICATION_CREDENTIALS` | ex `/path/to/directory/service-key.json` | Service account credentials, you only need this if running locally | yes | <https://console.cloud.google.com/iam-admin/serviceaccounts> | +| name | value | description | sensitive? | source | +|------------------------------------|------------------------------------------|---------------------------------------------------------------------------------------------------------------------|------------|--------------------------------------------------------------| +| `LOG_PREFIX` | `service` | Logging prefix | no | - | +| `LOG_LEVEL` | `****` | Logging level | no | - | +| `SECURITY_HTTPS_CERTIFICATE_TRUST` | ex `false` | Elastic client connection uses TrustSelfSignedStrategy(), if it is 'true' | false | output of infrastructure deployment | +| `REDIS_SEARCH_HOST` | ex `127.0.0.1` | Redis host | no | | +| `REDIS_SEARCH_PORT` | ex `6379` | Redis host port | no | | +| `REDIS_SEARCH_PASSWORD` | ex `*****` | Redis host password | yes | | +| `REDIS_SEARCH_WITH_SSL` | ex `true` or `false` | Redis host ssl config | no | | +| `REDIS_SEARCH_EXPIRATION` | ex `30` | Redis cache expiration in seconds | no | | +| `PARTITION_HOST` | ex `https://partition.com` | Partition host | no | output of infrastructure deployment | +| `ENTITLEMENTS_HOST` | ex `https://entitlements.com` | Entitlements host | no | output of infrastructure deployment | +| `STORAGE_HOST` | ex `https://storage.com` | Storage host | no | output of infrastructure deployment | +| `SCHEMA_BASE_HOST` | ex `https://schema.com` | Schema service host | no | output of infrastructure deployment | +| `GOOGLE_APPLICATION_CREDENTIALS` | ex `/path/to/directory/service-key.json` | Service account credentials, you only need this if running locally | yes | <https://console.cloud.google.com/iam-admin/serviceaccounts> | +| `DEAD_LETTERING_ENABLED` | ex `true` or `false` | Dead lettering configuration validation, if enabled, then service will require configured dead lettering in Pubsub. | no | <https://console.cloud.google.com/cloudpubsub/topic/list> | These variables define service behavior, and are used to switch between `Reference` or `Google Cloud` environments, their overriding and usage in mixed mode was not tested. Usage of spring profiles is preferred. diff --git a/provider/indexer-gc/pom.xml b/provider/indexer-gc/pom.xml index 634032280d30488dc829882408020011869d670d..54513c202ec58b097a8b1080ce7aaae4f5e02b64 100644 --- a/provider/indexer-gc/pom.xml +++ b/provider/indexer-gc/pom.xml @@ -21,7 +21,7 @@ <dependency> <groupId>com.fasterxml.jackson</groupId> <artifactId>jackson-bom</artifactId> - <version>2.14.2</version> + <version>2.15.0</version> <type>pom</type> <scope>import</scope> </dependency> @@ -34,6 +34,7 @@ </dependency> </dependencies> </dependencyManagement> + <dependencies> <dependency> <groupId>org.opengroup.osdu</groupId> @@ -156,10 +157,11 @@ <version>5.3.22</version> </dependency> + <!-- Mappers --> <dependency> <groupId>org.opengroup.osdu</groupId> <artifactId>oqm</artifactId> - <version>0.21.0-rc3</version> + <version>0.21.0-rc5</version> </dependency> </dependencies> diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/OqmSubscriberManager.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/OqmSubscriberManager.java similarity index 62% rename from provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/OqmSubscriberManager.java rename to provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/OqmSubscriberManager.java index acc4ecd7a9fa8fec98cf531077e993f44a95f919..55157916adf14e8221dbf01bd0991c2d42ad88f9 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/OqmSubscriberManager.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/OqmSubscriberManager.java @@ -15,13 +15,15 @@ * limitations under the License. */ -package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; +package org.opengroup.osdu.indexer.provider.gcp.indexing.initialization; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.*; +import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import javax.annotation.Nullable; @@ -33,25 +35,45 @@ public class OqmSubscriberManager { private final OqmDriver driver; - private OqmSubscription getOrCreateSubscriptionForTenant(TenantInfo tenantInfo, String topicName, String subscriptionName) { - log.info("OQM: provisioning tenant {}:", tenantInfo.getDataPartitionId()); + private OqmSubscription getSubscriptionForTenant(TenantInfo tenantInfo, String topicName, String subscriptionName) { + String dataPartitionId = tenantInfo.getDataPartitionId(); + log.info("OQM: provisioning tenant {}:", dataPartitionId); log.info("OQM: check for topic {} existence:", topicName); - OqmTopic topic = driver.getTopic(topicName, getDestination(tenantInfo)) - .orElse(null); + OqmTopic topic = driver.getTopic(topicName, getDestination(tenantInfo)).orElse(null); + if (topic == null) { - log.info("OQM: check for topic {} existence: ABSENT. Skipped", topicName); - throw new RuntimeException(); + log.error("OQM: check for topic: {}, tenant: {} existence: ABSENT.", topicName, + dataPartitionId); + throw new AppException( + HttpStatus.INTERNAL_SERVER_ERROR.value(), + "Required topic not exists.", + String.format( + "Required topic not exists. Create topic: %s for tenant: %s and restart service.", + topicName, dataPartitionId + ) + ); } log.info("OQM: check for topic {} existence: PRESENT", topicName); OqmSubscription subscription = getSubscription(tenantInfo, topic, subscriptionName); if (subscription == null) { - subscription = createSubscription(tenantInfo, topic, subscriptionName); - } else { - log.info("OQM: check for subscription {} existence: PRESENT", subscriptionName); + log.error( + "OQM: check for subscription {}, tenant: {} existence: ABSENT.", + subscriptionName, + dataPartitionId + ); + throw new AppException( + HttpStatus.INTERNAL_SERVER_ERROR.value(), + "Required subscription not exists.", + String.format( + "Required subscription not exists. Create subscription: %s for tenant: %s and restart service.", + subscriptionName, + dataPartitionId + ) + ); } - log.info("OQM: provisioning tenant {}: COMPLETED.", tenantInfo.getDataPartitionId()); + log.info("OQM: provisioning tenant {}: COMPLETED.", dataPartitionId); return subscription; } @@ -68,17 +90,8 @@ public class OqmSubscriberManager { .orElse(null); } - private OqmSubscription createSubscription(TenantInfo tenantInfo, OqmTopic topic, String subscriptionName) { - log.info("OQM: check for subscription {} existence: ABSENT. Will create.", subscriptionName); - OqmSubscription request = OqmSubscription.builder() - .topic(topic) - .name(subscriptionName) - .build(); - return driver.createAndGetSubscription(request, getDestination(tenantInfo)); - } - public void registerSubscriber(TenantInfo tenantInfo, String topicName, String subscriptionName, OqmMessageReceiver messageReceiver, OqmSubscriberThroughput throughput) { - OqmSubscription subscriptionForTenant = getOrCreateSubscriptionForTenant(tenantInfo, topicName, subscriptionName); + OqmSubscription subscriptionForTenant = getSubscriptionForTenant(tenantInfo, topicName, subscriptionName); log.info("OQM: registering Subscriber for subscription {}", subscriptionName); OqmDestination destination = getDestination(tenantInfo); diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/TenantSubscriberConfiguration.java similarity index 86% rename from provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java rename to provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/TenantSubscriberConfiguration.java index 9d3a7789d585a6575073719655c6bb2251e343a2..03376bcb6bb5054b99eb29bf9e5ebe49510306a2 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/initialization/TenantSubscriberConfiguration.java @@ -1,6 +1,6 @@ /* - * Copyright 2020-2022 Google LLC - * Copyright 2020-2022 EPAM Systems, Inc + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; +package org.opengroup.osdu.indexer.provider.gcp.indexing.initialization; import java.util.Collection; import javax.annotation.PostConstruct; @@ -27,6 +27,10 @@ import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriberThroughput; import org.opengroup.osdu.indexer.api.RecordIndexerApi; import org.opengroup.osdu.indexer.api.ReindexApi; +import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties; +import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.RecordsChangedMessageReceiver; +import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.RepressorMessageReceiver; +import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.SchemaChangedMessageReceiver; import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; import org.springframework.stereotype.Component; @@ -43,7 +47,7 @@ public class TenantSubscriberConfiguration { private final OqmSubscriberManager subscriberManager; private final ITenantFactory tenantInfoFactory; private final TokenProvider tokenProvider; - private final ThreadDpsHeaders headers; + private final ThreadDpsHeaders headers; private final RecordIndexerApi recordIndexerApi; private final ReindexApi reindexApi; diff --git a/provider/indexer-gc/src/main/resources/application-gcp.properties b/provider/indexer-gc/src/main/resources/application-gcp.properties index 8ac15c3f94a0bb7addf3ffcd4fead8bcbd2ffa5e..2e6be18d1a00c96e97bb3567da10699f5201428e 100644 --- a/provider/indexer-gc/src/main/resources/application-gcp.properties +++ b/provider/indexer-gc/src/main/resources/application-gcp.properties @@ -1,3 +1,4 @@ oqmDriver=pubsub service.token.provider=GCP partition-auth-enabled=true +dead-lettering-required=true