From 1c64c1191d53d4674b3b6fcf81f5885d5c452942 Mon Sep 17 00:00:00 2001
From: "Rustam Lotsmanenko (EPAM)" <rustam_lotsmanenko@epam.com>
Date: Tue, 25 Apr 2023 14:01:54 +0000
Subject: [PATCH] tweak params to improve reindex performance

---
 .../publish/ReprocessingTaskPublisher.java    | 63 ++++++++++++++-----
 .../src/main/resources/application.properties |  2 +-
 2 files changed, 47 insertions(+), 18 deletions(-)

diff --git a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java
index 4caa625e5..5a1efac6f 100644
--- a/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java
+++ b/provider/indexer-gc/src/main/java/org/opengroup/osdu/indexer/provider/gcp/common/publish/ReprocessingTaskPublisher.java
@@ -1,6 +1,6 @@
 /*
- *  Copyright 2020-2022 Google LLC
- *  Copyright 2020-2022 EPAM Systems, Inc
+ *  Copyright 2020-2023 Google LLC
+ *  Copyright 2020-2023 EPAM Systems, Inc
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -17,19 +17,18 @@
 
 package org.opengroup.osdu.indexer.provider.gcp.common.publish;
 
-import static org.opengroup.osdu.core.common.Constants.REINDEX_RELATIVE_URL;
-import static org.opengroup.osdu.core.common.Constants.WORKER_RELATIVE_URL;
-
 import com.google.gson.Gson;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.PostConstruct;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.jetbrains.annotations.NotNull;
 import org.opengroup.osdu.core.common.model.http.DpsHeaders;
 import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
-import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
 import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
 import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
 import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
@@ -47,9 +46,9 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
 
   private final Gson gson = new Gson();
 
-  private final OqmDriver driver;
+  private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
 
-  private final TenantInfo tenantInfo;
+  private final OqmDriver driver;
 
   private final IndexerMessagingConfigProperties properties;
 
@@ -64,23 +63,54 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
   }
 
   public void createWorkerTask(String payload, DpsHeaders headers) {
-    publishRecordsChangedTask(WORKER_RELATIVE_URL, payload, 0l, headers);
+    publishRecordsChangedTask(payload, headers);
   }
 
   public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
-    publishRecordsChangedTask(WORKER_RELATIVE_URL, payload, countdownMillis, headers);
+    DpsHeaders headersCopy = DpsHeaders.createFromMap(headers.getHeaders());
+    scheduledExecutorService.schedule(
+        () -> {
+          try {
+            publishRecordsChangedTask(payload, headersCopy);
+          } catch (Exception e) {
+            // If error or exception not caught, executor will die out silently.
+            log.error("The exception was thrown during scheduled event publishing!", e);
+            throw e;
+          } catch (Throwable e) {
+            log.error("The Error was thrown during scheduled event publishing!", e);
+            throw e;
+          }
+        },
+        countdownMillis,
+        TimeUnit.MILLISECONDS
+    );
   }
 
   public void createReIndexTask(String payload, DpsHeaders headers) {
-    publishReindexTask(REINDEX_RELATIVE_URL, payload, 0l, headers);
+    publishReindexTask(payload, headers);
   }
 
   public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
-    publishReindexTask(REINDEX_RELATIVE_URL, payload, countdownMillis, headers);
+    DpsHeaders headersCopy = DpsHeaders.createFromMap(headers.getHeaders());
+    scheduledExecutorService.schedule(
+        () -> {
+          try {
+            publishReindexTask(payload, headersCopy);
+          } catch (Exception e) {
+            // If error or exception not caught, executor will die out silently.
+            log.error("The exception was thrown during scheduled event publishing!", e);
+            throw e;
+          } catch (Throwable e) {
+            log.error("The Error was thrown during scheduled event publishing!", e);
+            throw e;
+          }
+        },
+        countdownMillis,
+        TimeUnit.MILLISECONDS
+    );
   }
 
-  private void publishReindexTask(String url, String payload, Long countdownMillis,
-      DpsHeaders headers) {
+  private void publishReindexTask(String payload, DpsHeaders headers) {
     OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId())
         .build();
     Map<String, String> attributes = getAttributesFromHeaders(headers);
@@ -89,8 +119,7 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
     driver.publish(oqmMessage, reprocessOqmTopic, oqmDestination);
   }
 
-  private void publishRecordsChangedTask(String url, String payload, Long countdownMillis,
-      DpsHeaders headers) {
+  private void publishRecordsChangedTask(String payload, DpsHeaders headers) {
     OqmDestination oqmDestination = OqmDestination.builder()
         .partitionId(headers.getPartitionId())
         .build();
@@ -112,7 +141,7 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
   private Map<String, String> getAttributesFromHeaders(DpsHeaders headers) {
     Map<String, String> attributes = new HashMap<>();
     attributes.put(DpsHeaders.USER_EMAIL, headers.getUserEmail());
-    attributes.put(DpsHeaders.ACCOUNT_ID, this.tenantInfo.getName());
+    attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionId());
     attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
     headers.addCorrelationIdIfMissing();
     attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
diff --git a/provider/indexer-gc/src/main/resources/application.properties b/provider/indexer-gc/src/main/resources/application.properties
index a5e5077aa..c9933e687 100644
--- a/provider/indexer-gc/src/main/resources/application.properties
+++ b/provider/indexer-gc/src/main/resources/application.properties
@@ -24,7 +24,7 @@ cron-empty-index-cleanup-threshold-days=7
 DEFAULT_DATA_COUNTRY=US
 gae-service=indexer
 security.https.certificate.trust=false
-storage-records-by-kind-batch-size=20
+storage-records-by-kind-batch-size=1000
 storage-records-batch-size=20
 
 REDIS_SEARCH_PORT=6379
-- 
GitLab