Skip to content
Snippets Groups Projects
Commit 30b1475b authored by Derek Hudson's avatar Derek Hudson
Browse files

Aws improve s3 performance

parent ad566eef
No related branches found
No related tags found
1 merge request!896Aws improve s3 performance
Showing
with 116 additions and 18 deletions
......@@ -627,6 +627,7 @@ The following software have components provided under the terms of this license:
- Azure Java Client Runtime for ARM (from https://github.com/Azure/autorest-clientruntime-for-java)
- Azure Java Client Runtime for AutoRest (from https://github.com/Azure/autorest-clientruntime-for-java)
- Checker Qual (from https://checkerframework.org)
- Extensions on Apache Proton-J library (from https://github.com/Azure/qpid-proton-j-extensions)
- JGraphT - Core (from https://repo1.maven.org/maven2/org/jgrapht/jgrapht-core)
- JUL to SLF4J bridge (from http://www.slf4j.org)
- Jackson-core (from http://wiki.fasterxml.com/JacksonHome, https://github.com/FasterXML/jackson-core)
......@@ -682,7 +683,6 @@ The following software have components provided under the terms of this license:
- mockito-junit-jupiter (from https://github.com/mockito/mockito)
- msal4j (from https://github.com/AzureAD/microsoft-authentication-library-for-java)
- msal4j-persistence-extension (from https://github.com/AzureAD/microsoft-authentication-extensions-for-java, https://github.com/AzureAD/microsoft-authentication-library-for-java)
- qpid-proton-j-extensions (from https://github.com/Azure/qpid-proton-j-extensions)
========================================================================
MPL-1.1
......
......@@ -25,6 +25,7 @@ import org.opengroup.osdu.core.common.model.storage.RecordMetadata;
import org.opengroup.osdu.core.common.model.storage.RecordProcessing;
import org.opengroup.osdu.core.common.model.storage.TransferInfo;
import org.opengroup.osdu.storage.provider.aws.security.UserAccessService;
import org.opengroup.osdu.storage.provider.aws.util.WorkerThreadPool;
import org.opengroup.osdu.storage.provider.interfaces.ICloudStorage;
import org.opengroup.osdu.core.common.util.Crc32c;
import org.opengroup.osdu.storage.provider.aws.util.s3.RecordProcessor;
......@@ -35,7 +36,6 @@ import org.apache.http.HttpStatus;
import org.opengroup.osdu.storage.provider.interfaces.IRecordsMetadataRepository;
import org.opengroup.osdu.storage.util.CollaborationUtil;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Repository;
import static org.apache.commons.codec.binary.Base64.encodeBase64;
......@@ -48,9 +48,6 @@ import java.util.stream.Collectors;
@Repository
public class CloudStorageImpl implements ICloudStorage {
@Value("${aws.s3.max-record-threads}")
private int maxNumOfRecordThreads;
@Inject
private S3RecordClient s3RecordClient;
......@@ -66,6 +63,8 @@ public class CloudStorageImpl implements ICloudStorage {
@Inject
private IRecordsMetadataRepository<String> recordsMetadataRepository;
@Inject
private WorkerThreadPool threadPool;
@Inject
private DpsHeaders headers;
......@@ -87,7 +86,7 @@ public class CloudStorageImpl implements ICloudStorage {
recordProcessing.getRecordData().setMeta(arrayMeta);
}
RecordProcessor recordProcessor = new RecordProcessor(recordProcessing, s3RecordClient, dataPartition);
CompletableFuture<RecordProcessor> future = CompletableFuture.supplyAsync(recordProcessor::call);
CompletableFuture<RecordProcessor> future = CompletableFuture.supplyAsync(recordProcessor::call, threadPool.getThreadPool());
futures.add(future);
}
......
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.storage.provider.aws.util;
import org.opengroup.osdu.core.common.logging.DefaultLogger;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Service
public class WorkerThreadPool {
private static final JaxRsDpsLog logger = new JaxRsDpsLog(new DefaultLogger(), new DpsHeaders());
private static final int DEFAULT_THREADS = 1000;
@Autowired
public WorkerThreadPool(@Value("${aws.worker-threads}") int numberOfThreads) {
if (numberOfThreads <= 0) {
logger.error(String.format("Illegal `aws.worker-threads` value: %d. Using default %d threads instead.", numberOfThreads, DEFAULT_THREADS));
numberOfThreads = DEFAULT_THREADS;
}
threadPool = Executors.newFixedThreadPool(numberOfThreads);
logger.info(String.format("Created the Worker Thread Pool with %d threads", numberOfThreads));
}
private final ExecutorService threadPool;
public ExecutorService getThreadPool() {
return threadPool;
}
}
......@@ -20,6 +20,7 @@ import org.opengroup.osdu.core.common.model.storage.RecordMetadata;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.storage.provider.aws.util.WorkerThreadPool;
import org.springframework.stereotype.Component;
import jakarta.inject.Inject;
......@@ -36,6 +37,9 @@ public class RecordsUtil {
private S3RecordClient s3RecordClient;
@Inject
private WorkerThreadPool threadPool;
@Inject
private DpsHeaders headers;
......@@ -54,7 +58,7 @@ public class RecordsUtil {
try {
for (Map.Entry<String, String> object : objects.entrySet()) {
GetRecordFromVersionTask task = new GetRecordFromVersionTask(s3RecordClient, object.getKey(), object.getValue(), dataPartition);
CompletableFuture<GetRecordFromVersionTask> future = CompletableFuture.supplyAsync(task::call);
CompletableFuture<GetRecordFromVersionTask> future = CompletableFuture.supplyAsync(task::call, threadPool.getThreadPool());
futures.add(future);
}
......@@ -101,7 +105,7 @@ public class RecordsUtil {
try {
for (RecordMetadata recordMetadata: recordMetadatas) {
GetRecordTask task = new GetRecordTask(s3RecordClient, map, recordMetadata, dataPartition);
CompletableFuture<GetRecordTask> future = CompletableFuture.supplyAsync(task::call);
CompletableFuture<GetRecordTask> future = CompletableFuture.supplyAsync(task::call, threadPool.getThreadPool());
futures.add(future);
}
......
......@@ -42,6 +42,8 @@ aws.osduVersion=${OSDU_VERSION}
aws.s3.max-record-threads=2000
aws.worker-threads=${WORKER_THREADS:1000}
#Tenant Specific S3 Bucket Configuration
aws.s3.recordsBucket.ssm.relativePath=${RECORDS_BUCKET_SSM_RELATIVE_PATH:services/core/storage/s3DataBucket}
......
......@@ -25,6 +25,7 @@ import org.opengroup.osdu.core.common.model.storage.RecordMetadata;
import org.opengroup.osdu.core.common.model.storage.RecordProcessing;
import org.opengroup.osdu.core.common.model.storage.TransferInfo;
import org.opengroup.osdu.storage.provider.aws.security.UserAccessService;
import org.opengroup.osdu.storage.provider.aws.util.WorkerThreadPool;
import org.opengroup.osdu.storage.provider.aws.util.s3.RecordsUtil;
import org.opengroup.osdu.storage.provider.aws.util.s3.S3RecordClient;
import org.opengroup.osdu.core.common.util.Crc32c;
......@@ -34,6 +35,7 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.springframework.test.util.ReflectionTestUtils;
import java.nio.charset.StandardCharsets;
import java.util.*;
......@@ -67,9 +69,6 @@ class CloudStorageImplTest {
@Mock
private RecordsUtil recordsUtil;
@Mock
private ExecutorService threadPool;
@Mock
private UserAccessService userAccessService;
......@@ -97,6 +96,7 @@ class CloudStorageImplTest {
@Mock
private TransferInfo transfer;
private final WorkerThreadPool threadPool = new WorkerThreadPool(10);
private String dataPartition = "dummyPartitionName";
private String mockRecord = "{\"data\":{\"id\":\"test\"}, \"meta\":null, \"modifyUser\":null, \"modifyTime\":0}";
......@@ -116,7 +116,8 @@ class CloudStorageImplTest {
doNothing().when(record).setId(userId);
doNothing().when(record).addGcsPath(1);
records.add(record);
ReflectionTestUtils.setField(repo, "threadPool", threadPool);
when(headers.getPartitionIdWithFallbackToAccountId()).thenReturn(dataPartition);
}
......@@ -157,7 +158,7 @@ class CloudStorageImplTest {
bytes = checksumGenerator.getValueAsBytes();
String expectedHash = new String(encodeBase64(bytes));
when(recordsUtil.getRecordsValuesById(Mockito.eq(records)))
when(recordsUtil.getRecordsValuesById(records))
.thenReturn(mapRecords);
// act
......@@ -170,7 +171,7 @@ class CloudStorageImplTest {
@Test
void delete(){
// arrange
Mockito.doNothing().when(s3RecordClient).deleteRecord(Mockito.eq(path), Mockito.eq(dataPartition));
Mockito.doNothing().when(s3RecordClient).deleteRecord(path, dataPartition);
when(record.hasVersion()).thenReturn(true);
List<String> list = new ArrayList<String>();
......@@ -181,7 +182,7 @@ class CloudStorageImplTest {
repo.delete(record);
// assert
verify(s3RecordClient, Mockito.times(1)).deleteRecord(eq(path), eq(dataPartition));
verify(s3RecordClient, Mockito.times(1)).deleteRecord(path, dataPartition);
}
@Test
......
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.storage.provider.aws.util;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ExecutorService;
class WorkerThreadPoolTest {
@Test
void should_reuse_executor_if_exists() {
WorkerThreadPool workerPool = new WorkerThreadPool(7);
ExecutorService service = workerPool.getThreadPool();
assertNotNull(service);
assertEquals(service, workerPool.getThreadPool());
}
@Test
void should_not_throw_exception_if_numberOfThreads_is_invalid() {
WorkerThreadPool workerPool = new WorkerThreadPool(-1);
assertNotNull(workerPool.getThreadPool());
}
}
......@@ -24,10 +24,13 @@ import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.storage.RecordMetadata;
import org.opengroup.osdu.storage.provider.aws.util.WorkerThreadPool;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
......@@ -48,9 +51,6 @@ class RecordUtilsTest {
@Mock
private S3RecordClient s3RecordClient;
@Mock
private ExecutorService threadPool;
@Mock
private DpsHeaders headers;
......@@ -60,11 +60,13 @@ class RecordUtilsTest {
@Mock
private RecordMetadata recordMetadata;
private final WorkerThreadPool threadPool = new WorkerThreadPool(10);
private String dataPartition = "dummyPartitionName";
@BeforeEach
void setuUp() {
openMocks(this);
ReflectionTestUtils.setField(recordsUtil, "threadPool", threadPool);
when(headers.getPartitionIdWithFallbackToAccountId()).thenReturn(dataPartition);
}
......
spring.data.mongodb.port=27019
aws.s3.max-record-threads=2000
aws.worker-threads=1000
#testing context configuration
spring.main.allow-bean-definition-overriding=true
repository.implementation=mongodb
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment