Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
System
SDKs
Python SDK
Commits
43464edb
Commit
43464edb
authored
Sep 27, 2021
by
Yan Sushchynski (EPAM)
Browse files
GONRG-3452: Move Ingestion Logic from Python SDK
parent
113e1a63
Pipeline
#71300
passed with stages
in 49 seconds
Changes
112
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
.gitlab-ci.yml
View file @
43464edb
...
...
@@ -12,13 +12,6 @@ stages:
-
test
-
deploy
test-libs
:
stage
:
test
script
:
-
pip install -r requirements.txt
-
pip install -r requirements-dev.txt
-
python -m pytest ./osdu_api/test/libs-unit-tests
test-providers-gcp
:
stage
:
test
script
:
...
...
README.md
View file @
43464edb
...
...
@@ -9,7 +9,6 @@
*
*
[
Installation from Package Registry
](
#installation-from-package-registry
)
*
[
Testing
](
#testing
)
*
*
[
Running E2E Tests
](
#running-e2e-tests
)
*
*
[
Running Ingestion libs Tests
](
#running-ingestion-libs-tests
)
*
*
[
Running CSP Tests
](
#running-csp-tests
)
*
[
Licence
](
#licence
)
...
...
@@ -25,12 +24,6 @@ The Python SDK must be installed on the machine that uses OSDU services.
In OSDU R3 Prototype, the SDK encapsulates calls to the ODES Storage and Search services.
The SDK now provides different components for the ingestion process in
`osdu_api.libs`
folder. Among them:
-
validating OSDU entities against corresponding schemas;
-
ensuring referential integrity;
-
finding parent-child relationships between entities;
-
etc.
Also, in
`osdu_api.providers`
folder the SDK provides common interfaces for writing cloud-specific implementations for authorization and accessing
cloud storages. In this
`osdu_api.providers`
folder CSP code is stored.
...
...
@@ -91,15 +84,6 @@ Specify of end-services URLs into `tests/osdu_api.yaml` and run
pytest test
```
### Running ingestion libs tests
```shell
export CLOUD_PROVIDER=provider_test
pip install -r requirements.txt
pip install -r requirements-dev.txt
pytest ./osdu_api/test/libs-unit-tests
```
### Running CSP tests
```shell
...
...
VERSION
View file @
43464edb
0.1
0.1
0.1
2.0
osdu_api/
libs/
auth/README.MD
→
osdu_api/auth/README.MD
View file @
43464edb
...
...
@@ -38,7 +38,7 @@ The decorator uses an object of a user-defined implementation of the abstract cl
**strategy**
for refreshing the access token.
```
class
osdu_
api
.libs.auth.authorization.TokenRefresher()
classosdu_
ingestion
.libs.auth.authorization.TokenRefresher()
```
...
...
@@ -60,7 +60,7 @@ The following abstract method of the abstract class `TokenRefresher` must be imp
```
python
import
requests
from
osdu_
api
.libs.auth.authorization
import
authorize
,
TokenRefresher
from
osdu_
ingestion
.libs.auth.authorization
import
authorize
,
TokenRefresher
class
VendorRefreshTokenStrategy
(
TokenRefresher
):
...
...
osdu_api/
libs
/__init__.py
→
osdu_api/
auth
/__init__.py
View file @
43464edb
File moved
osdu_api/
libs/
auth/authorization.py
→
osdu_api/auth/authorization.py
View file @
43464edb
...
...
@@ -21,7 +21,7 @@ from typing import Callable, Union
import
requests
from
osdu_api.
lib
s.exceptions
import
TokenRefresherNotPresentError
from
osdu_api.
exception
s.exceptions
import
TokenRefresherNotPresentError
logger
=
logging
.
getLogger
()
...
...
osdu_api/
libs/valida
tion/__init__.py
→
osdu_api/
excep
tion
s
/__init__.py
View file @
43464edb
File moved
osdu_api/
libs/type
s.py
→
osdu_api/
exceptions/exception
s.py
View file @
43464edb
...
...
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from
typing
import
List
,
TypeVar
"""Exceptions module."""
ManifestType
=
TypeVar
(
"ManifestType"
,
dict
,
List
[
dict
])
class
TokenRefresherNotPresentError
(
Exception
):
"""Raise when token refresher is not present in "refresh_token' decorator."""
pass
osdu_api/libs/auth/__init__.py
deleted
100644 → 0
View file @
113e1a63
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
osdu_api/libs/constants.py
deleted
100644 → 0
View file @
113e1a63
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Constants module."""
RETRIES
=
3
TIMEOUT
=
1
WAIT
=
10
FIRST_STORED_RECORD_INDEX
=
0
# Paths to extend schema fields with surrogate keys
DATA_TYPES_WITH_SURROGATE_KEYS
=
(
"dataset"
,
"work-product"
,
"work-product-component"
)
SURROGATE_KEYS_PATHS
=
[
(
"definitions"
,
"{{data-partition-id}}:wks:AbstractWPCGroupType:1.0.0"
,
"properties"
,
"Datasets"
,
"items"
),
(
"definitions"
,
"{{data-partition-id}}:wks:AbstractWPCGroupType:1.0.0"
,
"properties"
,
"Artefacts"
,
"items"
,
"properties"
,
"ResourceID"
),
(
"properties"
,
"data"
,
"allOf"
,
1
,
"properties"
,
"Components"
,
"items"
),
]
SEARCH_ID_BATCH_SIZE
=
25
SAVE_RECORDS_BATCH_SIZE
=
500
DATA_SECTION
=
"Data"
DATASETS_SECTION
=
"Datasets"
MASTER_DATA_SECTION
=
"MasterData"
REFERENCE_DATA_SECTION
=
"ReferenceData"
WORK_PRODUCT_SECTION
=
"WorkProduct"
WORK_PRODUCT_COMPONENTS_SECTION
=
"WorkProductComponents"
osdu_api/libs/context.py
deleted
100644 → 0
View file @
113e1a63
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Context module."""
import
dataclasses
@
dataclasses
.
dataclass
class
Context
:
"""Class to store data-partition-id and AppKey."""
data_partition_id
:
str
app_key
:
str
@
classmethod
def
populate
(
cls
,
ctx
:
dict
)
->
'Context'
:
"""
Populates Context dataclass from dagrun.conf dict.
:return: populated Context
:rtype: Context
"""
ctx_payload
=
ctx
.
pop
(
'Payload'
)
try
:
data_partition_id
=
ctx_payload
[
'data-partition-id'
]
except
KeyError
:
data_partition_id
=
ctx
[
'dataPartitionId'
]
# to support some DAGs payload interface
ctx_obj
=
cls
(
app_key
=
ctx_payload
[
'AppKey'
],
data_partition_id
=
data_partition_id
)
return
ctx_obj
osdu_api/libs/exceptions.py
deleted
100644 → 0
View file @
113e1a63
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exceptions module."""
from
typing
import
List
from
osdu_api.libs.utils
import
create_skipped_entity_info
class
RecordsNotSearchableError
(
Exception
):
"""Raise when expected totalCount of records differs from actual one."""
pass
class
PipelineFailedError
(
Exception
):
"""Raise when pipeline failed."""
pass
class
EmptyManifestError
(
Exception
):
"""Raise when manifest field is empty."""
pass
class
GetSchemaError
(
Exception
):
"""Raise when can't find schema."""
pass
class
SRNNotFound
(
Exception
):
"""Raise when can't find SRN."""
pass
class
NotOSDUSchemaFormatError
(
Exception
):
"""Raise when schema doesn't correspond OSDU format."""
pass
class
FileSourceError
(
Exception
):
"""Raise when file doesn't exist under given URI path."""
pass
class
UploadFileError
(
Exception
):
"""Raise when there is an error while uploading a file into OSDU."""
class
TokenRefresherNotPresentError
(
Exception
):
"""Raise when token refresher is not present in "refresh_token' decorator."""
pass
class
NoParentEntitySystemSRNError
(
Exception
):
"""Raise when parent entity doesn't have system-generated SRN."""
pass
class
InvalidFileRecordData
(
Exception
):
"""Raise when file data does not contain mandatory fields."""
class
GenericManifestSchemaError
(
Exception
):
"""Raise when a generic manifest schema is invalid."""
class
BaseEntityValidationError
(
Exception
):
"""
Base Error for failed validations.
"""
def
__init__
(
self
,
entity
:
dict
,
reason
:
str
):
self
.
skipped_entity
=
create_skipped_entity_info
(
entity
,
reason
)
class
EntitySchemaValidationError
(
BaseEntityValidationError
):
"""
Raise when the validation against schemas failed.
"""
class
ValidationIntegrityError
(
BaseEntityValidationError
):
"""
Raise when an entity does not pass validation integrity.
"""
class
DatasetValidationError
(
BaseEntityValidationError
):
"""
Raise when a dataset is not valid.
"""
class
ProcessRecordError
(
BaseEntityValidationError
):
"""
Raise when a record is unprocessed
"""
class
ProcessRecordBatchError
(
BaseEntityValidationError
):
def
__init__
(
self
,
entities
:
List
[
dict
],
reason
:
str
):
self
.
skipped_entities
=
[
create_skipped_entity_info
(
entity
,
reason
)
for
entity
in
entities
]
osdu_api/libs/handle_file.py
deleted
100644 → 0
View file @
113e1a63
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module provides cloud specific File Handler implementations."""
import
dataclasses
import
io
import
json
import
logging
from
typing
import
List
,
Tuple
import
requests
import
tenacity
from
osdu_api.libs.auth.authorization
import
TokenRefresher
,
authorize
from
osdu_api.libs.constants
import
RETRIES
,
WAIT
from
osdu_api.libs.context
import
Context
from
osdu_api.libs.exceptions
import
InvalidFileRecordData
from
osdu_api.libs.mixins
import
HeadersMixin
from
osdu_api.providers
import
blob_storage
from
osdu_api.providers.types
import
BlobStorageClient
,
FileLikeObject
logger
=
logging
.
getLogger
()
RETRY_SETTINGS
=
{
"stop"
:
tenacity
.
stop_after_attempt
(
RETRIES
),
"wait"
:
tenacity
.
wait_fixed
(
WAIT
),
}
@
dataclasses
.
dataclass
class
FileUploadUrlResponse
:
"""Simple class to store File service uploadURL response values."""
file_id
:
str
signed_url
:
str
file_source
:
str
@
dataclasses
.
dataclass
class
FileDownloadUrlResponse
:
"""Simple class to store File service downloadURL response values."""
signed_url
:
str
unsigned_url
:
str
kind
:
str
class
FileHandler
(
HeadersMixin
):
"""Class to perform operations using OSDU File Service."""
def
__init__
(
self
,
file_service_host
:
str
,
token_refresher
:
TokenRefresher
,
context
:
Context
,
blob_storage_client
:
BlobStorageClient
=
None
):
"""File handler.
:param file_service_host: Base OSDU File service url
:type file_service_host: str
:param token_refresher: Object to refresh tokens
:type token_refresher: TokenRefresher
:param context: The tenant context data
:type context: Context
"""
super
().
__init__
(
context
)
self
.
_file_service_host
=
file_service_host
self
.
token_refresher
=
token_refresher
self
.
_blob_storage_client
=
blob_storage_client
or
blob_storage
.
get_client
()
def
_get_file_from_preload_path
(
self
,
preload_file_path
:
str
,
file
:
FileLikeObject
)
->
Tuple
[
FileLikeObject
,
str
]:
"""Get file from a preloaded path.
:param preload_file_path: Full URI of the file to obtain
:type preload_file_path: str
:return: Raw file data and content-type
:rtype: Tuple[FileLikeObject, str]
"""
return
self
.
_blob_storage_client
.
download_to_file
(
preload_file_path
,
file
)
@
staticmethod
def
_verify_file_record_data
(
file_record_data
:
dict
):
"""Perform simple verification of mandatory fields according to OSDU
File Service.
:param file_record_data: Data field of file_record
:type file_record_data: dict
:raises InvalidFileRecordData: When some of the mandatory fields is
missing or empty
"""
endian
=
file_record_data
.
get
(
"Endian"
)
file_source
=
file_record_data
[
"DatasetProperties"
][
"FileSourceInfo"
].
get
(
"FileSource"
)
if
not
(
endian
and
file_source
):
raise
InvalidFileRecordData
(
f
"Mandatory fields: Endian-
{
endian
}
"
f
"FileSource-
{
file_source
}
"
)
@
staticmethod
def
_handle_download_url_response
(
response
:
dict
)
->
FileDownloadUrlResponse
:
"""
Handle downloadURL according to file service version
:param response: The response already load from json
:type response: dict
:return: FileDownloadUrlResponse filled properly
:rtype: FileDownloadUrlResponse
"""
try
:
# response got from latest version of File service
return
FileDownloadUrlResponse
(
signed_url
=
response
[
"signedUrl"
],
unsigned_url
=
response
[
"unsignedUrl"
],
kind
=
response
[
"kind"
])
except
KeyError
:
# response got from a legacy version of File service
return
FileDownloadUrlResponse
(
signed_url
=
response
[
"SignedUrl"
],
unsigned_url
=
None
,
kind
=
None
)
@
tenacity
.
retry
(
**
RETRY_SETTINGS
)
@
authorize
()
def
_send_post_request
(
self
,
headers
:
dict
,
url
:
str
,
request_body
:
str
)
->
requests
.
Response
:
logger
.
debug
(
f
"
{
request_body
}
"
)
response
=
requests
.
post
(
url
,
request_body
,
headers
=
headers
)
logger
.
debug
(
response
.
content
)
return
response
@
tenacity
.
retry
(
**
RETRY_SETTINGS
)
@
authorize
()
def
_send_get_request
(
self
,
headers
:
dict
,
url
:
str
)
->
requests
.
Response
:
response
=
requests
.
get
(
url
,
headers
=
headers
)
logger
.
debug
(
response
)
return
response
def
_get_upload_signed_url
(
self
,
headers
:
dict
)
->
FileUploadUrlResponse
:
"""Get FileID, SignedURL and FileSource using File Service uploadURL
endpoint.
:param headers: Request headers to pass to the final request issuer
:type headers: dict
:return: FileUploadUrlResponse with data from service
:rtype: FileUploadUrlResponse
"""
logger
.
debug
(
"Getting upload signed url."
)
endpoint
=
f
"
{
self
.
_file_service_host
}
/v2/files/uploadURL"
response
=
self
.
_send_get_request
(
headers
,
endpoint
).
json
()
logger
.
debug
(
"Signed url got."
)
upload_url_response
=
FileUploadUrlResponse
(
file_id
=
response
[
"FileID"
],
signed_url
=
response
[
"Location"
][
"SignedURL"
],
file_source
=
response
[
"Location"
][
"FileSource"
])
return
upload_url_response
def
_get_download_signed_url
(
self
,
headers
:
dict
,
record_id
:
str
)
->
FileDownloadUrlResponse
:
"""Get signedURL, unsignedURL and kind using File Service downloadURL
endpoint.
:param headers: Request headers to pass to the final request issuer
:type headers: dict
:param record_id: Unique id of the file record saved in the osdu system
:type record_id: str
:return: FileDownloadUrlResponse with signed and unsigned urls
:rtype: FileDownloadUrlResponse
"""
logger
.
debug
(
"Getting download signed url."
)
endpoint
=
f
"
{
self
.
_file_service_host
}
/v2/files/
{
record_id
}
/downloadURL"
response
=
self
.
_send_get_request
(
headers
,
endpoint
).
json
()
logger
.
debug
(
"Signed url got."
)
download_url_response
=
self
.
_handle_download_url_response
(
response
)
return
download_url_response
@
tenacity
.
retry
(
**
RETRY_SETTINGS
)
def
_upload_file_request
(
self
,
headers
:
dict
,
signed_url
:
str
,
buffer
:
FileLikeObject
):
"""Upload file via File service using signed_url.
:param headers: Request headers to pass to the final request issuer
:type headers: dict
:param signed_url: SignedURL to authenticate request
:type signed_url: str
:param buffer: Raw file data
:type buffer: FileLikeObject
"""
logger
.
debug
(
"Uploading file."
)
buffer
.
seek
(
0
)
requests
.
put
(
signed_url
,
buffer
.
read
(),
headers
=
headers
)
logger
.
debug
(
"File uploaded."
)
def
_get_file_location_request
(
self
,
headers
:
dict
,
file_id
:
str
)
->
str
:
"""Get file location using File Service.
:param headers: Request headers to pass to the final request issuer
:type headers: dict
:param file_id: String identifier of the file
:type file_id: str
:return: Full URI of the located file
:rtype: str
"""
logger
.
debug
(
"Getting file location."
)
request_body
=
json
.
dumps
({
"FileID"
:
file_id
})
endpoint
=
f
"
{
self
.
_file_service_host
}
/getFileLocation"
response
=
self
.
_send_post_request
(
headers
,
endpoint
,
request_body
)
logger
.
debug
(
"File location got."
)
return
response
.
json
()[
"Location"
]
def
upload_file
(
self
,
preload_file_path
:
str
)
->
str
:
"""Copy file from preload_file_path location to Landing Zone in OSDU
platform using File service. Get Content-Type of this file, refresh
Content-Type with this value in headers while this file is being
uploaded onto OSDU platform.
:param preload_file_path: The URI of the preloaded file
:type preload_file_path: str
:return: FileSource obtained via File service
:rtype: str
"""
with
io
.
BytesIO
()
as
buffer
:
buffer
,
content_type
=
self
.
_get_file_from_preload_path
(
preload_file_path
,
buffer
)
upload_url_response
=
self
.
_get_upload_signed_url
(
self
.
request_headers
)
headers
=
self
.
request_headers
headers
[
"Content-Type"
]
=
content_type
self
.
_upload_file_request
(
headers
,
upload_url_response
.
signed_url
,
buffer
)
return
upload_url_response
.
file_source
def
get_file_staging_location
(
self
,
file_source
:
str
)
->
str
:
"""Retrieve location (full URI) of the file in staging area.
:param file_source: The FileSource (relative URI) of the file of the form
/{folder}/{file_id}
:type file_source: str
:return: Full URI of the location of the file in staging area
:rtype: str
"""
file_id
=
file_source
.
split
(
"/"
)[
-
1
]
file_staging_location
=
self
.
_get_file_location_request
(
self
.
request_headers
,
file_id
)
return
file_staging_location
def
get_file_permanent_location
(
self
,
file_record_id
:
str
)
->
str
:
"""Retrieve location (full URI) of the file in permanent area.
:param file_record_id: The unique id of the file record (aka metadata
:type file_record_id: str
:return: Full URI of the location of the file in permanent area
:rtype: str
"""
download_url_response
=
self
.
_get_download_signed_url
(
self
.
request_headers
,
file_record_id
)
permanent_location
=
download_url_response
.
unsigned_url
return
permanent_location
def
save_file_record
(
self
,
file_record
:
dict
)
->
str
:
"""Send request to store record via file service API.
:param file_record: The file record to save
:type file_record: dict
:return: OSDU system generated id of the saved record
:rtype: str
"""
self
.
_verify_file_record_data
(
file_record
[
"data"
])
# TODO fix 'name' field processing
# Generate file entity name as workaround because file API required this field.
if
not
file_record
[
"data"
].
get
(
"Name"
):
file_record
[
"data"
][
"Name"
]
=
\
f
"surrogate_name_
{
file_record
[
'data'
][
'DatasetProperties'
][
'FileSourceInfo'
][
'PreloadFilePath'
].
split
(
'/'
)[
-
1
]
}
"
logger
.
info
(
f
"Generated name:
{
file_record
[
'data'
][
'Name'
]
}
"
)
logger
.
info
(
"Sending file record metadata to File service"
)
endpoint
=
f
"
{
self
.
_file_service_host
}
/v2/files/metadata"
response
=
self
.
_send_post_request
(
self
.
request_headers
,
endpoint
,
json
.
dumps
(
file_record
))
return
response
.
json
()[
"id"
]
def
batch_save_file_records
(
self
,
file_records
:
List
[
str
])
->
List
[
str
]:
"""Perform concurrent save file record requests.
:param file_records: List of file records to save
:type file_records: List[str]
:return: List of OSDU system generated ids of the saved records
:rtype: List[str]
"""
raise
NotImplementedError
(
"TODO(python-team) implementation."
)
osdu_api/libs/linearize_manifest.py
deleted
100644 → 0
View file @
113e1a63
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems