Skip to content
Snippets Groups Projects
Commit cca0b300 authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

Merge branch 'feature/providers_logic_split' into 'master'

Feature/providers logic split

See merge request !10
parents 38c0c24b 1a367620
No related branches found
No related tags found
1 merge request!10Feature/providers logic split
Pipeline #24709 passed
Showing
with 831 additions and 371 deletions
......@@ -22,7 +22,7 @@ stages:
- deploy
pylint:
image: johnybear/osdu-airflow:python36-1
image: johnybear/osdu-airflow:python36-2
stage: linters
allow_failure: true
script:
......@@ -32,7 +32,7 @@ pylint:
- exit ${EXIT_CODE}
isort:
image: johnybear/osdu-airflow:python36-1
image: johnybear/osdu-airflow:python36-2
allow_failure: true
stage: linters
script:
......@@ -42,7 +42,7 @@ isort:
test_dags:
stage: test_dags
image: johnybear/osdu-airflow:python36-1
image: johnybear/osdu-airflow:python36-2
script:
- chmod +x tests/test_dags.sh
- tests/./test_dags.sh || EXIT_CODE=$?
......@@ -55,7 +55,7 @@ test_dags:
unit_tests:
stage: unit_tests
image: johnybear/osdu-airflow:python36-1
image: johnybear/osdu-airflow:python36-2
script:
- chmod +x tests/unit_tests.sh
- tests/./unit_tests.sh || EXIT_CODE=$?
......@@ -68,6 +68,8 @@ osdu-gcp-deploy:
- cd src
- gsutil -m rsync -x "dags/libs*" -d -R dags $OSDU_GCP_DEPL_TARGET/dags/ingestion
- gsutil -m rsync -d -R dags/libs $OSDU_GCP_DEPL_TARGET/dags/libs
- gsutil -m rsync -x "dags/providers*" -d -R dags $OSDU_GCP_DEPL_TARGET/dags/ingestion
- gsutil -m rsync -d -R dags/providers $OSDU_GCP_DEPL_TARGET/dags/providers
- gsutil -m rsync -R plugins $OSDU_GCP_DEPL_TARGET/plugins
only:
variables:
......
......@@ -17,12 +17,15 @@
import configparser
import logging
import os
from airflow.models import Variable
from libs.refresh_token import AirflowTokenRefresher, refresh_token
from libs.context import Context
from libs.refresh_token import AirflowTokenRefresher
from osdu_api.libs.auth.authorization import authorize
from osdu_api.model.acl import Acl
from osdu_api.model.legal import Legal
from osdu_api.model.legal_compliance import LegalCompliance
from osdu_api.model.legal.legal import Legal
from osdu_api.model.legal.legal_compliance import LegalCompliance
from osdu_api.model.record import Record
from osdu_api.model.record_ancestry import RecordAncestry
from osdu_api.storage.record_client import RecordClient
......@@ -40,7 +43,8 @@ DEFAULT_SOURCE = config.get("DEFAULTS", "authority")
DEFAULT_VERSION = config.get("DEFAULTS", "kind_version")
@refresh_token(AirflowTokenRefresher())
@authorize(AirflowTokenRefresher())
def create_update_record_request(headers, record_client, record):
"""Send request to create or update records via RecordClient.
......@@ -49,7 +53,7 @@ def create_update_record_request(headers, record_client, record):
:param record: The record to create or update
:return: Storage service response
"""
resp = record_client.create_update_records([record], headers.items())
resp = record_client.create_update_records([record], headers)
return resp
......@@ -91,7 +95,7 @@ def create_records(**kwargs):
"AppKey": data_conf.get("app-key", "")
}
record_client = RecordClient()
record_client = RecordClient(AirflowTokenRefresher(), Context("test", "test"))
record_client.data_partition_id = data_conf.get(
"partition-id", DEFAULT_SOURCE)
resp = create_update_record_request(headers, record_client, record)
......
......@@ -21,13 +21,6 @@ class RecordsNotSearchableError(Exception):
pass
class RefreshSATokenError(Exception):
"""Raise when token is empty after attempt to get credentials from
service account file.
"""
pass
class PipelineFailedError(Exception):
"""Raise when pipeline failed."""
pass
......@@ -53,18 +46,8 @@ class NotOSDUSchemaFormatError(Exception):
pass
class SAFilePathError(Exception):
"""Raise when sa_file path is not specified in Env Variables."""
pass
class FileSourceError(Exception):
"""Raise when file doesn't exist under given path."""
pass
class GCSObjectURIError(Exception):
"""Raise when wrong Google Storage Object was given."""
"""Raise when file doesn't exist under given URI path."""
pass
......
......@@ -20,23 +20,20 @@ import io
import json
import logging
import uuid
from abc import ABC, abstractmethod
from typing import List, Tuple, TypeVar
from urllib.parse import urlparse
import requests
import tenacity
from google.cloud import storage
from libs.context import Context
from libs.constants import RETRIES, WAIT
from libs.exceptions import FileSourceError, GCSObjectURIError, InvalidFileRecordData
from libs.context import Context
from libs.exceptions import InvalidFileRecordData
from libs.mixins import HeadersMixin
from libs.refresh_token import TokenRefresher, refresh_token
from providers import blob_storage
from providers.types import BlobStorageClient, FileLikeObject
logger = logging.getLogger()
FileLikeObject = TypeVar("FileLikeObject", io.IOBase, io.RawIOBase, io.BytesIO)
RETRY_SETTINGS = {
"stop": tenacity.stop_after_attempt(RETRIES),
"wait": tenacity.wait_fixed(WAIT),
......@@ -59,10 +56,11 @@ class FileDownloadUrlResponse:
kind: str
class FileHandler(HeadersMixin, ABC):
class FileHandler(HeadersMixin):
"""Class to perform operations using OSDU File Service."""
def __init__(self, file_service_url: str, token_refresher: TokenRefresher, context: Context):
def __init__(self, file_service_url: str, token_refresher: TokenRefresher, context: Context,
blob_storage_client: BlobStorageClient = None):
"""File handler.
:param file_service_url: Base OSDU File service url
......@@ -75,17 +73,18 @@ class FileHandler(HeadersMixin, ABC):
super().__init__(context)
self._file_service_url = file_service_url
self.token_refresher = token_refresher
self._blob_storage_client = blob_storage_client or blob_storage.get_client()
@abstractmethod
def get_file_from_preload_path(self, preload_path: str) -> Tuple[FileLikeObject, str]:
"""Get a file from a given preload_path.
def _get_file_from_preload_path(self, preload_path: str,
file: FileLikeObject) -> Tuple[FileLikeObject, str]:
"""Get file from a preloaded path.
:param preload_path: Full URI of the file to obtain
:type preload_path: str
:return: Raw file data and content-type
:rtype: Tuple[FileLikeObject, str]
"""
raise NotImplementedError("Provide an implementation according to cloud env.")
return self._blob_storage_client.download_to_file(preload_file_path, file)
@staticmethod
def _verify_file_record_data(file_record_data: dict):
......@@ -103,6 +102,27 @@ class FileHandler(HeadersMixin, ABC):
raise InvalidFileRecordData(f"Mandatory fields: Endian-{endian}"
f"FileSource-{file_source}")
@staticmethod
def _handle_download_url_response(response: dict) -> FileDownloadUrlResponse:
"""
Handle downloadURL according to file service version
:param response: The response already load from json
:type response: dict
:return: FileDownloadUrlResponse filled properly
:rtype: FileDownloadUrlResponse
"""
try:
# response got from latest version of File service
return FileDownloadUrlResponse(signed_url=response["signedUrl"],
unsigned_url=response["unsignedUrl"],
kind=response["kind"])
except KeyError:
# response got from a legacy version of File service
return FileDownloadUrlResponse(signed_url=response["SignedUrl"],
unsigned_url=None,
kind=None)
@tenacity.retry(**RETRY_SETTINGS)
@refresh_token()
def _send_post_request(self, headers: dict, url: str, request_body: str) -> requests.Response:
......@@ -151,9 +171,7 @@ class FileHandler(HeadersMixin, ABC):
endpoint = f"{self._file_service_url}/v1/files/{record_id}/downloadURL"
response = self._send_get_request(headers, endpoint).json()
logger.debug("Signed url got.")
download_url_response = FileDownloadUrlResponse(signed_url=response["signedUrl"],
unsigned_url=response["unsignedUrl"],
kind=response["kind"])
download_url_response = self._handle_download_url_response(response)
return download_url_response
@tenacity.retry(**RETRY_SETTINGS)
......@@ -189,19 +207,24 @@ class FileHandler(HeadersMixin, ABC):
logger.debug("File location got.")
return response.json()["Location"]
@tenacity.retry(**RETRY_SETTINGS)
def upload_file(self, preload_file_path: str) -> str:
"""Upload file from preload_file_path to Landing Zone (staging area)
onto OSDU platform using File service.
"""Copy file from preload_file_path location to Landing Zone in OSDU
platform using File service. Get Content-Type of this file, refresh
Content-Type with this value in headers while this file is being
uploaded onto OSDU platform.
:param preload_file_path: Full URI of the file to obtain
:param preload_file_path: The URI of the preloaded file
:type preload_file_path: str
:return: The FileSource (relative URI) of the uploaded file
:return: FileSource obtained via File service
:rtype: str
"""
buffer = self.get_file_from_preload_path(preload_file_path)
upload_url_response = self._get_upload_signed_url(self.request_headers)
self._upload_file_request(self.request_headers, upload_url_response.signed_url, buffer)
with io.BytesIO() as buffer:
buffer, content_type = self._get_file_from_preload_path(preload_file_path, buffer)
upload_url_response = self._get_upload_signed_url(self.request_headers)
headers = self.request_headers
headers["Content-Type"] = content_type
self._upload_file_request(headers, upload_url_response.signed_url, buffer)
return upload_url_response.file_source
......@@ -260,104 +283,3 @@ class FileHandler(HeadersMixin, ABC):
:rtype: List[str]
"""
raise NotImplementedError("TODO(python-team) implementation.")
class GCSFileHandler(FileHandler):
"""GCP specific implementation of FileHandler."""
def __init__(
self,
file_service_url: str,
token_refresher: TokenRefresher,
context: Context,
):
"""GCS specific FileHandler.
:param file_service_url: Base osdu file service url
:type file_service_url: str
:param token_refresher: Object to refresh tokens
:type token_refresher: TokenRefresher
:param context: The tenant context data
:type context: Context
"""
super().__init__(file_service_url, token_refresher, context)
@staticmethod
def _parse_object_uri(file_path: str) -> Tuple[str, str]:
"""Parse a URI to obtain the bucket name and the blob name.
:param file_path: URI to parse
:type file_path: str
:raises GCSObjectURIError: When URI is not compliant with GCS
:return: The bucket and blob names
:rtype: Tuple[str, str]
"""
parsed_path = urlparse(file_path)
if parsed_path.scheme == "gs":
bucket_name = parsed_path.netloc
source_blob_name = parsed_path.path[1:] # delete the first slash
if bucket_name and source_blob_name:
return bucket_name, source_blob_name
raise GCSObjectURIError
@tenacity.retry(**RETRY_SETTINGS)
def get_file_from_bucket(self,
bucket_name: str,
source_blob_name: str) -> Tuple[io.BytesIO, str]:
"""Get file and content type using GCS client.
:param bucket_name: The name of the bucket
:type bucket_name: str
:param source_blob_name: The name of the file
:type source_blob_name: str
:raises FileSourceError: When validation of file fails
:return: The raw data of the file and the content-type
:rtype: Tuple[io.BytesIO, str]
"""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.get_blob(source_blob_name)
does_exist = blob.exists()
if not does_exist:
raise FileSourceError("File doesn't exist in preloadPath "
f"'gs://{bucket_name}/{source_blob_name}'")
file = io.BytesIO()
blob.download_to_file(file)
logger.debug("File got from preload_path zone")
return file, blob.content_type
def get_file_from_preload_path(self, preload_file_path: str) -> Tuple[io.BytesIO, str]:
"""Get file from given preload file path.
:param preload_file_path: The full URI of the preloaded file
:type preload_file_path: str
:return: The raw data of the file and the content-type
:rtype: Tuple[io.BytesIO, str]
"""
bucket_name, blob_name = self._parse_object_uri(preload_file_path)
buffer, content_type = self.get_file_from_bucket(bucket_name, blob_name)
return buffer, content_type
def upload_file(self, preload_file_path: str) -> str:
"""Copy file from preload_file_path location to Landing Zone in OSDU
platform using File service. Get Content-Type of this file, refresh
Content-Type with this value in headers while this file is being
uploaded onto OSDU platform.
:param preload_file_path: The URI of the preloaded file
:type preload_file_path: str
:return: FileSource obtained via File service
:rtype: str
"""
buffer, content_type = self.get_file_from_preload_path(preload_file_path)
upload_url_response = self._get_upload_signed_url(self.request_headers)
headers = self.request_headers
headers["Content-Type"] = content_type
self._upload_file_request(headers, upload_url_response.signed_url, buffer)
return upload_url_response.file_source
......@@ -23,13 +23,13 @@ from typing import List, Tuple
import requests
import tenacity
from libs.context import Context
from libs.constants import RETRIES, WAIT
from libs.context import Context
from libs.exceptions import EmptyManifestError, NotOSDUSchemaFormatError
from libs.handle_file import FileHandler
from libs.mixins import HeadersMixin
from libs.refresh_token import TokenRefresher, refresh_token
from libs.source_file_check import SourceFileChecker
from libs.handle_file import FileHandler
RETRY_SETTINGS = {
"stop": tenacity.stop_after_attempt(RETRIES),
......@@ -235,5 +235,4 @@ class ManifestProcessor(HeadersMixin):
self.request_headers, request_data=populated_manifest_records)
record_ids.extend(save_manifests_response.json()["recordIds"])
logger.warning(record_ids)
return record_ids
......@@ -18,20 +18,19 @@
import json
import logging
import os
from typing import Callable, Union
from abc import ABC, abstractmethod
from functools import partial
from http import HTTPStatus
from urllib.parse import urlparse
from typing import Any, Callable, Union
import requests
from google.auth.transport.requests import Request
from google.cloud import storage
from google.oauth2 import service_account
from libs.exceptions import RefreshSATokenError, SAFilePathError, TokenRefresherNotPresentError
from libs.exceptions import TokenRefresherNotPresentError
from osdu_api.libs.auth.authorization import TokenRefresher as OSDUAPITokenRefresher
from providers import credentials
from providers.types import BaseCredentials
from tenacity import retry, stop_after_attempt
logger = logging.getLogger()
logger = logging.getLogger(__name__)
RETRIES = 3
......@@ -61,7 +60,7 @@ class TokenRefresher(ABC):
@property
@abstractmethod
def authorization_header(self) -> dict:
"""Authorization header. Must return authorization header for
"""Authorization header. Must return authorization header for
updating headers dict.
E.g. return {"Authorization": "Bearer <access_token>"}
......@@ -71,140 +70,30 @@ class TokenRefresher(ABC):
pass
class AirflowTokenRefresher(TokenRefresher):
DEFAULT_ACCESS_SCOPES = ['openid', 'email', 'profile']
class AirflowTokenRefresher(TokenRefresher, OSDUAPITokenRefresher):
"""Simple wrapper for credentials to be used in refresh_token decorator within Airflow."""
def __init__(self, access_scopes: list = None):
"""Init GCP Token Refresher.
:param access_scopes: the list of scopes for authorization, defaults
to None
:type access_scopes: list, optional
"""
def __init__(self, creds: BaseCredentials = None):
self._credentials = creds or credentials.get_credentials()
from airflow.models import Variable
self.airflow_variables = Variable
self._access_token = None
self._access_scopes = access_scopes
@property
def access_scopes(self) -> list:
"""Return access scopes. Use DEFAULT_ACCESS_SCOPES if user-defined
ones weren't provided.
:return: A list of scopes to authorize
:rtype: list
"""
if not self._access_scopes:
self._access_scopes = self.DEFAULT_ACCESS_SCOPES
return self._access_scopes
@staticmethod
@retry(stop=stop_after_attempt(RETRIES))
def get_sa_info_from_google_storage(bucket_name: str, source_blob_name: str) -> dict:
"""Get service account file content from Google Storage.
:param bucket_name: bucket where file is located
:type bucket_name: str
:param source_blob_name: name of the service account key file
:type source_blob_name: str
:return: A dictionary with the information loaded from file
:rtype: dict
"""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
logger.info("Got SA_file.")
sa_info = json.loads(blob.download_as_string())
return sa_info
@staticmethod
def get_sa_info_from_file(path: str) -> dict:
"""Load service account information from file.
:param path: file path
:type path: str
:return: A dictionary with the information loaded from file
:rtype: dict
"""
with open(path) as f:
return json.load(f)
def get_sa_info(self) -> dict:
"""Get file path from SA_FILE_PATH environment variable. This path
can be GCS object URI or local file path. Return content of sa path
as dict.
:raises SAFilePathError: When a invalid path is retrieved from env
:return: A dictionary with the information loaded from file
:rtype: dict
"""
sa_file_path = os.environ.get("SA_FILE_PATH", None)
parsed_path = urlparse(sa_file_path)
if parsed_path.scheme == "gs":
bucket_name = parsed_path.netloc
source_blob_name = parsed_path.path[1:] # delete the first slash
sa_info = self.get_sa_info_from_google_storage(bucket_name, source_blob_name)
elif not parsed_path.scheme and os.path.isfile(parsed_path.path):
sa_info = self.get_sa_info_from_file(parsed_path.path)
else:
logger.error("SA file path error.")
raise SAFilePathError(f"Got path {os.environ.get('SA_FILE_PATH', None)}")
return sa_info
@retry(stop=stop_after_attempt(RETRIES))
def _get_credentials_from_sa_info(self, sa_info: dict) -> service_account.Credentials:
"""Get credentials from service account info.
:param sa_info: The service account info as dict
:type sa_info: dict
:raises e: Generic exception
:return: Google service account credentials
:rtype: service_account.Credentials
"""
try:
credentials = service_account.Credentials.from_service_account_info(
sa_info, scopes=self.access_scopes)
except ValueError as e:
logger.error("SA file has bad format.")
raise e
return credentials
def get_access_token_using_sa_file(self) -> str:
"""Get new access token using SA info.
:raises RefreshSATokenError: When token can't be obtained after refresh
:return: The refreshed access token
:rtype: str
"""
sa_info = self.get_sa_info()
credentials = self._get_credentials_from_sa_info(sa_info)
logger.info("Refresh token.")
credentials.refresh(Request())
token = credentials.token
if credentials.token is None:
logger.error("Can't refresh token using SA-file. Token is empty.")
raise RefreshSATokenError
logger.info("Token refreshed.")
return token
@retry(stop=stop_after_attempt(RETRIES))
def refresh_token(self) -> str:
"""Refresh token. Also, it stores its value in Airflow Variable.
"""Refresh the token and cache token using airflow variables.
:return: The refreshed access token
:return: The refreshed token
:rtype: str
"""
token = self.get_access_token_using_sa_file()
self.airflow_variables.set("access_token", token)
self._access_token = token
self._credentials.refresh_token()
self.airflow_variables.set("access_token", self._credentials.access_token)
self._access_token = self._credentials.access_token
return self._access_token
@property
def access_token(self) -> str:
"""Access token.
"""The access token.
:return: The access token
:rtype: str
......@@ -218,9 +107,9 @@ class AirflowTokenRefresher(TokenRefresher):
@property
def authorization_header(self) -> dict:
"""Authorizatioh header.
"""Authorization header with bearer token.
:return: A dict with the authorization header updated
:return: Auth header as dict
:rtype: dict
"""
return {"Authorization": f"Bearer {self.access_token}"}
......@@ -236,7 +125,7 @@ def make_callable_request(obj: Union[object, None], request_function: Callable,
:type request_function: Callable
:param headers: The request headers
:type headers: dict
:return: A partial callable
:return: A partial callable
:rtype: Callable
"""
if obj: # if wrapped function is an object's method
......@@ -302,7 +191,7 @@ def send_request_with_auth_header(token_refresher: TokenRefresher, *args,
:param token_refresher: The token refresher instance
:type token_refresher: TokenRefresher
:raises e: Re-raises any requests.HTTPError
:return: The server response
:return: The server response
:rtype: requests.Response
"""
obj = kwargs.pop("obj", None)
......
......@@ -13,92 +13,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from abc import ABC, abstractmethod
from typing import Tuple
from urllib.parse import urlparse
from libs.exceptions import FileSourceError
from providers import blob_storage
from providers.types import BlobStorageClient
import google.auth
import tenacity
from google.cloud import storage
from libs.exceptions import GCSObjectURIError, FileSourceError
class SourceFileChecker:
"""Class to perform file validation."""
RETRIES = 3
def __init__(self, blob_storage_client: BlobStorageClient = None):
"""Initiliaze SourceFileChecker with provided BlobStorageClient.
logger = logging.getLogger()
class SourceFileChecker(ABC):
"""Class to verify the existance of a file in cloud storage."""
@abstractmethod
def does_file_exist(self, file_path: str):
"""Validate if file exists under given file path. Raises error if not.
:param file_path: The full URI if the file to verify
:type file_path: str
"""
pass
class GCSSourceFileChecker(SourceFileChecker):
def __repr__(self):
return "GCS file checker"
def _parse_object_uri(self, file_path: str) -> Tuple[str, str]:
"""Parse GCS Object uri.
:param file_path: The URI to parse
:type file_path: str
:raises GCSObjectURIError: When Tthe format of the URI does not comply
:return: The bucket and the blob names
:rtype: Tuple[str, str]
"""
parsed_path = urlparse(file_path)
if parsed_path.scheme == "gs":
bucket_name = parsed_path.netloc
source_blob_name = parsed_path.path[1:] # delete the first slash
if bucket_name and source_blob_name:
return bucket_name, source_blob_name
raise GCSObjectURIError(f"Wrong format path to GCS object. Object path is '{file_path}'")
@tenacity.retry(
stop=tenacity.stop_after_attempt(RETRIES),
reraise=True
)
def _does_file_exist_in_bucket(self, bucket_name: str, source_blob_name: str) -> bool:
"""Do the actual verification of file via gcs client.
:param bucket_name: The name of the bucket
:type bucket_name: str
:param source_blob_name: The name of the file
:type source_blob_name: str
:return: A boolean indicating if the file exists
:rtype: bool
:param blob_storage_client: Optional storage client, if not provided,
client will be obtained according to `CLOUD_PROVIDER` env var,
defaults to None
:type blob_storage_client: BlobStorageClient, optional
"""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
does_exist = blob.exists()
return does_exist
self._blob_storage_client = blob_storage_client or blob_storage.get_client()
def does_file_exist(self, file_path: str):
"""Verify if file exists under given file path (URI).
"""Verifies if a file exist give file path.
:param file_path: The full URI of the file to verify
:param file_path: The full URI of the file
:type file_path: str
:raises FileSourceError: When the file does not exists
"""
bucket_name, source_blob_name = self._parse_object_uri(file_path)
try:
does_exist = self._does_file_exist_in_bucket(bucket_name, source_blob_name)
except google.auth.exceptions.DefaultCredentialsError:
# TODO(python-team) Figure out a way to mock google endpoints in integration tests.
logger.error("No default credentials found in env, is this integration-env?")
else:
if not does_exist:
raise FileSourceError(f"File doesn't exist in {file_path}.")
if not self._blob_storage_client.does_file_exist(file_path):
raise FileSourceError(f"File not found in {file_path}.")
Providers folder structure.
```
providers
│ README.md
│ {blob_storage, ..., types}.py
└───gcp
│ │ gcp_blob_storage_client.py
│ │ ...
└───aws
│ ...
```
In the base folder there are the following base classes:
`types.py`: Stores all the interfaces to be implemented by all cloud providers.
`factory.py` Provides a mechanism to register and retrieve cloud specific implementations using a class decorator @ProviderFactory.register. A registry per interface is required, if a new interface is implemented a new registry should be added.
`constants.py` Reusable constants.
`exceptions.py` Provider specific exceptions should be thrown here and bubble up all the way up.
All interfaces will require a **wrapper module** that registers specific implementations and provides a factory method. Examples of wrapper modules are: `blob_storage.py` and `credentials.py`. Please pay attention to the import section as it's required that all modules that implement an interface to be imported here so they can be registered.
Adding a new implementation.
1. Create a module (e.g., gcp_blob_storage_client.py) under the cloud provider subfolder. Within the module, create a class that inherits from given base interface and decorate it with @ProviderFactory.register(`CLOUD_PROVIDER_NAME_CONSTANT`)
2. Add an import in the **wrapper module** of the interface (e.g., blob_storage.py -> from providerds.gcp.gcp_blob_storage_client.py) to register the new implementation. You may optionally modify the factory method if the initialization of the new implemented class needs some customatization. Just don't bubble up customatization as factory method should be transparent for all clients.
Adding a new interface.
1. Add interface to types.py.
2. Add a new registry to ProvidersFactory.py and modify register logic to take into account new type. Add a new get method for the new type.
3. Add the new implementation.
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Blob storage init module."""
import os
from providers.factory import ProvidersFactory
# import section needed to register cloud specific clients
from providers.gcp import gcp_blob_storage_client # pylint: disable=unused-import
from providers.types import BlobStorageClient
def get_client(cloud_env: str = None) -> BlobStorageClient:
"""Get specific blob storage client according to cloud environment.
:param cloud_env: Name of the provided cloud env, if not given,
`CLOUD_PROVIDER` env var should be set.
:type cloud_env: str, optional
:return: An instance of BlobStorageClient
:rtype: BlobStorageClient
"""
cloud_env = cloud_env or os.environ.get("CLOUD_PROVIDER")
return ProvidersFactory.get_blob_storage_client(cloud_env)
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Providers constants module."""
GOOGLE_CLOUD_PROVIDER = "gcp"
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Credentials init module."""
import os
from providers.factory import ProvidersFactory
# import section needed to register cloud specific clients
from providers.gcp import gcp_credentials # pylint: disable=unused-import
from providers.types import BaseCredentials
def get_credentials(cloud_env: str = None) -> BaseCredentials:
"""Get specific Credentials according to cloud environment.
:param cloud_env: Name of the provided cloud env, if not given,
`CLOUD_PROVIDER` env var should be set.
:type cloud_env: str, optional
:return: An instance of BaseCredentials
:rtype: BaseCredentials
"""
cloud_env = cloud_env or os.environ.get("CLOUD_PROVIDER")
return ProvidersFactory.get_credentials(cloud_env)
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Providers exceptions module."""
class RefreshSATokenError(Exception):
"""Raise when token is empty after attempt to get credentials from service account file."""
pass
class SAFilePathError(Exception):
"""Raise when sa_file path is not specified in Env Variables."""
pass
class GCSObjectURIError(Exception):
"""Raise when wrong Google Storage Object URI was given."""
pass
import logging
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Providers factory module."""
from typing import Any, Callable
from providers.types import BaseCredentials, BlobStorageClient
class ProvidersFactory:
"""The factory class for creating cloud specific clients."""
blob_storage_registry = {}
credentials_registry = {}
@classmethod
def register(cls, cloud_provider: str) -> Callable:
"""Class method to register BlogStorage class to the internal registry.
:param cloud_provider: The name of the implementation to register
:type cloud_provider: str
:raises ValueError: If the type of registered class does not match any
registry
:return: The class
:rtype: Callable
"""
def inner_wrapper(wrapped_class: Any) -> Callable:
if issubclass(wrapped_class, BlobStorageClient):
cls.blob_storage_registry.setdefault(cloud_provider, wrapped_class)
elif issubclass(wrapped_class, BaseCredentials):
cls.credentials_registry.setdefault(cloud_provider, wrapped_class)
else:
raise ValueError(f"Not recognized type for this registry: {type(wrapped_class)}.")
return wrapped_class
return inner_wrapper
@classmethod
def get_blob_storage_client(cls, cloud_provider: str, *args, **kwargs) -> BlobStorageClient:
"""Get BlobStorageClient instance given a cloud provider.
:param cloud_provider: The name of the cloud provider
:type cloud_provider: str
:raises NotImplementedError: When a class for this provided hasn't
been registered yet
:return: A cloud specific instance of BlobStorageClient
:rtype: BlobStorageClient
"""
if cloud_provider not in cls.blob_storage_registry:
raise NotImplementedError(
f"BlobStorageClient for {cloud_provider} does not exist in the registry.")
registered_class = cls.blob_storage_registry[cloud_provider]
return registered_class(*args, **kwargs)
@classmethod
def get_credentials(cls, cloud_provider: str, *args, **kwargs) -> BaseCredentials:
"""Get credentials instance given a cloud provider.
:param cloud_provider: The name of the cloud provider
:type cloud_provider: str
:raises NotImplementedError: When a class for this provided hasn't
been registered yet
:return: A cloud especific instance of Credentials
:rtype: BaseCredentials
"""
if cloud_provider not in cls.credentials_registry:
raise NotImplementedError(
f"Credential for {cloud_provider} does not exist in the registry.")
registered_class = cls.credentials_registry[cloud_provider]
return registered_class(*args, **kwargs)
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Blob storage GCP client module."""
import io
import logging
from typing import Tuple
from urllib.parse import urlparse
import google.auth
import tenacity
from google.cloud import storage
from providers.constants import GOOGLE_CLOUD_PROVIDER
from providers.exceptions import GCSObjectURIError
from providers.factory import ProvidersFactory
from providers.types import BlobStorageClient, FileLikeObject
logger = logging.getLogger(__name__)
RETRY_SETTINGS = {
"stop": tenacity.stop_after_attempt(3),
"wait": tenacity.wait_fixed(10),
"reraise": True,
}
@ProvidersFactory.register(GOOGLE_CLOUD_PROVIDER)
class GoogleCloudStorageClient(BlobStorageClient):
"""Implementation of blob storage client for the Google provider."""
def __init__(self):
"""Initialize storage client."""
self._storage_client = storage.Client()
@staticmethod
def _parse_gcs_uri(gcs_uri: str) -> Tuple[str, str]:
"""Parse gcs compliant uri and return bucket_name and blob_name.
:param gcs_uri: A GCS compliant URI.
:type gcs_uri: str
:raises GCSObjectURIError: When non GCS compliant URI is provided
:return: A tuple (bucket_name, blob_name) obtained from the URI
:rtype: Tuple[str, str]
"""
parsed_path = urlparse(gcs_uri)
if parsed_path.scheme == "gs":
bucket_name = parsed_path.netloc
source_blob_name = parsed_path.path[1:] # delete the first slash
if bucket_name and source_blob_name:
return bucket_name, source_blob_name
raise GCSObjectURIError(f"Wrong format path to GCS object. Object path is '{gcs_uri}'")
@tenacity.retry(**RETRY_SETTINGS)
def _get_file_from_bucket(self,
bucket_name: str,
source_blob_name: str,
file: FileLikeObject) -> Tuple[io.BytesIO, str]:
"""Get file from gcs bucket.
:param bucket_name: The name of the bucket that holds the file
:type bucket_name: str
:param source_blob_name: The name of the file
:type source_blob_name: str
:param file: The file where to download the blob content
:type file: FileLikeObject
:return: A tuple containing file and its content-type
:rtype: Tuple[io.BytesIO, str]
"""
bucket = self._storage_client.bucket(bucket_name)
blob = bucket.get_blob(source_blob_name)
blob.download_to_file(file)
logger.debug(f"File {source_blob_name} got from bucket {bucket_name}.")
return file, blob.content_type
@tenacity.retry(**RETRY_SETTINGS)
def _get_file_as_bytes_from_bucket(self,
bucket_name: str,
source_blob_name: str) -> Tuple[bytes, str]:
"""Get file as bytes from gcs bucket.
:param bucket_name: The name of the bucket that holds the file
:type bucket_name: str
:param source_blob_name: The name of the file
:type source_blob_name: str
:return: A tuple containing file and its content-type
:rtype: Tuple[bytes, str]
"""
bucket = self._storage_client.bucket(bucket_name)
blob = bucket.get_blob(source_blob_name)
file_as_bytes = blob.download_as_bytes()
logger.debug(f"File {source_blob_name} got from bucket {bucket_name}.")
return file_as_bytes, blob.content_type
@tenacity.retry(**RETRY_SETTINGS)
def _does_file_exist_in_bucket(self, bucket_name: str, source_blob_name: str) -> bool:
"""Use gcs client and verify a file exists in given bucket.
:param bucket_name: The name of the bucket that holds the resoie
:type bucket_name: str
:param source_blob_name: The name of the file
:type source_blob_name: str
:return: A boolean indicating if the file exists
:rtype: bool
"""
bucket = self._storage_client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
return blob.exists()
def does_file_exist(self, uri: str) -> bool:
"""Verify if a file exists in the given URI.
:param uri: The GCS URI of the file.
:type uri: str
:return: A boolean indicating if the file exists
:rtype: bool
"""
bucket_name, source_blob_name = self._parse_gcs_uri(uri)
try:
return self._does_file_exist_in_bucket(bucket_name, source_blob_name)
except google.auth.exceptions.DefaultCredentialsError:
# TODO(python-team) Figure out a way to mock google endpoints in integration tests.
logger.error("No default credentials found in env, is this integration-env?")
def download_to_file(self, uri: str, file: FileLikeObject) -> Tuple[FileLikeObject, str]:
"""Download file from the given URI.
:param uri: The GCS URI of the file.
:type uri: str
:param file: The file where to download the blob content
:type file: FileLikeObject
:return: A tuple containing the file and its content-type
:rtype: Tuple[io.BytesIO, str]
"""
bucket_name, blob_name = self._parse_gcs_uri(uri)
return self._get_file_from_bucket(bucket_name, blob_name, file)
def download_file_as_bytes(self, uri: str) -> Tuple[bytes, str]:
"""Download file as bytes from the given URI.
:param uri: The GCS URI of the file
:type uri: str
:return: The file as bytes and its content-type
:rtype: Tuple[bytes, str]
"""
bucket_name, blob_name = self._parse_gcs_uri(uri)
return self._get_file_as_bytes_from_bucket(bucket_name, blob_name)
def upload_file(self, uri: str, blob_file: FileLikeObject, content_type: str):
"""Upload a file to the given uri.
:param uri: The GCS URI of the file
:type uri: str
:param blob: The file
:type blob: FileLikeObject
:param content_type: [description]
:type content_type: str
"""
bucket_name, blob_name = self._parse_gcs_uri(uri)
bucket = self._storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.upload_from_file(blob_file, content_type=content_type)
logger.debug(f"Uploaded file to {uri}.")
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GCP Credentials module."""
import json
import logging
import os
from urllib.parse import urlparse
import requests
from google.auth.transport.requests import Request
from google.cloud import storage
from google.oauth2 import service_account
from providers.constants import GOOGLE_CLOUD_PROVIDER
from providers.exceptions import RefreshSATokenError, SAFilePathError
from providers.factory import ProvidersFactory
from providers.types import BaseCredentials
from tenacity import retry, stop_after_attempt
logger = logging.getLogger(__name__)
RETRIES = 3
@ProvidersFactory.register(GOOGLE_CLOUD_PROVIDER)
class GCPCredentials(BaseCredentials):
"""GCP Credentials Provider."""
DEFAULT_ACCESS_SCOPES = ["openid", "email", "profile"]
def __init__(self, access_scopes: list = None):
"""Initialize GCP Credentials object.
:param access_scopes: Optional scopes, defaults to None
:type access_scopes: list, optional
"""
self._access_token = None
self._access_scopes = access_scopes
self._storage_client = storage.Client()
@property
def access_scopes(self) -> list:
"""
Return access scopes.
Use DEFAULT_ACCESS_SCOPES if user-defined ones weren't provided.
"""
if not self._access_scopes:
self._access_scopes = self.DEFAULT_ACCESS_SCOPES
return self._access_scopes
@retry(stop=stop_after_attempt(RETRIES))
def _get_sa_info_from_google_storage(self, bucket_name: str, source_blob_name: str) -> dict:
"""Get sa_file content from Google Storage.
:param bucket_name: The name of the bucket
:type bucket_name: str
:param source_blob_name: The name of the file
:type source_blob_name: str
:return: Service account info as dict
:rtype: dict
"""
bucket = self._storage_client .bucket(bucket_name)
blob = bucket.blob(source_blob_name)
logger.info("Got SA_file.")
sa_info = json.loads(blob.download_as_string())
return sa_info
@staticmethod
def _get_sa_info_from_file(path: str) -> dict:
"""Return infor from file.
:param path: The path of the file.
:type path: str
:return: Loaded file as dict
:rtype: dict
"""
with open(path) as f:
return json.load(f)
def _get_sa_info(self) -> dict:
"""Get file path from SA_FILE_PATH environmental variable.
This path can be GCS object URI or local file path.
Return content of sa path as dict.
:raises SAFilePathError: When an error occurs with file path
:return: Service account info
:rtype: dict
"""
sa_file_path = os.environ.get("SA_FILE_PATH", None)
parsed_path = urlparse(sa_file_path)
if parsed_path.scheme == "gs":
bucket_name = parsed_path.netloc
source_blob_name = parsed_path.path[1:] # delete the first slash
sa_info = self._get_sa_info_from_google_storage(bucket_name, source_blob_name)
elif not parsed_path.scheme and os.path.isfile(parsed_path.path):
sa_info = self._get_sa_info_from_file(parsed_path.path)
else:
logger.error("SA file path error.")
raise SAFilePathError(f"Got path {os.environ.get('SA_FILE_PATH', None)}")
return sa_info
@retry(stop=stop_after_attempt(RETRIES))
def _get_credentials_from_sa_info(self, sa_info: dict) -> service_account.Credentials:
"""Build and get a credentials object using service account info.
:param sa_info: Json loaded service account info
:type sa_info: dict
:raises e: If loaded file has bad format
:return: Google credentials object obtained from service account
:rtype: service_account.Credentials
"""
try:
credentials = service_account.Credentials.from_service_account_info(
sa_info, scopes=self.access_scopes)
except ValueError as e:
logger.error("SA file has bad format.")
raise e
return credentials
def _get_access_token_using_sa_file(self) -> str:
"""Get new access token using SA info.
:raises RefreshSATokenError: When underlying client can't refresh token
:return: Refreshed token
:rtype: str
"""
sa_info = self._get_sa_info()
credentials = self._get_credentials_from_sa_info(sa_info)
logger.info("Refresh token.")
credentials.refresh(Request())
token = credentials.token
if credentials.token is None:
logger.error("Can't refresh token using SA-file. Token is empty.")
raise RefreshSATokenError
logger.info("Token refreshed.")
return token
@retry(stop=stop_after_attempt(RETRIES))
def refresh_token(self) -> str:
"""Refresh token.
:return: Refreshed token
:rtype: str
"""
token = self._get_access_token_using_sa_file()
self._access_token = token
return self._access_token
@property
def access_token(self) -> str:
"""The access token.
:return: Access token string.
:rtype: str
"""
return self._access_token
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Types module."""
import abc
import io
from typing import Tuple, TypeVar
FileLikeObject = TypeVar("FileLikeObject", io.IOBase, io.RawIOBase, io.BytesIO)
class BlobStorageClient(abc.ABC):
"""Base interface for storage clients."""
@abc.abstractmethod
def download_to_file(self, uri: str, file: FileLikeObject) -> Tuple[FileLikeObject, str]:
"""Download file from the given URI.
:param uri: The full URI of the file.
:type uri: str
:param file: The file where to download the blob content
:type file: FileLikeObject
:return: A tuple containing the file and its content-type
:rtype: Tuple[FileLikeObject, str]
"""
pass
@abc.abstractmethod
def download_file_as_bytes(self, uri: str) -> Tuple[bytes, str]:
"""Download file as bytes from the given URI.
:param uri: The full URI of the file
:type uri: str
:return: The file as bytes and its content-type
:rtype: Tuple[bytes, str]
"""
pass
@abc.abstractmethod
def upload_file(self, uri: str, file: FileLikeObject, content_type: str):
"""Upload blob to given URI.
:param uri: The full target URI of the resource to upload.
:type uri: str
:param file: The file to upload
:type file: FileLikeObject
:param content_type: The content-type of the file to uplaod
:type content_type: str
"""
pass
@abc.abstractmethod
def does_file_exist(self, uri: str) -> bool:
"""Verify if a file resource exists in the given URI.
:param uri: The URI of the resource to verify
:type uri: str
:return: True if exists, False otherwise
:rtype: bool
"""
pass
class BaseCredentials(abc.ABC):
"""Base interface for credentials."""
@abc.abstractmethod
def refresh_token(self) -> str:
"""Refresh auth token.
:return: refreshed token
:rtype: str
"""
pass
@abc.abstractproperty
def access_token(self) -> str:
"""Auth access token.
:return: token string
:rtype: str
"""
pass
......@@ -19,8 +19,8 @@ import logging
from airflow.utils import apply_defaults
from airflow.models import BaseOperator, Variable
from libs.context import Context
from libs.source_file_check import GCSSourceFileChecker
from libs.handle_file import GCSFileHandler
from libs.source_file_check import SourceFileChecker
from libs.handle_file import FileHandler
from libs.refresh_token import AirflowTokenRefresher
from libs.process_manifest_r3 import ManifestProcessor
from libs.traverse_manifest import ManifestTraversal
......@@ -58,8 +58,9 @@ class ProcessManifestOperatorR3(BaseOperator):
"""
payload_context = Context.populate(context["dag_run"].conf)
token_refresher = AirflowTokenRefresher()
file_handler = GCSFileHandler(self.file_service_url, token_refresher, payload_context)
source_file_checker = GCSSourceFileChecker()
file_handler = FileHandler(self.file_service_url, token_refresher, payload_context)
source_file_checker = SourceFileChecker()
validator = SchemaValidator(
self.schema_service_url,
token_refresher,
......
# Copyright 2021 Google LLC
# Copyright 2021 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from providers import mock_providers
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment