diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/AugmenterConfigurationServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/AugmenterConfigurationServiceImpl.java index e3cc866dd3c4c83db879341990eb2b9d482031af..e94cdbcf9c0f6649e9ea215fd07100ea0ccbd30a 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/AugmenterConfigurationServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/AugmenterConfigurationServiceImpl.java @@ -57,6 +57,7 @@ public class AugmenterConfigurationServiceImpl implements AugmenterConfiguration "nested(data.Configurations, nested(data.Configurations.Paths, (RelatedObjectsSpec.RelationshipDirection: ChildToParent AND RelatedObjectsSpec.RelatedObjectKind:\"%s\")))"; private static final String HAS_CONFIGURATIONS_QUERY_FORMAT = "data.Code: \"%s\" OR nested(data.Configurations, nested(data.Configurations.Paths, (RelatedObjectsSpec.RelatedObjectKind:\"%s\")))"; private static final int MAX_SEARCH_LIMIT = 1000; + private static final int MAX_TOTAL_COUNT = 10000; private static final String PROPERTY_DELIMITER = "."; private static final String NESTED_OBJECT_DELIMITER = "[]."; @@ -890,7 +891,7 @@ public class AugmenterConfigurationServiceImpl implements AugmenterConfiguration searchRequest.setQuery(query); searchRequest.setReturnedFields(Arrays.asList("kind", "id", "data." + ASSOCIATED_IDENTITIES_PROPERTY)); List<RecordInfo> recordInfos = new ArrayList<>(); - for (SearchRecord searchRecord : searchRecordsWithCursor(searchRequest)) { + for (SearchRecord searchRecord : searchRecords(searchRequest)) { Map<String, Object> data = searchRecord.getData(); if (!data.containsKey(ASSOCIATED_IDENTITIES_PROPERTY) || data.get(ASSOCIATED_IDENTITIES_PROPERTY) == null) continue; @@ -1056,7 +1057,7 @@ public class AugmenterConfigurationServiceImpl implements AugmenterConfiguration searchRequest.setKind(kind); String query = String.format("%s: \"%s\"", childrenObjectField, parentId); searchRequest.setQuery(query); - return searchRecordsWithCursor(searchRequest); + return searchRecords(searchRequest); } private List<SearchRecord> searchRecordsWithCursor(SearchRequest searchRequest) { @@ -1064,23 +1065,24 @@ public class AugmenterConfigurationServiceImpl implements AugmenterConfiguration List<SearchRecord> allRecords = new ArrayList<>(); try { List<SearchRecord> results = null; + String cursor = null; do { SearchResponse searchResponse = searchService.queryWithCursor(searchRequest); results = searchResponse.getResults(); + cursor = searchResponse.getCursor(); if (!CollectionUtils.isEmpty(results)) { allRecords.addAll(results); - if (!Strings.isNullOrEmpty(searchResponse.getCursor()) && results.size() == MAX_SEARCH_LIMIT) { - searchRequest.setCursor(searchResponse.getCursor()); + if (!Strings.isNullOrEmpty(cursor)) { + searchRequest.setCursor(cursor); } } - } while(results != null && results.size() == MAX_SEARCH_LIMIT); + } while(results != null && cursor != null); } catch (URISyntaxException e) { jaxRsDpsLog.error(SEARCH_GENERAL_ERROR, e); } return allRecords; } - // The search without cursor can return max. 10,000 records private List<SearchRecord> searchRecords(SearchRequest searchRequest) { searchRequest.setLimit(MAX_SEARCH_LIMIT); int offset = 0; @@ -1089,6 +1091,11 @@ public class AugmenterConfigurationServiceImpl implements AugmenterConfiguration List<SearchRecord> results = null; do { SearchResponse searchResponse = searchService.query(searchRequest); + if(searchResponse.getTotalCount() >= MAX_TOTAL_COUNT) { + // The search without cursor can return max. 10,000 records + // If the totalCount reaches 10,1000 records, we should switch to using query with cursor + return searchRecordsWithCursor(searchRequest); + } results = searchResponse.getResults(); if (!CollectionUtils.isEmpty(results)) { allRecords.addAll(results); diff --git a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/AugmenterConfigurationServiceImplTest.java b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/AugmenterConfigurationServiceImplTest.java index 034393238cfa364cba74eee89c49392c4c13b85a..6a1d09dc22c5a3e9b91052f7f7bf4453f42e4b3d 100644 --- a/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/AugmenterConfigurationServiceImplTest.java +++ b/indexer-core/src/test/java/org/opengroup/osdu/indexer/service/AugmenterConfigurationServiceImplTest.java @@ -215,7 +215,7 @@ public class AugmenterConfigurationServiceImplTest { List<SearchRecord> childrenRecords = gson.fromJson(jsonText, type); SearchResponse response = new SearchResponse(); response.setResults(childrenRecords); - when(this.searchService.queryWithCursor(any())).thenReturn(response); + when(this.searchService.query(any())).thenReturn(response); Map<String, Object> extendedProperties = this.sut.getExtendedProperties("anyId", originalDataMap, propertyConfigurations); Map<String, Object> expectedExtendedProperties = getDataMap("wellbore_extended_data.json"); @@ -763,6 +763,7 @@ public class AugmenterConfigurationServiceImplTest { Assert.assertEquals(1, infoList.size()); Assert.assertEquals(childKind, infoList.get(0).getKind()); Assert.assertEquals(childId, infoList.get(0).getId()); + verify(this.searchService,times(0)).queryWithCursor(any()); } @Test @@ -788,6 +789,7 @@ public class AugmenterConfigurationServiceImplTest { // Verify verify(this.indexerQueueTaskBuilder,times(0)).createWorkerTask(any(), any(), any()); + verify(this.searchService,times(0)).queryWithCursor(any()); } @Test @@ -806,6 +808,7 @@ public class AugmenterConfigurationServiceImplTest { // Verify verify(this.indexerQueueTaskBuilder,times(0)).createWorkerTask(any(), any(), any()); + verify(this.searchService,times(0)).queryWithCursor(any()); } private void updateAssociatedRecords_updateAssociatedChildrenRecords_baseSetup() throws URISyntaxException { @@ -831,6 +834,133 @@ public class AugmenterConfigurationServiceImplTest { // No result } } + else if(searchRequest.getKind().toString().contains("osdu:wks:master-data--Well:1.")) { + // Return of searchUniqueParentIds(...) + SearchRecord searchRecord = new SearchRecord(); + Map<String, Object> childDataMap = new HashMap<>(); + childDataMap.put("AssociatedIdentities", Arrays.asList(parentId)); + searchRecord.setKind(childKind); + searchRecord.setId(childId); + searchRecord.setData(childDataMap); + searchResponse.setResults(Arrays.asList(searchRecord)); + } + else { + // This branch is a setup for test case: + // updateAssociatedRecords_updateAssociatedChildrenRecords_circularIndexing + throw new Exception("Unexpected search"); + } + return searchResponse; + }); + + // setup headers + DpsHeaders dpsHeaders = new DpsHeaders(); + dpsHeaders.put(DpsHeaders.AUTHORIZATION, "testAuth"); + dpsHeaders.put(DpsHeaders.DATA_PARTITION_ID, "opendes"); + dpsHeaders.put(DpsHeaders.CORRELATION_ID, "123"); + when(this.requestInfo.getHeadersWithDwdAuthZ()).thenReturn(dpsHeaders); + } + + @Test + public void updateAssociatedRecords_updateAssociatedChildrenRecords_with_cursor_query_for_updated_parentRecord_with_extendedPropertyChanged() throws URISyntaxException { + updateAssociatedRecords_updateAssociatedChildrenRecords_with_query_by_cursor_baseSetup(); + + RecordChangeInfo recordChangeInfo = new RecordChangeInfo(); + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setKind(parentKind); + recordInfo.setId(parentId); + recordInfo.setOp(OperationType.update.getValue()); + recordChangeInfo.setRecordInfo(recordInfo); + recordChangeInfo.setUpdatedProperties(Arrays.asList("GeoPoliticalEntityName")); + when(this.recordChangeInfoCache.get(any())).thenReturn(recordChangeInfo); + + // Test + RecordChangedMessages recordChangedMessages = new RecordChangedMessages(); + recordChangedMessages.setAttributes(new HashMap<>()); + Map<String, List<String>> upsertKindIds = new HashMap<>(); + Map<String, List<String>> deleteKindIds = new HashMap<>(); + upsertKindIds.put(parentKind, Arrays.asList(parentId)); + this.sut.updateAssociatedRecords(recordChangedMessages, upsertKindIds, deleteKindIds); + + // Verify + ArgumentCaptor<String> payloadArgumentCaptor = ArgumentCaptor.forClass(String.class); + verify(this.indexerQueueTaskBuilder,times(1)).createWorkerTask(payloadArgumentCaptor.capture(), any(), any()); + + RecordChangedMessages newMessages = gson.fromJson(payloadArgumentCaptor.getValue(), RecordChangedMessages.class); + Type type = new TypeToken<List<RecordInfo>>() {}.getType(); + List<RecordInfo> infoList = gson.fromJson(newMessages.getData(), type); + Assert.assertEquals(parentKind, newMessages.getAttributes().get(Constants.ANCESTRY_KINDS)); + Assert.assertEquals(1, infoList.size()); + Assert.assertEquals(childKind, infoList.get(0).getKind()); + Assert.assertEquals(childId, infoList.get(0).getId()); + + verify(this.searchService,times(1)).queryWithCursor(any()); + } + + @Test + public void updateAssociatedRecords_updateAssociatedChildrenRecords_with_cursor_query_for_updated_parentRecord_without_extendedPropertyChanged() throws URISyntaxException { + updateAssociatedRecords_updateAssociatedChildrenRecords_with_query_by_cursor_baseSetup(); + + RecordChangeInfo recordChangeInfo = new RecordChangeInfo(); + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setKind(parentKind); + recordInfo.setId(parentId); + recordInfo.setOp(OperationType.update.getValue()); + recordChangeInfo.setRecordInfo(recordInfo); + recordChangeInfo.setUpdatedProperties(Arrays.asList("abc")); + when(this.recordChangeInfoCache.get(any())).thenReturn(recordChangeInfo); + + // Test + RecordChangedMessages recordChangedMessages = new RecordChangedMessages(); + recordChangedMessages.setAttributes(new HashMap<>()); + Map<String, List<String>> upsertKindIds = new HashMap<>(); + Map<String, List<String>> deleteKindIds = new HashMap<>(); + upsertKindIds.put(parentKind, Arrays.asList(parentId)); + this.sut.updateAssociatedRecords(recordChangedMessages, upsertKindIds, deleteKindIds); + + // Verify + verify(this.indexerQueueTaskBuilder,times(0)).createWorkerTask(any(), any(), any()); + verify(this.searchService,times(1)).queryWithCursor(any()); + } + + private void updateAssociatedRecords_updateAssociatedChildrenRecords_with_query_by_cursor_baseSetup() throws URISyntaxException { + childKind = "osdu:wks:master-data--Well:1.0.0"; + childId = "anyChildId"; + parentKind = "osdu:wks:master-data--GeoPoliticalEntity:1.0.0"; + parentId = "anyParentId"; + + // Setup search response for searchService.query(...) + when(this.searchService.query(any())).thenAnswer(invocation -> { + SearchRequest searchRequest = invocation.getArgument(0); + SearchResponse searchResponse = new SearchResponse(); + if (searchRequest.getKind().toString().equals(augmenterConfigurationKind)) { + if (searchRequest.getQuery().contains("ChildToParent") || searchRequest.getQuery().contains("data.Code:")) { + // Return of getParentChildRelatedObjectsSpecs(...) or + // getPropertyConfigurations(...) + Map<String, Object> dataMap = getDataMap("well_configuration_record.json"); + SearchRecord searchRecord = new SearchRecord(); + searchRecord.setData(dataMap); + searchResponse.setResults(Arrays.asList(searchRecord)); + } else { + // Search ParentToChildren + // No result + } + } + else if(searchRequest.getKind().toString().contains("osdu:wks:master-data--Well:1.")) { + // Return of searchUniqueParentIds(...) + SearchRecord searchRecord = new SearchRecord(); + Map<String, Object> childDataMap = new HashMap<>(); + childDataMap.put("AssociatedIdentities", Arrays.asList(parentId)); + searchRecord.setKind(childKind); + searchRecord.setId(childId); + searchRecord.setData(childDataMap); + searchResponse.setResults(Arrays.asList(searchRecord)); + searchResponse.setTotalCount(10000); + } + else { + // This branch is a setup for test case: + // updateAssociatedRecords_updateAssociatedChildrenRecords_circularIndexing + throw new Exception("Unexpected search"); + } return searchResponse; });