diff --git a/src/dags/libs/validate_schema.py b/src/dags/libs/validate_schema.py index bbe7428332727c550fbdf0c751b42739d6b17925..d51c557d8229436cdef508d1de5347d08d22e5ff 100644 --- a/src/dags/libs/validate_schema.py +++ b/src/dags/libs/validate_schema.py @@ -164,7 +164,7 @@ class SchemaValidator(HeadersMixin): """ if not schema: schema = self.get_schema(entity["kind"]) - data = entity["data"] + data = entity try: self._validate_against_schema(schema, data) logger.debug(f"Record successfully validated") diff --git a/src/dags/providers/azure/__init__.py b/src/dags/providers/azure/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..56d545215d47ae77b35aafd66cd9406af68e43a6 --- /dev/null +++ b/src/dags/providers/azure/__init__.py @@ -0,0 +1,13 @@ +# Copyright © Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/dags/providers/azure/azure_blob_storage_client.py b/src/dags/providers/azure/azure_blob_storage_client.py new file mode 100644 index 0000000000000000000000000000000000000000..a0653974f9b528e1d05b01bf77e5caadabba9485 --- /dev/null +++ b/src/dags/providers/azure/azure_blob_storage_client.py @@ -0,0 +1,80 @@ +# Copyright © Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Blob storage Azure client module""" + +import tenacity +from providers.constants import AZURE_CLOUD_PROVIDER +import logging +from providers.factory import ProvidersFactory +from providers.types import BlobStorageClient, FileLikeObject +from typing import Tuple + +logger = logging.getLogger(__name__) + +RETRY_SETTINGS = { + "stop": tenacity.stop_after_attempt(3), + "wait": tenacity.wait_fixed(10), + "reraise": True, +} + +@ProvidersFactory.register(AZURE_CLOUD_PROVIDER) +class AzureCloudStorageClient(BlobStorageClient): + """Implementation of blob storage client for the Azure provider.""" + def __init__(self): + """Initialize storage client.""" + pass + + def does_file_exist(self, uri: str) -> bool: + """Verify if a file exists in the given URI. + + :param uri: The GCS URI of the file. + :type uri: str + :return: A boolean indicating if the file exists + :rtype: bool + """ + pass + + def download_to_file(self, uri: str, file: FileLikeObject) -> Tuple[FileLikeObject, str]: + """Download file from the given URI. + + :param uri: The GCS URI of the file. + :type uri: str + :param file: The file where to download the blob content + :type file: FileLikeObject + :return: A tuple containing the file and its content-type + :rtype: Tuple[io.BytesIO, str] + """ + pass + + def download_file_as_bytes(self, uri: str) -> Tuple[bytes, str]: + """Download file as bytes from the given URI. + + :param uri: The GCS URI of the file + :type uri: str + :return: The file as bytes and its content-type + :rtype: Tuple[bytes, str] + """ + pass + + def upload_file(self, uri: str, blob_file: FileLikeObject, content_type: str): + """Upload a file to the given uri. + + :param uri: The GCS URI of the file + :type uri: str + :param blob: The file + :type blob: FileLikeObject + :param content_type: [description] + :type content_type: str + """ + pass diff --git a/src/dags/providers/azure/azure_credentials.py b/src/dags/providers/azure/azure_credentials.py new file mode 100644 index 0000000000000000000000000000000000000000..c8cb21e457c6f4a7efdd4f241f8bb32cf2399ba4 --- /dev/null +++ b/src/dags/providers/azure/azure_credentials.py @@ -0,0 +1,99 @@ +# Copyright © Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Azure Credential Module.""" + +import logging +from providers.constants import AZURE_CLOUD_PROVIDER +from providers.factory import ProvidersFactory +from providers.types import BaseCredentials +from tenacity import retry, stop_after_attempt +import msal +import os +from azure.keyvault.secrets import SecretClient +from azure.identity import DefaultAzureCredential + +logger = logging.getLogger(__name__) +RETRIES = 3 + +@ProvidersFactory.register(AZURE_CLOUD_PROVIDER) +class AzureCredentials(BaseCredentials): + """Azure Credential Provider""" + + def __init__(self): + """Initialize Azure Credentials object""" + self._access_token = None + self._client_id = None + self._client_secret = None + self._tenant_id = None + self._resource_id = None + + def _populate_ad_credentials(self) -> None: + uri = os.getenv("KEYVAULT_URI") + credential = DefaultAzureCredential() + client = SecretClient(vault_url=uri, credential=credential) + self._client_id = client.get_secret("app-dev-sp-username").value + self._client_secret = client.get_secret('app-dev-sp-password').value + self._tenant_id = client.get_secret('app-dev-sp-tenant-id').value + self._resource_id = client.get_secret("aad-client-id").value + + def _generate_token(self) -> str: + if self._client_id is None: + self._populate_ad_credentials() + if self._tenant_id is None: + logger.error('TenantId is not set properly') + raise ValueError("TenantId is not set properly") + if self._resource_id is None: + logger.error('ResourceId is not set properly') + raise ValueError("ResourceId is not set properly") + if self._client_id is None: + logger.error('Please pass client Id to generate token') + raise ValueError("Please pass client Id to generate token") + if self._client_secret is None: + logger.error('Please pass client secret to generate token') + raise ValueError("Please pass client secret to generate token") + + try: + authority_host_uri = 'https://login.microsoftonline.com' + authority_uri = authority_host_uri + '/' + self._tenant_id + scopes = [self._resource_id + '/.default'] + app = msal.ConfidentialClientApplication(client_id = self._client_id, + authority = authority_uri, + client_credential = self._client_secret) + result = app.acquire_token_for_client(scopes=scopes) + return result.get('access_token') + except Exception as e: + logger.error(e) + raise e + + + @retry(stop=stop_after_attempt(RETRIES)) + def refresh_token(self) -> str: + """Refresh token. + + :return: Refreshed token + :rtype: str + """ + token = self._generate_token() + self._access_token = token + return self._access_token + + @property + def access_token(self) -> str: + """The access token. + + :return: Access token string. + :rtype: str + """ + return self._access_token + diff --git a/src/dags/providers/blob_storage.py b/src/dags/providers/blob_storage.py index 5bc05b9c3bd1db55d03fda460ad1dfc5c85c1eb8..c764a091bd2f42be9a9d2f0a0ae11abdbfe5bce8 100644 --- a/src/dags/providers/blob_storage.py +++ b/src/dags/providers/blob_storage.py @@ -1,5 +1,6 @@ # Copyright 2021 Google LLC # Copyright 2021 EPAM Systems +# Copyright © Microsoft Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +19,7 @@ import os from providers.factory import ProvidersFactory # import section needed to register cloud specific clients +from providers.azure import azure_blob_storage_client from providers.gcp import gcp_blob_storage_client # pylint: disable=unused-import from providers.types import BlobStorageClient diff --git a/src/dags/providers/constants.py b/src/dags/providers/constants.py index eae8abd560c38f182682f0529437850bd5c593d4..705e6cf0f6cb553656e9660bf0ff9b84bead98b9 100644 --- a/src/dags/providers/constants.py +++ b/src/dags/providers/constants.py @@ -1,5 +1,6 @@ # Copyright 2021 Google LLC # Copyright 2021 EPAM Systems +# Copyright © Microsoft Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,3 +17,4 @@ """Providers constants module.""" GOOGLE_CLOUD_PROVIDER = "gcp" +AZURE_CLOUD_PROVIDER = "azure" diff --git a/src/dags/providers/credentials.py b/src/dags/providers/credentials.py index 2ec89c64fd8e00bcafbe2055d11ed84979ffceb5..a325a575b41719c05ece1464125b64d40fd7a3ec 100644 --- a/src/dags/providers/credentials.py +++ b/src/dags/providers/credentials.py @@ -1,5 +1,6 @@ # Copyright 2021 Google LLC # Copyright 2021 EPAM Systems +# Copyright © Microsoft Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,6 +19,7 @@ import os from providers.factory import ProvidersFactory # import section needed to register cloud specific clients +from providers.azure import azure_credentials from providers.gcp import gcp_credentials # pylint: disable=unused-import from providers.types import BaseCredentials