Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Data Flow
Data Ingestion
Data Workflow
Commits
56827812
Commit
56827812
authored
Oct 29, 2020
by
Matt Wise
Browse files
support passing auth token from incoming requests to airflow
parent
5b3d5df5
Pipeline
#13528
passed with stages
in 9 minutes and 37 seconds
Changes
7
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
data-workflow-core/src/main/java/org/opengroup/osdu/dataworkflow/provider/interfaces/ISubmitIngestService.java
View file @
56827812
...
...
@@ -17,6 +17,7 @@
package
org.opengroup.osdu.dataworkflow.provider.interfaces
;
import
org.opengroup.osdu.core.common.model.http.DpsHeaders
;
import
org.opengroup.osdu.dataworkflow.model.StartWorkflowRequestV2
;
public
interface
ISubmitIngestService
{
...
...
@@ -27,5 +28,5 @@ public interface ISubmitIngestService {
* @param startWorkflowRequest workflow request parameters
* @return dummy boolean
*/
boolean
submitIngest
(
StartWorkflowRequestV2
startWorkflowRequest
);
boolean
submitIngest
(
StartWorkflowRequestV2
startWorkflowRequest
,
DpsHeaders
requestHeaders
);
}
data-workflow-core/src/main/java/org/opengroup/osdu/dataworkflow/service/WorkflowServiceImpl.java
View file @
56827812
...
...
@@ -55,7 +55,7 @@ public class WorkflowServiceImpl implements IWorkflowService {
updateRequestContextWithWorkflowId
(
request
,
workflowId
);
submitIngestService
.
submitIngest
(
request
);
submitIngestService
.
submitIngest
(
request
,
headers
);
workflowStatusRepository
.
saveWorkflowStatus
(
WorkflowStatus
.
builder
()
.
workflowId
(
workflowId
)
...
...
data-workflow-core/src/test/java/org/opengroup/osdu/dataworkflow/WorkflowMvcTest.java
View file @
56827812
...
...
@@ -106,7 +106,7 @@ public class WorkflowMvcTest {
.
inputParameters
(
inputParameters
)
.
build
();
given
(
submitIngestService
.
submitIngest
(
any
()))
given
(
submitIngestService
.
submitIngest
(
any
(),
any
()))
.
willReturn
(
Boolean
.
TRUE
);
given
(
workflowStatusRepository
.
saveWorkflowStatus
(
workflowStatusCaptor
.
capture
()))
...
...
data-workflow-core/src/test/java/org/opengroup/osdu/dataworkflow/service/WorkflowServiceImplTest.java
View file @
56827812
...
...
@@ -98,7 +98,7 @@ class WorkflowServiceImplTest {
then
(
startWorkflowResponse
.
getWorkflowId
()).
isNotNull
();
InOrder
inOrder
=
Mockito
.
inOrder
(
validationService
,
submitIngestService
,
workflowStatusRepository
);
inOrder
.
verify
(
validationService
).
validateStartWorkflowRequest
(
request
);
inOrder
.
verify
(
submitIngestService
).
submitIngest
(
request
);
inOrder
.
verify
(
submitIngestService
).
submitIngest
(
request
,
headers
);
inOrder
.
verify
(
workflowStatusRepository
).
saveWorkflowStatus
(
workflowStatusCaptor
.
capture
());
inOrder
.
verifyNoMoreInteractions
();
...
...
@@ -119,8 +119,7 @@ class WorkflowServiceImplTest {
DpsHeaders
headers
=
getMessageHeaders
();
doThrow
(
new
RuntimeException
(
TEST_EXCEPTION
)).
when
(
submitIngestService
)
.
submitIngest
(
eq
(
startWorkflowRequest
));
.
submitIngest
(
eq
(
startWorkflowRequest
),
eq
(
headers
));
// when
Throwable
thrown
=
catchThrowable
(()
->
workflowService
...
...
provider/data-workflow-aws/src/main/java/org/opengroup/osdu/dataworkflow/aws/service/AirflowClient.java
View file @
56827812
...
...
@@ -27,6 +27,7 @@ import java.util.HashMap;
import
java.util.Map
;
import
org.opengroup.osdu.core.common.model.http.AppException
;
import
org.opengroup.osdu.core.common.model.http.DpsHeaders
;
import
org.springframework.http.HttpStatus
;
@Slf4j
...
...
@@ -38,9 +39,10 @@ public class AirflowClient {
* @param dagName
* @throws IOException
*/
public
void
makeRequestToAirflow
(
String
airflowDagURL
,
String
body
,
String
dagName
)
throws
IOException
{
public
void
makeRequestToAirflow
(
String
airflowDagURL
,
String
body
,
String
dagName
,
DpsHeaders
originalRequestHeaders
)
throws
IOException
{
Map
<
String
,
String
>
headers
=
new
HashMap
<>();
headers
.
put
(
"Content-Type"
,
"application/json"
);
headers
.
put
(
"Authorization"
,
originalRequestHeaders
.
getAuthorization
());
HttpURLConnection
connection
=
getConnection
(
body
,
headers
,
airflowDagURL
);
...
...
provider/data-workflow-aws/src/main/java/org/opengroup/osdu/dataworkflow/aws/service/SubmitIngestServiceImpl.java
View file @
56827812
...
...
@@ -18,6 +18,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
lombok.RequiredArgsConstructor
;
import
lombok.extern.slf4j.Slf4j
;
import
org.opengroup.osdu.core.common.model.http.DpsHeaders
;
import
org.opengroup.osdu.dataworkflow.exception.OsduRuntimeException
;
import
org.opengroup.osdu.dataworkflow.model.StartWorkflowRequestV2
;
import
org.opengroup.osdu.dataworkflow.provider.interfaces.ISubmitIngestService
;
...
...
@@ -58,7 +60,7 @@ public class SubmitIngestServiceImpl implements ISubmitIngestService {
* @return
*/
@Override
public
boolean
submitIngest
(
StartWorkflowRequestV2
startWorkflowRequest
)
{
public
boolean
submitIngest
(
StartWorkflowRequestV2
startWorkflowRequest
,
DpsHeaders
originalRequestHeaders
)
{
Map
<
String
,
Object
>
inputParameters
=
startWorkflowRequest
.
getInputParameters
();
...
...
@@ -72,7 +74,7 @@ public class SubmitIngestServiceImpl implements ISubmitIngestService {
String
serializedData
=
serializeData
(
inputParameters
);
try
{
String
airflowDagUrlStr
=
format
(
"%s/api/experimental/dags/%s/dag_runs"
,
airflowBaseUrl
,
startWorkflowRequest
.
getDagName
());
airflowClient
.
makeRequestToAirflow
(
airflowDagUrlStr
,
serializedData
,
startWorkflowRequest
.
getDagName
());
airflowClient
.
makeRequestToAirflow
(
airflowDagUrlStr
,
serializedData
,
startWorkflowRequest
.
getDagName
()
,
originalRequestHeaders
);
return
true
;
}
catch
(
IOException
e
)
{
throw
new
OsduRuntimeException
(
"Request execution exception"
,
e
);
...
...
provider/data-workflow-aws/src/test/java/org/opengroup/osdu/dataworkflow/aws/repository/SubmitIngestServiceImplTest.java
View file @
56827812
...
...
@@ -21,6 +21,7 @@ import org.mockito.Mock;
import
org.mockito.Mockito
;
import
org.mockito.internal.util.reflection.Whitebox
;
import
org.mockito.runners.MockitoJUnitRunner
;
import
org.opengroup.osdu.core.common.model.http.DpsHeaders
;
import
org.opengroup.osdu.dataworkflow.aws.WorkflowAwsApplication
;
import
org.opengroup.osdu.dataworkflow.aws.service.AirflowClient
;
import
org.opengroup.osdu.dataworkflow.aws.service.SubmitIngestServiceImpl
;
...
...
@@ -56,14 +57,14 @@ public class SubmitIngestServiceImplTest {
Whitebox
.
setInternalState
(
repo
,
"airflowBaseUrl"
,
airflowBaseUrl
);
Mockito
.
doNothing
().
when
(
airflowClient
).
makeRequestToAirflow
(
Mockito
.
anyString
(),
Mockito
.
anyString
(),
Mockito
.
anyString
());
Mockito
.
doNothing
().
when
(
airflowClient
).
makeRequestToAirflow
(
Mockito
.
anyString
(),
Mockito
.
anyString
(),
Mockito
.
anyString
(),
Mockito
.
any
());
DpsHeaders
headers
=
new
DpsHeaders
();
// Act
repo
.
submitIngest
(
request
);
repo
.
submitIngest
(
request
,
headers
);
// Assert
Mockito
.
verify
(
airflowClient
,
Mockito
.
times
(
1
))
.
makeRequestToAirflow
(
"test url/api/experimental/dags/dagTest/dag_runs"
,
"{\"replace_microseconds\":\"false\",\"test\":\"testing\",\"datsetRegistryIds\":[]}"
,
dagName
);
.
makeRequestToAirflow
(
"test url/api/experimental/dags/dagTest/dag_runs"
,
"{\"replace_microseconds\":\"false\",\"test\":\"testing\",\"datsetRegistryIds\":[]}"
,
dagName
,
headers
);
}
}
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment