Commit c45a7557 authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

GONRG-2726: Move libs to SDK

parent 64cd7ad6
Pipeline #52133 passed with stages
in 7 minutes and 52 seconds
......@@ -20,7 +20,7 @@ stages:
- unit_tests
- test_dags
- deploy
pylint:
image: eu.gcr.io/osdu-cicd-epam/airflow-python-dags/airflow-python-dags:latest
......@@ -70,8 +70,6 @@ osdu-gcp-deploy:
- cd src
- gsutil -m rsync -x "dags/libs*" -d -R dags $OSDU_GCP_DEPL_TARGET/dags/ingestion
- gsutil -m rsync -d -R dags/libs $OSDU_GCP_DEPL_TARGET/dags/libs
- gsutil -m rsync -x "dags/providers*" -d -R dags $OSDU_GCP_DEPL_TARGET/dags/ingestion
- gsutil -m rsync -d -R dags/providers $OSDU_GCP_DEPL_TARGET/dags/providers
- gsutil cp dags/.airflowignore $OSDU_GCP_DEPL_TARGET/dags/
- gsutil -m rsync -R plugins $OSDU_GCP_DEPL_TARGET/plugins
except:
......@@ -85,8 +83,6 @@ osdu-gcp-deploy-pre-prod:
- cd src
- gsutil -m rsync -x "dags/libs*" -d -R dags $OSDU_GCP_DEPL_TARGET_PREPROD/dags/ingestion
- gsutil -m rsync -d -R dags/libs $OSDU_GCP_DEPL_TARGET_PREPROD/dags/libs
- gsutil -m rsync -x "dags/providers*" -d -R dags $OSDU_GCP_DEPL_TARGET_PREPROD/dags/ingestion
- gsutil -m rsync -d -R dags/providers $OSDU_GCP_DEPL_TARGET_PREPROD/dags/providers
- gsutil -m rsync -R plugins $OSDU_GCP_DEPL_TARGET_PREPROD/plugins
only:
- /^release\/*/
......
......@@ -49,8 +49,8 @@ According to the [DAG implementation details](#dag-implementation-details) need
#### Installing Python Dependencies
Environment dependencies might be installed by several ways:
1. Setting up an environment into the Cloud Composer Console.
2. Installing local Python library. Put your dependencies into *DAG_FOLDER/libs* directory. Airflow automatically adds *DAG_FOLDER* and *PLUGINS_FOLDER* to the *PATH*.
1. Install packages via `pip` on the environment where Airflow runs.
2. Although it is **not recommended**, it is possible to install Python libraries locally. Put your dependencies into *DAG_FOLDER/libs* directory. Airflow automatically adds *DAG_FOLDER* and *PLUGINS_FOLDER* to the *PATH*.
### Azure
To deploy the Ingestion DAGs to airflow, follow below steps.
......@@ -62,6 +62,13 @@ To deploy the Ingestion DAGs to airflow, follow below steps.
#### Installing Python Dependencies
Python dependencies can be specified as extra pip packages in airflow deployment [here](https://community.opengroup.org/osdu/platform/deployment-and-operations/infra-azure-provisioning/-/blob/master/charts/airflow/helm-config.yaml#L211)
Also, the DAGs require [Python SDK](https://community.opengroup.org/osdu/platform/system/sdks/common-python-sdk) to be installed.
It can be installed ton the environment via `pip`:
```shell
pip install osdu-api --extra-index-url https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple
```
#### Environment Variables & Airflow Variables
Add variables manually in the Airflow UI or through airflow helm charts. [List of the required variables](#required-variables).
......@@ -74,7 +81,7 @@ More details about airflow variables can be found [here](https://airflow.apache.
OSDU DAGs are cloud platform-agnostic by design. However, there are specific implementation requirements by cloud
platforms, and the OSDU R2 Prototype provides a dedicated Python SDK to make sure that DAGs are independent from the
cloud platforms. This Python SDK is located in a separate [os-python-sdk] folder.
cloud platforms. This Python SDK must be installed from the corresponding [repository](https://community.opengroup.org/osdu/platform/system/sdks/common-python-sdk).
## Required Variables
......
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Constants module."""
RETRIES = 3
TIMEOUT = 1
WAIT = 10
FIRST_STORED_RECORD_INDEX = 0
# Paths to extend schema fields with surrogate keys
DATA_TYPES_WITH_SURROGATE_KEYS = ("dataset", "work-product", "work-product-component")
SURROGATE_KEYS_PATHS = [
("definitions", "{{data-partition-id}}:wks:AbstractWPCGroupType:1.0.0", "properties", "Datasets",
"items"),
("definitions", "{{data-partition-id}}:wks:AbstractWPCGroupType:1.0.0", "properties", "Artefacts",
"items", "properties", "ResourceID"),
("properties", "data", "allOf", 1, "properties", "Components", "items"),
]
DATA_SECTION = "Data"
DATASETS_SECTION = "Datasets"
MASTER_DATA_SECTION ="MasterData"
REFERENCE_DATA_SECTION ="ReferenceData"
WORK_PRODUCT_SECTION = "WorkProduct"
WORK_PRODUCT_COMPONENTS_SECTION = "WorkProductComponents"
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Context module."""
import dataclasses
@dataclasses.dataclass
class Context:
"""Class to store data-partition-id and AppKey."""
data_partition_id: str
app_key: str
@classmethod
def populate(cls, ctx: dict) -> 'Context':
"""
Populates Context dataclass from dagrun.conf dict.
:return: populated Context
:rtype: Context
"""
ctx_payload = ctx.pop('Payload')
ctx_obj = cls(app_key=ctx_payload['AppKey'],
data_partition_id=ctx_payload['data-partition-id'])
return ctx_obj
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union
from libs.utils import create_skipped_entity_info
"""Exceptions module."""
class RecordsNotSearchableError(Exception):
"""Raise when expected totalCount of records differs from actual one."""
pass
class PipelineFailedError(Exception):
"""Raise when pipeline failed."""
pass
class EmptyManifestError(Exception):
"""Raise when manifest field is empty."""
pass
class GetSchemaError(Exception):
"""Raise when can't find schema."""
pass
class SRNNotFound(Exception):
"""Raise when can't find SRN."""
pass
class NotOSDUSchemaFormatError(Exception):
"""Raise when schema doesn't correspond OSDU format."""
pass
class FileSourceError(Exception):
"""Raise when file doesn't exist under given URI path."""
pass
class UploadFileError(Exception):
"""Raise when there is an error while uploading a file into OSDU."""
class TokenRefresherNotPresentError(Exception):
"""Raise when token refresher is not present in "refresh_token' decorator."""
pass
class NoParentEntitySystemSRNError(Exception):
"""Raise when parent entity doesn't have system-generated SRN."""
pass
class NoParentEntitySystemSRNError(Exception):
"""
Raise when parent entity doesn't have system-generated SRN.
"""
pass
class InvalidFileRecordData(Exception):
"""Raise when file data does not contain mandatory fields."""
class GenericManifestSchemaError(Exception):
"""Raise when a generic manifest schema is invalid."""
class BaseEntityValidationError(Exception):
"""
Base Error for failed validations.
"""
def __init__(self, entity: dict, reason: str):
self.skipped_entity = create_skipped_entity_info(entity, reason)
class EntitySchemaValidationError(BaseEntityValidationError):
"""
Raise when the validation against schemas failed.
"""
class ValidationIntegrityError(BaseEntityValidationError):
"""
Raise when an entity does not pass validation integrity.
"""
class DatasetValidationError(BaseEntityValidationError):
"""
Raise when a dataset is not valid.
"""
class ProcessRecordError(BaseEntityValidationError):
"""
Raise when a record is unprocessed
"""
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module provides cloud specific File Handler implementations."""
import dataclasses
import io
import json
import logging
import uuid
from typing import List, Tuple, TypeVar
import requests
import tenacity
from libs.constants import RETRIES, WAIT
from libs.context import Context
from libs.exceptions import InvalidFileRecordData
from libs.mixins import HeadersMixin
from osdu_api.libs.auth.authorization import TokenRefresher, authorize
from providers import blob_storage
from providers.types import BlobStorageClient, FileLikeObject
logger = logging.getLogger()
RETRY_SETTINGS = {
"stop": tenacity.stop_after_attempt(RETRIES),
"wait": tenacity.wait_fixed(WAIT),
}
@dataclasses.dataclass
class FileUploadUrlResponse:
"""Simple class to store File service uploadURL response values."""
file_id: str
signed_url: str
file_source: str
@dataclasses.dataclass
class FileDownloadUrlResponse:
"""Simple class to store File service downloadURL response values."""
signed_url: str
unsigned_url: str
kind: str
class FileHandler(HeadersMixin):
"""Class to perform operations using OSDU File Service."""
def __init__(self, file_service_host: str, token_refresher: TokenRefresher, context: Context,
blob_storage_client: BlobStorageClient = None):
"""File handler.
:param file_service_host: Base OSDU File service url
:type file_service_host: str
:param token_refresher: Object to refresh tokens
:type token_refresher: TokenRefresher
:param context: The tenant context data
:type context: Context
"""
super().__init__(context)
self._file_service_host = file_service_host
self.token_refresher = token_refresher
self._blob_storage_client = blob_storage_client or blob_storage.get_client()
def _get_file_from_preload_path(self, preload_file_path: str,
file: FileLikeObject) -> Tuple[FileLikeObject, str]:
"""Get file from a preloaded path.
:param preload_file_path: Full URI of the file to obtain
:type preload_file_path: str
:return: Raw file data and content-type
:rtype: Tuple[FileLikeObject, str]
"""
return self._blob_storage_client.download_to_file(preload_file_path, file)
@staticmethod
def _verify_file_record_data(file_record_data: dict):
"""Perform simple verification of mandatory fields according to OSDU
File Service.
:param file_record_data: Data field of file_record
:type file_record_data: dict
:raises InvalidFileRecordData: When some of the mandatory fields is
missing or empty
"""
endian = file_record_data.get("Endian")
file_source = file_record_data["DatasetProperties"]["FileSourceInfo"].get("FileSource")
if not (endian and file_source):
raise InvalidFileRecordData(f"Mandatory fields: Endian-{endian}"
f"FileSource-{file_source}")
@staticmethod
def _handle_download_url_response(response: dict) -> FileDownloadUrlResponse:
"""
Handle downloadURL according to file service version
:param response: The response already load from json
:type response: dict
:return: FileDownloadUrlResponse filled properly
:rtype: FileDownloadUrlResponse
"""
try:
# response got from latest version of File service
return FileDownloadUrlResponse(signed_url=response["signedUrl"],
unsigned_url=response["unsignedUrl"],
kind=response["kind"])
except KeyError:
# response got from a legacy version of File service
return FileDownloadUrlResponse(signed_url=response["SignedUrl"],
unsigned_url=None,
kind=None)
@tenacity.retry(**RETRY_SETTINGS)
@authorize()
def _send_post_request(self, headers: dict, url: str, request_body: str) -> requests.Response:
logger.debug(f"{request_body}")
response = requests.post(url, request_body, headers=headers)
logger.debug(response.content)
return response
@tenacity.retry(**RETRY_SETTINGS)
@authorize()
def _send_get_request(self, headers: dict, url: str) -> requests.Response:
response = requests.get(url, headers=headers)
logger.debug(response)
return response
def _get_upload_signed_url(self, headers: dict) -> FileUploadUrlResponse:
"""Get FileID, SignedURL and FileSource using File Service uploadURL
endpoint.
:param headers: Request headers to pass to the final request issuer
:type headers: dict
:return: FileUploadUrlResponse with data from service
:rtype: FileUploadUrlResponse
"""
logger.debug("Getting upload signed url.")
endpoint = f"{self._file_service_host}/v2/files/uploadURL"
response = self._send_get_request(headers, endpoint).json()
logger.debug("Signed url got.")
upload_url_response = FileUploadUrlResponse(file_id=response["FileID"],
signed_url=response["Location"]["SignedURL"],
file_source=response["Location"]["FileSource"])
return upload_url_response
def _get_download_signed_url(self, headers: dict, record_id: str) -> FileDownloadUrlResponse:
"""Get signedURL, unsignedURL and kind using File Service downloadURL
endpoint.
:param headers: Request headers to pass to the final request issuer
:type headers: dict
:param record_id: Unique id of the file record saved in the osdu system
:type record_id: str
:return: FileDownloadUrlResponse with signed and unsigned urls
:rtype: FileDownloadUrlResponse
"""
logger.debug("Getting download signed url.")
endpoint = f"{self._file_service_host}/v2/files/{record_id}/downloadURL"
response = self._send_get_request(headers, endpoint).json()
logger.debug("Signed url got.")
download_url_response = self._handle_download_url_response(response)
return download_url_response
@tenacity.retry(**RETRY_SETTINGS)
def _upload_file_request(self, headers: dict, signed_url: str, buffer: FileLikeObject):
"""Upload file via File service using signed_url.
:param headers: Request headers to pass to the final request issuer
:type headers: dict
:param signed_url: SignedURL to authenticate request
:type signed_url: str
:param buffer: Raw file data
:type buffer: FileLikeObject
"""
logger.debug("Uploading file.")
buffer.seek(0)
requests.put(signed_url, buffer.read(), headers=headers)
logger.debug("File uploaded.")
def _get_file_location_request(self, headers: dict, file_id: str) -> str:
"""Get file location using File Service.
:param headers: Request headers to pass to the final request issuer
:type headers: dict
:param file_id: String identifier of the file
:type file_id: str
:return: Full URI of the located file
:rtype: str
"""
logger.debug("Getting file location.")
request_body = json.dumps({"FileID": file_id})
endpoint = f"{self._file_service_host}/getFileLocation"
response = self._send_post_request(headers, endpoint, request_body)
logger.debug("File location got.")
return response.json()["Location"]
def upload_file(self, preload_file_path: str) -> str:
"""Copy file from preload_file_path location to Landing Zone in OSDU
platform using File service. Get Content-Type of this file, refresh
Content-Type with this value in headers while this file is being
uploaded onto OSDU platform.
:param preload_file_path: The URI of the preloaded file
:type preload_file_path: str
:return: FileSource obtained via File service
:rtype: str
"""
with io.BytesIO() as buffer:
buffer, content_type = self._get_file_from_preload_path(preload_file_path, buffer)
upload_url_response = self._get_upload_signed_url(self.request_headers)
headers = self.request_headers
headers["Content-Type"] = content_type
self._upload_file_request(headers, upload_url_response.signed_url, buffer)
return upload_url_response.file_source
def get_file_staging_location(self, file_source: str) -> str:
"""Retrieve location (full URI) of the file in staging area.
:param file_source: The FileSource (relative URI) of the file of the form
/{folder}/{file_id}
:type file_source: str
:return: Full URI of the location of the file in staging area
:rtype: str
"""
file_id = file_source.split("/")[-1]
file_staging_location = self._get_file_location_request(self.request_headers, file_id)
return file_staging_location
def get_file_permanent_location(self, file_record_id: str) -> str:
"""Retrieve location (full URI) of the file in permanent area.
:param file_record_id: The unique id of the file record (aka metadata
:type file_record_id: str
:return: Full URI of the location of the file in permanent area
:rtype: str
"""
download_url_response = self._get_download_signed_url(self.request_headers, file_record_id)
permanent_location = download_url_response.unsigned_url
return permanent_location
def save_file_record(self, file_record: dict) -> str:
"""Send request to store record via file service API.
:param file_record: The file record to save
:type file_record: dict
:return: OSDU system generated id of the saved record
:rtype: str
"""
self._verify_file_record_data(file_record["data"])
# TODO fix 'name' field processing
# Generate file entity name as workaround because file API required this field.
if not file_record["data"].get("Name"):
file_record["data"]["Name"] = \
f"surrogate_name_{file_record['data']['DatasetProperties']['FileSourceInfo']['PreloadFilePath'].split('/')[-1]}"
logger.info(f"Generated name: {file_record['data']['Name']}")
logger.info("Sending file record metadata to File service")
endpoint = f"{self._file_service_host}/v2/files/metadata"
response = self._send_post_request(self.request_headers, endpoint, json.dumps(file_record))
return response.json()["id"]
def batch_save_file_records(self, file_records: List[str]) -> List[str]:
"""Perform concurrent save file record requests.
:param file_records: List of file records to save
:type file_records: List[str]
:return: List of OSDU system generated ids of the saved records
:rtype: List[str]
"""
raise NotImplementedError("TODO(python-team) implementation.")
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import re
from collections import deque
from typing import Set, Iterator, Iterable
from uuid import uuid4
import toposort
from libs.refresh_token import TokenRefresher
from libs.traverse_manifest import ManifestEntity
from libs.utils import remove_trailing_colon
logger = logging.getLogger()
class EntityNode(object):
"""
This class represents entities and their links to parent and child ones.
"""
__slots__ = ["srn", "system_srn", "entity_info", "children", "parents", "unprocessed"]
SRN_REGEX = re.compile(
r"(?<=\")surrogate-key:[\w\-\.\d]+(?=\")|(?<=\")[\w\-\.]+:[\w\-\.]+--[\w\-\.]+:[\w\-\.\:]+:[0-9]*(?=\")")
def __init__(self, srn, entity_info: ManifestEntity):
self.srn = srn
self.entity_info = entity_info
self.system_srn = None
self.children = set()
self.parents = set()
self.unprocessed = False
def __repr__(self):
return f"SRN: {self.srn}"
@property
def content(self) -> dict:
return self.entity_info.entity
@content.setter
<