Skip to content
Snippets Groups Projects
Commit 93e75c5d authored by Riabokon Stanislav(EPAM)[GCP]'s avatar Riabokon Stanislav(EPAM)[GCP]
Browse files

Merge branch 'gc-increase-records-batch' into 'master'

tweak params to improve reindex performance

See merge request !527
parents 986f857c 1c64c119
No related branches found
No related tags found
1 merge request!527tweak params to improve reindex performance
Pipeline #181350 failed
/*
* Copyright 2020-2022 Google LLC
* Copyright 2020-2022 EPAM Systems, Inc
* Copyright 2020-2023 Google LLC
* Copyright 2020-2023 EPAM Systems, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -17,19 +17,18 @@
package org.opengroup.osdu.indexer.provider.gcp.common.publish;
import static org.opengroup.osdu.core.common.Constants.REINDEX_RELATIVE_URL;
import static org.opengroup.osdu.core.common.Constants.WORKER_RELATIVE_URL;
import com.google.gson.Gson;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.oqm.model.OqmMessage;
......@@ -47,9 +46,9 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
private final Gson gson = new Gson();
private final OqmDriver driver;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final TenantInfo tenantInfo;
private final OqmDriver driver;
private final IndexerMessagingConfigProperties properties;
......@@ -64,23 +63,54 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
}
public void createWorkerTask(String payload, DpsHeaders headers) {
publishRecordsChangedTask(WORKER_RELATIVE_URL, payload, 0l, headers);
publishRecordsChangedTask(payload, headers);
}
public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
publishRecordsChangedTask(WORKER_RELATIVE_URL, payload, countdownMillis, headers);
DpsHeaders headersCopy = DpsHeaders.createFromMap(headers.getHeaders());
scheduledExecutorService.schedule(
() -> {
try {
publishRecordsChangedTask(payload, headersCopy);
} catch (Exception e) {
// If error or exception not caught, executor will die out silently.
log.error("The exception was thrown during scheduled event publishing!", e);
throw e;
} catch (Throwable e) {
log.error("The Error was thrown during scheduled event publishing!", e);
throw e;
}
},
countdownMillis,
TimeUnit.MILLISECONDS
);
}
public void createReIndexTask(String payload, DpsHeaders headers) {
publishReindexTask(REINDEX_RELATIVE_URL, payload, 0l, headers);
publishReindexTask(payload, headers);
}
public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
publishReindexTask(REINDEX_RELATIVE_URL, payload, countdownMillis, headers);
DpsHeaders headersCopy = DpsHeaders.createFromMap(headers.getHeaders());
scheduledExecutorService.schedule(
() -> {
try {
publishReindexTask(payload, headersCopy);
} catch (Exception e) {
// If error or exception not caught, executor will die out silently.
log.error("The exception was thrown during scheduled event publishing!", e);
throw e;
} catch (Throwable e) {
log.error("The Error was thrown during scheduled event publishing!", e);
throw e;
}
},
countdownMillis,
TimeUnit.MILLISECONDS
);
}
private void publishReindexTask(String url, String payload, Long countdownMillis,
DpsHeaders headers) {
private void publishReindexTask(String payload, DpsHeaders headers) {
OqmDestination oqmDestination = OqmDestination.builder().partitionId(headers.getPartitionId())
.build();
Map<String, String> attributes = getAttributesFromHeaders(headers);
......@@ -89,8 +119,7 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
driver.publish(oqmMessage, reprocessOqmTopic, oqmDestination);
}
private void publishRecordsChangedTask(String url, String payload, Long countdownMillis,
DpsHeaders headers) {
private void publishRecordsChangedTask(String payload, DpsHeaders headers) {
OqmDestination oqmDestination = OqmDestination.builder()
.partitionId(headers.getPartitionId())
.build();
......@@ -112,7 +141,7 @@ public class ReprocessingTaskPublisher extends IndexerQueueTaskBuilder {
private Map<String, String> getAttributesFromHeaders(DpsHeaders headers) {
Map<String, String> attributes = new HashMap<>();
attributes.put(DpsHeaders.USER_EMAIL, headers.getUserEmail());
attributes.put(DpsHeaders.ACCOUNT_ID, this.tenantInfo.getName());
attributes.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionId());
attributes.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
headers.addCorrelationIdIfMissing();
attributes.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
......
......@@ -24,7 +24,7 @@ cron-empty-index-cleanup-threshold-days=7
DEFAULT_DATA_COUNTRY=US
gae-service=indexer
security.https.certificate.trust=false
storage-records-by-kind-batch-size=20
storage-records-by-kind-batch-size=1000
storage-records-batch-size=20
REDIS_SEARCH_PORT=6379
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment