Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Data Flow
Data Enrichment
wks
Commits
35a0a15b
Commit
35a0a15b
authored
Feb 16, 2021
by
SGupta79
Browse files
Code formatting
parent
c0c0f517
Pipeline
#26837
passed with stages
in 32 minutes and 35 seconds
Changes
4
Pipelines
4
Hide whitespace changes
Inline
Side-by-side
provider/wks-azure/src/main/java/org/opengroup/osdu/wks/provider/azure/pubsub/ProcessWKSTransform.java
View file @
35a0a15b
...
...
@@ -27,107 +27,103 @@ import static java.nio.charset.StandardCharsets.UTF_8;
@Component
public
class
ProcessWKSTransform
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
ProcessWKSTransform
.
class
);
private
static
final
String
DATA
=
"data"
;
private
static
final
String
ID
=
"id"
;
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
ProcessWKSTransform
.
class
);
private
static
final
String
DATA
=
"data"
;
private
static
final
String
ID
=
"id"
;
@Autowired
private
WKSService
wKSService
;
@Autowired
private
WKSService
wKSService
;
@Autowired
private
MDCContextMap
mdcContextMap
;
@Autowired
private
MDCContextMap
mdcContextMap
;
@Autowired
private
ThreadDpsHeaders
dpsHeaders
;
@Autowired
private
ThreadDpsHeaders
dpsHeaders
;
@Autowired
private
SliLogger
sliLogger
;
@Autowired
private
SliLogger
sliLogger
;
public
void
initiateWksTransformation
(
IMessage
message
)
{
public
void
initiateWksTransformation
(
IMessage
message
)
{
String
dataPartitionId
=
null
;
String
correlationId
=
null
;
String
dataPartitionId
=
null
;
String
correlationId
=
null
;
boolean
transformationSucceeded
=
false
;
boolean
transformationSucceeded
=
false
;
try
{
try
{
String
messageBody
=
new
String
(
message
.
getMessageBody
().
getBinaryData
().
get
(
0
),
UTF_8
);
JsonElement
jsonRoot
=
JsonParser
.
parseString
(
messageBody
);
JsonElement
messageData
=
jsonRoot
.
getAsJsonObject
().
get
(
DATA
);
String
messageBody
=
new
String
(
message
.
getMessageBody
().
getBinaryData
().
get
(
0
),
UTF_8
);
JsonElement
jsonRoot
=
JsonParser
.
parseString
(
messageBody
);
JsonElement
messageData
=
jsonRoot
.
getAsJsonObject
().
get
(
DATA
);
String
messageId
=
jsonRoot
.
getAsJsonObject
().
get
(
ID
).
getAsString
();
message
.
setMessageId
(
messageId
);
String
messageId
=
jsonRoot
.
getAsJsonObject
().
get
(
ID
).
getAsString
();
message
.
setMessageId
(
messageId
);
dataPartitionId
=
messageData
.
getAsJsonObject
().
get
(
DpsHeaders
.
DATA_PARTITION_ID
).
getAsString
();
correlationId
=
messageData
.
getAsJsonObject
().
get
(
DpsHeaders
.
CORRELATION_ID
).
getAsString
();
dataPartitionId
=
messageData
.
getAsJsonObject
().
get
(
DpsHeaders
.
DATA_PARTITION_ID
).
getAsString
();
correlationId
=
messageData
.
getAsJsonObject
().
get
(
DpsHeaders
.
CORRELATION_ID
).
getAsString
();
MDC
.
setContextMap
(
mdcContextMap
.
getContextMap
(
correlationId
,
dataPartitionId
));
dpsHeaders
.
setThreadContext
(
dataPartitionId
,
correlationId
);
MDC
.
setContextMap
(
mdcContextMap
.
getContextMap
(
correlationId
,
dataPartitionId
));
dpsHeaders
.
setThreadContext
(
dataPartitionId
,
correlationId
);
String
dataPayload
=
messageData
.
getAsJsonObject
().
get
(
DATA
).
toString
();
RawRecordDetails
[]
rawRecordDetails
=
getRecordsChangeData
(
dataPayload
);
String
dataPayload
=
messageData
.
getAsJsonObject
().
get
(
DATA
).
toString
();
RawRecordDetails
[]
rawRecordDetails
=
getRecordsChangeData
(
dataPayload
);
LOGGER
.
info
(
"Transformation started for message with id: {} containing {} Records"
,
message
.
getMessageId
(),
rawRecordDetails
.
length
);
LOGGER
.
info
(
"Transformation started for message with id: {} containing {} Records"
,
message
.
getMessageId
(),
rawRecordDetails
.
length
);
wKSService
.
transform
(
rawRecordDetails
,
dataPartitionId
,
correlationId
);
wKSService
.
transform
(
rawRecordDetails
,
dataPartitionId
,
correlationId
);
LOGGER
.
info
(
"Transformation completed for message with id: {}"
,
message
.
getMessageId
());
transformationSucceeded
=
true
;
LOGGER
.
info
(
"Transformation completed for message with id: {}"
,
message
.
getMessageId
());
transformationSucceeded
=
true
;
}
catch
(
BadRequestException
e
)
{
LOGGER
.
warn
(
"Bad Request Reason: {}, pubsub message id: {}"
,
e
.
getErrorMsg
(),
message
.
getMessageId
(),
e
);
sliLogger
.
writeLog
(
SliLoggerPayload
.
builder
().
status
(
false
).
recordId
(
message
.
getMessageId
())
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
BAD_REQUEST
.
value
()).
build
());
}
catch
(
BadRequestException
e
)
{
LOGGER
.
warn
(
"Bad Request Reason: {}, pubsub message id: {}"
,
e
.
getErrorMsg
(),
message
.
getMessageId
(),
e
);
sliLogger
.
writeLog
(
SliLoggerPayload
.
builder
().
status
(
false
).
recordId
(
message
.
getMessageId
())
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
BAD_REQUEST
.
value
()).
build
());
}
catch
(
ApplicationException
e
)
{
LOGGER
.
warn
(
"Application Error Reason: {}, pubsub message id: {}"
,
e
.
getErrorMsg
(),
message
.
getMessageId
(),
e
);
sliLogger
.
writeLog
(
SliLoggerPayload
.
builder
().
status
(
false
).
recordId
(
message
.
getMessageId
())
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
INTERNAL_SERVER_ERROR
.
value
()).
build
());
}
catch
(
ApplicationException
e
)
{
LOGGER
.
warn
(
"Application Error Reason: {}, pubsub message id: {}"
,
e
.
getErrorMsg
(),
message
.
getMessageId
(),
e
);
sliLogger
.
writeLog
(
SliLoggerPayload
.
builder
().
status
(
false
).
recordId
(
message
.
getMessageId
())
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
INTERNAL_SERVER_ERROR
.
value
()).
build
());
}
catch
(
NullPointerException
e
)
{
}
catch
(
NullPointerException
e
)
{
LOGGER
.
error
(
"Invalid format for message with id: {}"
,
message
.
getMessageId
(),
e
);
sliLogger
.
writeLog
(
SliLoggerPayload
.
builder
().
status
(
false
).
recordId
(
message
.
getMessageId
())
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
INTERNAL_SERVER_ERROR
.
value
()).
build
());
LOGGER
.
error
(
"Invalid format for message with id: {}"
,
message
.
getMessageId
(),
e
);
sliLogger
.
writeLog
(
SliLoggerPayload
.
builder
().
status
(
false
).
recordId
(
message
.
getMessageId
())
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
INTERNAL_SERVER_ERROR
.
value
()).
build
());
}
catch
(
JsonSyntaxException
e
)
{
LOGGER
.
error
(
"Error occurred in payload parsing with id: {}"
,
message
.
getMessageId
(),
e
);
sliLogger
.
writeLog
(
SliLoggerPayload
.
builder
().
status
(
false
).
recordId
(
message
.
getMessageId
())
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
INTERNAL_SERVER_ERROR
.
value
()).
build
());
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"Exception encountered processing the message with id: {}"
,
message
.
getMessageId
(),
e
);
sliLogger
.
writeLog
(
SliLoggerPayload
.
builder
().
status
(
false
).
recordId
(
message
.
getMessageId
())
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
INTERNAL_SERVER_ERROR
.
value
()).
build
());
}
catch
(
JsonSyntaxException
e
)
{
LOGGER
.
error
(
"Error occurred in payload parsing with id: {}"
,
message
.
getMessageId
(),
e
);
sliLogger
.
writeLog
(
SliLoggerPayload
.
builder
().
status
(
false
).
recordId
(
message
.
getMessageId
())
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
INTERNAL_SERVER_ERROR
.
value
()).
build
());
}
}
catch
(
Exception
e
)
{
if
(!
transformationSucceeded
)
{
LOGGER
.
info
(
"Transformation failed for message with id: {}"
,
message
.
getMessageId
());
}
LOGGER
.
error
(
"Exception encountered processing the message with id: {}"
,
message
.
getMessageId
(),
e
);
sliLogger
.
writeLog
(
SliLoggerPayload
.
builder
().
status
(
false
).
recordId
(
message
.
getMessageId
())
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
INTERNAL_SERVER_ERROR
.
value
()).
build
());
ThreadScopeContextHolder
.
getContext
().
clear
();
MDC
.
clear
();
}
}
private
RawRecordDetails
[]
getRecordsChangeData
(
String
data
)
throws
BadRequestException
{
if
(!
transformationSucceeded
)
{
LOGGER
.
info
(
"Transformation failed for message with id: {}"
,
message
.
getMessageId
());
}
if
(
data
.
equals
(
"[]"
))
{
throw
new
BadRequestException
(
HttpStatus
.
BAD_REQUEST
,
"Invalid record change message, message data not found"
);
}
ThreadScopeContextHolder
.
getContext
().
clear
();
MDC
.
clear
();
}
private
RawRecordDetails
[]
getRecordsChangeData
(
String
data
)
throws
BadRequestException
{
if
(
data
.
equals
(
"[]"
))
{
throw
new
BadRequestException
(
HttpStatus
.
BAD_REQUEST
,
"Invalid record change message, message data not found"
);
}
Gson
gson
=
new
Gson
();
return
gson
.
fromJson
(
data
,
RawRecordDetails
[].
class
);
}
Gson
gson
=
new
Gson
();
return
gson
.
fromJson
(
data
,
RawRecordDetails
[].
class
);
}
}
wks-core/src/main/java/org/opengroup/osdu/wks/logging/SliLogger.java
View file @
35a0a15b
...
...
@@ -31,7 +31,7 @@ public class SliLogger {
jsonPayload
.
put
(
DpsHeaders
.
CORRELATION_ID
,
payload
.
getCorrelationId
());
return
jsonPayload
;
}
private
Map
<
String
,
Object
>
createSliPayload
(
SliLoggerPayload
payload
)
{
Map
<
String
,
Object
>
jsonPayload
=
new
HashMap
<>();
jsonPayload
.
put
(
"status"
,
payload
.
getStatus
());
...
...
wks-core/src/main/java/org/opengroup/osdu/wks/logging/SliLoggerPayload.java
View file @
35a0a15b
...
...
@@ -6,10 +6,10 @@ import lombok.Data;
@Data
@Builder
public
class
SliLoggerPayload
{
private
String
recordId
;
private
int
statusCode
;
private
Boolean
status
;
private
String
dataPartitionId
;
private
String
correlationId
;
private
String
recordId
;
private
int
statusCode
;
private
Boolean
status
;
private
String
dataPartitionId
;
private
String
correlationId
;
}
wks-core/src/main/java/org/opengroup/osdu/wks/service/WKSServiceImpl.java
View file @
35a0a15b
...
...
@@ -64,10 +64,9 @@ public class WKSServiceImpl implements WKSService {
@Autowired
private
SchemaService
schemaService
;
@Autowired
private
SliLogger
sliLogger
;
/**
* This method applies the WKS transformation on the valid raw records
...
...
@@ -87,7 +86,7 @@ public class WKSServiceImpl implements WKSService {
RawRecordDetails
[]
rawRecordsDetailsForTransformation
=
findValidRawRecordsDetailsForTransformation
(
rawRecordsDetails
);
if
(
rawRecordsDetailsForTransformation
.
length
>
0
)
{
if
(
rawRecordsDetailsForTransformation
.
length
>
0
)
{
Map
<
String
,
List
<
MappingsModel
>>
mappingsMap
=
retrieveMappingsMap
(
rawRecordsDetailsForTransformation
);
// Updating mappings with latest minor and patch versions
updateMappingsWithLatestMinorAndPatchVersions
(
mappingsMap
);
...
...
@@ -114,8 +113,7 @@ public class WKSServiceImpl implements WKSService {
}
if
(
invalidOpCodeCount
>
0
)
{
LOGGER
.
info
(
Constants
.
NOT_CREATE_OR_UPDATE_OPERATION
,
invalidOpCodeCount
,
rawRecordsDetails
.
length
);
LOGGER
.
info
(
Constants
.
NOT_CREATE_OR_UPDATE_OPERATION
,
invalidOpCodeCount
,
rawRecordsDetails
.
length
);
}
if
(
invalidSourceCount
>
0
)
{
...
...
@@ -125,9 +123,8 @@ public class WKSServiceImpl implements WKSService {
return
rawRecordsDetailsList
.
toArray
(
new
RawRecordDetails
[
0
]);
}
private
void
transformRecords
(
RawRecordDetails
[]
rawRecordsDetails
,
Map
<
String
,
List
<
MappingsModel
>>
mappingsMap
,
String
dataPartitionId
,
String
correlationId
)
throws
ApplicationException
{
String
dataPartitionId
,
String
correlationId
)
throws
ApplicationException
{
List
<
Tree
>
wksRecordTreeList
=
new
ArrayList
<>();
List
<
RelationshipStatus
>
relationshipStatusList
=
new
ArrayList
<>();
for
(
RawRecordDetails
rawRecord
:
rawRecordsDetails
)
{
...
...
@@ -137,7 +134,7 @@ public class WKSServiceImpl implements WKSService {
LOGGER
.
info
(
Constants
.
TRANSFORMATION_STARTED
,
rawRecord
.
getId
(),
mappingsList
.
size
());
Tree
rawRecordTree
=
retrieveRawRecordTree
(
rawRecord
);
for
(
MappingsModel
mappings:
mappingsList
)
{
for
(
MappingsModel
mappings
:
mappingsList
)
{
try
{
TransformRequest
transformRequest
=
new
TransformRequest
(
rawRecordTree
,
mappings
);
...
...
@@ -157,28 +154,26 @@ public class WKSServiceImpl implements WKSService {
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
OK
.
value
()).
build
());
}
catch
(
ApplicationException
e
)
{
LOGGER
.
warn
(
Constants
.
TRANSFORMATION_FAILED_FOR_GIVEN_WKS_KIND
,
rawRecord
.
getId
(),
mappings
.
getTargetSchemaKind
(),
e
.
getErrorMsg
(),
e
);
}
catch
(
ApplicationException
e
)
{
LOGGER
.
warn
(
Constants
.
TRANSFORMATION_FAILED_FOR_GIVEN_WKS_KIND
,
rawRecord
.
getId
(),
mappings
.
getTargetSchemaKind
(),
e
.
getErrorMsg
(),
e
);
sliLogger
.
writeLog
(
SliLoggerPayload
.
builder
().
status
(
false
).
recordId
(
rawRecord
.
getId
())
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
INTERNAL_SERVER_ERROR
.
value
()).
build
());
}
catch
(
Exception
e
)
{
LOGGER
.
warn
(
Constants
.
TRANSFORMATION_FAILED_FOR_GIVEN_WKS_KIND
,
rawRecord
.
getId
(),
mappings
.
getTargetSchemaKind
(),
e
.
getMessage
(),
e
);
}
catch
(
Exception
e
)
{
LOGGER
.
warn
(
Constants
.
TRANSFORMATION_FAILED_FOR_GIVEN_WKS_KIND
,
rawRecord
.
getId
(),
mappings
.
getTargetSchemaKind
(),
e
.
getMessage
(),
e
);
}
}
}
catch
(
ApplicationException
e
)
{
LOGGER
.
warn
(
Constants
.
TRANFORMATION_FAILED
,
rawRecord
.
getId
(),
e
.
getErrorMsg
(),
e
);
sliLogger
.
writeLog
(
SliLoggerPayload
.
builder
().
status
(
false
).
recordId
(
rawRecord
.
getId
())
.
correlationId
(
correlationId
).
dataPartitionId
(
dataPartitionId
)
.
statusCode
(
HttpStatus
.
INTERNAL_SERVER_ERROR
.
value
()).
build
());
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
LOGGER
.
warn
(
Constants
.
TRANFORMATION_FAILED
,
rawRecord
.
getId
(),
e
.
getMessage
(),
e
);
}
}
...
...
@@ -187,19 +182,21 @@ public class WKSServiceImpl implements WKSService {
saveAllWksRecords
(
wksRecordTreeList
,
dataPartitionId
,
correlationId
);
LOGGER
.
info
(
"Successfully saved {} WKS records in storage"
,
wksRecordTreeList
.
size
());
if
(!
relationshipStatusList
.
isEmpty
())
{
if
(!
relationshipStatusList
.
isEmpty
())
{
statusStoreService
.
createOrUpdateStoreEntry
(
relationshipStatusList
);
LOGGER
.
info
(
"Successfully saved relationship status for records for which WKS were not found"
);
}
}
}
private
void
updateMappingsWithLatestMinorAndPatchVersions
(
Map
<
String
,
List
<
MappingsModel
>>
mappingsMap
)
throws
ApplicationException
{
private
void
updateMappingsWithLatestMinorAndPatchVersions
(
Map
<
String
,
List
<
MappingsModel
>>
mappingsMap
)
throws
ApplicationException
{
for
(
Map
.
Entry
<
String
,
List
<
MappingsModel
>>
entry
:
mappingsMap
.
entrySet
())
{
List
<
MappingsModel
>
mappingsModelList
=
new
ArrayList
<>();
for
(
MappingsModel
mappingsModel
:
entry
.
getValue
())
{
List
<
String
>
schemaKindsWithLatestMinorAndPatchVersions
=
schemaService
.
getSchemas
(
mappingsModel
.
getTargetSchemaKind
());
List
<
String
>
schemaKindsWithLatestMinorAndPatchVersions
=
schemaService
.
getSchemas
(
mappingsModel
.
getTargetSchemaKind
());
mappingsModel
.
setTargetSchemaKind
(
schemaKindsWithLatestMinorAndPatchVersions
.
get
(
0
));
mappingsModelList
.
add
(
mappingsModel
);
}
...
...
@@ -213,8 +210,8 @@ public class WKSServiceImpl implements WKSService {
requestIdentity
.
setCorrelationId
(
correlationId
);
}
private
List
<
MappingsModel
>
retrieveMappings
(
Map
<
String
,
List
<
MappingsModel
>>
mappingsMap
,
RawRecordDetails
rawRecord
)
throws
ApplicationException
{
private
List
<
MappingsModel
>
retrieveMappings
(
Map
<
String
,
List
<
MappingsModel
>>
mappingsMap
,
RawRecordDetails
rawRecord
)
throws
ApplicationException
{
List
<
MappingsModel
>
mappings
=
mappingsMap
.
get
(
rawRecord
.
getKind
());
if
(
isEmptyMappings
(
mappings
))
{
LOGGER
.
warn
(
Constants
.
NO_MAPPING_FOR_KIND
,
rawRecord
.
getKind
());
...
...
@@ -232,13 +229,12 @@ public class WKSServiceImpl implements WKSService {
private
void
saveAllWksRecords
(
List
<
Tree
>
wksRecordTreeList
,
String
dataPartitionId
,
String
correlationId
)
throws
ApplicationException
{
String
message
=
this
.
storageService
.
putRecords
(
convertTreeListToJsonStrArr
(
wksRecordTreeList
),
dataPartitionId
,
correlationId
);
String
message
=
this
.
storageService
.
putRecords
(
convertTreeListToJsonStrArr
(
wksRecordTreeList
),
dataPartitionId
,
correlationId
);
LOGGER
.
info
(
message
);
}
private
Tree
retrieveRawRecordTree
(
RawRecordDetails
rawRecord
)
throws
ApplicationException
,
BadRequestException
{
private
Tree
retrieveRawRecordTree
(
RawRecordDetails
rawRecord
)
throws
ApplicationException
,
BadRequestException
{
Optional
<
String
>
optionalRawRecord
=
storageService
.
getRecord
(
rawRecord
.
getId
());
if
(!
optionalRawRecord
.
isPresent
())
{
...
...
@@ -259,11 +255,12 @@ public class WKSServiceImpl implements WKSService {
return
mappings
==
null
||
mappings
.
isEmpty
();
}
private
Map
<
String
,
List
<
MappingsModel
>>
retrieveMappingsMap
(
RawRecordDetails
[]
rawRecordsDetails
)
throws
ApplicationException
{
private
Map
<
String
,
List
<
MappingsModel
>>
retrieveMappingsMap
(
RawRecordDetails
[]
rawRecordsDetails
)
throws
ApplicationException
{
Map
<
String
,
List
<
MappingsModel
>>
mappingsMap
=
new
HashMap
<>();
for
(
RawRecordDetails
recordDetails
:
rawRecordsDetails
)
{
if
(
mappingsMap
.
get
(
recordDetails
.
getKind
())
==
null
)
{
for
(
RawRecordDetails
recordDetails
:
rawRecordsDetails
)
{
if
(
mappingsMap
.
get
(
recordDetails
.
getKind
())
==
null
)
{
List
<
MappingsModel
>
mappings
=
this
.
mappingService
.
getMapping
(
recordDetails
.
getKind
());
mappingsMap
.
put
(
recordDetails
.
getKind
(),
mappings
);
}
...
...
@@ -318,10 +315,10 @@ public class WKSServiceImpl implements WKSService {
private
boolean
isParametersToBeUpdated
(
RelationshipStatus
relationshipStatus
)
{
return
relationshipStatus
!=
null
&&
relationshipStatus
.
getRawRecordId
()
!=
null
;
}
private
String
[]
convertTreeListToJsonStrArr
(
List
<
Tree
>
wksRecordTreeList
)
{
List
<
String
>
wksRecordsJsonStrList
=
new
ArrayList
<>();
for
(
Tree
tree
:
wksRecordTreeList
)
{
for
(
Tree
tree
:
wksRecordTreeList
)
{
wksRecordsJsonStrList
.
add
(
tree
.
toString
());
}
return
wksRecordsJsonStrList
.
toArray
(
new
String
[
0
]);
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment