diff --git a/NOTICE b/NOTICE index 64ba63354013eb46bea8daede7c784ba7e469257..a9185f05b3d932692be38c0fe44f268bc5a54e78 100644 --- a/NOTICE +++ b/NOTICE @@ -615,7 +615,7 @@ The following software have components provided under the terms of this license: - Zipkin Reporter: Core (from https://repo1.maven.org/maven2/io/zipkin/reporter2/zipkin-reporter) - Zipkin v2 (from https://repo1.maven.org/maven2/io/zipkin/zipkin2/zipkin) - aalto-xml (from https://github.com/FasterXML/aalto-xml, https://repo1.maven.org/maven2/com/fasterxml/aalto-xml) -- aggs-matrix-stats (from https://github.com/elastic/elasticsearch) +- aggs-matrix-stats (from https://github.com/elastic/elasticsearch.git) - compiler (from http://github.com/spullara/mustache.java) - datastore-v1-proto-client (from https://repo1.maven.org/maven2/com/google/cloud/datastore/datastore-v1-proto-client) - documentdb-bulkexecutor (from http://azure.microsoft.com/en-us/services/documentdb/) @@ -649,9 +649,9 @@ The following software have components provided under the terms of this license: - jersey-media-json-jackson (from https://repo1.maven.org/maven2/org/glassfish/jersey/media/jersey-media-json-jackson) - jose4j (from https://bitbucket.org/b_c/jose4j/) - json-path (from http://code.google.com/p/json-path/, https://github.com/jayway/JsonPath) -- lang-mustache (from https://github.com/elastic/elasticsearch) +- lang-mustache (from https://github.com/elastic/elasticsearch.git) - lettuce (from http://github.com/mp911de/lettuce/wiki, https://github.com/lettuce-io/lettuce-core/wiki) -- mapper-extras (from https://github.com/elastic/elasticsearch) +- mapper-extras (from https://github.com/elastic/elasticsearch.git) - micrometer-core (from https://github.com/micrometer-metrics/micrometer) - micrometer-registry-azure-monitor (from https://github.com/micrometer-metrics/micrometer) - micrometer-registry-prometheus (from https://github.com/micrometer-metrics/micrometer) @@ -660,7 +660,7 @@ The following software have components provided under the terms of this license: - org.conscrypt:conscrypt-openjdk-uber (from https://conscrypt.org/) - org.opentest4j:opentest4j (from https://github.com/ota4j-team/opentest4j) - org.xmlunit:xmlunit-core (from http://www.xmlunit.org/, https://www.xmlunit.org/) -- parent-join (from https://github.com/elastic/elasticsearch) +- parent-join (from https://github.com/elastic/elasticsearch.git) - perfmark:perfmark-api (from https://github.com/perfmark/perfmark) - proto-google-cloud-datastore-v1 (from https://github.com/googleapis/googleapis, https://github.com/googleapis/java-datastore/proto-google-cloud-datastore-v1) - proto-google-cloud-iamcredentials-v1 (from https://github.com/googleapis/google-cloud-java, https://github.com/googleapis/java-iamcredentials/proto-google-cloud-iamcredentials-v1, https://repo1.maven.org/maven2/com/google/api/grpc/proto-google-cloud-iamcredentials-v1) @@ -668,7 +668,7 @@ The following software have components provided under the terms of this license: - proto-google-cloud-pubsub-v1 (from https://github.com/googleapis/googleapis, https://github.com/googleapis/java-pubsub/proto-google-cloud-pubsub-v1) - proto-google-common-protos (from https://github.com/googleapis/api-client-staging, https://github.com/googleapis/googleapis, https://github.com/googleapis/java-iam/proto-google-common-protos) - proto-google-iam-v1 (from https://github.com/googleapis/googleapis, https://github.com/googleapis/java-iam/proto-google-iam-v1) -- rank-eval (from https://github.com/elastic/elasticsearch) +- rank-eval (from https://github.com/elastic/elasticsearch.git) - resilience4j (from https://github.com/resilience4j/resilience4j, https://resilience4j.readme.io, ttps://resilience4j.readme.io) - rest (from https://github.com/elastic/elasticsearch) - rest-high-level (from https://github.com/elastic/elasticsearch) diff --git a/provider/indexer-gcp/docs/anthos/README.md b/provider/indexer-gcp/docs/anthos/README.md index 07c4a0dd4aeb4967f107f724c231f6a5d5bbc16c..161d29778edc9b36c9cbb4dd20fa979163414b31 100644 --- a/provider/indexer-gcp/docs/anthos/README.md +++ b/provider/indexer-gcp/docs/anthos/README.md @@ -29,8 +29,8 @@ Defined in default application property file but possible to override: | `PARTITION_HOST` | ex `https://partition.com` | Partition host | no | output of infrastructure deployment | | `ENTITLEMENTS_HOST` | ex `https://entitlements.com` | Entitlements host | no | output of infrastructure deployment | | `STORAGE_HOST` | ex `https://storage.com` | Storage host | no | output of infrastructure deployment | -| `INDEXER_QUEUE_HOST` | ex `http://indexer-queue/api/indexer-queue/v1/_dps/task-handlers/enqueue` | Indexer-Queue host endpoint used for reprocessing tasks| no | output of infrastructure deployment | | `SCHEMA_BASE_HOST` | ex `https://schema.com` | Schema service host | no | output of infrastructure deployment | +| `RABBITMQ_RETRY_LIMIT` | ex `3` | Limit number of retry attempts | no | output of infrastructure deployment | These variables define service behavior, and are used to switch between `anthos` or `gcp` environments, their overriding and usage in mixed mode was not tested. Usage of spring profiles is preferred. @@ -188,11 +188,33 @@ curl -L -X PATCH 'https://dev.osdu.club/api/partition/v1/partitions/opendes' -H #### Exchanges and queues configuration -At RabbitMq should be created exchange with name: +At RabbitMq should be created exchanges and queues with names: -**name:** `indexing-progress` +**Exchange name:** `indexing-progress` - +**Exchange config** +`Type fanout` +`durable: true` + +**Target queue name** `indexer-records-changed` + +**Target queue config** +`x-delivery-limit: 5` +`x-queue-type: quorum` +`durable: true` + +**Exchange name:** `reprocess` + +**Exchange config** +`Type fanout` +`durable: true` + +**Target queue name** `indexer-reprocess` + +**Target queue config** +`x-delivery-limit: 5` +`x-queue-type: quorum` +`durable: true` ## Keycloak configuration diff --git a/provider/indexer-gcp/docs/gcp/README.md b/provider/indexer-gcp/docs/gcp/README.md index 757a558b2962e39a11d8654050706ec0abc1ae16..e08fb94b496aef4b8cbfbc71a6f9b8b7a17d9e4e 100644 --- a/provider/indexer-gcp/docs/gcp/README.md +++ b/provider/indexer-gcp/docs/gcp/README.md @@ -27,7 +27,6 @@ Defined in default application property file but possible to override: | `PARTITION_HOST` | ex `https://partition.com` | Partition host | no | output of infrastructure deployment | | `ENTITLEMENTS_HOST` | ex `https://entitlements.com` | Entitlements host | no | output of infrastructure deployment | | `STORAGE_HOST` | ex `https://storage.com` | Storage host | no | output of infrastructure deployment | -| `INDEXER_QUEUE_HOST` | ex `http://indexer-queue/api/indexer-queue/v1/_dps/task-handlers/enqueue` | Indexer-Queue host endpoint used for reprocessing tasks | no | output of infrastructure deployment | | `SCHEMA_BASE_HOST` | ex `https://schema.com` | Schema service host | no | output of infrastructure deployment | | `GOOGLE_APPLICATION_CREDENTIALS` | ex `/path/to/directory/service-key.json` | Service account credentials, you only need this if running locally | yes | https://console.cloud.google.com/iam-admin/serviceaccounts | @@ -42,9 +41,17 @@ Usage of spring profiles is preferred. ## Pubsub configuration: -At Pubsub should be created topic with name: +At Pubsub should be created topics and subscribers with names: -**name:** `indexing-progress` +**Topic name:** `indexing-progress` + +**Topic name:** `records-changed` + +**Subscriber name** `indexer-records-changed` + +**Topic name:** `reprocess` + +**Subscriber name** `indexer-reprocess` ### Properties set in Partition service: diff --git a/provider/indexer-gcp/pom.xml b/provider/indexer-gcp/pom.xml index 5dcfece8a13fb3376de4fb07c3c47e7bdc51ebb1..85109ee61a9b4d5b5508a471ab5096d704bdc782 100644 --- a/provider/indexer-gcp/pom.xml +++ b/provider/indexer-gcp/pom.xml @@ -12,7 +12,7 @@ <artifactId>indexer-gcp</artifactId> <version>0.19.0-SNAPSHOT</version> <name>indexer-gcp</name> - <description>Indexer Service GCP App Engine</description> + <description>Indexer Service GCP</description> <packaging>jar</packaging> <dependencies> @@ -147,7 +147,7 @@ <configuration> <classifier>spring-boot</classifier> <mainClass> - org.opengroup.osdu.indexer.IndexerGcpApplication + org.opengroup.osdu.indexer.provider.gcp.IndexerGcpApplication </mainClass> </configuration> </execution> diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/IndexerGcpApplication.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/IndexerGcpApplication.java deleted file mode 100644 index 6ee70305369eba6f03ef966ce1d442f336eaa87b..0000000000000000000000000000000000000000 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/IndexerGcpApplication.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - Copyright 2020 Google LLC - Copyright 2020 EPAM Systems, Inc - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -package org.opengroup.osdu.indexer; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.actuate.autoconfigure.security.servlet.ManagementWebSecurityAutoConfiguration; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; -import org.springframework.context.annotation.ComponentScan; -import org.springframework.context.annotation.Configuration; - -@SpringBootApplication(exclude = { SecurityAutoConfiguration.class, ManagementWebSecurityAutoConfiguration.class }) -@Configuration -@ComponentScan({"org.opengroup.osdu"}) -public class IndexerGcpApplication { - - public static void main(String[] args) { - SpringApplication.run(IndexerGcpApplication.class, args); - } - -} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/ServletInitializer.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/ServletInitializer.java deleted file mode 100644 index dd76cbdb7cc7fa44954c59b4a0e69088dc532f55..0000000000000000000000000000000000000000 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/ServletInitializer.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - Copyright 2020 Google LLC - Copyright 2020 EPAM Systems, Inc - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -package org.opengroup.osdu.indexer; - -import org.springframework.boot.builder.SpringApplicationBuilder; -import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; - -public class ServletInitializer extends SpringBootServletInitializer { - @Override - protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { - return application.sources(IndexerGcpApplication.class); - } -} - diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/EntitlementsClientFactory.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/EntitlementsClientFactory.java deleted file mode 100644 index 37e23c94763621e6b06123f2e51cdfd74f7f1507..0000000000000000000000000000000000000000 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/EntitlementsClientFactory.java +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2017-2019, Schlumberger -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexer.di; - -import org.opengroup.osdu.core.common.entitlements.EntitlementsAPIConfig; -import org.opengroup.osdu.core.common.entitlements.EntitlementsFactory; -import org.opengroup.osdu.core.common.entitlements.IEntitlementsFactory; -import org.opengroup.osdu.core.common.http.json.HttpResponseBodyMapper; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.beans.factory.config.AbstractFactoryBean; -import org.springframework.context.annotation.Lazy; -import org.springframework.stereotype.Component; -import org.springframework.web.context.annotation.RequestScope; - -@Component -@RequestScope -@Lazy -public class EntitlementsClientFactory extends AbstractFactoryBean<IEntitlementsFactory> { - - @Value("${AUTHORIZE_API}") - private String authorizeApi; - - @Value("${AUTHORIZE_API_KEY:}") - private String authorizeApiKey; - - @Autowired - private HttpResponseBodyMapper mapper; - - @Override - protected IEntitlementsFactory createInstance() throws Exception { - - return new EntitlementsFactory(EntitlementsAPIConfig - .builder() - .rootUrl(authorizeApi) - .apiKey(authorizeApiKey) - .build(), - mapper); - } - - @Override - public Class<?> getObjectType() { - return IEntitlementsFactory.class; - } -} \ No newline at end of file diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/IndexerGcpApplication.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/IndexerGcpApplication.java new file mode 100644 index 0000000000000000000000000000000000000000..418959415ebd132ccc18f9c7089169a91f09be3e --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/IndexerGcpApplication.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp; + +import org.opengroup.osdu.indexer.provider.gcp.indexing.config.CustomContextConfiguration; +import org.opengroup.osdu.indexer.provider.gcp.web.config.WebAppMainContextConfiguration; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.ConfigurableApplicationContext; + +@SpringBootConfiguration +public class IndexerGcpApplication { + + public static void main(String[] args) { + SpringApplicationBuilder springApplicationBuilder = new SpringApplicationBuilder(IndexerGcpApplication.class) + .sources(IndexerGcpApplication.class) + .web(WebApplicationType.NONE) + .child(CustomContextConfiguration.class) + .web(WebApplicationType.NONE) + .child(WebAppMainContextConfiguration.class) + .web(WebApplicationType.SERVLET); + + ConfigurableApplicationContext context = springApplicationBuilder.run(args); + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/AttributesCache.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/AttributesCache.java similarity index 56% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/AttributesCache.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/AttributesCache.java index 0f66e377bb331e53a55b16c5cfceae4895dd9684..9b909930bc2dc49700f69e917e1ef4feaf71a444 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/AttributesCache.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/AttributesCache.java @@ -1,36 +1,38 @@ -// Copyright 2017-2019, Schlumberger -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexer.cache; +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.common.cache; +import java.util.Set; +import javax.inject.Inject; import org.opengroup.osdu.core.common.cache.RedisCache; import org.opengroup.osdu.core.common.provider.interfaces.IAttributesCache; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.springframework.stereotype.Component; -import javax.inject.Inject; -import java.util.Set; - @Component -public class AttributesCache implements IAttributesCache<String,Set>, AutoCloseable { +public class AttributesCache implements IAttributesCache<String, Set>, AutoCloseable { private RedisCache<String, Set> cache; @Inject public AttributesCache(final IndexerConfigurationProperties properties) { cache = new RedisCache(properties.getRedisSearchHost(), Integer.parseInt(properties.getRedisSearchPort()), - properties.getElasticCacheExpiration() * 60, String.class, Boolean.class); + properties.getElasticCacheExpiration() * 60, String.class, Boolean.class); } @Override @@ -57,4 +59,4 @@ public class AttributesCache implements IAttributesCache<String,Set>, AutoClosea public void close() { this.cache.close(); } -} \ No newline at end of file +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/ElasticCredentialsCache.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/ElasticCredentialsCache.java similarity index 69% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/ElasticCredentialsCache.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/ElasticCredentialsCache.java index 83ee124a964a148dcaba28b7b0e88d7c7a111b2f..9f72882a0e61d73158102f00c7f3a3d414b56c2e 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/ElasticCredentialsCache.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/ElasticCredentialsCache.java @@ -1,21 +1,21 @@ /* - Copyright 2020 Google LLC - Copyright 2020 EPAM Systems, Inc - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -package org.opengroup.osdu.indexer.cache; +package org.opengroup.osdu.indexer.provider.gcp.common.cache; import com.google.gson.Gson; import java.util.Objects; diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/IndexCache.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/IndexCache.java similarity index 57% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/IndexCache.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/IndexCache.java index 7f7911db5d70cf5a5c34c77c828fa67e68adbc8d..736f3b663b25bec4463fc32753abecb71dc17ae4 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/IndexCache.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/IndexCache.java @@ -1,37 +1,37 @@ /* - Copyright 2020 Google LLC - Copyright 2020 EPAM Systems, Inc - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -package org.opengroup.osdu.indexer.cache; +package org.opengroup.osdu.indexer.provider.gcp.common.cache; +import javax.inject.Inject; import org.opengroup.osdu.core.common.cache.RedisCache; import org.opengroup.osdu.core.common.provider.interfaces.IIndexCache; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.springframework.stereotype.Component; -import javax.inject.Inject; - @Component public class IndexCache implements IIndexCache<String, Boolean>, AutoCloseable { + private RedisCache<String, Boolean> cache; @Inject public IndexCache(final IndexerConfigurationProperties properties) { cache = new RedisCache<>(properties.getRedisSearchHost(), Integer.parseInt(properties.getRedisSearchPort()), - properties.getElasticCacheExpiration() * 60, String.class, Boolean.class); + properties.getElasticCacheExpiration() * 60, String.class, Boolean.class); } @Override diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/JwtCache.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/JwtCache.java similarity index 61% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/JwtCache.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/JwtCache.java index 4ba364f5bd4523a44e07f74ec5dcd18c10f4ed11..ef546c586a32ad4987711c100fc2ec68460525da 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/JwtCache.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/JwtCache.java @@ -1,41 +1,42 @@ /* - Copyright 2020 Google LLC - Copyright 2020 EPAM Systems, Inc - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -package org.opengroup.osdu.indexer.cache; +package org.opengroup.osdu.indexer.provider.gcp.common.cache; +import javax.inject.Inject; import org.opengroup.osdu.core.common.cache.RedisCache; import org.opengroup.osdu.core.common.model.search.IdToken; import org.opengroup.osdu.core.common.provider.interfaces.IJwtCache; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.springframework.stereotype.Component; -import javax.inject.Inject; - @Component public class JwtCache implements IJwtCache<String, IdToken>, AutoCloseable { + RedisCache<String, IdToken> cache; // google service account id_token can be requested only for 1 hr private static final int EXPIRED_AFTER = 59; + @Inject public JwtCache(final IndexerConfigurationProperties properties) { cache = new RedisCache<>(properties.getRedisSearchHost(), Integer.parseInt(properties.getRedisSearchPort()), - EXPIRED_AFTER * 60, String.class, IdToken.class); + EXPIRED_AFTER * 60, String.class, IdToken.class); } @Override diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/KindsCache.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/KindsCache.java similarity index 56% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/KindsCache.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/KindsCache.java index e20d2f3ca240d33b44cc3bb9261593cae657f8ee..3e8c072c6aa969ee33ba81de41af96611ef277da 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/KindsCache.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/KindsCache.java @@ -1,39 +1,39 @@ /* - Copyright 2020 Google LLC - Copyright 2020 EPAM Systems, Inc - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -package org.opengroup.osdu.indexer.cache; +package org.opengroup.osdu.indexer.provider.gcp.common.cache; +import java.util.Set; +import javax.inject.Inject; import org.opengroup.osdu.core.common.cache.RedisCache; import org.opengroup.osdu.core.common.provider.interfaces.IKindsCache; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.springframework.stereotype.Component; -import javax.inject.Inject; -import java.util.Set; - @Component public class KindsCache implements IKindsCache<String, Set>, AutoCloseable { + private RedisCache<String, Set> cache; @Inject public KindsCache(final IndexerConfigurationProperties properties) { cache = new RedisCache<>(properties.getRedisSearchHost(), Integer.parseInt(properties.getRedisSearchPort()), - properties.getKindsCacheExpiration() * 60, - properties.getKindsRedisDatabase(), String.class, Set.class); + properties.getKindsCacheExpiration() * 60, + properties.getKindsRedisDatabase(), String.class, Set.class); } @Override diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/SchemaCache.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/SchemaCache.java similarity index 57% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/SchemaCache.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/SchemaCache.java index 0ee20756541b14b4c8a6d94fb153317722c233df..a3b139f3df6659341863e524a4b7d39fde4d88fe 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/cache/SchemaCache.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/cache/SchemaCache.java @@ -1,37 +1,37 @@ /* - Copyright 2020 Google LLC - Copyright 2020 EPAM Systems, Inc - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -package org.opengroup.osdu.indexer.cache; +package org.opengroup.osdu.indexer.provider.gcp.common.cache; +import javax.inject.Inject; import org.opengroup.osdu.core.common.cache.RedisCache; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.opengroup.osdu.indexer.provider.interfaces.ISchemaCache; import org.springframework.stereotype.Component; -import javax.inject.Inject; - @Component public class SchemaCache implements ISchemaCache<String, String>, AutoCloseable { + private RedisCache<String, String> cache; @Inject public SchemaCache(final IndexerConfigurationProperties properties) { cache = new RedisCache<>(properties.getRedisSearchHost(), Integer.parseInt(properties.getRedisSearchPort()), - properties.getElasticCacheExpiration() * 60, String.class, String.class); + properties.getElasticCacheExpiration() * 60, String.class, String.class); } @Override diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/ElasticSearchConfig.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/di/ElasticSearchConfig.java similarity index 96% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/ElasticSearchConfig.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/di/ElasticSearchConfig.java index 0f9cc1f4d3ea4319c729af434fd5710203b30141..9d10958a0d02ab35e64b6c5b44b4fe27553695bd 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/ElasticSearchConfig.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/di/ElasticSearchConfig.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.opengroup.osdu.indexer.di; +package org.opengroup.osdu.indexer.provider.gcp.common.di; import org.opengroup.osdu.core.common.partition.IPartitionProvider; import org.opengroup.osdu.core.common.provider.interfaces.IElasticRepository; diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/ElasticSearchConfigurationProperties.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/di/ElasticSearchConfigurationProperties.java similarity index 94% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/ElasticSearchConfigurationProperties.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/di/ElasticSearchConfigurationProperties.java index 586ce1ab9e9127282066a2e0fddf651836d12f18..08b162fd535c8b35637141fa133888aa0d557ecd 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/ElasticSearchConfigurationProperties.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/di/ElasticSearchConfigurationProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.opengroup.osdu.indexer.di; +package org.opengroup.osdu.indexer.provider.gcp.common.di; import lombok.Getter; import lombok.Setter; diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/di/EntitlementsClientFactory.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/di/EntitlementsClientFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..24f9cc19ff4f51d87bac4f31608eea9b0bb50ca7 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/di/EntitlementsClientFactory.java @@ -0,0 +1,58 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.common.di; + +import org.opengroup.osdu.core.common.entitlements.EntitlementsAPIConfig; +import org.opengroup.osdu.core.common.entitlements.EntitlementsFactory; +import org.opengroup.osdu.core.common.entitlements.IEntitlementsFactory; +import org.opengroup.osdu.core.common.http.json.HttpResponseBodyMapper; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.beans.factory.config.AbstractFactoryBean; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +@Component +@Lazy +public class EntitlementsClientFactory extends AbstractFactoryBean<IEntitlementsFactory> { + + @Value("${AUTHORIZE_API}") + private String authorizeApi; + + @Value("${AUTHORIZE_API_KEY:}") + private String authorizeApiKey; + + @Autowired + private HttpResponseBodyMapper mapper; + + @Override + protected IEntitlementsFactory createInstance() throws Exception { + + return new EntitlementsFactory(EntitlementsAPIConfig + .builder() + .rootUrl(authorizeApi) + .apiKey(authorizeApiKey) + .build(), + mapper); + } + + @Override + public Class<?> getObjectType() { + return IEntitlementsFactory.class; + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/package-info.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..391c0383a79f364f2ad3465bb3842abcc9d51dfc --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This module is used to provide common bean configuration, for both web app context that serves user request, and non-web app context that process indexing + * events. + */ +package org.opengroup.osdu.indexer.provider.gcp.common; diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java new file mode 100644 index 0000000000000000000000000000000000000000..db8bb3259bb7774a00a93dc4b856a0a52d11bf17 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java @@ -0,0 +1,105 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.common.publish; + +import static org.opengroup.osdu.core.common.Constants.REINDEX_RELATIVE_URL; +import static org.opengroup.osdu.core.common.Constants.WORKER_RELATIVE_URL; + +import com.google.gson.Gson; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; +import org.opengroup.osdu.core.common.model.tenant.TenantInfo; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; +import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; +import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties; +import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Component; + +@Slf4j +@Primary +@Component +@RequiredArgsConstructor +public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder { + + private final OqmDriver driver; + + private final TenantInfo tenantInfo; + + private final IndexerMessagingConfigProperties properties; + + private OqmTopic oqmTopic; + + @PostConstruct + public void setUp() { + oqmTopic = OqmTopic.builder().name(properties.getReprocessTopicName()).build(); + } + + public void createWorkerTask(String payload, DpsHeaders headers) { + createTask(WORKER_RELATIVE_URL, payload, 0l, headers); + } + + public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) { + createTask(WORKER_RELATIVE_URL, payload, countdownMillis, headers); + } + + public void createReIndexTask(String payload, DpsHeaders headers) { + createTask(REINDEX_RELATIVE_URL, payload, 0l, headers); + } + + public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) { + createTask(REINDEX_RELATIVE_URL, payload, countdownMillis, headers); + } + + private void createTask(String url, String payload, Long countdownMillis, DpsHeaders headers) { + CloudTaskRequest cloudTaskRequest = CloudTaskRequest.builder() + .message(payload) + .url(url) + .initialDelayMillis(countdownMillis) + .build(); + + OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId()).build(); + + Map<String, String> attributes = getAttributesFromHeaders(headers); + + String json = new Gson().toJson(cloudTaskRequest); + + OqmMessage oqmMessage = OqmMessage.builder().data(json).attributes(attributes).build(); + log.info("Reprocessing task: {} ,has been published.", oqmMessage); + driver.publish(oqmMessage, oqmTopic, oqmDestination); + } + + @NotNull + private Map<String, String> getAttributesFromHeaders(DpsHeaders headers) { + Map<String, String> attributes = new HashMap<>(); + attributes.put(DpsHeaders.USER_EMAIL, headers.getUserEmail()); + attributes.put(DpsHeaders.ACCOUNT_ID, this.tenantInfo.getName()); + attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); + headers.addCorrelationIdIfMissing(); + attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); + return attributes; + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/publish/PublisherImpl.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/StatusPublisherImpl.java similarity index 60% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/publish/PublisherImpl.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/StatusPublisherImpl.java index 2a35a96f5684f9e0c1227d549c0419da067c8118..5e1787f341606e15d3f34fe7cef3061be6bd1be0 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/publish/PublisherImpl.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/StatusPublisherImpl.java @@ -1,24 +1,28 @@ -// Copyright 2017-2019, Schlumberger -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -package org.opengroup.osdu.indexer.publish; +package org.opengroup.osdu.indexer.provider.gcp.common.publish; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; import java.util.HashMap; import java.util.Map; +import javax.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.java.Log; import org.opengroup.osdu.core.common.model.http.DpsHeaders; @@ -27,26 +31,28 @@ import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; +import org.opengroup.osdu.indexer.provider.gcp.indexing.processing.IndexerMessagingConfigProperties; import org.opengroup.osdu.indexer.provider.interfaces.IPublisher; import org.springframework.stereotype.Component; -import org.springframework.web.context.annotation.RequestScope; @Log @Component -@RequestScope @RequiredArgsConstructor -public class PublisherImpl implements IPublisher { - - private static final String TOPIC_ID = "indexing-progress"; +public class StatusPublisherImpl implements IPublisher { private final OqmDriver driver; + private final IndexerMessagingConfigProperties properties; + private OqmTopic oqmTopic; - private final OqmTopic oqmTopic = OqmTopic.builder().name(TOPIC_ID).build(); + @PostConstruct + public void setUp() { + oqmTopic = OqmTopic.builder().name(properties.getStatusChangedTopicName()).build(); + } @Override public void publishStatusChangedTagsToTopic(DpsHeaders headers, JobStatus indexerBatchStatus) { - OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId()) - .build(); + OqmDestination oqmDestination = + OqmDestination.builder().partitionId(headers.getPartitionId()).build(); String json = generatePubSubMessage(indexerBatchStatus); Map<String, String> attributes = getAttributes(headers); @@ -68,4 +74,4 @@ public class PublisherImpl implements IPublisher { JsonElement statusChangedTagsJson = gson.toJsonTree(jobStatus, JobStatus.class); return statusChangedTagsJson.toString(); } -} \ No newline at end of file +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/util/ServiceAccountJwtGcpClientImpl.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/util/ServiceAccountJwtGcpClientImpl.java similarity index 95% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/util/ServiceAccountJwtGcpClientImpl.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/util/ServiceAccountJwtGcpClientImpl.java index 0e3821c64259a8e8c8b894c646862c1ec7806b27..698046499b105c845ce0c4d3d780be56afc833e1 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/util/ServiceAccountJwtGcpClientImpl.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/util/ServiceAccountJwtGcpClientImpl.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.opengroup.osdu.indexer.util; +package org.opengroup.osdu.indexer.provider.gcp.common.util; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/config/CustomContextConfiguration.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/config/CustomContextConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..ce24926c2d575e63430649ce5d545f2e701b3b23 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/config/CustomContextConfiguration.java @@ -0,0 +1,71 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.config; + +import java.util.Arrays; +import javax.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.indexer.IndexerApplication; +import org.opengroup.osdu.indexer.ServerletInitializer; +import org.opengroup.osdu.indexer.provider.gcp.web.config.WebAppMainContextConfiguration; +import org.opengroup.osdu.indexer.provider.gcp.web.security.GcpSecurityConfig; +import org.opengroup.osdu.indexer.swagger.SwaggerConfiguration; +import org.springframework.boot.actuate.autoconfigure.security.servlet.ManagementWebSecurityAutoConfiguration; +import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; +import org.springframework.boot.autoconfigure.security.servlet.SecurityFilterAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.FilterType; +import org.springframework.context.annotation.PropertySource; + +@Slf4j +@Configuration +@EnableConfigurationProperties +@PropertySource("classpath:application.properties") +@RequiredArgsConstructor +@ComponentScan( + value = {"org.opengroup.osdu"}, + excludeFilters = { + @ComponentScan.Filter( + type = FilterType.ASSIGNABLE_TYPE, + value = { + WebAppMainContextConfiguration.class, + IndexerApplication.class, + ServerletInitializer.class, + SwaggerConfiguration.class, + GcpSecurityConfig.class, + SecurityAutoConfiguration.class, + ManagementWebSecurityAutoConfiguration.class, + SecurityFilterAutoConfiguration.class + }) + }) +public class CustomContextConfiguration { + + private final ApplicationContext applicationContext; + + @PostConstruct + public void setUp() { + log.debug("Messaging context initialized with id: {}.", applicationContext.getId()); + log.debug("Messaging context status: {}.", applicationContext); + String[] allBeansNames = applicationContext.getBeanDefinitionNames(); + log.debug("Messaging context beans definitions: {}.", Arrays.toString(allBeansNames)); + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/config/ScopeModifierPostProcessor.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/config/ScopeModifierPostProcessor.java new file mode 100644 index 0000000000000000000000000000000000000000..f1b4ce0b80bbbd86d947b2f486fd97238b30a22a --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/config/ScopeModifierPostProcessor.java @@ -0,0 +1,47 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.config; + +import java.util.Objects; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.indexer.provider.gcp.indexing.thread.ThreadScope; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class ScopeModifierPostProcessor implements BeanFactoryPostProcessor { + + public static final String SCOPE_THREAD = "scope_thread"; + + @Override + public void postProcessBeanFactory(ConfigurableListableBeanFactory factory) throws BeansException { + factory.registerScope(SCOPE_THREAD, new ThreadScope()); + + for (String beanName : factory.getBeanDefinitionNames()) { + BeanDefinition beanDef = factory.getBeanDefinition(beanName); + if (Objects.equals(beanDef.getScope(), "request")) { + beanDef.setScope(SCOPE_THREAD); + log.debug("Scope has been overridden for bean: {}", beanDef.getBeanClassName()); + } + } + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/package-info.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..4cc667aa15649f7e9022b31c829f95078f479270 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/package-info.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This module is used to override request scope bean configuration enforced by the common code, to untie indexing from the web environment and allow async task + * processing in a pulling manner. And configure non web context, which should not process user requests. + * <p> + * As a replacement for @RequestScope, ThreadScope is used, implementation based on SimpleThreadScope provided by Spring. Beans configuration provided by the + * common code is overriden with help of BeanFactoryPostProcessor. + * </p> + */ +package org.opengroup.osdu.indexer.provider.gcp.indexing; diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerMessagingConfigProperties.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerMessagingConfigProperties.java new file mode 100644 index 0000000000000000000000000000000000000000..321db939f35b31503d02c4db08acf9513765dd16 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerMessagingConfigProperties.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Setter +@Getter +@ConfigurationProperties +@Configuration +public class IndexerMessagingConfigProperties { + + private String recordsChangedTopicName; + //TODO schema-changed events consuming not implemented + private String schemaChangedTopicName; + private String defaultRelativeIndexerWorkerUrl; + private String reprocessTopicName; + private String statusChangedTopicName; + +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java new file mode 100644 index 0000000000000000000000000000000000000000..0e3e0fd92ed859373da063573508480fbe1d3a33 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java @@ -0,0 +1,130 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import static org.opengroup.osdu.core.common.Constants.WORKER_RELATIVE_URL; + +import com.google.common.base.Strings; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.opengroup.osdu.core.auth.TokenProvider; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; +import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; +import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; +import org.opengroup.osdu.indexer.provider.gcp.indexing.thread.ThreadScopeContextHolder; + +@Slf4j +@RequiredArgsConstructor +public class IndexerOqmMessageReceiver implements OqmMessageReceiver { + + private final ThreadDpsHeaders dpsHeaders; + private final SubscriptionConsumer consumer; + private final TokenProvider tokenProvider; + private final Gson gson = new Gson(); + + + @Override + public void receiveMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier) { + log.info("OQM message: {} - {} - {}", oqmMessage.getId(), oqmMessage.getData(), oqmMessage.getAttributes()); + boolean acked = false; + try { + if (!validInput(oqmMessage)) { + log.warn("Not valid event payload, event will not be processed."); + oqmAckReplier.ack(); + return; + } + DpsHeaders headers = getHeaders(oqmMessage); + // Filling thread context required by the core services. + dpsHeaders.setThreadContext(headers.getHeaders()); + acked = sendMessage(oqmMessage); + } catch (Exception e) { + log.error("Error occurred during message receiving: ", e); + } finally { + if (!acked) { + oqmAckReplier.nack(); + } else { + oqmAckReplier.ack(); + } + // Cleaning thread context after processing is finished and the thread dies out. + ThreadScopeContextHolder.currentThreadScopeAttributes().clear(); + } + } + + private boolean validInput(OqmMessage oqmMessage) { + boolean isValid = true; + if (Strings.isNullOrEmpty(oqmMessage.getData()) || oqmMessage.getData().equals("{}")) { + log.warn("Message body is empty, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes()); + isValid = false; + } + if (oqmMessage.getAttributes() == null || oqmMessage.getAttributes().size() == 0) { + log.warn("Attribute map not found, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes()); + isValid = false; + } + return isValid; + } + + private boolean sendMessage(OqmMessage oqmMessage) { + CloudTaskRequest cloudTaskRequest; + JsonElement jsonElement = JsonParser.parseString(oqmMessage.getData()); + + if (jsonElement.isJsonArray()) { + cloudTaskRequest = getCloudTaskRequestProducedByStorageService(oqmMessage); + } else { + cloudTaskRequest = getCloudTaskRequestProducedByIndexerService(oqmMessage); + } + + return consumer.consume(cloudTaskRequest); + } + + /** + * @param oqmMessage produced by Indexer packs messages in org.opengroup.osdu.core.common.model.search.CloudTaskRequest + * @return CloudTaskRequest as it was packed by Indexer + */ + private CloudTaskRequest getCloudTaskRequestProducedByIndexerService(OqmMessage oqmMessage) { + return this.gson.fromJson(oqmMessage.getData(), CloudTaskRequest.class); + } + + /** + * @param oqmMessage produced by Storage packs messages in array of org.opengroup.osdu.core.common.model.storage.PubSubInfo ; + * @return CloudTaskRequest with array of PubSubInfo's packed in message property + */ + private CloudTaskRequest getCloudTaskRequestProducedByStorageService(OqmMessage oqmMessage) { + return CloudTaskRequest.builder() + .url(WORKER_RELATIVE_URL) + .message(gson.toJson(oqmMessage)) + .build(); + } + + @NotNull + private DpsHeaders getHeaders(OqmMessage oqmMessage) { + DpsHeaders headers = new DpsHeaders(); + headers.getHeaders().put("data-partition-id", oqmMessage.getAttributes().get("data-partition-id")); + headers.getHeaders().put("correlation-id", oqmMessage.getAttributes().get("correlation-id")); + headers.getHeaders().put("account-id", oqmMessage.getAttributes().get("account-id")); + headers.getHeaders().put("user", oqmMessage.getAttributes().get("user")); + headers.getHeaders().put("authorization", "Bearer " + tokenProvider.getIdToken()); + return headers; + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/OqmSubscriberManager.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/OqmSubscriberManager.java new file mode 100644 index 0000000000000000000000000000000000000000..b69491029f4bc9febcbfe0b3119f7c3e3eeaf418 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/OqmSubscriberManager.java @@ -0,0 +1,102 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.Nullable; +import org.opengroup.osdu.core.common.model.tenant.TenantInfo; +import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; +import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessageReceiver; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriber; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscription; +import org.opengroup.osdu.core.gcp.oqm.model.OqmSubscriptionQuery; +import org.opengroup.osdu.core.gcp.oqm.model.OqmTopic; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +@RequiredArgsConstructor +public class OqmSubscriberManager { + + private final OqmDriver driver; + + private OqmSubscription getOrCreateSubscriptionForTenant(TenantInfo tenantInfo, String topicName, String subscriptionName) { + log.info("OQM: provisioning tenant {}:", tenantInfo.getDataPartitionId()); + log.info("OQM: check for topic {} existence:", topicName); + OqmTopic topic = driver.getTopic(topicName, getDestination(tenantInfo)) + .orElse(null); + if (topic == null) { + log.info("OQM: check for topic {} existence: ABSENT. Skipped", topicName); + throw new RuntimeException(); + } + + log.info("OQM: check for topic {} existence: PRESENT", topicName); + OqmSubscription subscription = getSubscription(tenantInfo, topic, subscriptionName); + + if (subscription == null) { + subscription = createSubscription(tenantInfo, topic, subscriptionName); + } else { + log.info("OQM: check for subscription {} existence: PRESENT", subscriptionName); + } + log.info("OQM: provisioning tenant {}: COMPLETED.", tenantInfo.getDataPartitionId()); + return subscription; + } + + @Nullable + private OqmSubscription getSubscription(TenantInfo tenantInfo, OqmTopic topic, String subscriptionName) { + log.info("OQM: check for subscription {} existence:", subscriptionName); + OqmSubscriptionQuery query = OqmSubscriptionQuery.builder() + .namePrefix(subscriptionName) + .subscriberable(true) + .build(); + return driver + .listSubscriptions(topic, query, getDestination(tenantInfo)).stream() + .findAny() + .orElse(null); + } + + private OqmSubscription createSubscription(TenantInfo tenantInfo, OqmTopic topic, String subscriptionName) { + log.info("OQM: check for subscription {} existence: ABSENT. Will create.", subscriptionName); + OqmSubscription request = OqmSubscription.builder() + .topic(topic) + .name(subscriptionName) + .build(); + return driver.createAndGetSubscription(request, getDestination(tenantInfo)); + } + + public void registerSubscriber(TenantInfo tenantInfo, String topicName, String subscriptionName, OqmMessageReceiver messageReceiver) { + OqmSubscription subscriptionForTenant = getOrCreateSubscriptionForTenant(tenantInfo, topicName, subscriptionName); + log.info("OQM: registering Subscriber for subscription {}", subscriptionName); + OqmDestination destination = getDestination(tenantInfo); + + OqmSubscriber subscriber = OqmSubscriber.builder() + .subscription(subscriptionForTenant) + .messageReceiver(messageReceiver) + .build(); + driver.subscribe(subscriber, destination); + log.info("OQM: provisioning subscription {}: Subscriber REGISTERED.", subscriptionName); + } + + private OqmDestination getDestination(TenantInfo tenantInfo) { + return OqmDestination.builder() + .partitionId(tenantInfo.getDataPartitionId()) + .build(); + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumer.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..5fb535367258b07f840970a073c9c870de22d27b --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumer.java @@ -0,0 +1,94 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import static org.opengroup.osdu.core.common.Constants.REINDEX_RELATIVE_URL; +import static org.opengroup.osdu.core.common.Constants.WORKER_RELATIVE_URL; + +import com.google.gson.Gson; +import java.time.LocalDateTime; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.indexer.JobStatus; +import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; +import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; +import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.indexer.api.RecordIndexerApi; +import org.opengroup.osdu.indexer.api.ReindexApi; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class RecordsChangedSubscriptionConsumer implements SubscriptionConsumer { + + private final DpsHeaders dpsHeaders; + private final RecordIndexerApi recordIndexerApi; + private final ReindexApi reindexApi; + private final Gson gson = new Gson(); + + public boolean consume(CloudTaskRequest request) { + String url = request.getUrl(); + log.debug("Incoming async processing task: {} with headers: {}", request, dpsHeaders.getHeaders()); + + try { + if (url.equals(WORKER_RELATIVE_URL)) { + RecordChangedMessages indexWorkerRequestBody = getIndexWorkerRequestBody(request, dpsHeaders); + log.debug("Job message body: {}", indexWorkerRequestBody); + ResponseEntity<JobStatus> jobStatusResponse = recordIndexerApi.indexWorker(indexWorkerRequestBody); + log.info("Job status: {}", jobStatusResponse); + } else if (url.equals(REINDEX_RELATIVE_URL)) { + RecordReindexRequest reindexBody = getReindexBody(request); + log.debug("Reindex job message body: {}", reindexBody); + ResponseEntity<?> reindexResponse = reindexApi.reindex(reindexBody, false); + log.info("Reindex job status: {}", reindexResponse); + } + return true; + } catch (AppException e) { + int statusCode = e.getError().getCode(); + if (statusCode > 199 && statusCode < 300) { + log.info("Event : {}, with headers: {} was not processed, with AppException: {} and will not be rescheduled", request, dpsHeaders.getHeaders(), + e); + return true; + } else { + log.warn("Event : {}, with headers: {} was not processed, with AppException: {}, stack trace: {} and will be rescheduled", request, + dpsHeaders.getHeaders(), e.getOriginalException(), e.getOriginalException().getStackTrace()); + return false; + } + } catch (Exception e) { + log.error("Error, Event : {}, with headers: {} was not processed, and will be rescheduled, reason: {}, stack trace: {}", request, + dpsHeaders.getHeaders(), e, e.getStackTrace()); + return false; + } + } + + private RecordReindexRequest getReindexBody(CloudTaskRequest request) { + return this.gson.fromJson(request.getMessage(), RecordReindexRequest.class); + } + + private RecordChangedMessages getIndexWorkerRequestBody(CloudTaskRequest request, DpsHeaders dpsHeaders) { + RecordChangedMessages recordChangedMessages = this.gson.fromJson(request.getMessage(), RecordChangedMessages.class); + recordChangedMessages.setMessageId(dpsHeaders.getCorrelationId()); + recordChangedMessages.setPublishTime(LocalDateTime.now().toString()); + return recordChangedMessages; + } + +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/SubscriptionConsumer.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/SubscriptionConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..00dd163c7890edf9c68005f3afd4000cc7bc3c91 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/SubscriptionConsumer.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; + +public interface SubscriptionConsumer { + + boolean consume(CloudTaskRequest request); +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..3da0b0ab19fe9b106c148257acc810b1c128c9ec --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/TenantSubscriberConfiguration.java @@ -0,0 +1,71 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import java.util.Collection; +import javax.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.auth.TokenProvider; +import org.opengroup.osdu.core.common.model.tenant.TenantInfo; +import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory; +import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; +import org.springframework.stereotype.Component; + +/** + * Subscription configuration class. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class TenantSubscriberConfiguration { + + private static final String SUBSCRIPTION_PREFIX = "indexer-"; + private final IndexerMessagingConfigProperties properties; + private final OqmSubscriberManager subscriberManager; + private final ITenantFactory tenantInfoFactory; + private final TokenProvider tokenProvider; + private final SubscriptionConsumer consumer; + private final ThreadDpsHeaders headers; + + /** + * Tenant configurations provided by the Partition service will be used to configure subscribers. If tenants use the same message broker(The same RabbitMQ + * instance, or the same GCP project Pub/Sub) then only one subscriber in this broker will be used. + */ + @PostConstruct + void postConstruct() { + log.info("OqmSubscriberManager provisioning STARTED"); + IndexerOqmMessageReceiver recordsChangedMessageReceiver = new IndexerOqmMessageReceiver(headers, consumer, tokenProvider); + IndexerOqmMessageReceiver reprocessOqmMessageReceiver = new IndexerOqmMessageReceiver(headers, consumer, tokenProvider); + + String recordsChangedTopicName = properties.getRecordsChangedTopicName(); + String reprocessTopicName = properties.getReprocessTopicName(); + + Collection<TenantInfo> tenantInfos = tenantInfoFactory.listTenantInfo(); + + for (TenantInfo tenantInfo : tenantInfos) { + subscriberManager.registerSubscriber(tenantInfo, recordsChangedTopicName, getSubscriptionName(recordsChangedTopicName), recordsChangedMessageReceiver); + subscriberManager.registerSubscriber(tenantInfo, reprocessTopicName, getSubscriptionName(reprocessTopicName), reprocessOqmMessageReceiver); + } + log.info("OqmSubscriberManager provisioning COMPLETED"); + } + + private String getSubscriptionName(String topicName) { + return SUBSCRIPTION_PREFIX + topicName; + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/scope/ThreadDpsHeaders.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/scope/ThreadDpsHeaders.java new file mode 100644 index 0000000000000000000000000000000000000000..3e9f007ceb7c5ab75f6c1dfcb40cc53ad8648c51 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/scope/ThreadDpsHeaders.java @@ -0,0 +1,45 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.scope; + +import static org.springframework.context.annotation.ScopedProxyMode.TARGET_CLASS; + +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.auth.TokenProvider; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.indexer.provider.gcp.indexing.config.ScopeModifierPostProcessor; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Component; + +@Slf4j +@Primary +@Component +@Scope(value = ScopeModifierPostProcessor.SCOPE_THREAD, proxyMode = TARGET_CLASS) +@RequiredArgsConstructor +public class ThreadDpsHeaders extends DpsHeaders { + + private final TokenProvider tokenProvider; + + public void setThreadContext(Map<String, String> headers) { + this.put(DpsHeaders.AUTHORIZATION, "Bearer " + tokenProvider.getIdToken()); + this.addFromMap(headers); + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScope.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScope.java new file mode 100644 index 0000000000000000000000000000000000000000..139ab8386029b108c2d3c55b310483403f0ef597 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScope.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.thread; + +import org.springframework.context.support.SimpleThreadScope; + +public class ThreadScope extends SimpleThreadScope { + + @Override + public void registerDestructionCallback(String name, Runnable callback) { + ThreadScopeContextHolder.currentThreadScopeAttributes().registerRequestDestructionCallback(name, callback); + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScopeAttributes.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScopeAttributes.java new file mode 100644 index 0000000000000000000000000000000000000000..7a9ac113c207b536d3b5df147dd3e22771b35583 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScopeAttributes.java @@ -0,0 +1,60 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.thread; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.lang.NonNull; + +@Slf4j +@RequiredArgsConstructor +public class ThreadScopeAttributes { + + protected final Map<String, Object> hBeans = new HashMap(); + protected final Map<String, Runnable> hRequestDestructionCallbacks = new LinkedHashMap(); + + protected final Map<String, Object> getBeanMap() { + return this.hBeans; + } + + protected final void registerRequestDestructionCallback( + @NonNull String name, @NonNull Runnable callback) { + log.trace("Registering callback for: {} on runnable: {}", name, callback); + this.hRequestDestructionCallbacks.put(name, callback); + } + + public final void clear() { + this.processDestructionCallbacks(); + this.hBeans.clear(); + } + + private void processDestructionCallbacks() { + for (Map.Entry<String, Runnable> mapEntry : this.hRequestDestructionCallbacks.entrySet()) { + Runnable callback = mapEntry.getValue(); + log.trace( + "Performing destruction callback for: {} on thread: {}", + mapEntry.getKey(), + Thread.currentThread().getName()); + callback.run(); + } + this.hRequestDestructionCallbacks.clear(); + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScopeContextHolder.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScopeContextHolder.java new file mode 100644 index 0000000000000000000000000000000000000000..24d7c2ef69103f9181a620ac7e88bcd1f464cb26 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/thread/ThreadScopeContextHolder.java @@ -0,0 +1,49 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.thread; + +public class ThreadScopeContextHolder { + + private static final ThreadLocal<ThreadScopeAttributes> threadScopeAttributesHolder = + new InheritableThreadLocal<ThreadScopeAttributes>() { + @Override + protected ThreadScopeAttributes initialValue() { + return new ThreadScopeAttributes(); + } + }; + + private ThreadScopeContextHolder() { + } + + public static ThreadScopeAttributes getThreadScopeAttributes() { + return threadScopeAttributesHolder.get(); + } + + public static void setThreadScopeAttributes(ThreadScopeAttributes accessor) { + threadScopeAttributesHolder.set(accessor); + } + + public static ThreadScopeAttributes currentThreadScopeAttributes() throws IllegalStateException { + ThreadScopeAttributes accessor = threadScopeAttributesHolder.get(); + if (accessor == null) { + throw new IllegalStateException("No thread scoped attributes."); + } else { + return accessor; + } + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/config/WebAppMainContextConfiguration.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/config/WebAppMainContextConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..ffb314b7e171ffed3fd0d6093a58106f98aa22c1 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/config/WebAppMainContextConfiguration.java @@ -0,0 +1,75 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.web.config; + +import java.util.Arrays; +import javax.annotation.PostConstruct; +import javax.servlet.http.HttpServletRequest; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.common.http.DpsHeaderFactory; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.indexer.IndexerApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.FilterType; +import org.springframework.context.annotation.Primary; +import org.springframework.context.annotation.PropertySource; +import org.springframework.web.context.annotation.RequestScope; + +@Slf4j +@Configuration +@EnableAutoConfiguration +@PropertySource("classpath:application.properties") +@RequiredArgsConstructor +@ComponentScan( + value = {"org.opengroup.osdu"}, + excludeFilters = { + @ComponentScan.Filter( + type = FilterType.ASSIGNABLE_TYPE, + value = { + IndexerApplication.class + }), + @ComponentScan.Filter( + type = FilterType.REGEX, + pattern = {"org\\.opengroup\\.osdu\\.indexer\\.provider\\.gcp\\.indexing\\..*"} + ) + } +) +public class WebAppMainContextConfiguration { + + private final ApplicationContext applicationContext; + + @PostConstruct + public void setUp() { + log.debug("Main web app context initialized with id: {}.", applicationContext.getId()); + log.debug("Main web app context status: {}.", applicationContext); + String[] allBeansNames = applicationContext.getBeanDefinitionNames(); + log.debug("Main web app context beans definitions: {}.", Arrays.toString(allBeansNames)); + } + + @Primary + @Bean + @RequestScope + public DpsHeaders dpsHeaders(HttpServletRequest request) { + return new DpsHeaderFactory(request); + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/TenantInfoService.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/di/TenantInfoService.java similarity index 56% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/TenantInfoService.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/di/TenantInfoService.java index 84b3ca3597ebb4a046ce39c2d9588f56f6bc4e7c..07570f46ccc0e0026684b08913feeb148cfb2344 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/di/TenantInfoService.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/di/TenantInfoService.java @@ -1,9 +1,8 @@ -package org.opengroup.osdu.indexer.di; +package org.opengroup.osdu.indexer.provider.gcp.web.di; import java.util.ArrayList; import java.util.List; import javax.inject.Inject; - import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.tenant.TenantInfo; import org.opengroup.osdu.core.common.multitenancy.ITenantInfoService; @@ -15,19 +14,19 @@ import org.springframework.web.context.annotation.RequestScope; @Component public class TenantInfoService implements ITenantInfoService { - @Inject - private ITenantFactory tenantFactory; + @Inject + private ITenantFactory tenantFactory; - @Inject - private DpsHeaders headers; + @Inject + private DpsHeaders headers; - @Override - public TenantInfo getTenantInfo() { - return tenantFactory.getTenantInfo(headers.getPartitionId()); - } + @Override + public TenantInfo getTenantInfo() { + return tenantFactory.getTenantInfo(headers.getPartitionId()); + } - @Override - public List<TenantInfo> getAllTenantInfos() { - return new ArrayList<>(tenantFactory.listTenantInfo()); - } -} \ No newline at end of file + @Override + public List<TenantInfo> getAllTenantInfos() { + return new ArrayList<>(tenantFactory.listTenantInfo()); + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/middleware/IndexFilter.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/middleware/IndexFilter.java similarity index 91% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/middleware/IndexFilter.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/middleware/IndexFilter.java index 525ccfa928b56f1bd5d08402621a8ffb331cedd5..2ff2b6ba6137c89f1e9381be873dc430ad69edc4 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/middleware/IndexFilter.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/middleware/IndexFilter.java @@ -15,9 +15,22 @@ limitations under the License. */ -package org.opengroup.osdu.indexer.middleware; +package org.opengroup.osdu.indexer.provider.gcp.web.middleware; import com.google.common.base.Strings; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import javax.inject.Inject; +import javax.servlet.Filter; +import javax.servlet.FilterChain; +import javax.servlet.FilterConfig; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import lombok.extern.java.Log; import org.apache.http.HttpStatus; import org.opengroup.osdu.core.common.http.ResponseHeaders; @@ -29,15 +42,6 @@ import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.springframework.http.HttpMethod; import org.springframework.stereotype.Component; -import javax.inject.Inject; -import javax.servlet.*; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - @Log @Component public class IndexFilter implements Filter { @@ -60,7 +64,7 @@ public class IndexFilter implements Filter { @Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) - throws IOException, ServletException { + throws IOException, ServletException { HttpServletRequest httpRequest = (HttpServletRequest) servletRequest; String uri = httpRequest.getRequestURI().toLowerCase(); @@ -75,7 +79,7 @@ public class IndexFilter implements Filter { } filterChain.doFilter(servletRequest, servletResponse); - + HttpServletResponse httpResponse = (HttpServletResponse) servletResponse; Map<String, List<Object>> standardHeaders = ResponseHeaders.STANDARD_RESPONSE_HEADERS; for (Map.Entry<String, List<Object>> header : standardHeaders.entrySet()) { @@ -92,7 +96,9 @@ public class IndexFilter implements Filter { } private void checkWorkerApiAccess(IRequestInfo requestInfo) { - if (requestInfo.isTaskQueueRequest()) return; + if (requestInfo.isTaskQueueRequest()) { + return; + } throw AppException.createForbidden("invalid user agent, AppEngine Task Queue only"); } diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/package-info.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..45e225e75466a19c067498cbfb312e058771dcda --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This module is used to keep minimum required bean configuration in the web environment required for request processing. All beans will be placed in a + * separate context. + */ +package org.opengroup.osdu.indexer.provider.gcp.web; diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/security/GcpSecurityConfig.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/security/GcpSecurityConfig.java similarity index 87% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/security/GcpSecurityConfig.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/security/GcpSecurityConfig.java index 47b42552ccdef36a19103027cdde783bd4d50359..c4242b232c3987150cf85697fed184c983189065 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/security/GcpSecurityConfig.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/security/GcpSecurityConfig.java @@ -15,7 +15,7 @@ limitations under the License. */ -package org.opengroup.osdu.indexer.security; +package org.opengroup.osdu.indexer.provider.gcp.web.security; import org.springframework.context.annotation.Configuration; import org.springframework.security.config.annotation.method.configuration.EnableGlobalMethodSecurity; @@ -33,15 +33,15 @@ public class GcpSecurityConfig extends WebSecurityConfigurerAdapter { protected void configure(HttpSecurity http) throws Exception { http.httpBasic().disable() - .csrf().disable(); //disable default authN. AuthN handled by endpoints proxy + .csrf().disable(); //disable default authN. AuthN handled by endpoints proxy } @Override public void configure(WebSecurity web) throws Exception { web.ignoring().antMatchers("/api-docs") - .antMatchers("/info") - .antMatchers("/swagger"); + .antMatchers("/info") + .antMatchers("/swagger"); } } diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/util/AppExceptionHandler.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/util/AppExceptionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..a766de2693d24cbc2ac119796e198d2881692363 --- /dev/null +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/util/AppExceptionHandler.java @@ -0,0 +1,58 @@ +package org.opengroup.osdu.indexer.provider.gcp.web.util; + +import java.util.Objects; +import lombok.extern.slf4j.Slf4j; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.ControllerAdvice; +import org.springframework.web.bind.annotation.ExceptionHandler; + +@ControllerAdvice +@Slf4j +public class AppExceptionHandler { + + @ExceptionHandler(AppException.class) + public ResponseEntity<Object> handleAppExceptions(AppException e) { + return this.getErrorResponse(e); + } + + private ResponseEntity<Object> getErrorResponse(AppException e) { + + String exceptionMsg = Objects.nonNull(e.getOriginalException()) + ? e.getOriginalException().getMessage() + : e.getError().getMessage(); + + Integer errorCode = e.getError().getCode(); + + if (errorCode > 499) { + log.error(exceptionMsg, e.getOriginalException()); + } else { + log.warn(exceptionMsg, e.getOriginalException()); + } + + HttpStatus status = Objects.nonNull(HttpStatus.resolve(errorCode)) + ? HttpStatus.resolve(errorCode) + : resolveNotSupportedStatus(errorCode); + + return new ResponseEntity<>(e.getError(), status); + } + + //Currently not all codes provided from core can be resolved by HttpStatus + //example org.opengroup.osdu.core.common.model.http.RequestStatus have not supported by HttpStatus codes + private HttpStatus resolveNotSupportedStatus(int statusCode) { + if (statusCode > 99 && statusCode < 200) { + return HttpStatus.CONTINUE; + } + if (statusCode > 199 && statusCode < 300) { + return HttpStatus.NO_CONTENT; + } + if (statusCode > 299 && statusCode < 400) { + return HttpStatus.MULTIPLE_CHOICES; + } + if (statusCode > 399 && statusCode < 500) { + return HttpStatus.BAD_REQUEST; + } + return HttpStatus.INTERNAL_SERVER_ERROR; + } +} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/util/RequestInfoImpl.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/util/RequestInfoImpl.java similarity index 97% rename from provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/util/RequestInfoImpl.java rename to provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/util/RequestInfoImpl.java index 360f576f68021a2fdb398db4d5c368284069bc2e..af047854e9a9ae254379650685d70c8b6b02cc72 100644 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/util/RequestInfoImpl.java +++ b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/provider/gcp/web/util/RequestInfoImpl.java @@ -15,7 +15,7 @@ limitations under the License. */ -package org.opengroup.osdu.indexer.util; +package org.opengroup.osdu.indexer.provider.gcp.web.util; import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION; @@ -38,12 +38,10 @@ import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; import org.opengroup.osdu.core.gcp.model.CloudTaskHeaders; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.springframework.stereotype.Component; -import org.springframework.web.context.annotation.RequestScope; @Log @Component -@RequestScope public class RequestInfoImpl implements IRequestInfo { @Inject diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/util/AppExceptionHandler.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/util/AppExceptionHandler.java deleted file mode 100644 index ebfe87ef077c4bbb244a1b68ba94d9138bfc0753..0000000000000000000000000000000000000000 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/util/AppExceptionHandler.java +++ /dev/null @@ -1,58 +0,0 @@ -package org.opengroup.osdu.indexer.util; - -import java.util.Objects; -import lombok.extern.slf4j.Slf4j; -import org.opengroup.osdu.core.common.model.http.AppException; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.ControllerAdvice; -import org.springframework.web.bind.annotation.ExceptionHandler; - -@ControllerAdvice -@Slf4j -public class AppExceptionHandler { - - @ExceptionHandler(AppException.class) - public ResponseEntity<Object> handleAppExceptions(AppException e) { - return this.getErrorResponse(e); - } - - private ResponseEntity<Object> getErrorResponse(AppException e) { - - String exceptionMsg = Objects.nonNull(e.getOriginalException()) - ? e.getOriginalException().getMessage() - : e.getError().getMessage(); - - Integer errorCode = e.getError().getCode(); - - if (errorCode > 499) { - log.error(exceptionMsg, e.getOriginalException()); - } else { - log.warn(exceptionMsg, e.getOriginalException()); - } - - HttpStatus status = Objects.nonNull(HttpStatus.resolve(errorCode)) - ? HttpStatus.resolve(errorCode) - : resolveNotSupportedStatus(errorCode); - - return new ResponseEntity<>(e.getError(), status); - } - - //Currently not all codes provided from core can be resolved by HttpStatus - //example org.opengroup.osdu.core.common.model.http.RequestStatus have not supported by HttpStatus codes - private HttpStatus resolveNotSupportedStatus(int statusCode) { - if (statusCode > 99 && statusCode < 200) { - return HttpStatus.CONTINUE; - } - if (statusCode > 199 && statusCode < 300) { - return HttpStatus.NO_CONTENT; - } - if (statusCode > 299 && statusCode < 400) { - return HttpStatus.MULTIPLE_CHOICES; - } - if (statusCode > 399 && statusCode < 500) { - return HttpStatus.BAD_REQUEST; - } - return HttpStatus.INTERNAL_SERVER_ERROR; - } -} diff --git a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/util/DpsHeaderFactoryGcp.java b/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/util/DpsHeaderFactoryGcp.java deleted file mode 100644 index 75b7b744f9e26ce7fb58315ccb14c28b34eef42a..0000000000000000000000000000000000000000 --- a/provider/indexer-gcp/src/main/java/org/opengroup/osdu/indexer/util/DpsHeaderFactoryGcp.java +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2017-2019, Schlumberger -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexer.util; - -import java.util.Collections; -import java.util.Map; -import java.util.stream.Collectors; - -import javax.servlet.http.HttpServletRequest; -import javax.inject.Inject; - -import com.google.common.base.Strings; -import org.opengroup.osdu.core.common.model.http.DpsHeaders; - -import org.opengroup.osdu.core.gcp.model.CloudTaskHeaders; -import org.opengroup.osdu.core.gcp.util.TraceIdExtractor; -import org.springframework.context.annotation.Primary; -import org.springframework.stereotype.Component; -import org.springframework.web.context.annotation.RequestScope; - -@Component -@RequestScope -@Primary -public class DpsHeaderFactoryGcp extends DpsHeaders { - - @Inject - public DpsHeaderFactoryGcp(HttpServletRequest request) { - - Map<String, String> headers = Collections - .list(request.getHeaderNames()) - .stream() - .collect(Collectors.toMap(h -> h, request::getHeader)); - - String traceContext = headers.get(CloudTaskHeaders.CLOUD_TRACE_CONTEXT); - - if(!Strings.isNullOrEmpty(traceContext)){ - headers.put(CloudTaskHeaders.TRACE_ID, TraceIdExtractor.getTraceId(traceContext)); - } - - this.addFromMap(headers); - - // Add Correlation ID if missing - this.addCorrelationIdIfMissing(); - } -} \ No newline at end of file diff --git a/provider/indexer-gcp/src/main/resources/application-anthos.properties b/provider/indexer-gcp/src/main/resources/application-anthos.properties index 67e63d39f7862bed24f541b7594661665b2214b1..e33d2787bfea0e4f92edfd0276745c590fcc4078 100644 --- a/provider/indexer-gcp/src/main/resources/application-anthos.properties +++ b/provider/indexer-gcp/src/main/resources/application-anthos.properties @@ -4,3 +4,4 @@ partition-auth-enabled=false openid.provider-url= openid.provider-client-id= openid.provider-client-secret= +rabbitmq.retry.limit=3 diff --git a/provider/indexer-gcp/src/main/resources/application.properties b/provider/indexer-gcp/src/main/resources/application.properties index bc53e7b9fc9bb11dcc343aad82aa59188d1a2137..0618f86a7b9ed9baea29cd761d75a8016cfd6110 100644 --- a/provider/indexer-gcp/src/main/resources/application.properties +++ b/provider/indexer-gcp/src/main/resources/application.properties @@ -56,6 +56,8 @@ SCHEMA_BASE_HOST=http://schema SCHEMA_PATH=/api/schema-service/v1/schema SCHEMA_HOST=${SCHEMA_BASE_HOST}${SCHEMA_PATH} -INDEXER_QUEUE_BASE_HOST=http://indexer-queue -INDEXER_QUEUE_PATH=/api/indexer-queue/v1/_dps/task-handlers/enqueue -INDEXER_QUEUE_HOST=${INDEXER_QUEUE_BASE_HOST}${INDEXER_QUEUE_PATH} +records-changed-topic-name=records-changed +schema-changed-topic-name=schema-changed +reprocess-topic-name=reprocess +status-changed-topic-name=indexing-progress + diff --git a/provider/indexer-gcp/src/test/java/org/opengroup/osdu/indexer/middleware/IndexFilterTest.java b/provider/indexer-gcp/src/test/java/org/opengroup/osdu/indexer/middleware/IndexFilterTest.java index bd698e00c523ed700c606dd08e9547db85221ba8..ddda259b30974da2e4495a3db0f023cc2abc57c1 100644 --- a/provider/indexer-gcp/src/test/java/org/opengroup/osdu/indexer/middleware/IndexFilterTest.java +++ b/provider/indexer-gcp/src/test/java/org/opengroup/osdu/indexer/middleware/IndexFilterTest.java @@ -1,3 +1,20 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.opengroup.osdu.indexer.middleware; import org.junit.Test; @@ -7,47 +24,68 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.indexer.provider.gcp.web.middleware.IndexFilter; import javax.servlet.FilterChain; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import java.io.IOException; import java.util.Collections; @RunWith(MockitoJUnitRunner.class) public class IndexFilterTest { - @InjectMocks - private IndexFilter indexFilter; - - @Mock - private DpsHeaders dpsHeaders; - - @Test - public void shouldSetCorrectResponseHeaders() throws IOException, ServletException { - HttpServletRequest httpServletRequest = Mockito.mock(HttpServletRequest.class); - HttpServletResponse httpServletResponse = Mockito.mock(HttpServletResponse.class); - FilterChain filterChain = Mockito.mock(FilterChain.class); - Mockito.when(httpServletRequest.getRequestURI()).thenReturn("https://test.com"); - Mockito.when(httpServletRequest.getMethod()).thenReturn("POST"); - Mockito.when(dpsHeaders.getCorrelationId()).thenReturn("correlation-id-value"); - - indexFilter.doFilter(httpServletRequest, httpServletResponse, filterChain); - - Mockito.verify(httpServletResponse).addHeader("Access-Control-Allow-Origin", Collections.singletonList("*").toString()); - Mockito.verify(httpServletResponse).addHeader("Access-Control-Allow-Headers", Collections.singletonList("origin, content-type, accept, authorization, data-partition-id, correlation-id, appkey").toString()); - Mockito.verify(httpServletResponse).addHeader("Access-Control-Allow-Methods", Collections.singletonList("GET, POST, PUT, DELETE, OPTIONS, HEAD, PATCH").toString()); - Mockito.verify(httpServletResponse).addHeader("Access-Control-Allow-Credentials", Collections.singletonList("true").toString()); - Mockito.verify(httpServletResponse).addHeader("X-Frame-Options", Collections.singletonList("DENY").toString()); - Mockito.verify(httpServletResponse).addHeader("X-XSS-Protection", Collections.singletonList("1; mode=block").toString()); - Mockito.verify(httpServletResponse).addHeader("X-Content-Type-Options", Collections.singletonList("nosniff").toString()); - Mockito.verify(httpServletResponse).addHeader("Cache-Control", Collections.singletonList("no-cache, no-store, must-revalidate").toString()); - Mockito.verify(httpServletResponse).addHeader("Content-Security-Policy", Collections.singletonList("default-src 'self'").toString()); - Mockito.verify(httpServletResponse).addHeader("Strict-Transport-Security", Collections.singletonList("max-age=31536000; includeSubDomains").toString()); - Mockito.verify(httpServletResponse).addHeader("Expires", Collections.singletonList("0").toString()); - Mockito.verify(httpServletResponse).addHeader("correlation-id", "correlation-id-value"); - Mockito.verify(filterChain).doFilter(httpServletRequest, httpServletResponse); - } + @InjectMocks private IndexFilter indexFilter; + + @Mock private DpsHeaders dpsHeaders; + + @Test + public void shouldSetCorrectResponseHeaders() throws IOException, ServletException { + HttpServletRequest httpServletRequest = Mockito.mock(HttpServletRequest.class); + HttpServletResponse httpServletResponse = Mockito.mock(HttpServletResponse.class); + FilterChain filterChain = Mockito.mock(FilterChain.class); + Mockito.when(httpServletRequest.getRequestURI()).thenReturn("https://test.com"); + Mockito.when(httpServletRequest.getMethod()).thenReturn("POST"); + Mockito.when(dpsHeaders.getCorrelationId()).thenReturn("correlation-id-value"); + + indexFilter.doFilter(httpServletRequest, httpServletResponse, filterChain); + + Mockito.verify(httpServletResponse) + .addHeader("Access-Control-Allow-Origin", Collections.singletonList("*").toString()); + Mockito.verify(httpServletResponse) + .addHeader( + "Access-Control-Allow-Headers", + Collections.singletonList( + "origin, content-type, accept, authorization, data-partition-id, correlation-id, appkey") + .toString()); + Mockito.verify(httpServletResponse) + .addHeader( + "Access-Control-Allow-Methods", + Collections.singletonList("GET, POST, PUT, DELETE, OPTIONS, HEAD, PATCH").toString()); + Mockito.verify(httpServletResponse) + .addHeader( + "Access-Control-Allow-Credentials", Collections.singletonList("true").toString()); + Mockito.verify(httpServletResponse) + .addHeader("X-Frame-Options", Collections.singletonList("DENY").toString()); + Mockito.verify(httpServletResponse) + .addHeader("X-XSS-Protection", Collections.singletonList("1; mode=block").toString()); + Mockito.verify(httpServletResponse) + .addHeader("X-Content-Type-Options", Collections.singletonList("nosniff").toString()); + Mockito.verify(httpServletResponse) + .addHeader( + "Cache-Control", + Collections.singletonList("no-cache, no-store, must-revalidate").toString()); + Mockito.verify(httpServletResponse) + .addHeader( + "Content-Security-Policy", Collections.singletonList("default-src 'self'").toString()); + Mockito.verify(httpServletResponse) + .addHeader( + "Strict-Transport-Security", + Collections.singletonList("max-age=31536000; includeSubDomains").toString()); + Mockito.verify(httpServletResponse) + .addHeader("Expires", Collections.singletonList("0").toString()); + Mockito.verify(httpServletResponse).addHeader("correlation-id", "correlation-id-value"); + Mockito.verify(filterChain).doFilter(httpServletRequest, httpServletResponse); + } } diff --git a/provider/indexer-gcp/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiverTest.java b/provider/indexer-gcp/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiverTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6a72833f40b6cc63665d51b62cc36bf62e5444c2 --- /dev/null +++ b/provider/indexer-gcp/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiverTest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.List; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.junit.Before; +import org.junit.experimental.theories.DataPoints; +import org.junit.experimental.theories.FromDataPoints; +import org.junit.experimental.theories.Theories; +import org.junit.experimental.theories.Theory; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.opengroup.osdu.core.auth.TokenProvider; +import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; +import org.opengroup.osdu.core.gcp.oqm.model.OqmAckReplier; +import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; +import org.opengroup.osdu.indexer.provider.gcp.indexing.scope.ThreadDpsHeaders; + +@RunWith(Theories.class) +public class IndexerOqmMessageReceiverTest { + + private final Gson gson = new Gson(); + + private ThreadDpsHeaders dpsHeaders = Mockito.mock(ThreadDpsHeaders.class); + + private SubscriptionConsumer consumer = Mockito.mock(SubscriptionConsumer.class); + + private TokenProvider tokenProvider = Mockito.mock(TokenProvider.class); + + private OqmAckReplier ackReplier = Mockito.mock(OqmAckReplier.class); + + private IndexerOqmMessageReceiver receiver; + + @Before + public void setUp() { + receiver = new IndexerOqmMessageReceiver(dpsHeaders, consumer, tokenProvider); + } + + @DataPoints("VALID_EVENTS") + public static List<ImmutablePair> validEvents() { + return ImmutableList.of( + ImmutablePair.of("/test-events/storage-index-event.json", "/test-events/formatted-as-cloud-task-storage-event.json"), + ImmutablePair.of("/test-events/indexer-reprocess-event.json", "/test-events/formatted-as-cloud-task-indexer-reprocess-event.json"), + ImmutablePair.of("/test-events/reindex-event.json", "/test-events/formatted-as-cloud-task-reindex-event.json") + ); + } + + @DataPoints("NOT_VALID_EVENTS") + public static List<String> notValidEvents() { + return ImmutableList.of( + "/test-events/empty-data-event.json", + "/test-events/empty-attributes-event.json" + ); + } + + @Theory + public void shouldReceiveValidEvent(@FromDataPoints("VALID_EVENTS") ImmutablePair<String, String> pair) { + when(consumer.consume(any())).thenReturn(true); + OqmMessage oqmMessage = readEventFromFile(pair.getLeft()); + CloudTaskRequest cloudTaskRequest = readCloudTaskFromFile(pair.getRight()); + receiver.receiveMessage(oqmMessage, ackReplier); + verify(consumer).consume(cloudTaskRequest); + verify(ackReplier).ack(); + } + + @Theory + public void shouldNotConsumeNotValidEvent(@FromDataPoints("NOT_VALID_EVENTS") String fileName) { + OqmMessage oqmMessage = readEventFromFile(fileName); + receiver.receiveMessage(oqmMessage, ackReplier); + verify(ackReplier).ack(); + verify(consumer, never()).consume(any()); + } + + private OqmMessage readEventFromFile(String filename) { + InputStream resourceAsStream = this.getClass().getResourceAsStream(filename); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resourceAsStream)); + JsonReader reader = new JsonReader(bufferedReader); + return gson.fromJson(reader, OqmMessage.class); + } + + private CloudTaskRequest readCloudTaskFromFile(String filename) { + InputStream resourceAsStream = this.getClass().getResourceAsStream(filename); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resourceAsStream)); + JsonReader reader = new JsonReader(bufferedReader); + return gson.fromJson(reader, CloudTaskRequest.class); + } +} diff --git a/provider/indexer-gcp/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumerTest.java b/provider/indexer-gcp/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6ade2840f51dd77d0cc5bfe2028e9db6e1714932 --- /dev/null +++ b/provider/indexer-gcp/src/test/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/RecordsChangedSubscriptionConsumerTest.java @@ -0,0 +1,103 @@ +/* + * Copyright 2020-2022 Google LLC + * Copyright 2020-2022 EPAM Systems, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableList; +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import org.junit.Before; +import org.junit.experimental.theories.DataPoints; +import org.junit.experimental.theories.FromDataPoints; +import org.junit.experimental.theories.Theories; +import org.junit.experimental.theories.Theory; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; +import org.opengroup.osdu.core.common.model.search.CloudTaskRequest; +import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.indexer.api.RecordIndexerApi; +import org.opengroup.osdu.indexer.api.ReindexApi; + +@RunWith(Theories.class) +public class RecordsChangedSubscriptionConsumerTest { + + private final Gson gson = new Gson(); + + private DpsHeaders dpsHeaders = Mockito.mock(DpsHeaders.class); + + private RecordIndexerApi recordIndexerApi = Mockito.mock(RecordIndexerApi.class); + + private ReindexApi reindexApi = Mockito.mock(ReindexApi.class); + + private RecordsChangedSubscriptionConsumer consumer; + + @Before + public void setUp() { + consumer = new RecordsChangedSubscriptionConsumer(dpsHeaders, recordIndexerApi, reindexApi); + } + + @DataPoints("REINDEX_TASKS") + public static ImmutableList<String> reindexEvents() { + return ImmutableList.of( + "/test-events/formatted-as-cloud-task-reindex-event.json" + ); + } + + @DataPoints("INDEX_TASKS") + public static ImmutableList<String> indexEvents() { + return ImmutableList.of( + "/test-events/formatted-as-cloud-task-indexer-reprocess-event.json", + "/test-events/formatted-as-cloud-task-storage-event.json" + ); + } + + @Theory + public void shouldProcessReindexEvents(@FromDataPoints("REINDEX_TASKS") String fileName) throws IOException { + CloudTaskRequest cloudTaskRequest = readCloudTaskFromFile(fileName); + consumer.consume(cloudTaskRequest); + RecordReindexRequest recordReindexRequest = gson.fromJson(cloudTaskRequest.getMessage(), RecordReindexRequest.class); + verify(reindexApi).reindex(recordReindexRequest, false); + } + + @Theory + public void shouldProcessIndexEvents(@FromDataPoints("INDEX_TASKS") String fileName) throws Exception { + CloudTaskRequest cloudTaskRequest = readCloudTaskFromFile(fileName); + consumer.consume(cloudTaskRequest); + ArgumentCaptor<RecordChangedMessages> captor = ArgumentCaptor.forClass(RecordChangedMessages.class); + verify(recordIndexerApi).indexWorker(captor.capture()); + RecordChangedMessages expectedMessages = this.gson.fromJson(cloudTaskRequest.getMessage(), RecordChangedMessages.class); + RecordChangedMessages actualMessages = captor.getValue(); + assertEquals(expectedMessages.getData(),actualMessages.getData()); + } + + private CloudTaskRequest readCloudTaskFromFile(String filename) { + InputStream resourceAsStream = this.getClass().getResourceAsStream(filename); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(resourceAsStream)); + JsonReader reader = new JsonReader(bufferedReader); + return gson.fromJson(reader, CloudTaskRequest.class); + } +} diff --git a/provider/indexer-gcp/src/test/resources/test-events/empty-attributes-event.json b/provider/indexer-gcp/src/test/resources/test-events/empty-attributes-event.json new file mode 100644 index 0000000000000000000000000000000000000000..dedbede7da290e9f474b6541c3047cc9ec83a28a --- /dev/null +++ b/provider/indexer-gcp/src/test/resources/test-events/empty-attributes-event.json @@ -0,0 +1,5 @@ +{ + "id": "6523155266469533", + "data": "{\"url\":\"/api/indexer/v2/_dps/task-handlers/reindex-worker\",\"message\":\"{\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"cursor\\\":\\\"12345\\\\u003d\\\\u003d\\\"}\",\"initialDelayMillis\":30000}", + "attributes": {} +} diff --git a/provider/indexer-gcp/src/test/resources/test-events/empty-data-event.json b/provider/indexer-gcp/src/test/resources/test-events/empty-data-event.json new file mode 100644 index 0000000000000000000000000000000000000000..e81f8e0413a527bbc627f4cbe5fc851fcd5a6eb7 --- /dev/null +++ b/provider/indexer-gcp/src/test/resources/test-events/empty-data-event.json @@ -0,0 +1,11 @@ +{ + "id": "6524321148950995", + "data": "", + "attributes": { + "correlation-id": "fbe4a2b4-b3be-48f6-bc2a-b7dc068bce62", + "account-id": "osdu", + "data-partition-id": "osdu", + "user": "gcp-integration-tester@nice-etching-277309.iam.gserviceaccount.com", + "googclient_deliveryattempt": "1" + } +} diff --git a/provider/indexer-gcp/src/test/resources/test-events/formatted-as-cloud-task-indexer-reprocess-event.json b/provider/indexer-gcp/src/test/resources/test-events/formatted-as-cloud-task-indexer-reprocess-event.json new file mode 100644 index 0000000000000000000000000000000000000000..4494af3d6b6003e1018d9af85989a5f28a76b118 --- /dev/null +++ b/provider/indexer-gcp/src/test/resources/test-events/formatted-as-cloud-task-indexer-reprocess-event.json @@ -0,0 +1,5 @@ +{ + "url": "/api/indexer/v2/_dps/task-handlers/index-worker", + "message": "{\"data\":\"[{\\\"id\\\":\\\"osdu:reference-data--AgreementType:AgencyTrust\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:AmendingWorkingInterestClarification\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:AssetSaleAndPurchase\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:AssetSalePurchase\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:AssignmentNovation\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:AssignmentNovationAmending\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:CommonStreamOperating\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:Concession\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:ConfidentialDisclosure\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:Confidentiality\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:ConstructionOwnershipOperating\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:ConsultantBusinessDevelopment\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:ContractWellOperating\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:CrossConveyedPooling\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:CrossconveyedPooling\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:DataExchange\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:DataLicence\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:DataPurchase\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:DataRoom\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"},{\\\"id\\\":\\\"osdu:reference-data--AgreementType:DataSubscription\\\",\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"correlation-id\":\"fbe4a2b4-b3be-48f6-bc2a-b7dc068bce62\",\"data-partition-id\":\"osdu\"}}", + "initialDelayMillis": 30000 +} diff --git a/provider/indexer-gcp/src/test/resources/test-events/formatted-as-cloud-task-reindex-event.json b/provider/indexer-gcp/src/test/resources/test-events/formatted-as-cloud-task-reindex-event.json new file mode 100644 index 0000000000000000000000000000000000000000..23bfbba0a9c10cdb43b660a4cf35a98b9abe5211 --- /dev/null +++ b/provider/indexer-gcp/src/test/resources/test-events/formatted-as-cloud-task-reindex-event.json @@ -0,0 +1,5 @@ +{ + "url": "/api/indexer/v2/_dps/task-handlers/reindex-worker", + "message": "{\"kind\":\"osdu:wks:reference-data--AgreementType:1.0.0\",\"cursor\":\"12345\\u003d\\u003d\"}", + "initialDelayMillis": 30000 +} diff --git a/provider/indexer-gcp/src/test/resources/test-events/formatted-as-cloud-task-storage-event.json b/provider/indexer-gcp/src/test/resources/test-events/formatted-as-cloud-task-storage-event.json new file mode 100644 index 0000000000000000000000000000000000000000..91c3a19749086fdf31673189c2b3698622a7fc1f --- /dev/null +++ b/provider/indexer-gcp/src/test/resources/test-events/formatted-as-cloud-task-storage-event.json @@ -0,0 +1,5 @@ +{ + "url": "/api/indexer/v2/_dps/task-handlers/index-worker", + "message": "{\"id\":\"6405379623020546\",\"data\":\"[{\\\"id\\\":\\\"osdu:query:3b4ee153-48c4-4026-955a-59644ddaf3110\\\",\\\"kind\\\":\\\"osdu:ds:query:1.0.1670526387956\\\",\\\"op\\\":\\\"create\\\"}]\",\"attributes\":{\"account-id\":\"osdu\",\"data-partition-id\":\"osdu\",\"user\":\"gcp-integration-tester@nice-etching-277309.iam.gserviceaccount.com\",\"correlation-id\":\"c4e8b544-ad55-44e0-a6c8-90afb25aaad9\",\"googclient_deliveryattempt\":\"1\"}}", + "initialDelayMillis": 0 +} diff --git a/provider/indexer-gcp/src/test/resources/test-events/indexer-reprocess-event.json b/provider/indexer-gcp/src/test/resources/test-events/indexer-reprocess-event.json new file mode 100644 index 0000000000000000000000000000000000000000..3ec3b31acdc096613cd564aac0b2238d638bfc00 --- /dev/null +++ b/provider/indexer-gcp/src/test/resources/test-events/indexer-reprocess-event.json @@ -0,0 +1,11 @@ +{ + "id": "6524321148950995", + "data": "{\"url\":\"/api/indexer/v2/_dps/task-handlers/index-worker\",\"message\":\"{\\\"data\\\":\\\"[{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:AgencyTrust\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:AmendingWorkingInterestClarification\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:AssetSaleAndPurchase\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:AssetSalePurchase\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:AssignmentNovation\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:AssignmentNovationAmending\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:CommonStreamOperating\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:Concession\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:ConfidentialDisclosure\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:Confidentiality\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:ConstructionOwnershipOperating\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:ConsultantBusinessDevelopment\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:ContractWellOperating\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:CrossConveyedPooling\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:CrossconveyedPooling\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:DataExchange\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:DataLicence\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:DataPurchase\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:DataRoom\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"},{\\\\\\\"id\\\\\\\":\\\\\\\"osdu:reference-data--AgreementType:DataSubscription\\\\\\\",\\\\\\\"kind\\\\\\\":\\\\\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\\\\\",\\\\\\\"op\\\\\\\":\\\\\\\"create\\\\\\\"}]\\\",\\\"attributes\\\":{\\\"correlation-id\\\":\\\"fbe4a2b4-b3be-48f6-bc2a-b7dc068bce62\\\",\\\"data-partition-id\\\":\\\"osdu\\\"}}\",\"initialDelayMillis\":30000}", + "attributes": { + "correlation-id": "fbe4a2b4-b3be-48f6-bc2a-b7dc068bce62", + "account-id": "osdu", + "data-partition-id": "osdu", + "user": "gcp-integration-tester@nice-etching-277309.iam.gserviceaccount.com", + "googclient_deliveryattempt": "1" + } +} diff --git a/provider/indexer-gcp/src/test/resources/test-events/reindex-event.json b/provider/indexer-gcp/src/test/resources/test-events/reindex-event.json new file mode 100644 index 0000000000000000000000000000000000000000..601e981be59935f99ad32a2bda99aab324fa5654 --- /dev/null +++ b/provider/indexer-gcp/src/test/resources/test-events/reindex-event.json @@ -0,0 +1,11 @@ +{ + "id": "6523155266469533", + "data": "{\"url\":\"/api/indexer/v2/_dps/task-handlers/reindex-worker\",\"message\":\"{\\\"kind\\\":\\\"osdu:wks:reference-data--AgreementType:1.0.0\\\",\\\"cursor\\\":\\\"12345\\\\u003d\\\\u003d\\\"}\",\"initialDelayMillis\":30000}", + "attributes": { + "data-partition-id": "osdu", + "account-id": "osdu", + "correlation-id": "46086b67-541a-449e-ae75-5e52659a775c", + "user": "gcp-integration-tester@nice-etching-277309.iam.gserviceaccount.com", + "googclient_deliveryattempt": "1" + } +} diff --git a/provider/indexer-gcp/src/test/resources/test-events/storage-index-event.json b/provider/indexer-gcp/src/test/resources/test-events/storage-index-event.json new file mode 100644 index 0000000000000000000000000000000000000000..c84a1e28fe3f5b0fba11c25b06e0e46ce1051baa --- /dev/null +++ b/provider/indexer-gcp/src/test/resources/test-events/storage-index-event.json @@ -0,0 +1,11 @@ +{ + "id": "6405379623020546", + "data": "[{\"id\":\"osdu:query:3b4ee153-48c4-4026-955a-59644ddaf3110\",\"kind\":\"osdu:ds:query:1.0.1670526387956\",\"op\":\"create\"}]", + "attributes": { + "account-id": "osdu", + "data-partition-id": "osdu", + "user": "gcp-integration-tester@nice-etching-277309.iam.gserviceaccount.com", + "correlation-id": "c4e8b544-ad55-44e0-a6c8-90afb25aaad9", + "googclient_deliveryattempt": "1" + } +}