Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Data Flow
Data Ingestion
Ingestion Workflow
Commits
8ec67579
Commit
8ec67579
authored
Sep 23, 2021
by
Aalekh Jain
Browse files
Refactoring
parent
4a8facd4
Pipeline
#67378
failed with stages
in 68 minutes and 53 seconds
Changes
3
Pipelines
3
Hide whitespace changes
Inline
Side-by-side
provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowMetadataRepository.java
View file @
8ec67579
...
...
@@ -3,51 +3,58 @@ package org.opengroup.osdu.workflow.provider.azure.repository;
import
com.azure.cosmos.CosmosException
;
import
com.azure.cosmos.models.CosmosQueryRequestOptions
;
import
com.azure.cosmos.models.SqlQuerySpec
;
import
lombok.RequiredArgsConstructor
;
import
lombok.extern.slf4j.Slf4j
;
import
org.opengroup.osdu.azure.cosmosdb.CosmosStore
;
import
org.opengroup.osdu.core.common.logging.JaxRsDpsLog
;
import
org.opengroup.osdu.core.common.model.http.AppException
;
import
org.opengroup.osdu.core.common.model.http.DpsHeaders
;
import
org.opengroup.osdu.workflow.exception.ResourceConflictException
;
import
org.opengroup.osdu.workflow.exception.WorkflowNotFoundException
;
import
org.opengroup.osdu.workflow.model.WorkflowMetadata
;
import
org.opengroup.osdu.workflow.provider.azure.config.AzureWorkflowEngineConfig
;
import
org.opengroup.osdu.workflow.provider.azure.config.CosmosConfig
;
import
org.opengroup.osdu.workflow.provider.azure.model.WorkflowMetadataDoc
;
import
org.opengroup.osdu.workflow.provider.interfaces.IWorkflowMetadataRepository
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Repository
;
import
java.util.List
;
import
java.util.Optional
;
@Component
public
class
WorkflowMetadataRepository
extends
BaseWorkflowMetadataRepository
implements
IWorkflowMetadataRepository
{
import
static
org
.
opengroup
.
osdu
.
workflow
.
provider
.
azure
.
utils
.
WorkflowMetadataUtils
.*;
@Slf4j
@Repository
@RequiredArgsConstructor
public
class
WorkflowMetadataRepository
implements
IWorkflowMetadataRepository
{
private
static
final
String
LOGGER_NAME
=
WorkflowMetadataRepository
.
class
.
getName
();
private
static
final
String
KEY_DAG_CONTENT
=
"dagContent"
;
private
static
final
String
KEY_DAG_TYPE
=
"dagType"
;
@Autowired
private
CosmosConfig
cosmosConfig
;
private
final
CosmosConfig
cosmosConfig
;
@Autowired
private
CosmosStore
cosmosStore
;
private
final
CosmosStore
cosmosStore
;
@Autowired
private
DpsHeaders
dpsHeaders
;
private
final
DpsHeaders
dpsHeaders
;
@Autowired
private
JaxRsDpsLog
logger
;
private
final
JaxRsDpsLog
logger
;
@Autowired
private
AzureWorkflowEngineConfig
workflowEngineConfig
;
private
final
AzureWorkflowEngineConfig
workflowEngineConfig
;
@Override
public
WorkflowMetadata
createWorkflow
(
final
WorkflowMetadata
workflowMetadata
)
{
final
WorkflowMetadataDoc
workflowMetadataDoc
=
buildWorkflowMetadataDoc
(
workflowMetadata
);
final
WorkflowMetadataDoc
workflowMetadataDoc
=
buildWorkflowMetadataDoc
(
workflowEngineConfig
,
workflowMetadata
);
try
{
cosmosStore
.
createItem
(
dpsHeaders
.
getPartitionId
(),
cosmosConfig
.
getDatabase
(),
cosmosConfig
.
getWorkflowMetadataCollection
(),
workflowMetadataDoc
.
getPartitionKey
(),
workflowMetadataDoc
);
}
catch
(
AppException
e
)
{
handleCreateWorkflowException
(
e
,
workflowMetadataDoc
.
getWorkflowName
());
String
workflowName
=
workflowMetadataDoc
.
getWorkflowName
();
if
(
e
.
getError
().
getCode
()
==
409
)
{
final
String
errorMessage
=
String
.
format
(
"Workflow with name %s already exists"
,
workflowName
);
logger
.
error
(
LOGGER_NAME
,
errorMessage
);
throw
new
ResourceConflictException
(
workflowName
,
errorMessage
);
}
else
{
throw
e
;
}
}
return
buildWorkflowMetadata
(
workflowMetadataDoc
);
}
...
...
@@ -64,7 +71,9 @@ public class WorkflowMetadataRepository extends BaseWorkflowMetadataRepository i
WorkflowMetadataDoc
.
class
);
if
(
null
==
workflowMetadataDoc
||
!
workflowMetadataDoc
.
isPresent
())
{
handleGetWorkflowException
(
workflowName
);
final
String
errorMessage
=
String
.
format
(
"Workflow: %s doesn't exist"
,
workflowName
);
logger
.
error
(
LOGGER_NAME
,
errorMessage
);
throw
new
WorkflowNotFoundException
(
errorMessage
);
}
return
buildWorkflowMetadata
(
workflowMetadataDoc
.
get
());
}
...
...
provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/repository/WorkflowSystemMetadataRepository.java
View file @
8ec67579
...
...
@@ -2,41 +2,38 @@ package org.opengroup.osdu.workflow.provider.azure.repository;
import
com.azure.cosmos.models.CosmosQueryRequestOptions
;
import
com.azure.cosmos.models.SqlQuerySpec
;
import
lombok.RequiredArgsConstructor
;
import
lombok.extern.slf4j.Slf4j
;
import
org.opengroup.osdu.azure.cosmosdb.CosmosStore
;
import
org.opengroup.osdu.core.common.logging.JaxRsDpsLog
;
import
org.opengroup.osdu.core.common.model.http.AppException
;
import
org.opengroup.osdu.core.common.model.http.DpsHeaders
;
import
org.opengroup.osdu.workflow.exception.ResourceConflictException
;
import
org.opengroup.osdu.workflow.exception.WorkflowNotFoundException
;
import
org.opengroup.osdu.workflow.model.WorkflowMetadata
;
import
org.opengroup.osdu.workflow.provider.azure.config.AzureWorkflowEngineConfig
;
import
org.opengroup.osdu.workflow.provider.azure.config.CosmosConfig
;
import
org.opengroup.osdu.workflow.provider.azure.model.WorkflowMetadataDoc
;
import
org.opengroup.osdu.workflow.provider.interfaces.IWorkflowSystemMetadataRepository
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Repository
;
import
java.util.List
;
import
java.util.Optional
;
@Component
public
class
WorkflowSystemMetadataRepository
extends
BaseWorkflowMetadataRepository
implements
IWorkflowSystemMetadataRepository
{
private
static
final
String
LOGGER_NAME
=
WorkflowMetadataRepository
.
class
.
getName
();
private
static
final
String
KEY_DAG_CONTENT
=
"dagContent"
;
private
static
final
String
KEY_DAG_TYPE
=
"dagType"
;
import
static
org
.
opengroup
.
osdu
.
workflow
.
provider
.
azure
.
utils
.
WorkflowMetadataUtils
.*;
@Autowired
private
CosmosConfig
cosmosConfig
;
@Slf4j
@Repository
@RequiredArgsConstructor
public
class
WorkflowSystemMetadataRepository
implements
IWorkflowSystemMetadataRepository
{
private
static
final
String
LOGGER_NAME
=
WorkflowMetadataRepository
.
class
.
getName
();
@Autowired
private
CosmosStore
cosmosStore
;
private
final
CosmosConfig
cosmosConfig
;
@Autowired
private
DpsHeaders
dpsHeaders
;
private
final
CosmosStore
cosmosStore
;
@Autowired
private
JaxRsDpsLog
logger
;
private
final
JaxRsDpsLog
logger
;
@Autowired
private
AzureWorkflowEngineConfig
workflowEngineConfig
;
private
final
AzureWorkflowEngineConfig
workflowEngineConfig
;
@Override
public
WorkflowMetadata
getSystemWorkflow
(
final
String
workflowName
)
{
...
...
@@ -49,14 +46,16 @@ public class WorkflowSystemMetadataRepository extends BaseWorkflowMetadataReposi
WorkflowMetadataDoc
.
class
);
if
(
null
==
workflowSystemMetadataDoc
||
!
workflowSystemMetadataDoc
.
isPresent
())
{
handleGetWorkflowException
(
workflowName
);
final
String
errorMessage
=
String
.
format
(
"Workflow: %s doesn't exist"
,
workflowName
);
logger
.
error
(
LOGGER_NAME
,
errorMessage
);
throw
new
WorkflowNotFoundException
(
errorMessage
);
}
return
buildWorkflowMetadata
(
workflowSystemMetadataDoc
.
get
());
}
@Override
public
WorkflowMetadata
createSystemWorkflow
(
final
WorkflowMetadata
workflowMetadata
)
{
final
WorkflowMetadataDoc
workflowMetadataDoc
=
buildWorkflowMetadataDoc
(
workflowMetadata
);
final
WorkflowMetadataDoc
workflowMetadataDoc
=
buildWorkflowMetadataDoc
(
workflowEngineConfig
,
workflowMetadata
);
try
{
cosmosStore
.
createItem
(
cosmosConfig
.
getSystemdatabase
(),
...
...
@@ -65,7 +64,14 @@ public class WorkflowSystemMetadataRepository extends BaseWorkflowMetadataReposi
workflowMetadataDoc
);
}
catch
(
AppException
e
)
{
handleCreateWorkflowException
(
e
,
workflowMetadataDoc
.
getWorkflowName
());
String
workflowName
=
workflowMetadataDoc
.
getWorkflowName
();
if
(
e
.
getError
().
getCode
()
==
409
)
{
final
String
errorMessage
=
String
.
format
(
"Workflow with name %s already exists"
,
workflowName
);
logger
.
error
(
LOGGER_NAME
,
errorMessage
);
throw
new
ResourceConflictException
(
workflowName
,
errorMessage
);
}
else
{
throw
e
;
}
}
return
buildWorkflowMetadata
(
workflowMetadataDoc
);
}
...
...
provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/
repository/Base
WorkflowMetadata
Repository
.java
→
provider/workflow-azure/src/main/java/org/opengroup/osdu/workflow/provider/azure/
utils/
WorkflowMetadata
Utils
.java
View file @
8ec67579
package
org.opengroup.osdu.workflow.provider.azure.
repository
;
package
org.opengroup.osdu.workflow.provider.azure.
utils
;
import
com.azure.cosmos.models.SqlParameter
;
import
com.azure.cosmos.models.SqlQuerySpec
;
import
org.opengroup.osdu.core.common.logging.JaxRsDpsLog
;
import
org.opengroup.osdu.core.common.model.http.AppException
;
import
org.opengroup.osdu.workflow.exception.ResourceConflictException
;
import
org.opengroup.osdu.workflow.exception.WorkflowNotFoundException
;
import
org.opengroup.osdu.workflow.model.WorkflowMetadata
;
import
org.opengroup.osdu.workflow.provider.azure.config.AzureWorkflowEngineConfig
;
import
org.opengroup.osdu.workflow.provider.azure.model.WorkflowMetadataDoc
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
public
class
BaseWorkflowMetadataRepository
{
private
static
final
String
LOGGER_NAME
=
BaseWorkflowMetadataRepository
.
class
.
getName
();
@Autowired
private
JaxRsDpsLog
logger
;
@Autowired
private
AzureWorkflowEngineConfig
workflowEngineConfig
;
public
class
WorkflowMetadataUtils
{
protected
static
final
String
KEY_DAG_CONTENT
=
"dagContent"
;
p
rotected
WorkflowMetadataDoc
buildWorkflowMetadataDoc
(
final
WorkflowMetadata
workflowMetadata
)
{
p
ublic
static
WorkflowMetadataDoc
buildWorkflowMetadataDoc
(
AzureWorkflowEngineConfig
workflowEngineConfig
,
final
WorkflowMetadata
workflowMetadata
)
{
// If we need to save multiple versions of workflow, then choose id as guid and get becomes a query.
// This is to avoid conflicts. Only one combination of Id and partition key should exist.
Map
<
String
,
Object
>
registrationInstructionForMetadata
=
...
...
@@ -51,7 +38,7 @@ public class BaseWorkflowMetadataRepository {
.
registrationInstructions
(
registrationInstructionForMetadata
).
build
();
}
p
rotected
WorkflowMetadata
buildWorkflowMetadata
(
final
WorkflowMetadataDoc
workflowMetadataDoc
)
{
p
ublic
static
WorkflowMetadata
buildWorkflowMetadata
(
final
WorkflowMetadataDoc
workflowMetadataDoc
)
{
return
WorkflowMetadata
.
builder
()
.
workflowId
(
workflowMetadataDoc
.
getId
())
.
workflowName
(
workflowMetadataDoc
.
getWorkflowName
())
...
...
@@ -63,7 +50,7 @@ public class BaseWorkflowMetadataRepository {
.
registrationInstructions
(
workflowMetadataDoc
.
getRegistrationInstructions
()).
build
();
}
p
rotected
SqlQuerySpec
buildSqlQuerySpecForGetAllWorkflow
(
String
prefix
)
{
p
ublic
static
SqlQuerySpec
buildSqlQuerySpecForGetAllWorkflow
(
String
prefix
)
{
SqlQuerySpec
sqlQuerySpec
;
if
(
prefix
!=
null
&&
!(
prefix
.
isEmpty
()))
{
SqlParameter
prefixParameter
=
new
SqlParameter
(
"@prefix"
,
prefix
);
...
...
@@ -78,27 +65,11 @@ public class BaseWorkflowMetadataRepository {
return
sqlQuerySpec
;
}
p
rotected
List
<
WorkflowMetadata
>
convertWorkflowMetadataDocsToWorkflowMetadataList
(
List
<
WorkflowMetadataDoc
>
workflowMetadataDocs
)
{
p
ublic
static
List
<
WorkflowMetadata
>
convertWorkflowMetadataDocsToWorkflowMetadataList
(
List
<
WorkflowMetadataDoc
>
workflowMetadataDocs
)
{
List
<
WorkflowMetadata
>
workflowMetadataList
=
new
ArrayList
<>();
for
(
WorkflowMetadataDoc
workflowMetadataDoc:
workflowMetadataDocs
)
{
workflowMetadataList
.
add
(
buildWorkflowMetadata
(
workflowMetadataDoc
));
}
return
workflowMetadataList
;
}
protected
void
handleGetWorkflowException
(
String
workflowName
)
{
final
String
errorMessage
=
String
.
format
(
"Workflow: %s doesn't exist"
,
workflowName
);
logger
.
error
(
LOGGER_NAME
,
errorMessage
);
throw
new
WorkflowNotFoundException
(
errorMessage
);
}
protected
void
handleCreateWorkflowException
(
AppException
e
,
String
workflowName
)
{
if
(
e
.
getError
().
getCode
()
==
409
)
{
final
String
errorMessage
=
String
.
format
(
"Workflow with name %s already exists"
,
workflowName
);
logger
.
error
(
LOGGER_NAME
,
errorMessage
);
throw
new
ResourceConflictException
(
workflowName
,
errorMessage
);
}
else
{
throw
e
;
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment