Skip to content
Snippets Groups Projects

add validation and ignore out of order message when kind is updated

Merged Neelesh Thakur requested to merge ignore-outdated-messages into master
2 unresolved threads
9 files
+ 354
753
Compare changes
  • Side-by-side
  • Inline
Files
9
@@ -125,7 +125,7 @@ public class IndexerServiceImpl implements IndexerService {
// get upsert records
Map<String, Map<String, OperationType>> upsertRecordMap = RecordInfo.getUpsertRecordIds(recordInfos);
if (upsertRecordMap != null && !upsertRecordMap.isEmpty()) {
List<String> upsertFailureRecordIds = processUpsertRecords(upsertRecordMap);
List<String> upsertFailureRecordIds = processUpsertRecords(upsertRecordMap, recordInfos);
retryRecordIds.addAll(upsertFailureRecordIds);
}
@@ -191,7 +191,7 @@ public class IndexerServiceImpl implements IndexerService {
}
}
private List<String> processUpsertRecords(Map<String, Map<String, OperationType>> upsertRecordMap) throws Exception {
private List<String> processUpsertRecords(Map<String, Map<String, OperationType>> upsertRecordMap, List<RecordInfo> recordChangedInfos) throws Exception {
// get schema for kind
Map<String, IndexSchema> schemas = this.getSchema(upsertRecordMap);
@@ -203,7 +203,7 @@ public class IndexerServiceImpl implements IndexerService {
if (recordIds.isEmpty()) return new LinkedList<>();
// get records via storage api
Records storageRecords = this.storageService.getStorageRecords(recordIds);
Records storageRecords = this.storageService.getStorageRecords(recordIds, recordChangedInfos);
List<String> failedOrRetryRecordIds = new LinkedList<>(storageRecords.getMissingRetryRecords());
// map storage records to indexer payload
Loading