Commit 872a77ae authored by Alok Joshi's avatar Alok Joshi
Browse files

Merge branch 'master' of...

Merge branch 'master' of https://community.opengroup.org/osdu/platform/system/lib/cloud/azure/os-core-lib-azure into policy_feature_flag
parents 3f81c3f1 bcdb280c
......@@ -151,6 +151,7 @@ The following software have components provided under the terms of this license:
- Spring Core (from https://github.com/spring-projects/spring-framework)
- Spring Data Core (from )
- Spring Expression Language (SpEL) (from https://github.com/spring-projects/spring-framework)
- Spring TestContext Framework (from https://github.com/spring-projects/spring-framework)
- Spring Transaction (from https://github.com/spring-projects/spring-framework)
- Spring Web (from https://github.com/spring-projects/spring-framework)
- Spring Web MVC (from https://github.com/spring-projects/spring-framework)
......
......@@ -272,6 +272,18 @@
<artifactId>documentdb-bulkexecutor</artifactId>
<version>${documentdb-bulkexecutor.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.2.8.RELEASE</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
......
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.azure.concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/***
* Custom Executors class that returns a custom thread pool executor {@link CustomThreadPoolExecutor}.
*/
public final class CustomExecutors {
/***
* Private constructor -- this class should never be instantiated.
*/
private CustomExecutors() {
}
/***
* Returning new custom thread pool executor.
* @param nThreads corresponding to corePoolSize and maxPoolSize.
* @return instance of {@link CustomThreadPoolExecutor}
*/
public static ExecutorService newFixedThreadPool(final int nThreads) {
return new CustomThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.azure.concurrency;
import org.springframework.web.context.request.RequestContextHolder;
import javax.validation.constraints.NotNull;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/***
* A custom thread pool executor that executes code in the new thread after copying the MDC and request context.
*/
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
/***
* Calling constructor of the super class.
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
*/
public CustomThreadPoolExecutor(
final int corePoolSize,
final int maximumPoolSize,
final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue
) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
/**
* Calling execute method of super class after wrapping the executable code with MDC and thread context.
* @param command
*/
@Override
public void execute(final @NotNull Runnable command) {
super.execute(CustomThreadPoolExecutorUtil.wrapWithContext(command, RequestContextHolder.currentRequestAttributes()));
}
}
package org.opengroup.osdu.azure.concurrency;
import org.jetbrains.annotations.NotNull;
import org.opengroup.osdu.azure.logging.CoreLoggerFactory;
import org.slf4j.MDC;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import java.util.Map;
/***
* Utils class for CustomThreadPoolExecutor {@link CustomThreadPoolExecutor}.
*/
public final class CustomThreadPoolExecutorUtil {
private static final String LOGGER_NAME = CustomThreadPoolExecutorUtil.class.getName();
/***
* Constructor marked as private so that this class is never instantiated.
*/
private CustomThreadPoolExecutorUtil() {
}
/***
* Copy the MDC and request context to the Runnable object being returned.
* @param task instance of Runnable class.
* @param context the RequestAttributes to be copied to new thread.
* @return Runnable instance wrapped with request context and MDC.
*/
public static Runnable wrapWithContext(final @NotNull Runnable task, final RequestAttributes context) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
//save the current MDC context
setMDCContext(contextMap);
//save the current request attributes
setRequestContext(context);
try {
task.run();
} finally {
// once the task is complete, clear MDC
MDC.clear();
RequestContextHolder.resetRequestAttributes();
}
};
}
/***
* Set the MDC.
* @param contextMap the MDC map to be copied.
*/
private static void setMDCContext(final Map<String, String> contextMap) {
MDC.clear();
if (contextMap != null) {
MDC.setContextMap(contextMap);
} else {
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).warn("Cannot set MDC as it is null");
}
}
/***
* Set the Request Attributes.
* @param context the Attributes to be copied.
*/
private static void setRequestContext(final RequestAttributes context) {
RequestContextHolder.resetRequestAttributes();
if (context != null) {
RequestContextHolder.setRequestAttributes(context);
} else {
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).warn("Cannot set RequestAttributes as they are null");
}
}
}
\ No newline at end of file
package org.opengroup.osdu.azure.di;
import com.microsoft.azure.documentdb.internal.Utils;
import lombok.Getter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -18,8 +17,7 @@ public class CosmosBulkExecutorConfiguration {
*/
@Bean
public int documentClientMaxPoolSize() {
String prop = System.getProperty("DOCUMENT_CLIENT_MAX_POOL_SIZE");
return prop == null ? Utils.getConcurrencyFactor() * 100 : Integer.parseInt(prop);
return Integer.parseInt(System.getenv().getOrDefault("DOCUMENT_CLIENT_MAX_POOL_SIZE", "100"));
}
/**
......@@ -27,6 +25,6 @@ public class CosmosBulkExecutorConfiguration {
*/
@Bean
public Integer bulkExecutorMaxRUs() {
return Integer.valueOf(System.getProperty("BULK_EXECUTOR_MAX_RUS", "4000"));
return Integer.parseInt(System.getenv().getOrDefault("BULK_EXECUTOR_MAX_RUS", "4000"));
}
}
package org.opengroup.osdu.azure.concurrency;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomExecutorsTest {
private long keepAliveTime = 0L;
private int nThreads = 1;
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
@Test
public void newFixedThreadPool_ShouldReturnExecutorServiceInstanceWithCorrectFields() {
ExecutorService executorService = CustomExecutors.newFixedThreadPool(nThreads);
Assertions.assertEquals(nThreads,((ThreadPoolExecutor)executorService).getCorePoolSize());
Assertions.assertEquals(nThreads,((ThreadPoolExecutor)executorService).getMaximumPoolSize());
Assertions.assertEquals(keepAliveTime,((ThreadPoolExecutor)executorService).getKeepAliveTime(timeUnit));
}
}
package org.opengroup.osdu.azure.concurrency;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockitoAnnotations;
import org.slf4j.MDC;
import org.springframework.mock.web.MockHttpServletRequest;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.jupiter.api.Assertions.fail;
public class CustomThreadPoolExecutorUtilTest {
private String testMDCKey = "testKey";
private String testMDCValue = "testValue";
private MockHttpServletRequest mockRequest;
private RequestAttributes expectedAttributes;
@BeforeEach
public void setup() {
MockitoAnnotations.initMocks(this);
mockRequest= new MockHttpServletRequest();
expectedAttributes = new ServletRequestAttributes(mockRequest);
}
@Test
public void wrapWithContext_ShouldReturn_RunnableObjectWithCorrectContext() {
// 1) clear the current context
RequestContextHolder.resetRequestAttributes();
MDC.clear();
// 2) set test context
RequestContextHolder.setRequestAttributes(expectedAttributes);
Map<String,String> testMap = new HashMap<String,String>();
testMap.put(testMDCKey,testMDCValue);
MDC.setContextMap(testMap);
// 3) define Runnable object, this is the one which needs to be wrapped with context
Runnable testTask = () -> {
// 5) verify that the correct context is present in the task being executed
String mdcValue = MDC.getCopyOfContextMap().get(testMDCKey);
RequestAttributes actualAttributes = RequestContextHolder.getRequestAttributes();
Assertions.assertEquals(testMDCValue,mdcValue);
Assertions.assertEquals(expectedAttributes,actualAttributes);
};
// 4) Call executor service to execute the testTask
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future result = executorService.submit(CustomThreadPoolExecutorUtil.wrapWithContext(testTask,expectedAttributes));
// 6) Fail the test in case of exception thrown in testTask explicitly because
// if the Assertions fail within 'testTask', the failure is logged but the test does not fail.
try {
result.get();
}
catch (Exception e) {
fail(e);
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment