Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Data Flow
Data Ingestion
osdu-airflow-lib
Commits
a4f05895
Commit
a4f05895
authored
Nov 12, 2021
by
Siarhei Khaletski (EPAM)
🚩
Browse files
Merge branch 'GONRG-3119_Python_SDK_Usage' into 'master'
GONRG-3119: Python SDK Usage See merge request
!7
parents
f3fc8463
7e33abc6
Pipeline
#76881
passed with stages
in 2 minutes and 6 seconds
Changes
7
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
osdu_airflow/operators/ensure_manifest_integrity.py
View file @
a4f05895
...
...
@@ -40,7 +40,6 @@ class EnsureManifestIntegrityOperator(BaseOperator, ReceivingContextMixin):
*
args
,
**
kwargs
):
"""Init base operator and obtain base urls from Airflow Variables."""
super
().
__init__
(
*
args
,
**
kwargs
)
self
.
search_url
=
Variable
.
get
(
'core__service__search__url'
)
self
.
whitelist_ref_patterns
=
Variable
.
get
(
'core__config__reference_patterns_whitelist'
,
default_var
=
None
)
self
.
previous_task_id
=
previous_task_id
self
.
_show_skipped_ids
=
Variable
.
get
(
...
...
@@ -58,7 +57,6 @@ class EnsureManifestIntegrityOperator(BaseOperator, ReceivingContextMixin):
file_source_validator
=
FileSourceValidator
()
manifest_integrity
=
ManifestIntegrity
(
self
.
search_url
,
token_refresher
,
file_source_validator
,
payload_context
,
...
...
osdu_airflow/operators/process_manifest_r2.py
View file @
a4f05895
...
...
@@ -30,7 +30,7 @@ from urllib.error import HTTPError
import
requests
import
tenacity
from
airflow.models
import
BaseOperator
,
Variable
from
osdu_
ingestion.libs
.auth.authorization
import
authorize
from
osdu_
api
.auth.authorization
import
authorize
from
osdu_ingestion.libs.refresh_token
import
AirflowTokenRefresher
config
=
configparser
.
RawConfigParser
()
...
...
osdu_airflow/operators/process_manifest_r3.py
View file @
a4f05895
...
...
@@ -33,7 +33,6 @@ from osdu_ingestion.libs.handle_file import FileHandler
from
osdu_ingestion.libs.process_manifest_r3
import
ManifestProcessor
from
osdu_ingestion.libs.processors.single_manifest_processor
import
SingleManifestProcessor
from
osdu_ingestion.libs.refresh_token
import
AirflowTokenRefresher
from
osdu_ingestion.libs.search_client
import
SearchClient
from
osdu_ingestion.libs.source_file_check
import
SourceFileChecker
from
osdu_ingestion.libs.types
import
ManifestType
from
osdu_ingestion.libs.validation.validate_file_source
import
FileSourceValidator
...
...
@@ -62,8 +61,6 @@ class ProcessManifestOperatorR3(BaseOperator, ReceivingContextMixin):
super
().
__init__
(
*
args
,
**
kwargs
)
self
.
previous_task_id
=
previous_task_id
self
.
batch_number
=
batch_number
self
.
schema_service_url
=
Variable
.
get
(
'core__service__schema__url'
)
self
.
search_service_url
=
Variable
.
get
(
'core__service__search__url'
)
self
.
storage_url
=
Variable
.
get
(
'core__service__storage__url'
)
self
.
file_service_host
=
Variable
.
get
(
'core__service__file__host'
)
self
.
batch_count
=
int
(
Variable
.
get
(
"core__ingestion__batch_count"
,
"3"
))
...
...
@@ -137,25 +134,21 @@ class ProcessManifestOperatorR3(BaseOperator, ReceivingContextMixin):
token_refresher
=
AirflowTokenRefresher
()
file_handler
=
FileHandler
(
self
.
file_service_host
,
token_refresher
,
payload_context
)
file_source_validator
=
FileSourceValidator
()
search_client
=
SearchClient
(
self
.
search_service_url
,
token_refresher
,
payload_context
)
source_file_checker
=
SourceFileChecker
()
referential_integrity_validator
=
ManifestIntegrity
(
self
.
search_service_url
,
token_refresher
,
file_source_validator
,
payload_context
)
manifest_processor
=
ManifestProcessor
(
storage_url
=
self
.
storage_url
,
file_handler
=
file_handler
,
token_refresher
=
token_refresher
,
context
=
payload_context
,
source_file_checker
=
source_file_checker
,
)
validator
=
SchemaValidator
(
self
.
schema_service_url
,
token_refresher
,
payload_context
,
data_types_with_surrogate_ids
=
DATA_TYPES_WITH_SURROGATE_KEYS
,
...
...
osdu_airflow/operators/validate_manifest_schema.py
View file @
a4f05895
...
...
@@ -40,10 +40,8 @@ class ValidateManifestSchemaOperator(BaseOperator, ReceivingContextMixin):
@
apply_defaults
def
__init__
(
self
,
previous_task_id
:
str
=
None
,
*
args
,
**
kwargs
):
"""Init base operator and obtain base urls from Airflow Variables."""
super
().
__init__
(
*
args
,
**
kwargs
)
self
.
previous_task_id
=
previous_task_id
self
.
schema_service_url
=
Variable
.
get
(
'core__service__schema__url'
)
self
.
_show_skipped_ids
=
Variable
.
get
(
'core__config__show_skipped_ids'
,
default_var
=
False
)
...
...
@@ -63,7 +61,6 @@ class ValidateManifestSchemaOperator(BaseOperator, ReceivingContextMixin):
token_refresher
=
AirflowTokenRefresher
()
schema_validator
=
SchemaValidator
(
self
.
schema_service_url
,
token_refresher
,
payload_context
,
surrogate_key_fields_paths
=
SURROGATE_KEYS_PATHS
,
...
...
osdu_airflow/tests/osdu_api.ini
0 → 100644
View file @
a4f05895
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[environment]
data_partition_id
=
opendes
storage_url
=
https://blah/api/storage/v2
partition_url
=
https://blah/api/storage/v2
search_url
=
https://blah/api/search/v2
legal_url
=
https://blah/api/legal/v1
data_workflow_url
=
https://blah/api/data-workflow/v1
entitlements_url
=
https://blah/api/entitlements/v1
file_dms_url
=
https://blah/api/filedms/v2
dataset_url
=
https://blah/api/dataset-registry/v1
schema_url
=
https://blah/api/schema-service/v1
ingestion_workflow_url
=
stub
use_service_principal
=
no
[provider]
name
=
provider_test
osdu_airflow/tests/unit_tests.sh
View file @
a4f05895
#!/bin/sh
export
CLOUD_PROVIDER
=
"provider_test"
export
OSDU_API_CONFIG_INI
=
"
`
dirname
$0
`
/osdu_api.ini"
airflow db init
...
...
requirements-dev.txt
View file @
a4f05895
...
...
@@ -6,7 +6,7 @@ requests==2.25.1
tenacity==6.2.0
--extra-index-url https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple
osdu-api~=0.1
2
.0
osdu-api~=0.1
3
.0
.dev241
--extra-index-url https://community.opengroup.org/api/v4/projects/823/packages/pypi/simple
osdu-ingestion~=0.1
2
.0
osdu-ingestion~=0.1
3
.0
.dev68
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment