Skip to content
Snippets Groups Projects
Commit 1c64c119 authored by Rustam Lotsmanenko (EPAM)'s avatar Rustam Lotsmanenko (EPAM) Committed by Riabokon Stanislav(EPAM)[GCP]
Browse files

tweak params to improve reindex performance

parent 986f857c
No related branches found
No related tags found
1 merge request!527tweak params to improve reindex performance
/* /*
* Copyright 2020-2022 Google LLC * Copyright 2020-2023 Google LLC
* Copyright 2020-2022 EPAM Systems, Inc * Copyright 2020-2023 EPAM Systems, Inc
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -17,19 +17,18 @@ ...@@ -17,19 +17,18 @@
package org.opengroup.osdu.indexer.provider.gcp.common.publish; package org.opengroup.osdu.indexer.provider.gcp.common.publish;
import static org.opengroup.osdu.core.common.Constants.REINDEX_RELATIVE_URL;
import static org.opengroup.osdu.core.common.Constants.WORKER_RELATIVE_URL;
import com.google.gson.Gson; import com.google.gson.Gson;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver; import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination; import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage; import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
...@@ -47,9 +46,9 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder { ...@@ -47,9 +46,9 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
private final Gson gson = new Gson(); private final Gson gson = new Gson();
private final OqmDriver driver; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final TenantInfo tenantInfo; private final OqmDriver driver;
private final IndexerMessagingConfigProperties properties; private final IndexerMessagingConfigProperties properties;
...@@ -64,23 +63,54 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder { ...@@ -64,23 +63,54 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
} }
public void createWorkerTask(String payload, DpsHeaders headers) { public void createWorkerTask(String payload, DpsHeaders headers) {
publishRecordsChangedTask(WORKER_RELATIVE_URL, payload, 0l, headers); publishRecordsChangedTask(payload, headers);
} }
public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) { public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
publishRecordsChangedTask(WORKER_RELATIVE_URL, payload, countdownMillis, headers); DpsHeaders headersCopy = DpsHeaders.createFromMap(headers.getHeaders());
scheduledExecutorService.schedule(
() -> {
try {
publishRecordsChangedTask(payload, headersCopy);
} catch (Exception e) {
// If error or exception not caught, executor will die out silently.
log.error("The exception was thrown during scheduled event publishing!", e);
throw e;
} catch (Throwable e) {
log.error("The Error was thrown during scheduled event publishing!", e);
throw e;
}
},
countdownMillis,
TimeUnit.MILLISECONDS
);
} }
public void createReIndexTask(String payload, DpsHeaders headers) { public void createReIndexTask(String payload, DpsHeaders headers) {
publishReindexTask(REINDEX_RELATIVE_URL, payload, 0l, headers); publishReindexTask(payload, headers);
} }
public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) { public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
publishReindexTask(REINDEX_RELATIVE_URL, payload, countdownMillis, headers); DpsHeaders headersCopy = DpsHeaders.createFromMap(headers.getHeaders());
scheduledExecutorService.schedule(
() -> {
try {
publishReindexTask(payload, headersCopy);
} catch (Exception e) {
// If error or exception not caught, executor will die out silently.
log.error("The exception was thrown during scheduled event publishing!", e);
throw e;
} catch (Throwable e) {
log.error("The Error was thrown during scheduled event publishing!", e);
throw e;
}
},
countdownMillis,
TimeUnit.MILLISECONDS
);
} }
private void publishReindexTask(String url, String payload, Long countdownMillis, private void publishReindexTask(String payload, DpsHeaders headers) {
DpsHeaders headers) {
OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId()) OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId())
.build(); .build();
Map<String, String> attributes = getAttributesFromHeaders(headers); Map<String, String> attributes = getAttributesFromHeaders(headers);
...@@ -89,8 +119,7 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder { ...@@ -89,8 +119,7 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
driver.publish(oqmMessage, reprocessOqmTopic, oqmDestination); driver.publish(oqmMessage, reprocessOqmTopic, oqmDestination);
} }
private void publishRecordsChangedTask(String url, String payload, Long countdownMillis, private void publishRecordsChangedTask(String payload, DpsHeaders headers) {
DpsHeaders headers) {
OqmDestination oqmDestination = OqmDestination.builder() OqmDestination oqmDestination = OqmDestination.builder()
.partitionId(headers.getPartitionId()) .partitionId(headers.getPartitionId())
.build(); .build();
...@@ -112,7 +141,7 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder { ...@@ -112,7 +141,7 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
private Map<String, String> getAttributesFromHeaders(DpsHeaders headers) { private Map<String, String> getAttributesFromHeaders(DpsHeaders headers) {
Map<String, String> attributes = new HashMap<>(); Map<String, String> attributes = new HashMap<>();
attributes.put(DpsHeaders.USER_EMAIL, headers.getUserEmail()); attributes.put(DpsHeaders.USER_EMAIL, headers.getUserEmail());
attributes.put(DpsHeaders.ACCOUNT_ID, this.tenantInfo.getName()); attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionId());
attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId()); attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
headers.addCorrelationIdIfMissing(); headers.addCorrelationIdIfMissing();
attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
......
...@@ -24,7 +24,7 @@ cron-empty-index-cleanup-threshold-days=7 ...@@ -24,7 +24,7 @@ cron-empty-index-cleanup-threshold-days=7
DEFAULT_DATA_COUNTRY=US DEFAULT_DATA_COUNTRY=US
gae-service=indexer gae-service=indexer
security.https.certificate.trust=false security.https.certificate.trust=false
storage-records-by-kind-batch-size=20 storage-records-by-kind-batch-size=1000
storage-records-batch-size=20 storage-records-batch-size=20
REDIS_SEARCH_PORT=6379 REDIS_SEARCH_PORT=6379
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment