diff --git a/NOTICE b/NOTICE index 121425b665c270d20637483071524fa8a324ffae..60b4118a46591bfa65113b5bb1ca7e3039c55b93 100644 --- a/NOTICE +++ b/NOTICE @@ -374,7 +374,7 @@ The following software have components provided under the terms of this license: - Byte Buddy Java agent (from https://repo1.maven.org/maven2/net/bytebuddy/byte-buddy-agent) - ClassMate (from http://github.com/cowtowncoder/java-classmate) - Cloud Key Management Service (KMS) API (from https://repo1.maven.org/maven2/com/google/apis/google-api-services-cloudkms) -- Cloud Storage JSON API (from https://repo1.maven.org/maven2/com/google/apis/google-api-services-storage) +- Cloud Storage JSON API v1-rev20230301-2.0.0 (from https://repo1.maven.org/maven2/com/google/apis/google-api-services-storage) - CloudWatch Metrics for AWS Java SDK (from https://aws.amazon.com/sdkforjava) - Cobertura (from http://cobertura.sourceforge.net) - Cobertura Limited Runtime (from http://cobertura.sourceforge.net) diff --git a/devops/aws/chart/Chart.yaml b/devops/aws/chart/Chart.yaml index 2afc719c5cbe3a2ee8118ac57d60a09b73b51be9..26a517fbf20e5b43df5efa0db457cd6b3c196df3 100644 --- a/devops/aws/chart/Chart.yaml +++ b/devops/aws/chart/Chart.yaml @@ -1,12 +1,11 @@ apiVersion: v2 name: "os-indexer" version: __CHART_VERSION__ -kubeVersion: ">= 1.21.x-x-x < 1.24.x-x-x" description: Indexer Helm Chart for Kubernetes type: application appVersion: __VERSION__ dependencies: - name: osdu-aws-lib - version: 0.2.0 - repository: __HELM_REPO__/osdu-aws-lib/ + version: ~0 + repository: __HELM_REPO__ deprecated: false diff --git a/devops/aws/chart/values.yaml b/devops/aws/chart/values.yaml index 1712d51d2117277efc176e126580d962658e5d77..1a880c4f40ddb9203cc0d30abc7bc10de5cfb3f3 100644 --- a/devops/aws/chart/values.yaml +++ b/devops/aws/chart/values.yaml @@ -69,10 +69,10 @@ environmentVariables: replicaCount: 1 resources: limits: - memory: 900M + memory: 1200M requests: cpu: 500m - memory: 900M + memory: 1200M autoscaling: enabled: true minReplicas: 1 diff --git a/devops/gc/deploy/README.md b/devops/gc/deploy/README.md index 17d79036d4cee7fea38c582b35d54525f4005130..e2d9e0b9c1f8642f077e1d5139375a44d28ee686 100644 --- a/devops/gc/deploy/README.md +++ b/devops/gc/deploy/README.md @@ -25,6 +25,13 @@ Packages are only needed for installation from a local computer. Before installing deploy Helm chart you need to install [configmap Helm chart](../configmap). First you need to set variables in **values.yaml** file using any code editor. Some of the values are prefilled, but you need to specify some values as well. You can find more information about them below. +### Global variables + +| Name | Description | Type | Default |Required | +|------|-------------|------|---------|---------| +**global.domain** | your domain for the external endpoint, ex `example.com` | string | - | yes +**global.onPremEnabled** | whether on-prem is enabled | boolean | false | yes + ### Configmap variables | Name | Description | Type | Default |Required | @@ -61,8 +68,6 @@ First you need to set variables in **values.yaml** file using any code editor. S **conf.elasticSecretName** | secret for elastic | string | `indexer-elastic-secret` | yes **conf.keycloakSecretName** | secret for keycloak | string | `indexer-keycloak-secret` | yes **conf.rabbitmqSecretName** | secret for rabbitmq | string | `rabbitmq-secret` | yes -**conf.onPremEnabled** | whether on-prem is enabled | boolean | false | yes -**conf.domain** | your domain | string | - | yes **conf.indexerRedisSecretName** | indexer Redis secret that contains redis password with REDIS_PASSWORD key | string | `indexer-redis-secret` | yes ### ISTIO variables diff --git a/devops/gc/deploy/templates/deployment.yaml b/devops/gc/deploy/templates/deployment.yaml index cfd362ebc14567bc617e7ed710cba93156b1f5a3..e42da5931b5959ba4850e0ecdb5d0e82b54aa7c2 100644 --- a/devops/gc/deploy/templates/deployment.yaml +++ b/devops/gc/deploy/templates/deployment.yaml @@ -41,7 +41,7 @@ spec: name: {{ .Values.conf.configmap | quote }} - secretRef: name: {{ .Values.conf.elasticSecretName | quote }} - {{- if .Values.conf.onPremEnabled }} + {{- if .Values.global.onPremEnabled }} - secretRef: name: {{ .Values.conf.keycloakSecretName | quote }} - secretRef: diff --git a/devops/gc/deploy/templates/service-account.yaml b/devops/gc/deploy/templates/service-account.yaml index f0771985cd5da9db77781dc3ab635e52378b3fb6..eec72f12889eec74d3de43ae6527a860bcd876ea 100644 --- a/devops/gc/deploy/templates/service-account.yaml +++ b/devops/gc/deploy/templates/service-account.yaml @@ -1,4 +1,4 @@ -{{- if .Values.conf.onPremEnabled }} +{{- if .Values.global.onPremEnabled }} apiVersion: v1 kind: ServiceAccount metadata: diff --git a/devops/gc/deploy/templates/service.yaml b/devops/gc/deploy/templates/service.yaml index 45bd505383e8204e40ebf5188814bc4a707b6c34..f985dd5f7a65eff8c6c0475c13d0298b9770ec00 100644 --- a/devops/gc/deploy/templates/service.yaml +++ b/devops/gc/deploy/templates/service.yaml @@ -1,7 +1,7 @@ apiVersion: v1 kind: Service metadata: - {{- if not .Values.conf.onPremEnabled }} + {{- if not .Values.global.onPremEnabled }} annotations: cloud.google.com/neg: '{"ingress": true}' {{- end }} diff --git a/devops/gc/deploy/templates/virtual-service.yaml b/devops/gc/deploy/templates/virtual-service.yaml index ed9507c9788b4fb94ba2c75e2faae7a6ba49ada8..8903b299b9577d298288734003857dcfe5877709 100644 --- a/devops/gc/deploy/templates/virtual-service.yaml +++ b/devops/gc/deploy/templates/virtual-service.yaml @@ -5,10 +5,10 @@ metadata: namespace: {{ .Release.Namespace | quote }} spec: hosts: - {{- if and .Values.conf.domain .Values.conf.onPremEnabled }} - - {{ printf "osdu.%s" .Values.conf.domain | quote }} - {{- else if .Values.conf.domain }} - - {{ .Values.conf.domain | quote }} + {{- if and .Values.global.domain .Values.global.onPremEnabled }} + - {{ printf "osdu.%s" .Values.global.domain | quote }} + {{- else if .Values.global.domain }} + - {{ .Values.global.domain | quote }} {{- else }} - "*" {{- end }} diff --git a/devops/gc/deploy/values.yaml b/devops/gc/deploy/values.yaml index b71731641d6454c02d5373b65d32c44ad72bfca5..b657f2f62e2402cf28e70ccbec7d519bf682f6b4 100644 --- a/devops/gc/deploy/values.yaml +++ b/devops/gc/deploy/values.yaml @@ -2,6 +2,10 @@ # This is a YAML-formatted file. # Declare variables to be passed into your templates. +global: + domain: "" + onPremEnabled: false + data: # Configmap entitlementsHost: "http://entitlements" @@ -30,8 +34,6 @@ conf: elasticSecretName: "indexer-elastic-secret" keycloakSecretName: "indexer-keycloak-secret" rabbitmqSecretName: "rabbitmq-secret" - onPremEnabled: false - domain: "" indexerRedisSecretName: "indexer-redis-secret" istio: diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index c804c056b87c5689f9c7dc203e303ae4801d21bd..8e2895fc2b0807e9ad6fedcc2a09f7b060ab27af 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -125,7 +125,7 @@ public class IndexerServiceImpl implements IndexerService { // get upsert records Map<String, Map<String, OperationType>> upsertRecordMap = RecordInfo.getUpsertRecordIds(recordInfos); if (upsertRecordMap != null && !upsertRecordMap.isEmpty()) { - List<String> upsertFailureRecordIds = processUpsertRecords(upsertRecordMap); + List<String> upsertFailureRecordIds = processUpsertRecords(upsertRecordMap, recordInfos); retryRecordIds.addAll(upsertFailureRecordIds); } @@ -191,7 +191,7 @@ public class IndexerServiceImpl implements IndexerService { } } - private List<String> processUpsertRecords(Map<String, Map<String, OperationType>> upsertRecordMap) throws Exception { + private List<String> processUpsertRecords(Map<String, Map<String, OperationType>> upsertRecordMap, List<RecordInfo> recordChangedInfos) throws Exception { // get schema for kind Map<String, IndexSchema> schemas = this.getSchema(upsertRecordMap); @@ -203,7 +203,7 @@ public class IndexerServiceImpl implements IndexerService { if (recordIds.isEmpty()) return new LinkedList<>(); // get records via storage api - Records storageRecords = this.storageService.getStorageRecords(recordIds); + Records storageRecords = this.storageService.getStorageRecords(recordIds, recordChangedInfos); List<String> failedOrRetryRecordIds = new LinkedList<>(storageRecords.getMissingRetryRecords()); // map storage records to indexer payload diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageService.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageService.java index bfbb5c6d5a0faf2cf1b62ec031697141f0f649ef..54a19e807079d6076b855e89ea2707f88308e038 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageService.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageService.java @@ -15,6 +15,7 @@ package org.opengroup.osdu.indexer.service; import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.indexer.RecordInfo; import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse; import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; import org.opengroup.osdu.core.common.model.indexer.Records; @@ -25,7 +26,7 @@ import java.util.List; public interface StorageService { - Records getStorageRecords(List<String> ids) throws AppException, URISyntaxException; + Records getStorageRecords(List<String> ids, List<RecordInfo> recordChangedInfos) throws AppException, URISyntaxException; RecordQueryResponse getRecordsByKind(RecordReindexRequest request) throws URISyntaxException; diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageServiceImpl.java index 89c13fcb1fbc6b0da27795b5577a491eac8bf3dd..1b20cfb14cd9128dcb1595d9a608d6c4ed1da4b1 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/StorageServiceImpl.java @@ -20,23 +20,28 @@ import com.google.api.client.http.HttpMethods; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.gson.Gson; - import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import org.apache.http.HttpStatus; import org.opengroup.osdu.core.common.http.FetchServiceHttpRequest; -import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.opengroup.osdu.core.common.http.IUrlFetchService; +import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.HttpResponse; import org.opengroup.osdu.core.common.model.http.RequestStatus; -import org.opengroup.osdu.core.common.model.indexer.*; +import org.opengroup.osdu.core.common.model.indexer.IndexingStatus; +import org.opengroup.osdu.core.common.model.indexer.JobStatus; +import org.opengroup.osdu.core.common.model.indexer.OperationType; +import org.opengroup.osdu.core.common.model.indexer.RecordInfo; +import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse; +import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; +import org.opengroup.osdu.core.common.model.indexer.Records; +import org.opengroup.osdu.core.common.model.search.RecordMetaAttribute; import org.opengroup.osdu.core.common.model.storage.ConversionStatus; import org.opengroup.osdu.core.common.model.storage.RecordIds; -import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; -import org.opengroup.osdu.core.common.http.IUrlFetchService; -import org.opengroup.osdu.core.common.model.search.RecordMetaAttribute; import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; -import org.apache.http.HttpStatus; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.springframework.stereotype.Component; @@ -52,8 +57,8 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import static org.opengroup.osdu.core.common.model.http.DpsHeaders.FRAME_OF_REFERENCE; import static org.opengroup.osdu.core.common.Constants.SLB_FRAME_OF_REFERENCE_VALUE; +import static org.opengroup.osdu.core.common.model.http.DpsHeaders.FRAME_OF_REFERENCE; @Component public class StorageServiceImpl implements StorageService { @@ -74,15 +79,17 @@ public class StorageServiceImpl implements StorageService { private IndexerConfigurationProperties configurationProperties; @Override - public Records getStorageRecords(List<String> ids) throws AppException, URISyntaxException { + public Records getStorageRecords(List<String> ids, List<RecordInfo> recordChangedInfos) throws AppException, URISyntaxException { List<Records.Entity> valid = new ArrayList<>(); List<String> notFound = new ArrayList<>(); List<ConversionStatus> conversionStatuses = new ArrayList<>(); List<String> missingRetryRecordIds = new ArrayList<>(); + Map<String, String> recordChangedMap = recordChangedInfos.stream().collect(Collectors.toMap(RecordInfo::getId, RecordInfo::getKind, (a, b) -> b)); + Map<String, String> validRecordKindPatchMap = getValidRecordKindPatchMap(recordChangedInfos); List<List<String>> batch = Lists.partition(ids, configurationProperties.getStorageRecordsBatchSize()); for (List<String> recordsBatch : batch) { - Records storageOut = this.getRecords(recordsBatch); + Records storageOut = this.getRecords(recordsBatch, recordChangedMap, validRecordKindPatchMap); valid.addAll(storageOut.getRecords()); notFound.addAll(storageOut.getNotFound()); conversionStatuses.addAll(storageOut.getConversionStatuses()); @@ -91,11 +98,10 @@ public class StorageServiceImpl implements StorageService { return Records.builder().records(valid).notFound(notFound).conversionStatuses(conversionStatuses).missingRetryRecords(missingRetryRecordIds).build(); } - protected Records getRecords(List<String> ids) throws URISyntaxException { + protected Records getRecords(List<String> ids, Map<String, String> recordChangedMap, Map<String, String> validRecordKindPatchMap) throws URISyntaxException { // e.g. {"records":["test:10"]} String body = this.gson.toJson(RecordIds.builder().records(ids).build()); -// Map<String, String> headers = this.requestInfo.getHeadersMap(); DpsHeaders headers = this.requestInfo.getHeaders(); headers.put(FRAME_OF_REFERENCE, SLB_FRAME_OF_REFERENCE_VALUE); FetchServiceHttpRequest request = FetchServiceHttpRequest @@ -105,10 +111,10 @@ public class StorageServiceImpl implements StorageService { .headers(headers) .body(body).build(); HttpResponse response = this.urlFetchService.sendRequest(request); - return this.validateStorageResponse(response, ids); + return this.validateStorageResponse(response, ids, recordChangedMap, validRecordKindPatchMap); } - private Records validateStorageResponse(HttpResponse response, List<String> ids) { + private Records validateStorageResponse(HttpResponse response, List<String> ids, Map<String, String> recordChangedMap, Map<String, String> validRecordKindPatchMap) { String bulkStorageData = response.getBody(); // retry entire payload -- storage service returned empty response @@ -129,6 +135,7 @@ public class StorageServiceImpl implements StorageService { // no retry possible, update record status as failed -- storage service cannot locate records if (!records.getNotFound().isEmpty()) { + jaxRsDpsLog.error(records.getNotFound().size() + " records were not found. Full list: " + records.getNotFound()); this.jobStatus.addOrUpdateRecordStatus(records.getNotFound(), IndexingStatus.FAIL, RequestStatus.INVALID_RECORD, "Storage service records not found", String.format("Storage service records not found: %s", String.join(",", records.getNotFound()))); } @@ -144,8 +151,13 @@ public class StorageServiceImpl implements StorageService { throw new AppException(HttpStatus.SC_NOT_FOUND, "Invalid request", "Storage service error"); } + // validate kind to avoid data duplication + List<String> staleRecords = getStaleRecordsUpdate(recordChangedMap, validRecordKindPatchMap, validRecords); + List<Records.Entity> indexableRecords = validateKind(validRecords, staleRecords); + records.setRecords(indexableRecords); + Map<String, List<String>> conversionStatus = getConversionErrors(records.getConversionStatuses()); - for (Records.Entity storageRecord : validRecords) { + for (Records.Entity storageRecord : indexableRecords) { String recordId = storageRecord.getId(); if (conversionStatus.get(recordId) == null) { continue; @@ -156,8 +168,8 @@ public class StorageServiceImpl implements StorageService { } // retry missing records -- storage did not return response for all RecordChangeMessage record-ids - if (records.getTotalRecordCount() != ids.size()) { - List<String> missingRecords = this.getMissingRecords(records, ids); + if ((records.getRecords().size() + records.getNotFound().size() + staleRecords.size()) != ids.size()) { + List<String> missingRecords = this.getMissingRecords(records, ids, staleRecords); records.setMissingRetryRecords(missingRecords); this.jobStatus.addOrUpdateRecordStatus(missingRecords, IndexingStatus.FAIL, HttpStatus.SC_NOT_FOUND, "Partial response received from Storage service - missing records", String.format("Partial response received from Storage service: %s", String.join(",", missingRecords))); } @@ -165,12 +177,13 @@ public class StorageServiceImpl implements StorageService { return records; } - private List<String> getMissingRecords(Records records, List<String> ids) { + private List<String> getMissingRecords(Records records, List<String> ids, List<String> staleRecords) { List<String> validRecordIds = records.getRecords().stream().map(Records.Entity::getId).collect(Collectors.toList()); List<String> invalidRecordsIds = records.getNotFound(); List<String> requestedIds = new ArrayList<>(ids); requestedIds.removeAll(validRecordIds); requestedIds.removeAll(invalidRecordsIds); + requestedIds.removeAll(staleRecords); return requestedIds; } @@ -187,6 +200,54 @@ public class StorageServiceImpl implements StorageService { return errorsByRecordId; } + private List<Records.Entity> validateKind(List<Records.Entity> validRecords, List<String> staleRecords) { + List<Records.Entity> indexableRecords = new ArrayList<>(); + if (!staleRecords.isEmpty()) { + for (Records.Entity record : validRecords) { + if (staleRecords.contains(record.getId())) { + continue; + } + indexableRecords.add(record); + } + jaxRsDpsLog.warning(String.format("stale records found with older kind, skipping indexing | record ids: %s", String.join(" | ", staleRecords))); + } else { + indexableRecords.addAll(validRecords); + } + return indexableRecords; + } + + private List<String> getStaleRecordsUpdate(Map<String, String> recordChangedMap, Map<String, String> validRecordKindPatchMap, List<Records.Entity> validRecords) { + List<String> staleRecords = new ArrayList<>(); + for (Records.Entity storageRecord : validRecords) { + String kindOnStorage = storageRecord.getKind(); + String kindOnMessage = recordChangedMap.get(storageRecord.getId()); + if (validRecordKindPatchMap.containsKey(storageRecord.getId())) { + continue; + } + if (!kindOnStorage.equals(kindOnMessage)) { + staleRecords.add(storageRecord.getId()); + } + } + return staleRecords; + } + + /* + * Gets valid kind patch map, previousVersionKind is included on the record update message in such cases + * */ + private Map<String, String> getValidRecordKindPatchMap(List<RecordInfo> recordChangedInfos) { + Map<String, String> out = new HashMap<>(); + for (RecordInfo msg : recordChangedInfos) { + OperationType op = OperationType.valueOf(msg.getOp()); + if (op != OperationType.update) { + continue; + } + if (!Strings.isNullOrEmpty(msg.getPreviousVersionKind())) { + out.put(msg.getId(), msg.getPreviousVersionKind()); + } + } + return out; + } + @Override public RecordQueryResponse getRecordsByKind(RecordReindexRequest reindexRequest) throws URISyntaxException { Map<String, String> queryParams = new HashMap<>(); @@ -196,8 +257,8 @@ public class StorageServiceImpl implements StorageService { queryParams.put("cursor", reindexRequest.getCursor()); } - if(requestInfo == null) - throw new AppException(HttpStatus.SC_NO_CONTENT, "Invalid header", "header can't be null"); + if (requestInfo == null) + throw new AppException(HttpStatus.SC_NO_CONTENT, "Invalid header", "header can't be null"); FetchServiceHttpRequest request = FetchServiceHttpRequest.builder() .httpMethod(HttpMethods.GET) @@ -233,6 +294,6 @@ public class StorageServiceImpl implements StorageService { HttpResponse response = this.urlFetchService.sendRequest(request); JsonObject asJsonObject = new JsonParser().parse(response.getBody()).getAsJsonObject(); JsonElement results = asJsonObject.get("results"); - return response.getResponseCode() != HttpStatus.SC_OK ? null : this.gson.fromJson(results,List.class); + return response.getResponseCode() != HttpStatus.SC_OK ? null : this.gson.fromJson(results, List.class); } } diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java index 285b384a017b4eefd044dc3e58e1434b1ad0543c..194fdab46f77aa9ca67834f640a3ba13337e25c7 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/IndexerServiceImplTest.java @@ -142,7 +142,7 @@ public class IndexerServiceImplTest { validRecords.add(Records.Entity.builder().id(recordId2).kind(kind2).data(storageData).build()); List<ConversionStatus> conversionStatus = new LinkedList<>(); Records storageRecords = Records.builder().records(validRecords).conversionStatuses(conversionStatus).build(); - when(this.storageService.getStorageRecords(any())).thenReturn(storageRecords); + when(this.storageService.getStorageRecords(any(), any())).thenReturn(storageRecords); // setup elastic, index and mapped document when(this.indicesService.createIndex(any(), any(), any(), any(), any())).thenReturn(true); diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/StorageServiceImplTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/StorageServiceImplTest.java index 42791ea57c2541a2073ee87df7033bb6da3758f1..e64574d7a6452f704361f80ea66884232b506c0a 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/StorageServiceImplTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/StorageServiceImplTest.java @@ -1,54 +1,313 @@ +// Copyright 2017-2023, Schlumberger +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.indexer.service; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Spy; -import org.mockito.junit.MockitoJUnitRunner; import org.opengroup.osdu.core.common.http.IUrlFetchService; +import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; +import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.HttpResponse; +import org.opengroup.osdu.core.common.model.indexer.IndexingStatus; +import org.opengroup.osdu.core.common.model.indexer.JobStatus; +import org.opengroup.osdu.core.common.model.indexer.OperationType; +import org.opengroup.osdu.core.common.model.indexer.RecordInfo; +import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse; +import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; import org.opengroup.osdu.core.common.model.indexer.Records; import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; +import org.springframework.http.HttpStatus; +import org.springframework.test.context.junit4.SpringRunner; +import java.lang.reflect.Type; import java.net.URISyntaxException; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@RunWith(MockitoJUnitRunner.class) +@RunWith(SpringRunner.class) public class StorageServiceImplTest { - - @Spy - private ObjectMapper objectMapper = new ObjectMapper(); @Mock private IUrlFetchService urlFetchService; @Mock + private JobStatus jobStatus; + @Mock + private JaxRsDpsLog log; + @Mock private IRequestInfo requestInfo; @Mock private IndexerConfigurationProperties configurationProperties; + @Spy + private ObjectMapper objectMapper = new ObjectMapper(); @InjectMocks private StorageServiceImpl sut; + private List<String> ids; + private static final String RECORD_ID1 = "tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465"; + private static final String RECORDS_ID2 = "tenant1:doc:15e790a69beb4d789b1f979e2af2e813"; + + @Before + public void setup() { + + String recordChangedMessages = "[{\"id\":\"tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465\",\"kind\":\"tenant1:testindexer1528919679710:well:1.0.0\",\"op\":\"purge\"}," + + "{\"id\":\"tenant1:doc:15e790a69beb4d789b1f979e2af2e813\",\"kind\":\"tenant1:testindexer1528919679710:well:1.0.0\",\"op\":\"create\"}]"; + + when(this.requestInfo.getHeadersMap()).thenReturn(new HashMap<>()); + when(this.requestInfo.getHeaders()).thenReturn(new DpsHeaders()); + + Type listType = new TypeToken<List<RecordInfo>>() { + }.getType(); + + List<RecordInfo> msgs = (new Gson()).fromJson(recordChangedMessages, listType); + jobStatus.initialize(msgs); + ids = Arrays.asList(RECORD_ID1, RECORDS_ID2); + + when(configurationProperties.getStorageRecordsBatchSize()).thenReturn(20); + } + @Test public void should_parse_long_integer_values_as_integer_types() throws URISyntaxException { when(this.requestInfo.getHeaders()).thenReturn(new DpsHeaders()); - String body = "{\"records\":[{\"id\":\"id1\",\"version\":0,\"data\":{\"long_int\":1000000000000000000000000,\"int\":123}}],\"notFound\":[],\"conversionStatuses\":[],\"missingRetryRecords\":[]}"; + String body = "{\"records\":[{\"id\":\"id1\",\"kind\":\"tenant:test:test:1.0.0\",\"version\":0,\"data\":{\"long_int\":1000000000000000000000000,\"int\":123}}],\"notFound\":[],\"conversionStatuses\":[],\"missingRetryRecords\":[]}"; + Map<String, String> recordChangedMap = new HashMap<>(); + Map<String, String> validRecordKindPatchMap = new HashMap<>(); + recordChangedMap.put("id1", "tenant:test:test:1.0.0"); HttpResponse httpResponse = mock(HttpResponse.class); when(httpResponse.getBody()).thenReturn(body); when(this.urlFetchService.sendRequest(any())).thenReturn(httpResponse); - Records rec = sut.getRecords(Collections.singletonList("id1")); + Records rec = sut.getRecords(Collections.singletonList("id1"), recordChangedMap, validRecordKindPatchMap); assertEquals(1, rec.getRecords().size()); assertEquals("1000000000000000000000000", rec.getRecords().get(0).getData().get("long_int").toString()); assertEquals("123", rec.getRecords().get(0).getData().get("int").toString()); } + + @Test + public void should_return404_givenNullData_getValidStorageRecordsTest() throws URISyntaxException { + + HttpResponse httpResponse = mock(HttpResponse.class); + when(httpResponse.getBody()).thenReturn(null); + + when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); + + should_return404_getValidStorageRecordsTest(); + } + + @Test + public void should_return404_givenEmptyData_getValidStorageRecordsTest() throws URISyntaxException { + + String emptyDataFromStorage = "{\"records\":[],\"notFound\":[]}"; + + HttpResponse httpResponse = mock(HttpResponse.class); + when(httpResponse.getBody()).thenReturn(emptyDataFromStorage); + + when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); + + should_return404_getValidStorageRecordsTest(); + } + + @Test + public void should_returnOneValidRecords_givenValidData_getValidStorageRecordsTest() throws URISyntaxException { + + String validDataFromStorage = "{\"records\":[{\"id\":\"testid\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[\"invalid1\"], \"conversionStatuses\": []}"; + List<RecordInfo> recordChangeInfos = Arrays.asList(RecordInfo.builder().id("testid").kind("tenant:test:test:1.0.0").op(OperationType.create.getValue()).build()); + + HttpResponse httpResponse = mock(HttpResponse.class); + when(httpResponse.getBody()).thenReturn(validDataFromStorage); + + when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); + Records storageRecords = this.sut.getStorageRecords(ids, recordChangeInfos); + + assertEquals(1, storageRecords.getRecords().size()); + } + + @Test + public void should_returnOneValidRecords_givenRecord_withValidKindUpdate_getStorageRecordsTest() throws URISyntaxException { + + String validDataFromStorage = "{\"records\":[{\"id\":\"testid\", \"version\":1, \"kind\":\"tenant:test:test:1.2.0\"}],\"notFound\":[], \"conversionStatuses\": []}"; + List<RecordInfo> recordChangeInfos = Arrays.asList(RecordInfo.builder().id("testid").kind("tenant:test:test:1.2.0").op(OperationType.update.getValue()).previousVersionKind("tenant:test:test:1.1.0").build()); + + HttpResponse httpResponse = mock(HttpResponse.class); + when(httpResponse.getBody()).thenReturn(validDataFromStorage); + + when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); + Records storageRecords = this.sut.getStorageRecords(ids, recordChangeInfos); + + assertEquals(1, storageRecords.getRecords().size()); + } + + @Test + public void should_returnZeroRecords_givenStaleMessage_getStorageRecordsTest() throws URISyntaxException { + + String validDataFromStorage = "{\"records\":[{\"id\":\"testid\", \"version\":1, \"kind\":\"tenant:test:test:1.2.0\"}],\"notFound\":[], \"conversionStatuses\": []}"; + List<RecordInfo> recordChangeInfos = Arrays.asList(RecordInfo.builder().id("testid").kind("tenant:test:test:1.1.0").op(OperationType.update.getValue()).build()); + + HttpResponse httpResponse = mock(HttpResponse.class); + when(httpResponse.getBody()).thenReturn(validDataFromStorage); + + when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); + Records storageRecords = this.sut.getStorageRecords(ids, recordChangeInfos); + + assertEquals(0, storageRecords.getRecords().size()); + verify(this.log).warning("stale records found with older kind, skipping indexing | record ids: testid"); + } + + @Test + public void should_logMissingRecord_given_storageMissedRecords() throws URISyntaxException { + + String validDataFromStorage = "{\"records\":[{\"id\":\"tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[]}"; + List<RecordInfo> recordChangeInfos = Arrays.asList(RecordInfo.builder().id("tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465").kind("tenant:test:test:1.0.0").op(OperationType.update.getValue()).build()); + + HttpResponse httpResponse = mock(HttpResponse.class); + when(httpResponse.getBody()).thenReturn(validDataFromStorage); + + when(this.urlFetchService.sendRequest(any())).thenReturn(httpResponse); + Records storageRecords = this.sut.getStorageRecords(ids, recordChangeInfos); + + assertEquals(1, storageRecords.getRecords().size()); + verify(this.jobStatus).addOrUpdateRecordStatus(singletonList(RECORDS_ID2), IndexingStatus.FAIL, HttpStatus.NOT_FOUND.value(), "Partial response received from Storage service - missing records", "Partial response received from Storage service: tenant1:doc:15e790a69beb4d789b1f979e2af2e813"); + } + + @Test + public void should_returnValidJobStatus_givenFailedUnitsConversion_processRecordChangedMessageTest() throws URISyntaxException { + String validDataFromStorage = "{\"records\":[{\"id\":\"tenant1:doc:15e790a69beb4d789b1f979e2af2e813\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[],\"conversionStatuses\":[{\"id\":\"tenant1:doc:15e790a69beb4d789b1f979e2af2e813\",\"status\":\"ERROR\",\"errors\":[\"crs conversion failed\"]}]}"; + List<RecordInfo> recordChangeInfos = Arrays.asList(RecordInfo.builder().id("tenant1:doc:15e790a69beb4d789b1f979e2af2e813").kind("tenant:test:test:1.0.0").op(OperationType.update.getValue()).build()); + + HttpResponse httpResponse = mock(HttpResponse.class); + when(httpResponse.getBody()).thenReturn(validDataFromStorage); + + when(this.urlFetchService.sendRequest(any())).thenReturn(httpResponse); + Records storageRecords = this.sut.getStorageRecords(singletonList(RECORDS_ID2), recordChangeInfos); + + assertEquals(1, storageRecords.getRecords().size()); + verify(this.jobStatus).addOrUpdateRecordStatus(RECORDS_ID2, IndexingStatus.WARN, HttpStatus.BAD_REQUEST.value(), "crs conversion failed", String.format("record-id: %s | %s", "tenant1:doc:15e790a69beb4d789b1f979e2af2e813", "crs conversion failed")); + } + + @Test + public void should_returnValidResponse_givenValidRecordQueryRequest_getRecordListByKind() throws Exception { + + RecordReindexRequest recordReindexRequest = RecordReindexRequest.builder().kind("tenant:test:test:1.0.0").cursor("100").build(); + + HttpResponse httpResponse = new HttpResponse(); + httpResponse.setBody(new Gson().toJson(recordReindexRequest, RecordReindexRequest.class)); + + when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); + + RecordQueryResponse recordQueryResponse = this.sut.getRecordsByKind(recordReindexRequest); + + assertEquals("100", recordQueryResponse.getCursor()); + assertNull(recordQueryResponse.getResults()); + } + + @Test + public void should_returnValidResponse_givenValidKind_getSchemaByKind() throws Exception { + + String validSchemaFromStorage = "{" + + " \"kind\": \"tenant:test:test:1.0.0\"," + + " \"schema\": [" + + " {" + + " \"path\": \"msg\"," + + " \"kind\": \"string\"" + + " }," + + " {" + + " \"path\": \"references.entity\"," + + " \"kind\": \"string\"" + + " }" + + " ]," + + " \"ext\": null" + + "}"; + String kind = "tenant:test:test:1.0.0"; + + HttpResponse httpResponse = new HttpResponse(); + httpResponse.setResponseCode(HttpStatus.OK.value()); + httpResponse.setBody(validSchemaFromStorage); + + when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); + + String recordSchemaResponse = this.sut.getStorageSchema(kind); + + assertNotNull(recordSchemaResponse); + } + + @Test + public void should_returnNullResponse_givenAbsentKind_getSchemaByKind() throws Exception { + + String kind = "tenant:test:test:1.0.0"; + + HttpResponse httpResponse = new HttpResponse(); + httpResponse.setResponseCode(HttpStatus.NOT_FOUND.value()); + + when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); + + String recordSchemaResponse = this.sut.getStorageSchema(kind); + + assertNull(recordSchemaResponse); + } + + @Test + public void should_returnOneValidRecords_givenValidData_getValidStorageRecordsWithInvalidConversionTest() throws URISyntaxException { + + String validDataFromStorage = "{\"records\":[{\"id\":\"testid\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[\"invalid1\"],\"conversionStatuses\": [{\"id\":\"testid\",\"status\":\"ERROR\",\"errors\":[\"conversion error occurred\"] } ]}"; + List<RecordInfo> recordChangeInfos = Arrays.asList(RecordInfo.builder().id("testid").kind("tenant:test:test:1.0.0").op(OperationType.update.getValue()).build()); + + HttpResponse httpResponse = mock(HttpResponse.class); + when(httpResponse.getBody()).thenReturn(validDataFromStorage); + + when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); + Records storageRecords = this.sut.getStorageRecords(ids, recordChangeInfos); + + assertEquals(1, storageRecords.getRecords().size()); + + assertEquals(1, storageRecords.getConversionStatuses().get(0).getErrors().size()); + + assertEquals("conversion error occurred", storageRecords.getConversionStatuses().get(0).getErrors().get(0)); + } + + private void should_return404_getValidStorageRecordsTest() { + List<RecordInfo> recordChangeInfos = Arrays.asList(RecordInfo.builder().id("testid").kind("tenant:test:test:1.0.0").op(OperationType.update.getValue()).build()); + try { + this.sut.getStorageRecords(ids, recordChangeInfos); + fail("Should throw exception"); + } catch (AppException e) { + assertEquals(HttpStatus.NOT_FOUND.value(), e.getError().getCode()); + } catch (Exception e) { + fail("Should not throw this exception" + e.getMessage()); + } + } } \ No newline at end of file diff --git a/provider/indexer-azure/pom.xml b/provider/indexer-azure/pom.xml index 9f527bd1e6c81cf687726c5a5715fdede7b2878f..3bd861b7b2895955b711bb2ea809934427ecf82f 100644 --- a/provider/indexer-azure/pom.xml +++ b/provider/indexer-azure/pom.xml @@ -41,7 +41,7 @@ <nimbus-jose-jwt.version>8.2</nimbus-jose-jwt.version> <indexer-core.version>0.20.0-SNAPSHOT</indexer-core.version> <spring-security-jwt.version>1.1.1.RELEASE</spring-security-jwt.version> - <osdu.corelibazure.version>0.19.0-rc8</osdu.corelibazure.version> + <osdu.corelibazure.version>0.20.0-rc5</osdu.corelibazure.version> <os-core-common.version>0.19.0-rc6</os-core-common.version> <reactor-netty.version>0.9.12.RELEASE</reactor-netty.version> <java-jwt.version>3.8.1</java-jwt.version> diff --git a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/di/PublisherConfig.java b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/di/PublisherConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..9c14375ec8816a88e30a56259a3a4d5d9e60f08f --- /dev/null +++ b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/di/PublisherConfig.java @@ -0,0 +1,28 @@ +// Copyright © Schlumberger +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opengroup.osdu.indexer.azure.di; + +import lombok.Getter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Getter +public class PublisherConfig { + + @Value("${azure.publisher.batchsize:50}") + private Integer pubSubBatchSize; +} + diff --git a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java index f7cabb70c42148fac7ddf806c811d3976b80aa91..639f6e4a0e7fbf9b2bd56656d68e593df70b18c7 100644 --- a/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java +++ b/provider/indexer-azure/src/main/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzure.java @@ -15,8 +15,10 @@ package org.opengroup.osdu.indexer.azure.util; import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.google.common.reflect.TypeToken; -import com.google.gson.*; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import com.microsoft.azure.servicebus.Message; import lombok.extern.java.Log; import org.apache.http.HttpStatus; @@ -29,6 +31,7 @@ import org.opengroup.osdu.core.common.model.indexer.RecordInfo; import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse; import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.indexer.azure.di.PublisherConfig; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.opengroup.osdu.indexer.service.StorageService; import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; @@ -41,10 +44,10 @@ import javax.inject.Inject; import javax.inject.Named; import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; -import java.util.stream.Collectors; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION; @@ -72,6 +75,8 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { @Inject private RequestInfoImpl requestInfo; + @Autowired + private PublisherConfig publisherConfig; @Override public void createWorkerTask(String payload, DpsHeaders headers) { @@ -99,7 +104,7 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { private void publishAllRecordsToServiceBus(String payload, DpsHeaders headers) { // fetch all the remaining records // This logic is temporary and would be updated to call the storage service async. - // Currently the storage client can't be called out of request scope hence making the + // Currently, the storage client can't be called out of request scope hence making the // storage calls sync here Gson gson = new Gson(); RecordReindexRequest recordReindexRequest = gson.fromJson(payload, RecordReindexRequest.class); @@ -116,17 +121,20 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { recordQueryResponse = this.storageService.getRecordsByKind(recordReindexRequest); if (recordQueryResponse.getResults() != null && recordQueryResponse.getResults().size() != 0) { - List<RecordInfo> records = recordQueryResponse.getResults().stream() - .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList()); + List<List<String>> batch = Lists.partition(recordQueryResponse.getResults(), publisherConfig.getPubSubBatchSize()); + for (List<String> recordsBatch : batch) { + List<RecordInfo> records = recordsBatch.stream() + .map(record -> RecordInfo.builder().id(record).kind(recordKind).op(OperationType.create.name()).build()).collect(Collectors.toList()); - Map<String, String> attributes = new HashMap<>(); - attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId()); - attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); - attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); + Map<String, String> attributes = new HashMap<>(); + attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId()); + attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); + attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); - RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build(); - String recordChangedMessagePayload = gson.toJson(recordChangedMessages); - createTask(recordChangedMessagePayload, headers); + RecordChangedMessages recordChangedMessages = RecordChangedMessages.builder().data(gson.toJson(records)).attributes(attributes).build(); + String recordChangedMessagePayload = gson.toJson(recordChangedMessages); + createTask(recordChangedMessagePayload, headers); + } } } while (!Strings.isNullOrEmpty(recordQueryResponse.getCursor()) && recordQueryResponse.getResults().size() == configurationProperties.getStorageRecordsByKindBatchSize()); @@ -178,8 +186,8 @@ public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder { private List<RecordInfo> parseRecordsAsJSON(String inputPayload) { Gson gson = new Gson(); - Type type = new TypeToken<List<RecordInfo>>(){}.getType(); + Type type = new TypeToken<List<RecordInfo>>() {}.getType(); List<RecordInfo> recordInfoList = gson.fromJson(inputPayload, type); - return recordInfoList; + return recordInfoList; } } diff --git a/provider/indexer-azure/src/main/resources/application.properties b/provider/indexer-azure/src/main/resources/application.properties index ba855b47b9e34e9720a1be7d51b26ef02d44abe8..d2dd19ac5bc19b00222901d71b828f970198f90a 100644 --- a/provider/indexer-azure/src/main/resources/application.properties +++ b/provider/indexer-azure/src/main/resources/application.properties @@ -46,7 +46,7 @@ STORAGE_QUERY_RECORD_HOST=${storage_service_url}/query/records STORAGE_QUERY_KINDS_HOST=${storage_service_url}/query/kinds STORAGE_QUERY_RECORD_FOR_CONVERSION_HOST=${storage_service_url}/query/records:batch STORAGE_RECORDS_BATCH_SIZE=20 -STORAGE_RECORDS_BY_KIND_BATCH_SIZE=100 +STORAGE_RECORDS_BY_KIND_BATCH_SIZE=1000 INDEXER_QUEUE_HOST=http://127.0.0.1:9000 diff --git a/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/service/StorageServiceTest.java b/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/service/StorageServiceTest.java deleted file mode 100644 index 218488386affee78476390c4ba1081774ce6a3a5..0000000000000000000000000000000000000000 --- a/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/service/StorageServiceTest.java +++ /dev/null @@ -1,246 +0,0 @@ -// Copyright 2017-2019, Schlumberger -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexer.azure.service; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.opengroup.osdu.core.common.model.indexer.IndexingStatus; -import org.opengroup.osdu.core.common.model.indexer.RecordInfo; -import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse; -import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; -import org.opengroup.osdu.core.common.model.indexer.Records; -import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; -import org.opengroup.osdu.indexer.service.StorageServiceImpl; -import org.opengroup.osdu.core.common.model.indexer.JobStatus; -import org.opengroup.osdu.core.common.model.http.HttpResponse; -import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; -import org.opengroup.osdu.core.common.http.IUrlFetchService; -import org.opengroup.osdu.core.common.model.http.AppException; -import org.springframework.http.HttpStatus; -import org.springframework.test.context.junit4.SpringRunner; - -import java.lang.reflect.Type; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -import static java.util.Collections.singletonList; -import static org.junit.Assert.*; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.powermock.api.mockito.PowerMockito.when; - -@Ignore -@RunWith(SpringRunner.class) -public class StorageServiceTest { - - @Mock - private IUrlFetchService urlFetchService; - @Mock - private JobStatus jobStatus; - @Mock - private JaxRsDpsLog log; - @Mock - private IRequestInfo requestInfo; - @InjectMocks - private StorageServiceImpl sut; - - private List<String> ids; - private static final String RECORD_ID1 = "tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465"; - private static final String RECORDS_ID2 = "tenant1:doc:15e790a69beb4d789b1f979e2af2e813"; - - @Before - public void setup() { - - String recordChangedMessages = "[{\"id\":\"tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465\",\"kind\":\"tenant1:testindexer1528919679710:well:1.0.0\",\"op\":\"purge\"}," + - "{\"id\":\"tenant1:doc:15e790a69beb4d789b1f979e2af2e813\",\"kind\":\"tenant1:testindexer1528919679710:well:1.0.0\",\"op\":\"create\"}]"; - - when(this.requestInfo.getHeadersMap()).thenReturn(new HashMap<>()); - - Type listType = new TypeToken<List<RecordInfo>>() {}.getType(); - - List<RecordInfo> msgs = (new Gson()).fromJson(recordChangedMessages, listType); - jobStatus.initialize(msgs); - ids = Arrays.asList(RECORD_ID1, RECORDS_ID2); - } - - @Test - public void should_return404_givenNullData_getValidStorageRecordsTest() throws URISyntaxException { - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(null); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - should_return404_getValidStorageRecordsTest(); - } - - @Test - public void should_return404_givenEmptyData_getValidStorageRecordsTest() throws URISyntaxException { - - String emptyDataFromStorage = "{\"records\":[],\"notFound\":[]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(emptyDataFromStorage); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - should_return404_getValidStorageRecordsTest(); - } - - @Test - public void should_returnOneValidRecords_givenValidData_getValidStorageRecordsTest() throws URISyntaxException { - - String validDataFromStorage = "{\"records\":[{\"id\":\"testid\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[\"invalid1\"]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(validDataFromStorage); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - Records storageRecords = this.sut.getStorageRecords(ids); - - assertEquals(1, storageRecords.getRecords().size()); - } - - @Test - public void should_logMissingRecord_given_storageMissedRecords() throws URISyntaxException { - - String validDataFromStorage = "{\"records\":[{\"id\":\"tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(validDataFromStorage); - - when(this.urlFetchService.sendRequest(any())).thenReturn(httpResponse); - Records storageRecords = this.sut.getStorageRecords(ids); - - assertEquals(1, storageRecords.getRecords().size()); - verify(this.jobStatus).addOrUpdateRecordStatus(singletonList(RECORDS_ID2), IndexingStatus.FAIL, org.apache.http.HttpStatus.SC_NOT_FOUND, "Partial response received from Storage service - missing records", "Partial response received from Storage service: tenant1:doc:15e790a69beb4d789b1f979e2af2e813"); - } - - @Test - public void should_returnValidJobStatus_givenFailedUnitsConversion_processRecordChangedMessageTest() throws URISyntaxException { - String validDataFromStorage = "{\"records\":[{\"id\":\"tenant1:doc:15e790a69beb4d789b1f979e2af2e813\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[],\"conversionStatuses\":[{\"id\":\"tenant1:doc:15e790a69beb4d789b1f979e2af2e813\",\"status\":\"ERROR\",\"errors\":[\"crs conversion failed\"]}]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(validDataFromStorage); - - when(this.urlFetchService.sendRequest(any())).thenReturn(httpResponse); - Records storageRecords = this.sut.getStorageRecords(singletonList("tenant1:doc:15e790a69beb4d789b1f979e2af2e813")); - - assertEquals(1, storageRecords.getRecords().size()); - verify(this.jobStatus).addOrUpdateRecordStatus("tenant1:doc:15e790a69beb4d789b1f979e2af2e813", IndexingStatus.WARN, org.apache.http.HttpStatus.SC_BAD_REQUEST, "crs conversion failed", String.format("record-id: %s | %s", "tenant1:doc:15e790a69beb4d789b1f979e2af2e813", "crs conversion failed")); - } - - @Test - public void should_returnValidResponse_givenValidRecordQueryRequest_getRecordListByKind() throws Exception { - - RecordReindexRequest recordReindexRequest = RecordReindexRequest.builder().kind("tenant:test:test:1.0.0").cursor("100").build(); - - HttpResponse httpResponse = new HttpResponse(); - httpResponse.setBody(new Gson().toJson(recordReindexRequest, RecordReindexRequest.class)); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - RecordQueryResponse recordQueryResponse = this.sut.getRecordsByKind(recordReindexRequest); - - assertEquals("100", recordQueryResponse.getCursor()); - assertNull(recordQueryResponse.getResults()); - } - - @Test - public void should_returnValidResponse_givenValidKind_getSchemaByKind() throws Exception { - - String validSchemaFromStorage = "{" + - " \"kind\": \"tenant:test:test:1.0.0\"," + - " \"schema\": [" + - " {" + - " \"path\": \"msg\"," + - " \"kind\": \"string\"" + - " }," + - " {" + - " \"path\": \"references.entity\"," + - " \"kind\": \"string\"" + - " }" + - " ]," + - " \"ext\": null" + - "}"; - String kind = "tenant:test:test:1.0.0"; - - HttpResponse httpResponse = new HttpResponse(); - httpResponse.setResponseCode(HttpStatus.OK.value()); - httpResponse.setBody(validSchemaFromStorage); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - String recordSchemaResponse = this.sut.getStorageSchema(kind); - - assertNotNull(recordSchemaResponse); - } - - @Test - public void should_returnNullResponse_givenAbsentKind_getSchemaByKind() throws Exception { - - String kind = "tenant:test:test:1.0.0"; - - HttpResponse httpResponse = new HttpResponse(); - httpResponse.setResponseCode(HttpStatus.NOT_FOUND.value()); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - String recordSchemaResponse = this.sut.getStorageSchema(kind); - - assertNull(recordSchemaResponse); - } - - @Test - public void should_returnOneValidRecords_givenValidData_getValidStorageRecordsWithInvalidConversionTest() throws URISyntaxException { - - String validDataFromStorage = "{\"records\":[{\"id\":\"testid\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[\"invalid1\"],\"conversionStatuses\": [{\"id\":\"testid\",\"status\":\"ERROR\",\"errors\":[\"conversion error occurred\"] } ]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(validDataFromStorage); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - Records storageRecords = this.sut.getStorageRecords(ids); - - assertEquals(1, storageRecords.getRecords().size()); - - assertEquals(1, storageRecords.getConversionStatuses().get(0).getErrors().size()); - - assertEquals("conversion error occurred", storageRecords.getConversionStatuses().get(0).getErrors().get(0)); - } - - private void should_return404_getValidStorageRecordsTest() { - try { - this.sut.getStorageRecords(ids); - fail("Should throw exception"); - } catch (AppException e) { - assertEquals(HttpStatus.NOT_FOUND.value(), e.getError().getCode()); - } catch (Exception e) { - fail("Should not throw this exception" + e.getMessage()); - } - } -} diff --git a/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzureTest.java b/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzureTest.java index e1333d34f6cb7e1d3bc5ede5a642ecb8680d930c..559fc9875db254a2896cbc804e0e0e4d963922f7 100644 --- a/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzureTest.java +++ b/provider/indexer-azure/src/test/java/org/opengroup/osdu/indexer/azure/util/IndexerQueueTaskBuilderAzureTest.java @@ -1,41 +1,37 @@ package org.opengroup.osdu.indexer.azure.util; -import com.microsoft.azure.servicebus.TopicClient; import com.microsoft.azure.servicebus.primitives.ServiceBusException; -import org.junit.jupiter.api.BeforeEach; +import org.junit.Before; import org.junit.Test; -import org.mockito.Spy; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; +import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; import org.opengroup.osdu.azure.servicebus.ITopicClientFactory; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; import org.opengroup.osdu.core.common.model.http.AppException; import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse; -import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; -import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; +import org.opengroup.osdu.indexer.azure.di.PublisherConfig; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.opengroup.osdu.indexer.service.StorageService; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.util.ReflectionTestUtils; -import javax.inject.Inject; -import javax.inject.Named; + import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.opengroup.osdu.core.common.model.http.DpsHeaders.AUTHORIZATION; @RunWith(MockitoJUnitRunner.class) public class IndexerQueueTaskBuilderAzureTest { - private String payload="{payload : value }"; + private String payload = "{payload : value }"; private static String partitionId = "opendes"; private static String correlationId = "correlationId"; private static String serviceBusReindexTopicNameField = "serviceBusReindexTopicName"; @@ -51,6 +47,9 @@ public class IndexerQueueTaskBuilderAzureTest { @Mock private JaxRsDpsLog logger; + @Mock + private PublisherConfig publisherConfig; + @Mock DpsHeaders dpsHeaders; @@ -63,8 +62,13 @@ public class IndexerQueueTaskBuilderAzureTest { @InjectMocks IndexerQueueTaskBuilderAzure sut; + @Before + public void setup() { + when(this.publisherConfig.getPubSubBatchSize()).thenReturn(50); + } + @Test - public void createWorkerTask_should_invoke_correctMethods() throws ServiceBusException, InterruptedException, NoSuchFieldException { + public void createWorkerTask_should_invoke_correctMethods() throws ServiceBusException, InterruptedException { when(dpsHeaders.getPartitionIdWithFallbackToAccountId()).thenReturn(partitionId); when(dpsHeaders.getPartitionId()).thenReturn(partitionId); when(dpsHeaders.getCorrelationId()).thenReturn(correlationId); @@ -79,8 +83,8 @@ public class IndexerQueueTaskBuilderAzureTest { } @Test - public void createWorkerTaskWithCountDown_should_invoke_correctMethods() throws ServiceBusException, InterruptedException, NoSuchFieldException { - Long milliseconds=8000L; + public void createWorkerTaskWithCountDown_should_invoke_correctMethods() throws ServiceBusException, InterruptedException { + Long milliseconds = 8000L; when(dpsHeaders.getPartitionIdWithFallbackToAccountId()).thenReturn(partitionId); when(dpsHeaders.getPartitionId()).thenReturn(partitionId); when(dpsHeaders.getCorrelationId()).thenReturn(correlationId); @@ -95,13 +99,12 @@ public class IndexerQueueTaskBuilderAzureTest { } @Test(expected = AppException.class) - public void createReIndexTask_InvalidParameter_ShouldThrowException() - { - sut.createReIndexTask(payload,dpsHeaders); + public void createReIndexTask_InvalidParameter_ShouldThrowException() { + sut.createReIndexTask(payload, dpsHeaders); } @Test - public void createReIndexTaskWithEmptyStorageResponse_should_invoke_correctMethods() throws ServiceBusException, InterruptedException, NoSuchFieldException, URISyntaxException { + public void createReIndexTaskWithEmptyStorageResponse_should_invoke_correctMethods() throws URISyntaxException { Long milliseconds = 8000L; RecordQueryResponse recordQueryResponse = new RecordQueryResponse(); when(requestInfo.checkOrGetAuthorizationHeader()).thenReturn(authorisedHeader); @@ -110,17 +113,17 @@ public class IndexerQueueTaskBuilderAzureTest { sut.createReIndexTask(payload, milliseconds, dpsHeaders); - verify(requestInfo,times(1)).checkOrGetAuthorizationHeader(); + verify(requestInfo, times(1)).checkOrGetAuthorizationHeader(); verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader); - verify(storageService,times(1)).getRecordsByKind(any()); + verify(storageService, times(1)).getRecordsByKind(any()); verify(dpsHeaders, times(1)).addCorrelationIdIfMissing(); } @Test - public void createReIndexTaskWithNonEmptyStorageResponse_should_invoke_correctMethods() throws ServiceBusException, InterruptedException, NoSuchFieldException, URISyntaxException { + public void createReIndexTaskWithNonEmptyStorageResponse_should_invoke_correctMethods() throws ServiceBusException, InterruptedException, URISyntaxException { Long milliseconds = 8000L; RecordQueryResponse recordQueryResponse = new RecordQueryResponse(); - List<String> res = Arrays.asList("r1","r2","r3"); + List<String> res = Arrays.asList("r1", "r2", "r3"); recordQueryResponse.setResults(res); when(dpsHeaders.getPartitionIdWithFallbackToAccountId()).thenReturn(partitionId); when(dpsHeaders.getCorrelationId()).thenReturn(correlationId); @@ -131,9 +134,9 @@ public class IndexerQueueTaskBuilderAzureTest { sut.createReIndexTask(payload, milliseconds, dpsHeaders); - verify(requestInfo,times(1)).checkOrGetAuthorizationHeader(); + verify(requestInfo, times(1)).checkOrGetAuthorizationHeader(); verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader); - verify(storageService,times(1)).getRecordsByKind(any()); + verify(storageService, times(1)).getRecordsByKind(any()); verify(dpsHeaders, times(6)).getPartitionIdWithFallbackToAccountId(); verify(dpsHeaders, times(3)).getCorrelationId(); verify(dpsHeaders, times(2)).addCorrelationIdIfMissing(); @@ -141,7 +144,31 @@ public class IndexerQueueTaskBuilderAzureTest { } @Test - public void createReIndexTaskWithCountdown_should_invoke_correctMethods() throws ServiceBusException, InterruptedException, NoSuchFieldException, URISyntaxException { + public void createReIndexTaskWithNonEmptyStorageResponse_1KBatch_should_invoke_correctMethods() throws ServiceBusException, InterruptedException, URISyntaxException { + Long milliseconds = 8000L; + RecordQueryResponse recordQueryResponse = new RecordQueryResponse(); + List<String> res = Arrays.asList("r1", "r2", "r3", "r4", "r5", "r6", "r7", "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15", "r16", "r17", "r18", "r19", "r20", "r21", "r22", "r23", "r24", "r25", "r26", "r27", "r28", "r29", "r30", "r31", "r32", "r33", "r34", "r35", "r36", "r37", "r38", "r39", "r40", "r41", "r42", "r43", "r44", "r45", "r46", "r47", "r48", "r49", "r50", "r51", "r52", "r53", "r54", "r55", "r56", "r57", "r58", "r59", "r60", "r61", "r62", "r63", "r64", "r65", "r66", "r67", "r68", "r69", "r70", "r71", "r72", "r73", "r74", "r75", "r76", "r77", "r78", "r79", "r80", "r81", "r82", "r83", "r84", "r85", "r86", "r87", "r88", "r89", "r90", "r91", "r92", "r93", "r94", "r95", "r96", "r97", "r98", "r99", "r100", "r101", "r102", "r103", "r104", "r105", "r106", "r107", "r108", "r109", "r110", "r111", "r112", "r113", "r114", "r115", "r116", "r117", "r118", "r119", "r120", "r121", "r122", "r123", "r124", "r125", "r126", "r127", "r128", "r129", "r130", "r131", "r132", "r133", "r134", "r135", "r136", "r137", "r138", "r139", "r140", "r141", "r142", "r143", "r144", "r145", "r146", "r147", "r148", "r149", "r150", "r151", "r152", "r153", "r154", "r155", "r156", "r157", "r158", "r159", "r160", "r161", "r162", "r163", "r164", "r165", "r166", "r167", "r168", "r169", "r170", "r171", "r172", "r173", "r174", "r175", "r176", "r177", "r178", "r179", "r180", "r181", "r182", "r183", "r184", "r185", "r186", "r187", "r188", "r189", "r190", "r191", "r192", "r193", "r194", "r195", "r196", "r197", "r198", "r199", "r200", "r201", "r202", "r203", "r204", "r205", "r206", "r207", "r208", "r209", "r210", "r211", "r212", "r213", "r214", "r215", "r216", "r217", "r218", "r219", "r220", "r221", "r222", "r223", "r224", "r225", "r226", "r227", "r228", "r229", "r230", "r231", "r232", "r233", "r234", "r235", "r236", "r237", "r238", "r239", "r240", "r241", "r242", "r243", "r244", "r245", "r246", "r247", "r248", "r249", "r250", "r251", "r252", "r253", "r254", "r255", "r256", "r257", "r258", "r259", "r260", "r261", "r262", "r263", "r264", "r265", "r266", "r267", "r268", "r269", "r270", "r271", "r272", "r273", "r274", "r275", "r276", "r277", "r278", "r279", "r280", "r281", "r282", "r283", "r284", "r285", "r286", "r287", "r288", "r289", "r290", "r291", "r292", "r293", "r294", "r295", "r296", "r297", "r298", "r299", "r300", "r301", "r302", "r303", "r304", "r305", "r306", "r307", "r308", "r309", "r310", "r311", "r312", "r313", "r314", "r315", "r316", "r317", "r318", "r319", "r320", "r321", "r322", "r323", "r324", "r325", "r326", "r327", "r328", "r329", "r330", "r331", "r332", "r333", "r334", "r335", "r336", "r337", "r338", "r339", "r340", "r341", "r342", "r343", "r344", "r345", "r346", "r347", "r348", "r349", "r350", "r351", "r352", "r353", "r354", "r355", "r356", "r357", "r358", "r359", "r360", "r361", "r362", "r363", "r364", "r365", "r366", "r367", "r368", "r369", "r370", "r371", "r372", "r373", "r374", "r375", "r376", "r377", "r378", "r379", "r380", "r381", "r382", "r383", "r384", "r385", "r386", "r387", "r388", "r389", "r390", "r391", "r392", "r393", "r394", "r395", "r396", "r397", "r398", "r399", "r400", "r401", "r402", "r403", "r404", "r405", "r406", "r407", "r408", "r409", "r410", "r411", "r412", "r413", "r414", "r415", "r416", "r417", "r418", "r419", "r420", "r421", "r422", "r423", "r424", "r425", "r426", "r427", "r428", "r429", "r430", "r431", "r432", "r433", "r434", "r435", "r436", "r437", "r438", "r439", "r440", "r441", "r442", "r443", "r444", "r445", "r446", "r447", "r448", "r449", "r450", "r451", "r452", "r453", "r454", "r455", "r456", "r457", "r458", "r459", "r460", "r461", "r462", "r463", "r464", "r465", "r466", "r467", "r468", "r469", "r470", "r471", "r472", "r473", "r474", "r475", "r476", "r477", "r478", "r479", "r480", "r481", "r482", "r483", "r484", "r485", "r486", "r487", "r488", "r489", "r490", "r491", "r492", "r493", "r494", "r495", "r496", "r497", "r498", "r499", "r500", "r501", "r502", "r503", "r504", "r505", "r506", "r507", "r508", "r509", "r510", "r511", "r512", "r513", "r514", "r515", "r516", "r517", "r518", "r519", "r520", "r521", "r522", "r523", "r524", "r525", "r526", "r527", "r528", "r529", "r530", "r531", "r532", "r533", "r534", "r535", "r536", "r537", "r538", "r539", "r540", "r541", "r542", "r543", "r544", "r545", "r546", "r547", "r548", "r549", "r550", "r551", "r552", "r553", "r554", "r555", "r556", "r557", "r558", "r559", "r560", "r561", "r562", "r563", "r564", "r565", "r566", "r567", "r568", "r569", "r570", "r571", "r572", "r573", "r574", "r575", "r576", "r577", "r578", "r579", "r580", "r581", "r582", "r583", "r584", "r585", "r586", "r587", "r588", "r589", "r590", "r591", "r592", "r593", "r594", "r595", "r596", "r597", "r598", "r599", "r600", "r601", "r602", "r603", "r604", "r605", "r606", "r607", "r608", "r609", "r610", "r611", "r612", "r613", "r614", "r615", "r616", "r617", "r618", "r619", "r620", "r621", "r622", "r623", "r624", "r625", "r626", "r627", "r628", "r629", "r630", "r631", "r632", "r633", "r634", "r635", "r636", "r637", "r638", "r639", "r640", "r641", "r642", "r643", "r644", "r645", "r646", "r647", "r648", "r649", "r650", "r651", "r652", "r653", "r654", "r655", "r656", "r657", "r658", "r659", "r660", "r661", "r662", "r663", "r664", "r665", "r666", "r667", "r668", "r669", "r670", "r671", "r672", "r673", "r674", "r675", "r676", "r677", "r678", "r679", "r680", "r681", "r682", "r683", "r684", "r685", "r686", "r687", "r688", "r689", "r690", "r691", "r692", "r693", "r694", "r695", "r696", "r697", "r698", "r699", "r700", "r701", "r702", "r703", "r704", "r705", "r706", "r707", "r708", "r709", "r710", "r711", "r712", "r713", "r714", "r715", "r716", "r717", "r718", "r719", "r720", "r721", "r722", "r723", "r724", "r725", "r726", "r727", "r728", "r729", "r730", "r731", "r732", "r733", "r734", "r735", "r736", "r737", "r738", "r739", "r740", "r741", "r742", "r743", "r744", "r745", "r746", "r747", "r748", "r749", "r750", "r751", "r752", "r753", "r754", "r755", "r756", "r757", "r758", "r759", "r760", "r761", "r762", "r763", "r764", "r765", "r766", "r767", "r768", "r769", "r770", "r771", "r772", "r773", "r774", "r775", "r776", "r777", "r778", "r779", "r780", "r781", "r782", "r783", "r784", "r785", "r786", "r787", "r788", "r789", "r790", "r791", "r792", "r793", "r794", "r795", "r796", "r797", "r798", "r799", "r800", "r801", "r802", "r803", "r804", "r805", "r806", "r807", "r808", "r809", "r810", "r811", "r812", "r813", "r814", "r815", "r816", "r817", "r818", "r819", "r820", "r821", "r822", "r823", "r824", "r825", "r826", "r827", "r828", "r829", "r830", "r831", "r832", "r833", "r834", "r835", "r836", "r837", "r838", "r839", "r840", "r841", "r842", "r843", "r844", "r845", "r846", "r847", "r848", "r849", "r850", "r851", "r852", "r853", "r854", "r855", "r856", "r857", "r858", "r859", "r860", "r861", "r862", "r863", "r864", "r865", "r866", "r867", "r868", "r869", "r870", "r871", "r872", "r873", "r874", "r875", "r876", "r877", "r878", "r879", "r880", "r881", "r882", "r883", "r884", "r885", "r886", "r887", "r888", "r889", "r890", "r891", "r892", "r893", "r894", "r895", "r896", "r897", "r898", "r899", "r900", "r901", "r902", "r903", "r904", "r905", "r906", "r907", "r908", "r909", "r910", "r911", "r912", "r913", "r914", "r915", "r916", "r917", "r918", "r919", "r920", "r921", "r922", "r923", "r924", "r925", "r926", "r927", "r928", "r929", "r930", "r931", "r932", "r933", "r934", "r935", "r936", "r937", "r938", "r939", "r940", "r941", "r942", "r943", "r944", "r945", "r946", "r947", "r948", "r949", "r950", "r951", "r952", "r953", "r954", "r955", "r956", "r957", "r958", "r959", "r960", "r961", "r962", "r963", "r964", "r965", "r966", "r967", "r968", "r969", "r970", "r971", "r972", "r973", "r974", "r975", "r976", "r977", "r978", "r979", "r980", "r981", "r982", "r983", "r984", "r985", "r986", "r987", "r988", "r989", "r990", "r991", "r992", "r993", "r994", "r995", "r996", "r997", "r998", "r999", "r1000"); + recordQueryResponse.setResults(res); + when(dpsHeaders.getPartitionIdWithFallbackToAccountId()).thenReturn(partitionId); + when(dpsHeaders.getCorrelationId()).thenReturn(correlationId); + when(dpsHeaders.getPartitionId()).thenReturn(partitionId); + when(requestInfo.checkOrGetAuthorizationHeader()).thenReturn(authorisedHeader); + when(storageService.getRecordsByKind(any())).thenReturn(recordQueryResponse); + ReflectionTestUtils.setField(sut, serviceBusReindexTopicNameField, serviceBusReindexTopicNameValue); + + sut.createReIndexTask(payload, milliseconds, dpsHeaders); + + verify(requestInfo, times(1)).checkOrGetAuthorizationHeader(); + verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader); + verify(storageService, times(1)).getRecordsByKind(any()); + verify(dpsHeaders, times(120)).getPartitionIdWithFallbackToAccountId(); + verify(dpsHeaders, times(60)).getCorrelationId(); + verify(dpsHeaders, times(21)).addCorrelationIdIfMissing(); + verify(topicClientFactory, times(20)).getClient(partitionId, serviceBusReindexTopicNameValue); + } + + @Test + public void createReIndexTaskWithCountdown_should_invoke_correctMethods() throws URISyntaxException { Long milliseconds = 8000L; RecordQueryResponse recordQueryResponse = new RecordQueryResponse(); when(requestInfo.checkOrGetAuthorizationHeader()).thenReturn(authorisedHeader); @@ -150,9 +177,9 @@ public class IndexerQueueTaskBuilderAzureTest { sut.createReIndexTask(payload, milliseconds, dpsHeaders); - verify(requestInfo,times(1)).checkOrGetAuthorizationHeader(); + verify(requestInfo, times(1)).checkOrGetAuthorizationHeader(); verify(dpsHeaders, times(1)).put(AUTHORIZATION, authorisedHeader); - verify(storageService,times(1)).getRecordsByKind(any()); + verify(storageService, times(1)).getRecordsByKind(any()); verify(dpsHeaders, times(1)).addCorrelationIdIfMissing(); } } diff --git a/provider/indexer-gc/docs/anthos/README.md b/provider/indexer-gc/docs/anthos/README.md index 70276024394e1452e3380c89cefc0e2603ea66df..9800216a3e1cdb6ccc86f5537f108206e179dfd2 100644 --- a/provider/indexer-gc/docs/anthos/README.md +++ b/provider/indexer-gc/docs/anthos/README.md @@ -49,7 +49,7 @@ Note that properties can be set in Partition as `sensitive` in that case in prop This variable should be present in environment of service that need that variable. Example: -``` +```json "elasticsearch.port": { "sensitive": false, <- value not sensitive "value": "9243" <- will be used as is. @@ -60,6 +60,17 @@ Example: } ``` +## Indexer account configuration +Google cloud OSDU platform doesn't use a single Tenant account which provides access to all groups for each service, +instead, separate accounts should be used. But the Indexer should have access to all data groups, no matter when they were created. +To achieve that add an Indexer account to the partition configuration: +```json + "indexer.service.account": { + "sensitive": false, + "value": "indexer@service.local" + } +``` +Related issue: https://community.opengroup.org/osdu/platform/system/storage/-/issues/153 ## Elasticsearch configuration diff --git a/provider/indexer-gc/docs/gc/README.md b/provider/indexer-gc/docs/gc/README.md index 4fbcefb066ae0e3c91dca47eed5a39f69ff77dc5..8edb09a7137f11a0a93a89ffab304faa2a81e472 100644 --- a/provider/indexer-gc/docs/gc/README.md +++ b/provider/indexer-gc/docs/gc/README.md @@ -63,7 +63,7 @@ It is possible, but not necessary to adjust consumer throughput via Partition se <https://community.opengroup.org/osdu/platform/system/lib/cloud/gcp/oqm/-/blob/master/src/main/java/org/opengroup/osdu/core/gcp/oqm/driver/pubsub/config/PsThroughputConfiguration.java> -``` +```json "max.sub.parallel.streams": { "sensitive": false, "value": 2 @@ -85,7 +85,7 @@ This variable should be present in environment of service that need that variabl Example: -``` +```json "elasticsearch.port": { "sensitive": false, <- value not sensitive "value": "9243" <- will be used as is. @@ -98,6 +98,18 @@ Example: There is no hardcode in services, all behaviour defined by sensitivity of property. +## Indexer account configuration +Google cloud OSDU platform doesn't use a single Tenant account which provides access to all groups for each service, +instead, separate accounts should be used. But the Indexer should have access to all data groups, no matter when they were created. +To achieve that add an Indexer account to the partition configuration: +```json + "indexer.service.account": { + "sensitive": false, + "value": "indexer@service.local" + } +``` +Related issue: https://community.opengroup.org/osdu/platform/system/storage/-/issues/153 + ## Elasticsearch configuration **prefix:** `elasticsearch` diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java index 973372f431e5d29c333affcd43932736132dbe16..3c73f93c1d893dea4be35ee1191eafee23546946 100644 --- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java +++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/indexing/processing/IndexerOqmMessageReceiver.java @@ -1,6 +1,6 @@ /* - * Copyright 2020-2022 Google LLC - * Copyright 2020-2022 EPAM Systems, Inc + * Copyright 2020-2023 Google LLC + * Copyright 2020-2023 EPAM Systems, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package org.opengroup.osdu.indexer.provider.gcp.indexing.processing; import com.google.common.base.Strings; -import java.io.IOException; + import java.util.Optional; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -42,60 +42,61 @@ public abstract class IndexerOqmMessageReceiver implements OqmMessageReceiver { @Override public void receiveMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier) { - log.info("OQM message: {} - {} - {}", oqmMessage.getId(), oqmMessage.getData(), - oqmMessage.getAttributes()); - boolean acked = false; + log.info("OQM message: {} - {} - {}", oqmMessage.getId(), oqmMessage.getData(), oqmMessage.getAttributes()); + if (!validInput(oqmMessage)) { + log.error("Not valid event payload, event will not be processed."); + oqmAckReplier.ack(); + return; + } + try { - if (!validInput(oqmMessage)) { - log.warn("Not valid event payload, event will not be processed."); - oqmAckReplier.ack(); - return; - } DpsHeaders headers = getHeaders(oqmMessage); // Filling thread context required by the core services. dpsHeaders.setThreadContext(headers.getHeaders()); sendMessage(oqmMessage); - acked = true; + oqmAckReplier.ack(); } catch (AppException appException) { int statusCode = appException.getError().getCode(); if (statusCode > 199 && statusCode < 300 && statusCode != RequestStatus.INVALID_RECORD) { - log.info( - "Event id : " + oqmMessage.getId() + ", was not processed, and will NOT be rescheduled.", - appException - ); - acked = true; + skipMessage(oqmMessage, oqmAckReplier, appException); } else { - //It is possible to get both AppException with wrapped in original Exception or the original Exception without any wrapper - Exception exception = Optional.ofNullable(appException.getOriginalException()).orElse(appException); - log.warn( - "Event id : " + oqmMessage.getId() + ", was not processed, and will BE rescheduled.", - exception - ); + rescheduleMessage(oqmMessage, oqmAckReplier, getException(appException)); } } catch (Exception exception) { - log.error( - "Event id : " + oqmMessage.getId() + ", was not processed, and will BE rescheduled.", - exception - ); + rescheduleMessage(oqmMessage, oqmAckReplier, exception); } finally { - if (!acked) { - oqmAckReplier.nack(); - } else { - oqmAckReplier.ack(); - } // Cleaning thread context after processing is finished and the thread dies out. ThreadScopeContextHolder.currentThreadScopeAttributes().clear(); } } + private static void skipMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier, AppException appException) { + log.info("Event id : " + oqmMessage.getId() + ", was not processed, and will NOT be rescheduled.", appException); + oqmAckReplier.ack(); + } + + private static void rescheduleMessage(OqmMessage oqmMessage, OqmAckReplier oqmAckReplier, Exception exception) { + log.error("Event id : " + oqmMessage.getId() + ", was not processed, and will BE rescheduled.", exception); + oqmAckReplier.nack(); + } + + /** + * It is possible to get both AppException with wrapped in original Exception + * or the original Exception without any wrapper. + */ + @NotNull + private static Exception getException(AppException appException) { + return Optional.ofNullable(appException.getOriginalException()).orElse(appException); + } + private boolean validInput(OqmMessage oqmMessage) { boolean isValid = true; if (Strings.isNullOrEmpty(oqmMessage.getData()) || oqmMessage.getData().equals("{}")) { - log.warn("Message body is empty, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes()); + log.error("Message body is empty, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes()); isValid = false; } if (oqmMessage.getAttributes() == null || oqmMessage.getAttributes().size() == 0) { - log.warn("Attribute map not found, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes()); + log.error("Attribute map not found, message id: {}, attributes: {}", oqmMessage.getId(), oqmMessage.getAttributes()); isValid = false; } return isValid; diff --git a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/service/StorageServiceTest.java b/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/service/StorageServiceTest.java deleted file mode 100644 index 3ff767ebe5c4f5cb638cef9eeb8e440eecd7db19..0000000000000000000000000000000000000000 --- a/provider/indexer-gc/src/test/java/org/opengroup/osdu/indexer/service/StorageServiceTest.java +++ /dev/null @@ -1,254 +0,0 @@ -// Copyright 2017-2019, Schlumberger -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.opengroup.osdu.indexer.service; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.Spy; -import org.opengroup.osdu.core.common.model.http.DpsHeaders; -import org.opengroup.osdu.core.common.model.http.AppException; -import org.opengroup.osdu.core.common.model.indexer.IndexingStatus; -import org.opengroup.osdu.core.common.model.indexer.JobStatus; -import org.opengroup.osdu.core.common.model.indexer.RecordInfo; -import org.opengroup.osdu.core.common.model.http.HttpResponse; -import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; -import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; -import org.opengroup.osdu.core.common.http.IUrlFetchService; -import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse; -import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; -import org.opengroup.osdu.core.common.model.indexer.Records; -import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; -import org.springframework.http.HttpStatus; -import org.springframework.test.context.junit4.SpringRunner; - -import java.lang.reflect.Type; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -import static java.util.Collections.singletonList; -import static org.junit.Assert.*; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.powermock.api.mockito.PowerMockito.when; - -@RunWith(SpringRunner.class) -public class StorageServiceTest { - - @Mock - private IUrlFetchService urlFetchService; - @Mock - private JobStatus jobStatus; - @Mock - private JaxRsDpsLog log; - @Mock - private IRequestInfo requestInfo; - @Mock - private IndexerConfigurationProperties configurationProperties; - @Spy - private ObjectMapper objectMapper = new ObjectMapper(); - @InjectMocks - private StorageServiceImpl sut; - - private List<String> ids; - private static final String RECORD_ID1 = "tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465"; - private static final String RECORDS_ID2 = "tenant1:doc:15e790a69beb4d789b1f979e2af2e813"; - - @Before - public void setup() { - - String recordChangedMessages = "[{\"id\":\"tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465\",\"kind\":\"tenant1:testindexer1528919679710:well:1.0.0\",\"op\":\"purge\"}," + - "{\"id\":\"tenant1:doc:15e790a69beb4d789b1f979e2af2e813\",\"kind\":\"tenant1:testindexer1528919679710:well:1.0.0\",\"op\":\"create\"}]"; - - when(this.requestInfo.getHeadersMap()).thenReturn(new HashMap<>()); - when(this.requestInfo.getHeaders()).thenReturn(new DpsHeaders()); - - Type listType = new TypeToken<List<RecordInfo>>() {}.getType(); - - List<RecordInfo> msgs = (new Gson()).fromJson(recordChangedMessages, listType); - jobStatus.initialize(msgs); - ids = Arrays.asList(RECORD_ID1, RECORDS_ID2); - - when(configurationProperties.getStorageRecordsBatchSize()).thenReturn(20); - } - - @Test - public void should_return404_givenNullData_getValidStorageRecordsTest() throws URISyntaxException { - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(null); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - should_return404_getValidStorageRecordsTest(); - } - - @Test - public void should_return404_givenEmptyData_getValidStorageRecordsTest() throws URISyntaxException { - - String emptyDataFromStorage = "{\"records\":[],\"notFound\":[]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(emptyDataFromStorage); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - should_return404_getValidStorageRecordsTest(); - } - - @Test - public void should_returnOneValidRecords_givenValidData_getValidStorageRecordsTest() throws URISyntaxException { - - String validDataFromStorage = "{\"records\":[{\"id\":\"testid\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[\"invalid1\"], \"conversionStatuses\": []}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(validDataFromStorage); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - Records storageRecords = this.sut.getStorageRecords(ids); - - assertEquals(1, storageRecords.getRecords().size()); - } - - @Test - public void should_logMissingRecord_given_storageMissedRecords() throws URISyntaxException { - - String validDataFromStorage = "{\"records\":[{\"id\":\"tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(validDataFromStorage); - - when(this.urlFetchService.sendRequest(any())).thenReturn(httpResponse); - Records storageRecords = this.sut.getStorageRecords(ids); - - assertEquals(1, storageRecords.getRecords().size()); - verify(this.jobStatus).addOrUpdateRecordStatus(singletonList(RECORDS_ID2), IndexingStatus.FAIL, HttpStatus.NOT_FOUND.value(), "Partial response received from Storage service - missing records", "Partial response received from Storage service: tenant1:doc:15e790a69beb4d789b1f979e2af2e813"); - } - - @Test - public void should_returnValidJobStatus_givenFailedUnitsConversion_processRecordChangedMessageTest() throws URISyntaxException { - String validDataFromStorage = "{\"records\":[{\"id\":\"tenant1:doc:15e790a69beb4d789b1f979e2af2e813\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[],\"conversionStatuses\":[{\"id\":\"tenant1:doc:15e790a69beb4d789b1f979e2af2e813\",\"status\":\"ERROR\",\"errors\":[\"crs conversion failed\"]}]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(validDataFromStorage); - - when(this.urlFetchService.sendRequest(any())).thenReturn(httpResponse); - Records storageRecords = this.sut.getStorageRecords(singletonList(RECORDS_ID2)); - - assertEquals(1, storageRecords.getRecords().size()); - verify(this.jobStatus).addOrUpdateRecordStatus(RECORDS_ID2, IndexingStatus.WARN, HttpStatus.BAD_REQUEST.value(), "crs conversion failed", String.format("record-id: %s | %s", "tenant1:doc:15e790a69beb4d789b1f979e2af2e813", "crs conversion failed")); - } - - @Test - public void should_returnValidResponse_givenValidRecordQueryRequest_getRecordListByKind() throws Exception { - - RecordReindexRequest recordReindexRequest = RecordReindexRequest.builder().kind("tenant:test:test:1.0.0").cursor("100").build(); - - HttpResponse httpResponse = new HttpResponse(); - httpResponse.setBody(new Gson().toJson(recordReindexRequest, RecordReindexRequest.class)); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - RecordQueryResponse recordQueryResponse = this.sut.getRecordsByKind(recordReindexRequest); - - assertEquals("100", recordQueryResponse.getCursor()); - assertNull(recordQueryResponse.getResults()); - } - - @Test - public void should_returnValidResponse_givenValidKind_getSchemaByKind() throws Exception { - - String validSchemaFromStorage = "{" + - " \"kind\": \"tenant:test:test:1.0.0\"," + - " \"schema\": [" + - " {" + - " \"path\": \"msg\"," + - " \"kind\": \"string\"" + - " }," + - " {" + - " \"path\": \"references.entity\"," + - " \"kind\": \"string\"" + - " }" + - " ]," + - " \"ext\": null" + - "}"; - String kind = "tenant:test:test:1.0.0"; - - HttpResponse httpResponse = new HttpResponse(); - httpResponse.setResponseCode(HttpStatus.OK.value()); - httpResponse.setBody(validSchemaFromStorage); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - String recordSchemaResponse = this.sut.getStorageSchema(kind); - - assertNotNull(recordSchemaResponse); - } - - @Test - public void should_returnNullResponse_givenAbsentKind_getSchemaByKind() throws Exception { - - String kind = "tenant:test:test:1.0.0"; - - HttpResponse httpResponse = new HttpResponse(); - httpResponse.setResponseCode(HttpStatus.NOT_FOUND.value()); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - String recordSchemaResponse = this.sut.getStorageSchema(kind); - - assertNull(recordSchemaResponse); - } - - @Test - public void should_returnOneValidRecords_givenValidData_getValidStorageRecordsWithInvalidConversionTest() throws URISyntaxException { - - String validDataFromStorage = "{\"records\":[{\"id\":\"testid\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[\"invalid1\"],\"conversionStatuses\": [{\"id\":\"testid\",\"status\":\"ERROR\",\"errors\":[\"conversion error occurred\"] } ]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(validDataFromStorage); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - Records storageRecords = this.sut.getStorageRecords(ids); - - assertEquals(1, storageRecords.getRecords().size()); - - assertEquals(1, storageRecords.getConversionStatuses().get(0).getErrors().size()); - - assertEquals("conversion error occurred", storageRecords.getConversionStatuses().get(0).getErrors().get(0)); - } - - private void should_return404_getValidStorageRecordsTest() { - try { - this.sut.getStorageRecords(ids); - fail("Should throw exception"); - } catch (AppException e) { - assertEquals(HttpStatus.NOT_FOUND.value(), e.getError().getCode()); - } catch (Exception e) { - fail("Should not throw this exception" + e.getMessage()); - } - } -} diff --git a/provider/indexer-ibm/pom.xml b/provider/indexer-ibm/pom.xml index 787b45c6de6af106eb6b1f4e708b903bbffd5dbc..c550b2a8642e6352f4b85e926d491434d52b33d1 100644 --- a/provider/indexer-ibm/pom.xml +++ b/provider/indexer-ibm/pom.xml @@ -31,7 +31,7 @@ <packaging>jar</packaging> <properties> - <tomcat.embed.core.version>9.0.68</tomcat.embed.core.version> + <tomcat.embed.core.version>9.0.69</tomcat.embed.core.version> <os-core-lib-ibm.version>0.16.0-rc1</os-core-lib-ibm.version> <spring-webmvc.version>5.3.22</spring-webmvc.version> <jackson-databind.version>2.13.4.2</jackson-databind.version> @@ -87,7 +87,7 @@ <dependency> <groupId>io.netty</groupId> <artifactId>netty-codec</artifactId> - <version>4.1.68.Final</version> + <version>4.1.86.Final</version> </dependency> <dependency> <groupId>org.opengroup.osdu</groupId> diff --git a/provider/indexer-ibm/src/test/java/org/opengroup/osdu/indexer/ibm/service/StorageServiceTest.java b/provider/indexer-ibm/src/test/java/org/opengroup/osdu/indexer/ibm/service/StorageServiceTest.java deleted file mode 100644 index dfbdb4396509ef1678d845fa12f1b9515654c120..0000000000000000000000000000000000000000 --- a/provider/indexer-ibm/src/test/java/org/opengroup/osdu/indexer/ibm/service/StorageServiceTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/* Licensed Materials - Property of IBM */ -/* (c) Copyright IBM Corp. 2020. All Rights Reserved.*/ - -package org.opengroup.osdu.indexer.ibm.service; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.opengroup.osdu.core.common.model.indexer.IndexingStatus; -import org.opengroup.osdu.core.common.model.indexer.RecordInfo; -import org.opengroup.osdu.core.common.model.indexer.RecordQueryResponse; -import org.opengroup.osdu.core.common.model.indexer.RecordReindexRequest; -import org.opengroup.osdu.core.common.model.indexer.Records; -import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; -import org.opengroup.osdu.indexer.service.StorageServiceImpl; -import org.opengroup.osdu.core.common.model.indexer.JobStatus; -import org.opengroup.osdu.core.common.model.http.HttpResponse; -import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; -import org.opengroup.osdu.core.common.http.IUrlFetchService; -import org.opengroup.osdu.core.common.model.http.AppException; -import org.springframework.http.HttpStatus; -import org.springframework.test.context.junit4.SpringRunner; - -import java.lang.reflect.Type; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -import static java.util.Collections.singletonList; -import static org.junit.Assert.*; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.powermock.api.mockito.PowerMockito.when; - -@Ignore -@RunWith(SpringRunner.class) -public class StorageServiceTest { - - @Mock - private IUrlFetchService urlFetchService; - @Mock - private JobStatus jobStatus; - @Mock - private JaxRsDpsLog log; - @Mock - private IRequestInfo requestInfo; - @InjectMocks - private StorageServiceImpl sut; - - private List<String> ids; - private static final String RECORD_ID1 = "tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465"; - private static final String RECORDS_ID2 = "tenant1:doc:15e790a69beb4d789b1f979e2af2e813"; - - @Before - public void setup() { - - String recordChangedMessages = "[{\"id\":\"tenant1:doc:1dbf528e0e0549cab7a08f29fbfc8465\",\"kind\":\"tenant1:testindexer1528919679710:well:1.0.0\",\"op\":\"purge\"}," + - "{\"id\":\"tenant1:doc:15e790a69beb4d789b1f979e2af2e813\",\"kind\":\"tenant1:testindexer1528919679710:well:1.0.0\",\"op\":\"create\"}]"; - - when(this.requestInfo.getHeadersMap()).thenReturn(new HashMap<>()); - - Type listType = new TypeToken<List<RecordInfo>>() {}.getType(); - - List<RecordInfo> msgs = (new Gson()).fromJson(recordChangedMessages, listType); - jobStatus.initialize(msgs); - ids = Arrays.asList(RECORD_ID1, RECORDS_ID2); - } - - @Test - public void should_return404_givenNullData_getValidStorageRecordsTest() throws URISyntaxException { - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(null); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - should_return404_getValidStorageRecordsTest(); - } - - @Test - public void should_return404_givenEmptyData_getValidStorageRecordsTest() throws URISyntaxException { - - String emptyDataFromStorage = "{\"records\":[],\"notFound\":[]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(emptyDataFromStorage); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - should_return404_getValidStorageRecordsTest(); - } - - @Test - public void should_returnOneValidRecords_givenValidData_getValidStorageRecordsTest() throws URISyntaxException { - - String validDataFromStorage = "{\"records\":[{\"id\":\"testid\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[\"invalid1\"]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(validDataFromStorage); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - Records storageRecords = this.sut.getStorageRecords(ids); - - assertEquals(1, storageRecords.getRecords().size()); - } - - @Test - public void should_returnValidResponse_givenValidRecordQueryRequest_getRecordListByKind() throws Exception { - - RecordReindexRequest recordReindexRequest = RecordReindexRequest.builder().kind("tenant:test:test:1.0.0").cursor("100").build(); - - HttpResponse httpResponse = new HttpResponse(); - httpResponse.setBody(new Gson().toJson(recordReindexRequest, RecordReindexRequest.class)); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - RecordQueryResponse recordQueryResponse = this.sut.getRecordsByKind(recordReindexRequest); - - assertEquals("100", recordQueryResponse.getCursor()); - assertNull(recordQueryResponse.getResults()); - } - - @Test - public void should_returnValidJobStatus_givenFailedUnitsConversion_processRecordChangedMessageTest() throws URISyntaxException { - String validDataFromStorage = "{\"records\":[{\"id\":\"tenant1:doc:15e790a69beb4d789b1f979e2af2e813\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[],\"conversionStatuses\":[{\"id\":\"tenant1:doc:15e790a69beb4d789b1f979e2af2e813\",\"status\":\"ERROR\",\"errors\":[\"crs conversion failed\"]}]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(validDataFromStorage); - - when(this.urlFetchService.sendRequest(any())).thenReturn(httpResponse); - Records storageRecords = this.sut.getStorageRecords(singletonList("tenant1:doc:15e790a69beb4d789b1f979e2af2e813")); - - assertEquals(1, storageRecords.getRecords().size()); - verify(this.jobStatus).addOrUpdateRecordStatus(RECORDS_ID2, IndexingStatus.WARN, HttpStatus.BAD_REQUEST.value(), "crs conversion failed", String.format("record-id: %s | %s", "tenant1:doc:15e790a69beb4d789b1f979e2af2e813", "crs conversion failed")); - } - - @Test - public void should_returnValidResponse_givenValidKind_getSchemaByKind() throws Exception { - - String validSchemaFromStorage = "{" + - " \"kind\": \"tenant:test:test:1.0.0\"," + - " \"schema\": [" + - " {" + - " \"path\": \"msg\"," + - " \"kind\": \"string\"" + - " }," + - " {" + - " \"path\": \"references.entity\"," + - " \"kind\": \"string\"" + - " }" + - " ]," + - " \"ext\": null" + - "}"; - String kind = "tenant:test:test:1.0.0"; - - HttpResponse httpResponse = new HttpResponse(); - httpResponse.setResponseCode(HttpStatus.OK.value()); - httpResponse.setBody(validSchemaFromStorage); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - String recordSchemaResponse = this.sut.getStorageSchema(kind); - - assertNotNull(recordSchemaResponse); - } - - @Test - public void should_returnNullResponse_givenAbsentKind_getSchemaByKind() throws Exception { - - String kind = "tenant:test:test:1.0.0"; - - HttpResponse httpResponse = new HttpResponse(); - httpResponse.setResponseCode(HttpStatus.NOT_FOUND.value()); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - - String recordSchemaResponse = this.sut.getStorageSchema(kind); - - assertNull(recordSchemaResponse); - } - - @Test - public void should_returnOneValidRecords_givenValidData_getValidStorageRecordsWithInvalidConversionTest() throws URISyntaxException { - - String validDataFromStorage = "{\"records\":[{\"id\":\"testid\", \"version\":1, \"kind\":\"tenant:test:test:1.0.0\"}],\"notFound\":[\"invalid1\"],\"conversionStatuses\": [{\"id\":\"testid\",\"status\":\"ERROR\",\"errors\":[\"conversion error occurred\"] } ]}"; - - HttpResponse httpResponse = mock(HttpResponse.class); - Mockito.when(httpResponse.getBody()).thenReturn(validDataFromStorage); - - when(this.urlFetchService.sendRequest(ArgumentMatchers.any())).thenReturn(httpResponse); - Records storageRecords = this.sut.getStorageRecords(ids); - - assertEquals(1, storageRecords.getRecords().size()); - - assertEquals(1, storageRecords.getConversionStatuses().get(0).getErrors().size()); - - assertEquals("conversion error occurred", storageRecords.getConversionStatuses().get(0).getErrors().get(0)); - } - - private void should_return404_getValidStorageRecordsTest() { - try { - this.sut.getStorageRecords(ids); - fail("Should throw exception"); - } catch (AppException e) { - assertEquals(HttpStatus.NOT_FOUND, e.getError().getCode()); - } catch (Exception e) { - fail("Should not throw this exception" + e.getMessage()); - } - } -}