Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Data Flow
Data Enrichment
wks
Commits
d8078b82
Commit
d8078b82
authored
Aug 18, 2020
by
harshit aggarwal
Browse files
using slf4j mdc context
parent
9460ba0a
Pipeline
#6069
failed with stage
in 38 seconds
Changes
8
Pipelines
2
Hide whitespace changes
Inline
Side-by-side
git
0 → 100644
View file @
d8078b82
provider/wks-azure/src/main/java/org/opengroup/osdu/wks/provider/azure/pubsub/MessageHandler.java
View file @
d8078b82
...
...
@@ -26,14 +26,8 @@ public class MessageHandler implements IMessageHandler {
@Override
public
CompletableFuture
<
Void
>
onMessageAsync
(
IMessage
message
)
{
boolean
ack
=
this
.
processWKSTransform
.
initiateWksTransformation
(
message
);
if
(
ack
)
{
return
receiveClient
.
completeAsync
(
message
.
getLockToken
());
}
else
{
return
receiveClient
.
abandonAsync
(
message
.
getLockToken
());
}
this
.
processWKSTransform
.
initiateWksTransformation
(
message
);
return
this
.
receiveClient
.
completeAsync
(
message
.
getLockToken
());
}
@Override
...
...
provider/wks-azure/src/main/java/org/opengroup/osdu/wks/provider/azure/pubsub/ProcessWKSTransform.java
View file @
d8078b82
...
...
@@ -38,10 +38,10 @@ public class ProcessWKSTransform {
@Autowired
private
MDCContextMap
mdcContextMap
;
public
boolean
initiateWksTransformation
(
IMessage
message
)
{
public
void
initiateWksTransformation
(
IMessage
message
)
{
String
dataPartitionId
=
message
.
getProperties
().
get
(
DpsHeaders
.
DATA_PARTITION_ID
).
toString
();
String
correlationId
=
message
.
getProperties
().
get
(
DpsHeaders
.
CORRELATION_ID
).
toString
();
//
MDC.setContextMap(mdcContextMap.getContextMap(correlationId, dataPartitionId));
MDC
.
setContextMap
(
mdcContextMap
.
getContextMap
(
correlationId
,
dataPartitionId
));
try
{
RawRecordDetails
[]
rawRecordDetails
=
retrieveDataFromMessage
(
message
);
...
...
@@ -52,11 +52,9 @@ public class ProcessWKSTransform {
}
catch
(
ApplicationException
e
)
{
logger
.
error
(
azureBootstrapConfig
.
getLogPrefix
(),
String
.
format
(
"Application Error Reason: %s, pubsub message id: %s"
,
e
.
getErrorMsg
(),
message
.
getMessageId
()),
Collections
.
emptyMap
());
// MDC.clear();
return
false
;
}
// MDC.clear();
return
true
;
MDC
.
clear
()
;
}
private
RawRecordDetails
[]
retrieveDataFromMessage
(
IMessage
message
)
throws
BadRequestException
{
...
...
provider/wks-azure/src/main/java/org/opengroup/osdu/wks/provider/azure/pubsub/SubscriptionManagerImpl.java
View file @
d8078b82
...
...
@@ -5,7 +5,6 @@ import com.microsoft.azure.servicebus.SubscriptionClient;
import
com.microsoft.azure.servicebus.primitives.ServiceBusException
;
import
org.opengroup.osdu.azure.logging.Slf4JLogger
;
import
org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig
;
import
org.opengroup.osdu.wks.provider.azure.utils.MDCAwareThreadPoolExecutor
;
import
org.opengroup.osdu.wks.provider.interfaces.SubscriptionManager
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
...
...
@@ -13,8 +12,6 @@ import java.time.Duration;
import
java.util.Collections
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.LinkedBlockingQueue
;
import
java.util.concurrent.TimeUnit
;
@Component
public
class
SubscriptionManagerImpl
implements
SubscriptionManager
{
...
...
@@ -39,10 +36,6 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
SubscriptionClient
subscriptionClient
=
this
.
subscriptionClientFactory
.
getSubscriptionClient
();
ExecutorService
executorService
=
Executors
.
newFixedThreadPool
(
Integer
.
parseUnsignedInt
(
azureBootstrapConfig
.
getNThreads
()));
//
// int nThreads = Integer.parseUnsignedInt(azureBootstrapConfig.getNThreads());
// ExecutorService executorService = new MDCAwareThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
//
registerMessageHandler
(
subscriptionClient
,
executorService
);
}
...
...
provider/wks-azure/src/main/java/org/opengroup/osdu/wks/provider/azure/utils/MDCAwareThreadPoolExecutor.java
deleted
100644 → 0
View file @
9460ba0a
package
org.opengroup.osdu.wks.provider.azure.utils
;
import
org.slf4j.MDC
;
import
java.util.Map
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
public
class
MDCAwareThreadPoolExecutor
extends
ThreadPoolExecutor
{
public
MDCAwareThreadPoolExecutor
(
int
corePoolSize
,
int
maximumPoolSize
,
long
keepAliveTime
,
TimeUnit
unit
,
BlockingQueue
<
Runnable
>
workQueue
)
{
super
(
corePoolSize
,
maximumPoolSize
,
keepAliveTime
,
unit
,
workQueue
);
}
@Override
public
void
execute
(
Runnable
command
)
{
super
.
execute
(
wrapWithMdcContext
(
command
));
}
private
Runnable
wrapWithMdcContext
(
Runnable
task
)
{
Map
<
String
,
String
>
contextMap
=
MDC
.
getCopyOfContextMap
();
return
()
->
{
setMDCContext
(
contextMap
);
try
{
task
.
run
();
}
finally
{
MDC
.
clear
();
}
};
}
private
void
setMDCContext
(
Map
<
String
,
String
>
contextMap
)
{
MDC
.
clear
();
if
(
contextMap
!=
null
)
{
MDC
.
setContextMap
(
contextMap
);
}
}
}
provider/wks-azure/src/test/java/org/opengroup/osdu/wks/provider/azure/di/AzureBootstrapConfigTest.java
View file @
d8078b82
...
...
@@ -7,8 +7,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
import
org.mockito.InjectMocks
;
import
org.mockito.Mock
;
import
org.mockito.junit.jupiter.MockitoExtension
;
import
org.opengroup.osdu.azure.util.AzureServicePrincipal
;
import
org.opengroup.osdu.wks.provider.azure.utils.MDCContextMap
;
import
static
org
.
junit
.
jupiter
.
api
.
Assertions
.
assertEquals
;
import
static
org
.
junit
.
jupiter
.
api
.
Assertions
.
assertNotNull
;
import
static
org
.
junit
.
jupiter
.
api
.
Assertions
.
assertThrows
;
import
static
org
.
mockito
.
Mockito
.
doReturn
;
...
...
@@ -25,7 +28,7 @@ public class AzureBootstrapConfigTest {
private
SecretClient
kv
;
@Test
public
void
kvSecret
_c
hecksForNullResponse
()
{
public
void
kvSecret
C
hecksForNullResponse
()
{
doReturn
(
null
).
when
(
kv
).
getSecret
(
"secret-name"
);
...
...
@@ -37,7 +40,7 @@ public class AzureBootstrapConfigTest {
}
@Test
public
void
kvSecret
_c
hecksForNullValueWithinResponse
()
{
public
void
kvSecret
C
hecksForNullValueWithinResponse
()
{
doReturn
(
null
).
when
(
secret
).
getValue
();
doReturn
(
secret
).
when
(
kv
).
getSecret
(
"secret-name"
);
...
...
@@ -49,7 +52,7 @@ public class AzureBootstrapConfigTest {
}
@Test
public
void
config
_r
eturnsCorrectSecret
_c
osmosKey
()
{
public
void
config
R
eturnsCorrectSecret
C
osmosKey
()
{
doReturn
(
"cosmos-key-secret"
).
when
(
secret
).
getValue
();
doReturn
(
secret
).
when
(
kv
).
getSecret
(
"cosmos-primary-key"
);
...
...
@@ -58,12 +61,28 @@ public class AzureBootstrapConfigTest {
}
@Test
public
void
config
_r
eturnsCorrectSecret
_c
osmosEndpoint
()
{
public
void
config
R
eturnsCorrectSecret
C
osmosEndpoint
()
{
doReturn
(
"cosmos-endpoint-secret"
).
when
(
secret
).
getValue
();
doReturn
(
secret
).
when
(
kv
).
getSecret
(
"cosmos-endpoint"
);
String
secretValue
=
bootstrapConfig
.
cosmosEndpoint
(
kv
);
assertEquals
(
"cosmos-endpoint-secret"
,
secretValue
);
}
@Test
public
void
azureServicePrincipalShouldNotBeNull
()
{
AzureServicePrincipal
azureServicePrincipal
=
bootstrapConfig
.
getAzureServicePrincipal
();
assertNotNull
(
azureServicePrincipal
);
}
@Test
public
void
MDCContextMapShouldNotBeNull
()
{
MDCContextMap
mdcContextMap
=
bootstrapConfig
.
mdcContextMap
();
assertNotNull
(
mdcContextMap
);
}
}
provider/wks-azure/src/test/java/org/opengroup/osdu/wks/provider/azure/pubsub/MessageHandlerTest.java
View file @
d8078b82
...
...
@@ -11,7 +11,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import
java.util.UUID
;
import
static
org
.
mockito
.
Mockito
.
do
Return
;
import
static
org
.
mockito
.
Mockito
.
do
Nothing
;
import
static
org
.
mockito
.
Mockito
.
times
;
import
static
org
.
mockito
.
Mockito
.
verify
;
import
static
org
.
mockito
.
Mockito
.
when
;
...
...
@@ -40,18 +40,10 @@ public class MessageHandlerTest {
@Test
public
void
shouldInvokeCompleteAsync
()
{
do
Return
(
true
).
when
(
processWKSTransform
).
initiateWksTransformation
(
message
);
do
Nothing
(
).
when
(
processWKSTransform
).
initiateWksTransformation
(
message
);
messageHandler
.
onMessageAsync
(
message
);
verify
(
subscriptionClient
,
times
(
1
)).
completeAsync
(
uuid
);
verify
(
processWKSTransform
,
times
(
1
)).
initiateWksTransformation
(
message
);
}
@Test
public
void
shouldInvokeAbandonAsync
()
{
doReturn
(
false
).
when
(
processWKSTransform
).
initiateWksTransformation
(
message
);
messageHandler
.
onMessageAsync
(
message
);
verify
(
subscriptionClient
,
times
(
1
)).
abandonAsync
(
uuid
);
verify
(
processWKSTransform
,
times
(
1
)).
initiateWksTransformation
(
message
);
}
}
provider/wks-azure/src/test/java/org/opengroup/osdu/wks/provider/azure/pubsub/ProcessWKSTransformTest.java
View file @
d8078b82
...
...
@@ -28,14 +28,13 @@ import org.opengroup.osdu.wks.exceptions.ApplicationException;
import
org.opengroup.osdu.wks.exceptions.BadRequestException
;
import
org.opengroup.osdu.wks.model.RawRecordDetails
;
import
org.opengroup.osdu.wks.provider.azure.di.AzureBootstrapConfig
;
import
org.opengroup.osdu.wks.provider.azure.utils.MDCContextMap
;
import
org.opengroup.osdu.wks.service.WKSService
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.Map
;
import
static
org
.
junit
.
jupiter
.
api
.
Assertions
.
assertFalse
;
import
static
org
.
junit
.
jupiter
.
api
.
Assertions
.
assertTrue
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
ArgumentMatchers
.
eq
;
import
static
org
.
mockito
.
Mockito
.
doNothing
;
...
...
@@ -55,6 +54,7 @@ public class ProcessWKSTransformTest {
private
static
final
String
noDataMessage
=
"{\"message\":{\"data\":[],\"account-id\":\"opendes\",\"data-partition-id\":\"opendes\",\"correlation-id\":\"908fcf8d-30c5-4c74-a0ae-ab47b48b7a85\"}}"
;
private
static
final
String
logPrefix
=
"log-prefix"
;
private
static
final
String
messageId
=
"message-id"
;
@InjectMocks
private
ProcessWKSTransform
processWKSTransform
;
...
...
@@ -70,6 +70,9 @@ public class ProcessWKSTransformTest {
@Mock
private
Slf4JLogger
logger
;
@Mock
private
MDCContextMap
mdcContextMap
;
@BeforeEach
public
void
init
()
{
...
...
@@ -79,13 +82,13 @@ public class ProcessWKSTransformTest {
}
@Test
public
void
shouldR
eturnTrueWhen
BadRequestExceptionInRetrieveDataFromMessage
()
{
public
void
shouldR
aise
BadRequestExceptionInRetrieveDataFromMessage
()
{
when
(
message
.
getMessageBody
()).
thenReturn
(
getMessageBody
(
emptyMessage
));
boolean
result
=
processWKSTransform
.
initiateWksTransformation
(
message
);
processWKSTransform
.
initiateWksTransformation
(
message
);
assertTrue
(
result
);
verify
(
mdcContextMap
,
times
(
1
)).
getContextMap
(
correlationId
,
dataPartitionId
);
verify
(
message
,
times
(
2
)).
getProperties
();
verify
(
message
,
times
(
1
)).
getMessageBody
();
verify
(
message
,
times
(
1
)).
getMessageId
();
...
...
@@ -94,13 +97,13 @@ public class ProcessWKSTransformTest {
}
@Test
public
void
shouldR
eturnTrueWhen
BadRequestExceptionInGetRecordsChangeData
()
{
public
void
shouldR
aise
BadRequestExceptionInGetRecordsChangeData
()
{
when
(
message
.
getMessageBody
()).
thenReturn
(
getMessageBody
(
noDataMessage
));
boolean
result
=
processWKSTransform
.
initiateWksTransformation
(
message
);
processWKSTransform
.
initiateWksTransformation
(
message
);
assertTrue
(
result
);
verify
(
mdcContextMap
,
times
(
1
)).
getContextMap
(
correlationId
,
dataPartitionId
);
verify
(
message
,
times
(
2
)).
getProperties
();
verify
(
message
,
times
(
1
)).
getMessageBody
();
verify
(
message
,
times
(
1
)).
getMessageId
();
...
...
@@ -109,14 +112,14 @@ public class ProcessWKSTransformTest {
}
@Test
public
void
shouldR
eturnFalseOn
ApplicationException
()
throws
BadRequestException
,
ApplicationException
{
public
void
shouldR
aise
ApplicationException
()
throws
BadRequestException
,
ApplicationException
{
when
(
message
.
getMessageBody
()).
thenReturn
(
getMessageBody
(
validMessage
));
doThrow
(
ApplicationException
.
class
).
when
(
wksService
).
transform
(
any
(
RawRecordDetails
[].
class
),
eq
(
dataPartitionId
),
eq
(
correlationId
));
boolean
result
=
processWKSTransform
.
initiateWksTransformation
(
message
);
processWKSTransform
.
initiateWksTransformation
(
message
);
assertFalse
(
result
);
verify
(
mdcContextMap
,
times
(
1
)).
getContextMap
(
correlationId
,
dataPartitionId
);
verify
(
message
,
times
(
2
)).
getProperties
();
verify
(
message
,
times
(
1
)).
getMessageBody
();
verify
(
message
,
times
(
1
)).
getMessageId
();
...
...
@@ -125,14 +128,14 @@ public class ProcessWKSTransformTest {
}
@Test
public
void
should
ReturnTrueOn
SuccessfulWKSTransform
()
throws
BadRequestException
,
ApplicationException
{
public
void
shouldSuccessful
lyPerform
WKSTransform
()
throws
BadRequestException
,
ApplicationException
{
when
(
message
.
getMessageBody
()).
thenReturn
(
getMessageBody
(
validMessage
));
doNothing
().
when
(
wksService
).
transform
(
any
(
RawRecordDetails
[].
class
),
eq
(
dataPartitionId
),
eq
(
correlationId
));
boolean
result
=
processWKSTransform
.
initiateWksTransformation
(
message
);
processWKSTransform
.
initiateWksTransformation
(
message
);
assertTrue
(
result
);
verify
(
mdcContextMap
,
times
(
1
)).
getContextMap
(
correlationId
,
dataPartitionId
);
verify
(
message
,
times
(
2
)).
getProperties
();
verify
(
message
,
times
(
1
)).
getMessageBody
();
}
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment