Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Data Flow
Data Enrichment
wks
Commits
a1d563f0
Commit
a1d563f0
authored
Jan 04, 2021
by
Hema Vishnu Pola [Microsoft]
Browse files
Merge branch 'azure/EventGridChanges' into 'master'
Azure: Taking Event Grid changes See merge request
!24
parents
0e1d3db0
cb561cf4
Pipeline
#20523
passed with stages
in 26 minutes and 23 seconds
Changes
3
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
devops/azure/chart/values.yaml
View file @
a1d563f0
...
...
@@ -13,8 +13,8 @@
# limitations under the License.
azure
:
servicebusSubscription
:
wkssubscription
servicebusTopic
:
recordstopic
servicebusSubscription
:
eg_sb_
wkssubscription
servicebusTopic
:
recordstopic
eg
storageContainer
:
osdu-wks-mappings
image
:
branch
:
master
...
...
provider/wks-azure/src/main/java/org/opengroup/osdu/wks/provider/azure/pubsub/ProcessWKSTransform.java
View file @
a1d563f0
...
...
@@ -26,6 +26,7 @@ public class ProcessWKSTransform {
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
ProcessWKSTransform
.
class
);
private
static
final
String
DATA
=
"data"
;
private
static
final
String
ID
=
"id"
;
@Autowired
private
WKSService
wKSService
;
...
...
@@ -38,14 +39,29 @@ public class ProcessWKSTransform {
public
void
initiateWksTransformation
(
IMessage
message
)
{
try
{
String
dataPartitionId
=
message
.
getProperties
().
get
(
DpsHeaders
.
DATA_PARTITION_ID
).
toString
();
String
correlationId
=
message
.
getProperties
().
get
(
DpsHeaders
.
CORRELATION_ID
).
toString
();
String
messageBody
=
new
String
(
message
.
getMessageBody
().
getBinaryData
().
get
(
0
),
UTF_8
);
JsonElement
jsonRoot
=
JsonParser
.
parseString
(
messageBody
);
JsonElement
messageData
=
jsonRoot
.
getAsJsonObject
().
get
(
DATA
);
String
messageId
=
jsonRoot
.
getAsJsonObject
().
get
(
ID
).
getAsString
();
message
.
setMessageId
(
messageId
);
String
dataPartitionId
=
messageData
.
getAsJsonObject
().
get
(
DpsHeaders
.
DATA_PARTITION_ID
).
getAsString
();
String
correlationId
=
messageData
.
getAsJsonObject
().
get
(
DpsHeaders
.
CORRELATION_ID
).
getAsString
();
MDC
.
setContextMap
(
mdcContextMap
.
getContextMap
(
correlationId
,
dataPartitionId
));
dpsHeaders
.
setThreadContext
(
dataPartitionId
,
correlationId
);
RawRecordDetails
[]
rawRecordDetails
=
retrieveDataFromMessage
(
message
);
String
dataPayload
=
messageData
.
getAsJsonObject
().
get
(
DATA
).
toString
();
RawRecordDetails
[]
rawRecordDetails
=
getRecordsChangeData
(
dataPayload
);
LOGGER
.
info
(
"Transformation started for message with id: "
+
message
.
getMessageId
());
wKSService
.
transform
(
rawRecordDetails
,
dataPartitionId
,
correlationId
);
LOGGER
.
info
(
"Transformation completed for message with id: "
+
message
.
getMessageId
());
}
catch
(
BadRequestException
e
)
{
LOGGER
.
error
(
"Bad Request Reason: {}, pubsub message id: {}"
,
e
.
getErrorMsg
(),
message
.
getMessageId
(),
e
);
...
...
@@ -63,21 +79,6 @@ public class ProcessWKSTransform {
MDC
.
clear
();
}
private
RawRecordDetails
[]
retrieveDataFromMessage
(
IMessage
message
)
throws
BadRequestException
{
LOGGER
.
info
(
"Transformation started for message with id: "
+
message
.
getMessageId
());
String
messageData
=
new
String
(
message
.
getMessageBody
().
getBinaryData
().
get
(
0
),
UTF_8
);
JsonElement
jsonRoot
=
JsonParser
.
parseString
(
messageData
);
JsonElement
msg
=
jsonRoot
.
getAsJsonObject
().
get
(
"message"
);
if
(
msg
==
null
)
{
throw
new
BadRequestException
(
HttpStatus
.
BAD_REQUEST
,
"Invalid record change message, message object not found"
);
}
String
dataValue
=
msg
.
getAsJsonObject
().
get
(
DATA
).
toString
();
return
getRecordsChangeData
(
dataValue
);
}
private
RawRecordDetails
[]
getRecordsChangeData
(
String
data
)
throws
BadRequestException
{
if
(
data
.
equals
(
"[]"
))
{
...
...
provider/wks-azure/src/test/java/org/opengroup/osdu/wks/provider/azure/pubsub/ProcessWKSTransformTest.java
View file @
a1d563f0
...
...
@@ -23,7 +23,6 @@ import org.mockito.InjectMocks;
import
org.mockito.Mock
;
import
org.mockito.Spy
;
import
org.mockito.junit.jupiter.MockitoExtension
;
import
org.opengroup.osdu.core.common.model.http.DpsHeaders
;
import
org.opengroup.osdu.wks.exceptions.ApplicationException
;
import
org.opengroup.osdu.wks.exceptions.BadRequestException
;
import
org.opengroup.osdu.wks.model.RawRecordDetails
;
...
...
@@ -32,8 +31,6 @@ import org.opengroup.osdu.wks.provider.azure.utils.MDCContextMap;
import
org.opengroup.osdu.wks.service.WKSService
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.Map
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
ArgumentMatchers
.
eq
;
...
...
@@ -50,9 +47,9 @@ public class ProcessWKSTransformTest {
private
static
final
String
dataPartitionId
=
"opendes"
;
private
static
final
String
correlationId
=
"908fcf8d-30c5-4c74-a0ae-ab47b48b7a85"
;
private
static
final
String
emptyMessage
=
"{}"
;
private
static
final
String
validMessage
=
"{\"
message
\":{\"data\":[{\"id\":\"opendes:
at
:wellbore-
b3Blbm
\",\"kind\":\"opendes:
a
t:wellbore:1.0.0\",\"op\":\"create\"}],\"account-id\":\"opendes\",\"data-partition-id\":\"opendes\",\"
correlation-id\":\"908fcf8d-30c5-4c74-a0ae-ab47b48b7a85
\"}
}
"
;
private
static
final
String
noDataMessage
=
"{\"
message\":{\"data\":[],\"account-id\":\"opendes
\",\"data-partition-id\":\"opendes\",\"
correlation-id\":\"908fcf8d-30c5-4c74-a0ae-ab47b48b7a85
\"}
}
"
;
private
static
final
String
messageId
=
"
message-id
"
;
private
static
final
String
validMessage
=
"{\"
id\":\"40cc96f5-85b9-4923-9a5b-c27f67a3e815\",\"subject\":\"RecordsChanged\",\"data
\":{\"data\":[{\"id\":\"opendes:
ww
:wellbore-
35b7eecfa2c35145053b150f47461bea630a.slb.wks.wellbore.2
\",\"kind\":\"opendes:
tes
t:wellbore
_har
:1.0.0\",\"op\":\"create\"}],\"account-id\":\"opendes\",\"
correlation-id\":\"908fcf8d-30c5-4c74-a0ae-ab47b48b7a85\",\"
data-partition-id\":\"opendes\"
}
,\"
eventType\":\"RecordsChanged\",\"dataVersion\":\"1.0\",\"metadataVersion\":\"1\",\"eventTime\":\"2020-12-09T13:52:57.292Z\",\"topic\":\"/subscriptions/929e9ae0-7bb1-4563-a200-9863fe27cae4/resourceGroups/osdu-mvp-idcmvpdp1-i2or-rg/providers/Microsoft.EventGrid/topics/osdu-mvp-idcmvpd-i2or-grid-recordstopic
\"}"
;
private
static
final
String
noDataMessage
=
"{\"
id\":\"40cc96f5-85b9-4923-9a5b-c27f67a3e815\",\"subject\":\"RecordsChanged\",\"data\":{\"data\":[],\"account-id\":\"opendes\",\"correlation-id\":\"908fcf8d-30c5-4c74-a0ae-ab47b48b7a85
\",\"data-partition-id\":\"opendes\"
}
,\"
eventType\":\"RecordsChanged\",\"dataVersion\":\"1.0\",\"metadataVersion\":\"1\",\"eventTime\":\"2020-12-09T13:52:57.292Z\",\"topic\":\"/subscriptions/929e9ae0-7bb1-4563-a200-9863fe27cae4/resourceGroups/osdu-mvp-idcmvpdp1-i2or-rg/providers/Microsoft.EventGrid/topics/osdu-mvp-idcmvpd-i2or-grid-recordstopic
\"}"
;
private
static
final
String
messageId
=
"
40cc96f5-85b9-4923-9a5b-c27f67a3e815
"
;
@InjectMocks
private
ProcessWKSTransform
processWKSTransform
;
...
...
@@ -71,10 +68,8 @@ public class ProcessWKSTransformTest {
@BeforeEach
public
void
init
()
{
when
(
message
.
getProperties
()).
thenReturn
(
getMessageProperties
());
lenient
().
when
(
message
.
getMessageId
()).
thenReturn
(
messageId
);
doNothing
().
when
(
dpsHeaders
).
setThreadContext
(
dataPartitionId
,
correlationId
);
lenient
().
doNothing
().
when
(
dpsHeaders
).
setThreadContext
(
dataPartitionId
,
correlationId
);
}
@Test
...
...
@@ -84,10 +79,8 @@ public class ProcessWKSTransformTest {
processWKSTransform
.
initiateWksTransformation
(
message
);
verify
(
mdcContextMap
,
times
(
1
)).
getContextMap
(
correlationId
,
dataPartitionId
);
verify
(
message
,
times
(
2
)).
getProperties
();
verify
(
message
,
times
(
1
)).
getMessageBody
();
verify
(
message
,
times
(
2
)).
getMessageId
();
verify
(
message
,
times
(
1
)).
getMessageId
();
}
@Test
...
...
@@ -98,9 +91,10 @@ public class ProcessWKSTransformTest {
processWKSTransform
.
initiateWksTransformation
(
message
);
verify
(
mdcContextMap
,
times
(
1
)).
getContextMap
(
correlationId
,
dataPartitionId
);
verify
(
message
,
times
(
2
)).
getProperties
();
verify
(
message
,
times
(
1
)).
getMessageBody
();
verify
(
message
,
times
(
2
)).
getMessageId
();
verify
(
message
,
times
(
1
)).
getMessageId
();
verify
(
message
,
times
(
1
)).
setMessageId
(
messageId
);
}
@Test
...
...
@@ -112,9 +106,9 @@ public class ProcessWKSTransformTest {
processWKSTransform
.
initiateWksTransformation
(
message
);
verify
(
mdcContextMap
,
times
(
1
)).
getContextMap
(
correlationId
,
dataPartitionId
);
verify
(
message
,
times
(
2
)).
getProperties
();
verify
(
message
,
times
(
1
)).
getMessageBody
();
verify
(
message
,
times
(
2
)).
getMessageId
();
verify
(
message
,
times
(
1
)).
setMessageId
(
messageId
);
}
@Test
...
...
@@ -126,15 +120,8 @@ public class ProcessWKSTransformTest {
processWKSTransform
.
initiateWksTransformation
(
message
);
verify
(
mdcContextMap
,
times
(
1
)).
getContextMap
(
correlationId
,
dataPartitionId
);
verify
(
message
,
times
(
2
)).
getProperties
();
verify
(
message
,
times
(
1
)).
getMessageBody
();
}
private
Map
<
String
,
Object
>
getMessageProperties
()
{
Map
<
String
,
Object
>
properties
=
new
HashMap
<>();
properties
.
put
(
DpsHeaders
.
DATA_PARTITION_ID
,
dataPartitionId
);
properties
.
put
(
DpsHeaders
.
CORRELATION_ID
,
correlationId
);
return
properties
;
verify
(
message
,
times
(
1
)).
setMessageId
(
messageId
);
}
private
MessageBody
getMessageBody
(
String
messageValue
)
{
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment