Commit bcdb280c authored by Kishore Battula's avatar Kishore Battula
Browse files

Merge branch 'users/Vibhuti/CustomConcurrency' into 'master'

Context Aware Concurrency

See merge request !69
parents 9ee8fcf2 55181d43
Pipeline #22404 passed with stages
in 11 minutes and 7 seconds
......@@ -151,6 +151,7 @@ The following software have components provided under the terms of this license:
- Spring Core (from https://github.com/spring-projects/spring-framework)
- Spring Data Core (from )
- Spring Expression Language (SpEL) (from https://github.com/spring-projects/spring-framework)
- Spring TestContext Framework (from https://github.com/spring-projects/spring-framework)
- Spring Transaction (from https://github.com/spring-projects/spring-framework)
- Spring Web (from https://github.com/spring-projects/spring-framework)
- Spring Web MVC (from https://github.com/spring-projects/spring-framework)
......
......@@ -20,7 +20,7 @@
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-azure</artifactId>
<packaging>jar</packaging>
<version>0.0.60</version>
<version>0.0.61</version>
<name>core-lib-azure</name>
<properties>
......@@ -272,6 +272,18 @@
<artifactId>documentdb-bulkexecutor</artifactId>
<version>${documentdb-bulkexecutor.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.2.8.RELEASE</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<repositories>
......
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.azure.concurrency;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/***
* Custom Executors class that returns a custom thread pool executor {@link CustomThreadPoolExecutor}.
*/
public final class CustomExecutors {
/***
* Private constructor -- this class should never be instantiated.
*/
private CustomExecutors() {
}
/***
* Returning new custom thread pool executor.
* @param nThreads corresponding to corePoolSize and maxPoolSize.
* @return instance of {@link CustomThreadPoolExecutor}
*/
public static ExecutorService newFixedThreadPool(final int nThreads) {
return new CustomThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
}
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.azure.concurrency;
import org.springframework.web.context.request.RequestContextHolder;
import javax.validation.constraints.NotNull;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/***
* A custom thread pool executor that executes code in the new thread after copying the MDC and request context.
*/
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
/***
* Calling constructor of the super class.
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
*/
public CustomThreadPoolExecutor(
final int corePoolSize,
final int maximumPoolSize,
final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue
) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
/**
* Calling execute method of super class after wrapping the executable code with MDC and thread context.
* @param command
*/
@Override
public void execute(final @NotNull Runnable command) {
super.execute(CustomThreadPoolExecutorUtil.wrapWithContext(command, RequestContextHolder.currentRequestAttributes()));
}
}
package org.opengroup.osdu.azure.concurrency;
import org.jetbrains.annotations.NotNull;
import org.opengroup.osdu.azure.logging.CoreLoggerFactory;
import org.slf4j.MDC;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import java.util.Map;
/***
* Utils class for CustomThreadPoolExecutor {@link CustomThreadPoolExecutor}.
*/
public final class CustomThreadPoolExecutorUtil {
private static final String LOGGER_NAME = CustomThreadPoolExecutorUtil.class.getName();
/***
* Constructor marked as private so that this class is never instantiated.
*/
private CustomThreadPoolExecutorUtil() {
}
/***
* Copy the MDC and request context to the Runnable object being returned.
* @param task instance of Runnable class.
* @param context the RequestAttributes to be copied to new thread.
* @return Runnable instance wrapped with request context and MDC.
*/
public static Runnable wrapWithContext(final @NotNull Runnable task, final RequestAttributes context) {
Map<String, String> contextMap = MDC.getCopyOfContextMap();
return () -> {
//save the current MDC context
setMDCContext(contextMap);
//save the current request attributes
setRequestContext(context);
try {
task.run();
} finally {
// once the task is complete, clear MDC
MDC.clear();
RequestContextHolder.resetRequestAttributes();
}
};
}
/***
* Set the MDC.
* @param contextMap the MDC map to be copied.
*/
private static void setMDCContext(final Map<String, String> contextMap) {
MDC.clear();
if (contextMap != null) {
MDC.setContextMap(contextMap);
} else {
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).warn("Cannot set MDC as it is null");
}
}
/***
* Set the Request Attributes.
* @param context the Attributes to be copied.
*/
private static void setRequestContext(final RequestAttributes context) {
RequestContextHolder.resetRequestAttributes();
if (context != null) {
RequestContextHolder.setRequestAttributes(context);
} else {
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).warn("Cannot set RequestAttributes as they are null");
}
}
}
\ No newline at end of file
package org.opengroup.osdu.azure.concurrency;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomExecutorsTest {
private long keepAliveTime = 0L;
private int nThreads = 1;
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
@Test
public void newFixedThreadPool_ShouldReturnExecutorServiceInstanceWithCorrectFields() {
ExecutorService executorService = CustomExecutors.newFixedThreadPool(nThreads);
Assertions.assertEquals(nThreads,((ThreadPoolExecutor)executorService).getCorePoolSize());
Assertions.assertEquals(nThreads,((ThreadPoolExecutor)executorService).getMaximumPoolSize());
Assertions.assertEquals(keepAliveTime,((ThreadPoolExecutor)executorService).getKeepAliveTime(timeUnit));
}
}
package org.opengroup.osdu.azure.concurrency;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockitoAnnotations;
import org.slf4j.MDC;
import org.springframework.mock.web.MockHttpServletRequest;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.jupiter.api.Assertions.fail;
public class CustomThreadPoolExecutorUtilTest {
private String testMDCKey = "testKey";
private String testMDCValue = "testValue";
private MockHttpServletRequest mockRequest;
private RequestAttributes expectedAttributes;
@BeforeEach
public void setup() {
MockitoAnnotations.initMocks(this);
mockRequest= new MockHttpServletRequest();
expectedAttributes = new ServletRequestAttributes(mockRequest);
}
@Test
public void wrapWithContext_ShouldReturn_RunnableObjectWithCorrectContext() {
// 1) clear the current context
RequestContextHolder.resetRequestAttributes();
MDC.clear();
// 2) set test context
RequestContextHolder.setRequestAttributes(expectedAttributes);
Map<String,String> testMap = new HashMap<String,String>();
testMap.put(testMDCKey,testMDCValue);
MDC.setContextMap(testMap);
// 3) define Runnable object, this is the one which needs to be wrapped with context
Runnable testTask = () -> {
// 5) verify that the correct context is present in the task being executed
String mdcValue = MDC.getCopyOfContextMap().get(testMDCKey);
RequestAttributes actualAttributes = RequestContextHolder.getRequestAttributes();
Assertions.assertEquals(testMDCValue,mdcValue);
Assertions.assertEquals(expectedAttributes,actualAttributes);
};
// 4) Call executor service to execute the testTask
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future result = executorService.submit(CustomThreadPoolExecutorUtil.wrapWithContext(testTask,expectedAttributes));
// 6) Fail the test in case of exception thrown in testTask explicitly because
// if the Assertions fail within 'testTask', the failure is logged but the test does not fail.
try {
result.get();
}
catch (Exception e) {
fail(e);
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment