diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/cache/RecordChangeInfoCache.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/cache/RecordChangeInfoCache.java new file mode 100644 index 0000000000000000000000000000000000000000..67aa01f36c1c9d84c1b47ea44819ca074efaee08 --- /dev/null +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/cache/RecordChangeInfoCache.java @@ -0,0 +1,27 @@ +/* + * Copyright © Schlumberger + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.cache; + +import org.opengroup.osdu.core.common.cache.VmCache; +import org.opengroup.osdu.indexer.model.RecordChangeInfo; +import org.springframework.stereotype.Component; + +@Component +public class RecordChangeInfoCache extends VmCache<String, RecordChangeInfo> { + public RecordChangeInfoCache() { + super(300, 1000); + } +} diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/RecordChangeInfo.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/RecordChangeInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..94dc1c62ed435796310e3545dabb91282d20cfba --- /dev/null +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/RecordChangeInfo.java @@ -0,0 +1,27 @@ +/* + * Copyright © Schlumberger + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.opengroup.osdu.indexer.model; + +import lombok.Data; +import org.opengroup.osdu.core.common.model.indexer.RecordInfo; + +import java.util.List; + +@Data +public class RecordChangeInfo { + private List<String> updatedProperties; + private RecordInfo recordInfo; +} diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/SchemaIdentity.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/SchemaIdentity.java index 2d616989b1a45e76f6625ca90da249afdd41c48d..0ad77f4229ce06827993d2b4090bd7da3c54ecd6 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/SchemaIdentity.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/SchemaIdentity.java @@ -1,3 +1,18 @@ +/* + * Copyright © Schlumberger + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.opengroup.osdu.indexer.model; import com.fasterxml.jackson.annotation.JsonInclude; diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/SchemaInfo.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/SchemaInfo.java index 5fd31f7e8f6b111c1824047b7726558b7413a3e0..3cfa9ceb8cdb3e5378922b7303347134a662b6bf 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/SchemaInfo.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/SchemaInfo.java @@ -1,3 +1,18 @@ +/* + * Copyright © Schlumberger + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.opengroup.osdu.indexer.model; import lombok.Data; diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/SchemaInfoResponse.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/SchemaInfoResponse.java index dd16652aab4e0dc6370bb4427b78067063c753cd..088d1a07b05d5e4a1c8d6883a87f93366dec8775 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/SchemaInfoResponse.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/SchemaInfoResponse.java @@ -1,3 +1,18 @@ +/* + * Copyright © Schlumberger + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.opengroup.osdu.indexer.model; import lombok.Data; diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/indexproperty/ParentChildRelatedObjectsSpec.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/indexproperty/ParentChildRelatedObjectsSpec.java index 67d28cd9901f237fc0385ae0c8f8d2e3f0bfc55a..837d27600b8e66078b81f4a01e05a62a4d26471f 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/indexproperty/ParentChildRelatedObjectsSpec.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/model/indexproperty/ParentChildRelatedObjectsSpec.java @@ -17,11 +17,19 @@ package org.opengroup.osdu.indexer.model.indexproperty; import lombok.Data; +import java.util.ArrayList; +import java.util.List; + @Data public class ParentChildRelatedObjectsSpec { private String parentKind; - private String parentObjectId; + private String parentObjectIdPath; private String childKind; + private List<String> childValuePaths; + + public ParentChildRelatedObjectsSpec() { + childValuePaths = new ArrayList<>(); + } @Override public boolean equals(Object another) { @@ -39,7 +47,7 @@ public class ParentChildRelatedObjectsSpec { stringBuilder.append("<>"); stringBuilder.append((childKind != null)? childKind : "__"); stringBuilder.append("<>"); - stringBuilder.append((parentObjectId != null)? parentObjectId : "__"); + stringBuilder.append((parentObjectIdPath != null)? parentObjectIdPath : "__"); return stringBuilder.toString().hashCode(); } } diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index 9f6c737f43f89178a31ca8b41c388dc7de91d207..573b84e35973caf72fb3a7af5741a354ad5177f6 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -42,7 +42,6 @@ import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; import org.opengroup.osdu.core.common.model.search.RecordMetaAttribute; import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver; -import org.opengroup.osdu.indexer.cache.RelatedObjectCache; import org.opengroup.osdu.indexer.logging.AuditLogger; import org.opengroup.osdu.indexer.model.indexproperty.PropertyConfigurations; import org.opengroup.osdu.indexer.provider.interfaces.IPublisher; @@ -98,8 +97,6 @@ public class IndexerServiceImpl implements IndexerService { private JobStatus jobStatus; @Inject private PropertyConfigurationsService propertyConfigurationsService; - @Inject - private RelatedObjectCache relatedObjectCache; private DpsHeaders headers; @@ -148,9 +145,10 @@ public class IndexerServiceImpl implements IndexerService { retryAndEnqueueFailedRecords(recordInfos, retryRecordIds, message); } - Map<String, List<String>> processedKindIds = getProcessedKindIds(upsertRecordMap, deleteRecordMap, retryRecordIds); - if (!processedKindIds.isEmpty()) { - propertyConfigurationsService.updateAssociatedRecords(message, processedKindIds); + Map<String, List<String>> upsertKindIds = getUpsertKindIds(upsertRecordMap, retryRecordIds); + Map<String, List<String>> deleteKindIds = getDeleteRecordKindIds(deleteRecordMap, retryRecordIds); + if (!upsertKindIds.isEmpty() || !deleteKindIds.isEmpty()) { + propertyConfigurationsService.updateAssociatedRecords(message, upsertKindIds, deleteKindIds); } } catch (IOException e) { errorMessage = e.getMessage(); @@ -186,28 +184,32 @@ public class IndexerServiceImpl implements IndexerService { } } - private Map<String, List<String>> getProcessedKindIds(Map<String, Map<String, OperationType>> upsertRecordMap, - Map<String, List<String>> deleteRecordMap, List<String> retryRecordIds) { - Map<String, List<String>> processedKindIdsMap = new HashMap<>(); + private Map<String, List<String>> getUpsertKindIds(Map<String, Map<String, OperationType>> upsertRecordMap, List<String> retryRecordIds) { + Map<String, List<String>> upsertKindIds = new HashMap<>(); for (Map.Entry<String, Map<String, OperationType>> entry : upsertRecordMap.entrySet()) { String kind = entry.getKey(); - List<String> ids = processedKindIdsMap.containsKey(kind) ? processedKindIdsMap.get(kind) : new ArrayList<>(); + List<String> ids = upsertKindIds.containsKey(kind) ? upsertKindIds.get(kind) : new ArrayList<>(); List<String> processedIds = entry.getValue().keySet().stream().filter(id -> !retryRecordIds.contains(id)).collect(Collectors.toList()); ids.addAll(processedIds); if (!ids.isEmpty()) { - processedKindIdsMap.put(kind, ids); + upsertKindIds.put(kind, ids); } } + return upsertKindIds; + } + + private Map<String, List<String>> getDeleteRecordKindIds(Map<String, List<String>> deleteRecordMap, List<String> retryRecordIds) { + Map<String, List<String>> deletedRecordKindIdsMap = new HashMap<>(); for (Map.Entry<String, List<String>> entry : deleteRecordMap.entrySet()) { String kind = entry.getKey(); - List<String> ids = processedKindIdsMap.containsKey(kind) ? processedKindIdsMap.get(kind) : new ArrayList<>(); + List<String> ids = deletedRecordKindIdsMap.containsKey(kind) ? deletedRecordKindIdsMap.get(kind) : new ArrayList<>(); List<String> processedIds = entry.getValue().stream().filter(id -> !retryRecordIds.contains(id)).collect(Collectors.toList()); ids.addAll(processedIds); if (!ids.isEmpty()) { - processedKindIdsMap.put(kind, ids); + deletedRecordKindIdsMap.put(kind, ids); } } - return processedKindIdsMap; + return deletedRecordKindIdsMap; } private void processSchemaEvents(RestHighLevelClient restClient, @@ -334,7 +336,7 @@ public class IndexerServiceImpl implements IndexerService { dataMap = mergeDataFromPropertyConfiguration(storageRecord.getId(), dataMap, propertyConfigurations); } // We cache the dataMap in case the update of this object will trigger update of the related objects. - relatedObjectCache.put(storageRecord.getId(), dataMap); + propertyConfigurationsService.cacheDataRecord(storageRecord.getId(), storageRecord.getKind(), dataMap); document.setData(dataMap); } diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/PropertyConfigurationsService.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/PropertyConfigurationsService.java index 1a1bb739cba3942e87cd35e90af879f3d31107ea..babf916c56c834e8baa7004eaaf38205236efcc9 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/PropertyConfigurationsService.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/PropertyConfigurationsService.java @@ -32,5 +32,7 @@ public interface PropertyConfigurationsService { String resolveConcreteKind(String kind); - void updateAssociatedRecords(RecordChangedMessages message, Map<String, List<String>> processedKindIdsMap); + void cacheDataRecord(String recordId, String kind, Map<String, Object> dataMap); + + void updateAssociatedRecords(RecordChangedMessages message, Map<String, List<String>> upsertKindIds, Map<String, List<String>> deleteKindIds); } diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/PropertyConfigurationsServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/PropertyConfigurationsServiceImpl.java index 6ba31a75d3b957e3212bf8a794937281eba11713..ca8210d41f14393f0f2571a2c50e4fe20f77abaf 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/PropertyConfigurationsServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/PropertyConfigurationsServiceImpl.java @@ -27,10 +27,7 @@ import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; import org.opengroup.osdu.core.common.model.storage.Schema; import org.opengroup.osdu.core.common.model.storage.SchemaItem; import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; -import org.opengroup.osdu.indexer.cache.KindCache; -import org.opengroup.osdu.indexer.cache.ParentChildRelatedObjectsSpecsCache; -import org.opengroup.osdu.indexer.cache.PropertyConfigurationsCache; -import org.opengroup.osdu.indexer.cache.RelatedObjectCache; +import org.opengroup.osdu.indexer.cache.*; import org.opengroup.osdu.indexer.config.IndexerConfigurationProperties; import org.opengroup.osdu.indexer.model.*; import org.opengroup.osdu.indexer.model.indexproperty.*; @@ -80,6 +77,8 @@ public class PropertyConfigurationsServiceImpl implements PropertyConfigurations @Inject private RelatedObjectCache relatedObjectCache; @Inject + private RecordChangeInfoCache recordChangeInfoCache; + @Inject private SearchService searchService; @Inject private SchemaService schemaService; @@ -244,13 +243,40 @@ public class PropertyConfigurationsServiceImpl implements PropertyConfigurations } @Override - public void updateAssociatedRecords(RecordChangedMessages message, Map<String, List<String>> processedKindIdsMap) { - if (processedKindIdsMap == null || processedKindIdsMap.isEmpty()) + public void cacheDataRecord(String recordId, String kind, Map<String, Object> dataMap) { + Map<String, Object> previousDataMap = this.getRelatedObjectData(kind, recordId); + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setId(recordId); + recordInfo.setKind(kind); + RecordChangeInfo changedInfo = new RecordChangeInfo(); + changedInfo.setRecordInfo(recordInfo); + if (previousDataMap == null || previousDataMap.isEmpty()) { + recordInfo.setOp(OperationType.create.getValue()); + } else { + recordInfo.setOp(OperationType.update.getValue()); + List<String> changedProperties = PropertyUtil.getChangedProperties(previousDataMap, dataMap); + changedInfo.setUpdatedProperties(changedProperties); + } + recordChangeInfoCache.put(recordId, changedInfo); + relatedObjectCache.put(recordId, dataMap); + } + + @Override + public void updateAssociatedRecords(RecordChangedMessages message, Map<String, List<String>> upsertKindIds, Map<String, List<String>> deleteKindIds) { + if ((upsertKindIds == null || upsertKindIds.isEmpty()) && (deleteKindIds == null || deleteKindIds.isEmpty())) { return; + } + if (upsertKindIds == null) { + upsertKindIds = new HashMap<>(); + } + if (deleteKindIds == null) { + deleteKindIds = new HashMap<>(); + } Map<String, String> attributes = message.getAttributes(); final String ancestors = attributes.containsKey(Constants.ANCESTRY_KINDS) ? attributes.get(Constants.ANCESTRY_KINDS) : ""; - for (Map.Entry<String, List<String>> entry : processedKindIdsMap.entrySet()) { + Map<String, List<RecordChangeInfo>> recordChangeInfoMap = createRecordChangeInfoMap(upsertKindIds, deleteKindIds); + for (Map.Entry<String, List<RecordChangeInfo>> entry : recordChangeInfoMap.entrySet()) { String kind = entry.getKey(); String updatedAncestors = ancestors.isEmpty() ? kind : ancestors + ANCESTRY_KINDS_DELIMITER + kind; @@ -311,6 +337,53 @@ public class PropertyConfigurationsServiceImpl implements PropertyConfigurations return relatedObject; } + private Map<String, List<RecordChangeInfo>> createRecordChangeInfoMap(Map<String, List<String>> upsertKindIds, Map<String, List<String>> deleteKindIds) { + Map<String, List<RecordChangeInfo>> recordChangeInfoMap = new HashMap<>(); + for (Map.Entry<String, List<String>> entry : upsertKindIds.entrySet()) { + String kind = entry.getKey(); + List<RecordChangeInfo> recordChangeInfoList; + if (recordChangeInfoMap.containsKey(kind)) { + recordChangeInfoList = recordChangeInfoMap.get(kind); + } else { + recordChangeInfoList = new ArrayList<>(); + recordChangeInfoMap.put(kind, recordChangeInfoList); + } + + for (String id : entry.getValue()) { + RecordChangeInfo changeInfo = recordChangeInfoCache.get(id); + if (changeInfo == null) { + changeInfo = new RecordChangeInfo(); + changeInfo.setRecordInfo(new RecordInfo()); + changeInfo.getRecordInfo().setKind(kind); + changeInfo.getRecordInfo().setId(id); + changeInfo.getRecordInfo().setOp(OperationType.create.getValue()); + } + recordChangeInfoList.add(changeInfo); + } + } + for (Map.Entry<String, List<String>> entry : deleteKindIds.entrySet()) { + String kind = entry.getKey(); + List<RecordChangeInfo> recordChangeInfoList; + if (recordChangeInfoMap.containsKey(kind)) { + recordChangeInfoList = recordChangeInfoMap.get(kind); + } else { + recordChangeInfoList = new ArrayList<>(); + recordChangeInfoMap.put(kind, recordChangeInfoList); + } + + for (String id : entry.getValue()) { + RecordChangeInfo changeInfo = new RecordChangeInfo(); + changeInfo.setRecordInfo(new RecordInfo()); + changeInfo.getRecordInfo().setKind(kind); + changeInfo.getRecordInfo().setId(id); + changeInfo.getRecordInfo().setOp(OperationType.delete.getValue()); + recordChangeInfoList.add(changeInfo); + } + } + + return recordChangeInfoMap; + } + private List<SchemaItem> getExtendedSchemaItems(Schema originalSchema, PropertyConfiguration configuration, PropertyPath propertyPath) { String relatedPropertyPath = PropertyUtil.removeDataPrefix(propertyPath.getValueExtraction().getValuePath()); if (relatedPropertyPath.contains(ARRAY_SYMBOL)) { // Nested @@ -501,7 +574,7 @@ public class PropertyConfigurationsServiceImpl implements PropertyConfigurations List<ParentChildRelatedObjectsSpec> specsList = parentChildRelatedObjectsSpecsCache.get(key); if (specsList == null) { - Set<ParentChildRelatedObjectsSpec> specs = new HashSet<>(); + Map<Integer, ParentChildRelatedObjectsSpec> specs = new HashMap<>(); List<PropertyConfigurations> configurationsList = searchParentKindConfigurations((kindWithMajor)); for (PropertyConfigurations configurations : configurationsList) { for (PropertyConfiguration configuration : configurations.getConfigurations()) { @@ -513,28 +586,34 @@ public class PropertyConfigurationsServiceImpl implements PropertyConfigurations if (propertyPath != null) { ParentChildRelatedObjectsSpec spec = new ParentChildRelatedObjectsSpec(); spec.setParentKind(configurations.getCode()); + spec.setParentObjectIdPath(propertyPath.getRelatedObjectsSpec().getRelatedObjectID()); spec.setChildKind(kindWithMajor); - spec.setParentObjectId(propertyPath.getRelatedObjectsSpec().getRelatedObjectID()); - specs.add(spec); + spec.getChildValuePaths().add(propertyPath.getValueExtraction().getValuePath()); + if (specs.containsKey(spec.hashCode())) { + specs.get(spec.hashCode()).getChildValuePaths().add(propertyPath.getValueExtraction().getValuePath()); + } else { + specs.put(spec.hashCode(), spec); + } } } } - specsList = new ArrayList<>(specs); + specsList = new ArrayList<>(specs.values()); parentChildRelatedObjectsSpecsCache.put(key, specsList); } return specsList; } - private void updateAssociatedParentRecords(String ancestors, String childKind, List<String> processedIds) { - if (processedIds == null || processedIds.isEmpty()) - return; - + private void updateAssociatedParentRecords(String ancestors, String childKind, List<RecordChangeInfo> childRecordChangeInfos) { List<ParentChildRelatedObjectsSpec> specList = getParentChildRelatedObjectsSpecs(childKind); Set ancestorSet = new HashSet<>(Arrays.asList(ancestors.split(ANCESTRY_KINDS_DELIMITER))); for (ParentChildRelatedObjectsSpec spec : specList) { - List<String> parentIds = searchUniqueParentIds(childKind, processedIds, spec.getParentObjectId()); + List childRecordIds = filterChildRecordIds(spec, childRecordChangeInfos); + if (childRecordIds.isEmpty()) + continue; + + List<String> parentIds = searchUniqueParentIds(childKind, childRecordIds, spec.getParentObjectIdPath()); if (parentIds.isEmpty()) continue; @@ -564,10 +643,64 @@ public class PropertyConfigurationsServiceImpl implements PropertyConfigurations } } - private void updateAssociatedChildrenRecords(String ancestors, List<String> processedIds) { - if (processedIds == null || processedIds.isEmpty()) - return; + private List<String> filterChildRecordIds(ParentChildRelatedObjectsSpec spec, List<RecordChangeInfo> childRecordChangeInfos) { + List<String> childRecordIds = new ArrayList<>(); + for (RecordChangeInfo recordChangeInfo : childRecordChangeInfos) { + if (recordChangeInfo.getRecordInfo().getOp().equals(OperationType.create.getValue()) || + recordChangeInfo.getRecordInfo().getOp().equals(OperationType.delete.getValue())) { + childRecordIds.add(recordChangeInfo.getRecordInfo().getId()); + } else if (recordChangeInfo.getRecordInfo().getOp().equals(OperationType.update.getValue())) { + List<String> updatedProperties = recordChangeInfo.getUpdatedProperties().stream().filter(updatedProperty -> { + for (String valuePath : spec.getChildValuePaths()) { + if (PropertyUtil.isPropertyPathMatched(valuePath, updatedProperty) || PropertyUtil.isPropertyPathMatched(updatedProperty, valuePath)) { + return true; + } + } + return false; + }).collect(Collectors.toList()); + if (!updatedProperties.isEmpty()) { + childRecordIds.add(recordChangeInfo.getRecordInfo().getId()); + } + } + } + return childRecordIds; + } + private boolean areExtendedPropertiesChanged(String childKind, List<RecordChangeInfo> parentRecordChangeInfos) { + if (parentRecordChangeInfos.stream().filter(info -> !info.getRecordInfo().getOp().equals(OperationType.update.getValue())).findFirst().orElse(null) != null) { + // If there is any OP of the parent record(s) that is not OperationType.update. It must be OperationType.delete in this case. Then the child record should be updated + return true; + } + + PropertyConfigurations propertyConfigurations = this.getPropertyConfiguration(childKind); + if(propertyConfigurations != null) { + for (PropertyConfiguration propertyConfiguration : propertyConfigurations.getConfigurations()) { + for (PropertyPath propertyPath : propertyConfiguration.getPaths().stream().filter( + p -> p.hasValidValueExtraction() && p.hasValidRelatedObjectsSpec()).collect(Collectors.toList())) { + String relatedObjectKind = propertyPath.getRelatedObjectsSpec().getRelatedObjectKind(); + String valuePath = PropertyUtil.removeDataPrefix(propertyPath.getValueExtraction().getValuePath()); + + // Find any parent record which has changed property that is extended by the child (kind) + RecordChangeInfo parentRecordChangeInfo = parentRecordChangeInfos.stream().filter(info -> { + if (PropertyUtil.areMajorKindsSame(info.getRecordInfo().getKind(), relatedObjectKind)) { + List<String> matchedProperties = info.getUpdatedProperties().stream().filter( + p -> PropertyUtil.isPropertyPathMatched(p, valuePath) || PropertyUtil.isPropertyPathMatched(valuePath, p)).collect(Collectors.toList()); + return !matchedProperties.isEmpty(); + } + return false; + }).findFirst().orElse(null); + if (parentRecordChangeInfo != null) { + return true; + } + } + } + } + + return false; + } + + private void updateAssociatedChildrenRecords(String ancestors, List<RecordChangeInfo> recordChangeInfos) { + List<String> processedIds = recordChangeInfos.stream().map(recordChangeInfo -> recordChangeInfo.getRecordInfo().getId()).collect(Collectors.toList()); String query = String.format("data.%s:(%s)", ASSOCIATED_IDENTITIES_PROPERTY, createIdsFilter(processedIds)); String kind = WILD_CARD_KIND; for (String ancestryKind : ancestors.split(ANCESTRY_KINDS_DELIMITER)) { @@ -581,18 +714,27 @@ public class PropertyConfigurationsServiceImpl implements PropertyConfigurations SearchRequest searchRequest = new SearchRequest(); searchRequest.setKind(kind); searchRequest.setQuery(query); - searchRequest.setReturnedFields(Arrays.asList("kind", "id")); + searchRequest.setReturnedFields(Arrays.asList("kind", "id", "data." + ASSOCIATED_IDENTITIES_PROPERTY)); List<RecordInfo> recordInfos = new ArrayList<>(); for (SearchRecord record : searchAllRecords(searchRequest)) { - RecordInfo recordInfo = new RecordInfo(); - recordInfo.setKind(record.getKind()); - recordInfo.setId(record.getId()); - recordInfo.setOp(OperationType.update.getValue()); - recordInfos.add(recordInfo); + Map<String, Object> data = record.getData(); + if (!data.containsKey(ASSOCIATED_IDENTITIES_PROPERTY) || data.get(ASSOCIATED_IDENTITIES_PROPERTY) == null) + continue; - if (recordInfos.size() >= limit) { - createWorkerTask(ancestors, recordInfos); - recordInfos = new ArrayList<>(); + List<String> associatedParentIds = (List<String>) data.get(ASSOCIATED_IDENTITIES_PROPERTY); + List<RecordChangeInfo> associatedParentRecordChangeInfos = recordChangeInfos.stream().filter( + info -> associatedParentIds.contains(info.getRecordInfo().getId())).collect(Collectors.toList()); + if (areExtendedPropertiesChanged(record.getKind(), associatedParentRecordChangeInfos)) { + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setKind(record.getKind()); + recordInfo.setId(record.getId()); + recordInfo.setOp(OperationType.update.getValue()); + recordInfos.add(recordInfo); + + if (recordInfos.size() >= limit) { + createWorkerTask(ancestors, recordInfos); + recordInfos = new ArrayList<>(); + } } } if (!recordInfos.isEmpty()) { @@ -691,17 +833,17 @@ public class PropertyConfigurationsServiceImpl implements PropertyConfigurations return kindIds; } - private List<String> searchUniqueParentIds(String childKind, List<String> childRecordIds, String parentObjectId) { + private List<String> searchUniqueParentIds(String childKind, List<String> childRecordIds, String parentObjectIdPath) { Set<String> parentIds = new HashSet<>(); SearchRequest searchRequest = new SearchRequest(); searchRequest.setKind(childKind); String query = String.format("id: (%s)", createIdsFilter(childRecordIds)); - searchRequest.setReturnedFields(Arrays.asList(parentObjectId)); + searchRequest.setReturnedFields(Arrays.asList(parentObjectIdPath)); searchRequest.setQuery(query); - parentObjectId = PropertyUtil.removeDataPrefix(parentObjectId); + parentObjectIdPath = PropertyUtil.removeDataPrefix(parentObjectIdPath); for (SearchRecord record : searchAllRecords(searchRequest)) { - if (record.getData().containsKey(parentObjectId)) { - Object id = record.getData().get(parentObjectId); + if (record.getData().containsKey(parentObjectIdPath)) { + Object id = record.getData().get(parentObjectIdPath); if (id != null && !parentIds.contains(id)) { parentIds.add(id.toString()); } @@ -726,15 +868,18 @@ public class PropertyConfigurationsServiceImpl implements PropertyConfigurations searchRequest.setLimit(MAX_SEARCH_LIMIT); List<SearchRecord> allRecords = new ArrayList<>(); boolean done = false; + int offset = 0; try { while (!done) { - SearchResponse searchResponse = searchService.queryWithCursor(searchRequest); + searchRequest.setOffset(offset); + SearchResponse searchResponse = searchService.query(searchRequest); List<SearchRecord> results = searchResponse.getResults(); - if (results != null) { + if (results != null && results.size() > 0) { allRecords.addAll(results); } - if (searchResponse.getCursor() != null && results.size() == MAX_SEARCH_LIMIT) { - searchRequest.setCursor(searchResponse.getCursor()); + + if (results != null && results.size() == MAX_SEARCH_LIMIT) { + offset += MAX_SEARCH_LIMIT; } else { done = true; } @@ -745,6 +890,30 @@ public class PropertyConfigurationsServiceImpl implements PropertyConfigurations return allRecords; } + //TODO: need to find out why the queryWithCursor is unstable +// private List<SearchRecord> searchAllRecords(SearchRequest searchRequest) { +// searchRequest.setLimit(MAX_SEARCH_LIMIT); +// List<SearchRecord> allRecords = new ArrayList<>(); +// boolean done = false; +// try { +// while (!done) { +// SearchResponse searchResponse = searchService.queryWithCursor(searchRequest); +// List<SearchRecord> results = searchResponse.getResults(); +// if (results != null) { +// allRecords.addAll(results); +// } +// if (searchResponse.getCursor() != null && results.size() == MAX_SEARCH_LIMIT) { +// searchRequest.setCursor(searchResponse.getCursor()); +// } else { +// done = true; +// } +// } +// } catch (URISyntaxException e) { +// jaxRsDpsLog.error("Failed to call search service.", e); +// } +// return allRecords; +// } + private SearchRecord searchFirstRecord(SearchRequest searchRequest) { searchRequest.setLimit(1); try { diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/util/PropertyUtil.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/util/PropertyUtil.java index d26ab76941e279986c054190604333d86ea374ec..ab0453e6ba6603f66e02998b0f1290def886ba2a 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/util/PropertyUtil.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/util/PropertyUtil.java @@ -18,6 +18,7 @@ package org.opengroup.osdu.indexer.util; import com.google.api.client.util.Strings; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; +import org.opengroup.osdu.indexer.model.Kind; import java.util.*; @@ -41,6 +42,25 @@ public class PropertyUtil { return !Strings.isNullOrEmpty(propertyPath) && (propertyPath.startsWith(parentPropertyPath + PROPERTY_DELIMITER) || propertyPath.equals(parentPropertyPath)); } + public static boolean areMajorKindsSame(String leftKind, String rightKind) { + try { + Kind left = new Kind(leftKind); + Kind right = new Kind(rightKind); + + String[] leftVersions = left.getVersion().split("\\."); + String[] rightVersions = right.getVersion().split("\\."); + return left.getAuthority().equals(right.getAuthority()) && + left.getSource().equals(right.getSource()) && + left.getType().equals(right.getType()) && + leftVersions.length >= 1 && + rightVersions.length >= 1 && + leftVersions[0].equals(rightVersions[0]); + } + catch(Exception ex) { + return false; + } + } + public static String removeDataPrefix(String path) { if (!Strings.isNullOrEmpty(path) && path.startsWith(DATA_PREFIX)) return path.substring(DATA_PREFIX.length());