Commit ec49fa02 authored by Luc Yriarte's avatar Luc Yriarte
Browse files

Merge branch 'slb_code_push-wip' into 'master'

Slb code push

See merge request !3
parents 46346fae 85252d76
Pipeline #23270 passed with stage
in 1 minute and 56 seconds
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
......@@ -173,29 +174,4 @@
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 2020 Open Subsurface Data Universe Software / Platform / Domain Data Mgmt Services / Wellbore / Lib / Wellbore-cloud
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
END OF TERMS AND CONDITIONS
\ No newline at end of file
## Introduction
Python package for osdu Azure cloud implementations.
# Python package for OSDU Azure cloud implementation
## Requirements and dependencies
Requires Python 3.7+ and depends on osdu-core-lib-python
## Introduction
## Getting Started
This project creates the OSDU Azure python package that implements intefrace `osdu-core-lib-python`.
### from package
install from package:
## Prerequisites
`pip install osdu-core-lib-python-azure`
* Python 3.7.x
* [Azure CLI](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli?view=azure-cli-latest)
* [Azurite npm](https://www.npmjs.com/package/azurite) or [Azurite docker image](https://hub.docker.com/_/microsoft-azure-storage-azurite) to run locally, [for more information](https://dev.azure.com/slb-des-ext-collaboration/open-data-ecosystem/_git/open-data-ecosystem.wiki?version=GBwikiMaster&path=%2FOSDU-(C)%2FWellbore-DMS%2FDesigns%2FAzure-Blob-Storage-Test-Design.md&_a=preview)
* Follow the [Wellbore project set up steps](https://dev.azure.com/slb-des-ext-collaboration/open-data-ecosystem/_git/os-wellbore-ddms?path=%2FREADME.md&_a=preview&anchor=project-startup) to ensure the osdu dependencies can be installed successfully.
### from sources
install dependencies:
## Dependencies
`pip install -r requirements.txt`
## **Required variables to run service**
| name | value | description | sensitive? | source |
| --- | --- | --- | --- | --- |
| KEYVAULT_URL | ex `https://foo-keyvault.vault.azure.net` | URL of shared KeyVault that holds application and partitions secrets | no | infrastructure deployment |
| SERVICE_HOST_PARTITION | ex `http://partition.osdu/api/partition` | Partition Service endpoint | no | infrastructure deployment |
| USE_PARTITION_SERVICE | `enabled`/`disabled` - defaults to `enabled` | Enabled when Partition Service is available in the environment/Disabled when not available and need to use fake Partition Service | no | infrastructure deployment |
## Tests
### install dependencies
unit tests uses [pytest](https://docs.pytest.org/en/stable/) framework. Other dependencies are
[pytest-asyncio](https://pypi.org/project/pytest-asyncio/) for testing asyncio code and
[pytest-cov](https://pypi.org/project/pytest-cov/) to produce coverage report.
### Test Dependencies
The tests use the [pytest](https://docs.pytest.org/en/stable/) framework. Other dependencies are [pytest-asyncio](https://pypi.org/project/pytest-asyncio/) for testing asyncio code and [pytest-cov](https://pypi.org/project/pytest-cov/) to produce coverage report.
`pip install -r requirements_dev.txt`
### run unit tests
Install and run Azurite for local tests:
install testing dependencies:
Option 1: using npm:
`pip install -r requirements_dev.txt`
```bash
# Install Azurite
npm install -g azurite
# Create azurite directory
mkdir c:/azurite
# Launch Azurite for Windows
azurite --silent --location c:\azurite --debug c:\azurite\debug.log
```
Option 2: using docker
```bash
docker run -p 10000:10000 mcr.microsoft.com/azure-storage/azurite azurite-blob --blobHost 0.0.0.0
```
The tests run against an Azure storage emulator using Azurite. Azurite storage account and container are parameterized via environment.
variables:
| Variable | default value |
| --- | --- |
| TESTING_AZ_STORAGE_ACCOUNT | `http://127.0.0.1:10000/devstoreaccount1` |
| TESTING_AZ_CREDENTIALS | `Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==` |
| TESTING_AZ_CONTAINER | `wdms-osdu` |
**Note: value for TESTING_AZ_CREDENTIALS is default value for Azurite, it's not a private key*
### How to run locally using DefaultAzureCredential and the Azure CLI
run all tests:
The authentication relies on [DefaultAzureCredential](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/identity/azure-identity#defaultazurecredential). If a user has signed in via the Azure CLI `az login` command, DefaultAzureCredential will authenticate as that user. In order to have Read/Write access to the storage account and successfully run the tests, the user must be assigned the [Storage Blob Data Contributor](https://docs.microsoft.com/en-us/azure/role-based-access-control/built-in-roles#storage-blob-data-contributor) role.
`python -m pytest ./tests`
```bash
az login
run all tests and produce reports:
# Set the subscription where the storage account is
subscription=<subscription id>
`python -m pytest --junit-xml=unit_tests_report.xml --cov=osdu_az --cov-report=html --cov-report=xml ./tests`
# Set your default subscription
az account set --subscription $subscription
# Validate your default subscription is set correctly
az account show
```
### Run Tests
```bash
python -m pytest ./tests
# Run all tests and produce coverage reports
python -m pytest --junit-xml=unit_tests_report.xml --cov=osdu_az --cov-report=html --cov-report=xml ./tests
```
### Generate and upload dev package
Dev package can be generated and upload to the feed by manually triggering new pipeline from dev branch (not master) and
check on the pipeline parameter 'generate dev package'.
The version number is automatically setup following the format:
```bash
<CURRENT_VERSION>.dev<BUILD_NUMBER> # example: _0.0.15.dev20102119_
```
In order to select a dev package, command line or in requirement file must explicitly specify the full version:
```bash
pip install osdu-core-lib-python-azure==0.0.15.dev20102116
```
or use the option `--pre` on `pip install` command (see [pip install documentation](https://pip.pypa.io/en/stable/reference/pip_install/#install-pre)).
Otherwise pip only finds stable versions and ignore any pre-release and development versions.
## License
Apache version 2.0 (see LICENSE.TXT)
## Contribute
Note: version number must be updated manually for now. Increment by one the version patch part in file osdu_az/\_\_init\_\_.py
## Additional Notes
* The version number must be updated manually for now. Increment by one the version patch part in file `osdu_az/__init__.py`.
__version__ = '0.0.2'
__version__ = '0.1.5'
def _from_env(key, default=None):
import os
result = os.environ.get(key, default)
# Suppress validation of required env vars on module loading.
# os-wellbore-dms is importing azure dependencies in gcp deployments.
# assert result, "Failed to get {} env variable".format(key)
return result
keyvault_url = _from_env('KEYVAULT_URL')
partition_api_url = _from_env('SERVICE_HOST_PARTITION')
class DataAccessError(Exception):
def __init__(self, message: str, status_code: int = None):
super().__init__(message)
self.message = message
self.status_code = status_code
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from osdu_az import conf
class AzureIdentity:
default_credential = None
default_scope = ''
@staticmethod
def get_default_credential():
if not AzureIdentity.default_credential:
AzureIdentity.default_credential = DefaultAzureCredential()
return AzureIdentity.default_credential
@staticmethod
def get_access_token():
credential = AzureIdentity.get_default_credential()
return credential.get_token(AzureIdentity.get_scope()).token
@staticmethod
def get_scope():
if not AzureIdentity.default_scope:
AzureIdentity.default_scope = AzureIdentity.get_resource_id()
return AzureIdentity.default_scope
@staticmethod
def get_resource_id() -> str:
secret_client = SecretClient(conf.keyvault_url, AzureIdentity.get_default_credential())
secret = secret_client.get_secret('aad-client-id')
return secret.value
from typing import Optional
from osdu_az.partition.partition_info import PartitionInfo
fake_partition_properties = \
{
'storage-account-name': {
'sensitive': True,
'value': 'opendes-storage'
}
}
default_partition_name = 'opendes'
class FakePartitionClient:
@staticmethod
async def get_partition(data_partition_id: str) -> Optional[PartitionInfo]:
if data_partition_id == default_partition_name:
return PartitionInfo(fake_partition_properties)
from typing import Optional
import aiohttp
from osdu_az import conf
from osdu_az.exceptions.data_access_error import DataAccessError
from osdu_az.identity.az_identity import AzureIdentity
from osdu_az.partition.partition_info import PartitionInfo
error_message = 'Failed to retrieve data partition'
class PartitionClient:
@staticmethod
def partition_endpoint() -> str:
return conf.partition_api_url + '/v1/partitions/'
@staticmethod
async def get_partition(data_partition_id: str) -> Optional[PartitionInfo]:
access_token = AzureIdentity.get_access_token()
headers = {
'authorization': f'Bearer {access_token}'
}
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(PartitionClient.partition_endpoint() + data_partition_id) as response:
if response.status == 200:
partition_properties = await response.json()
return PartitionInfo(partition_properties)
raise DataAccessError(message=f'{error_message} {data_partition_id}. {response.reason}.',
status_code=response.status)
from typing import Optional
from azure.keyvault.secrets import SecretClient
from osdu_az import conf
from osdu_az.identity.az_identity import AzureIdentity
class PartitionInfo():
def __init__(self, partition_properties: dict = None):
self._partition_properties = partition_properties
self._secret_client = None
def get_value(self, property_name: str) -> Optional[str]:
partition_property = self._partition_properties.get(property_name)
if not partition_property:
return None
if not partition_property.get('sensitive'):
return partition_property['value']
secret = self._get_secret(partition_property['value'])
return secret.value
def _get_secret(self, key: str):
return self._get_secret_client().get_secret(key)
def _get_secret_client(self):
if not self._secret_client:
self._secret_client = SecretClient(conf.keyvault_url, AzureIdentity.get_default_credential())
return self._secret_client
from os import environ
from osdu_az.opendes.fake_partition_client import FakePartitionClient
from osdu_az.opendes.partition_client import PartitionClient
from osdu_az.partition.partition_info import PartitionInfo
from osdu_az.partition.partitions_cache import PartitionsCache
# KNOWN CORE PARTITION PROPERTIES
COSMOS_ENDPOINT = 'cosmos-endpoint'
COSMOS_CONNECTION = 'cosmos-connection'
ELASTIC_USERNAME = 'elastic-username'
ELASTIC_ENDPOINT = 'elastic-endpoint'
SB_CONNECTION = 'sb-connection'
COSMOS_PRIMARY_KEY = 'cosmos-primary-key'
COMPLIANCE_RULESET = 'compliance-ruleset'
STORAGE_ACCOUNT_NAME = 'storage-account-name'
ELASTIC_PASSWORD = 'elastic-password'
STORAGE_ACCOUNT_KEY = 'storage-account-key'
SB_NAMESPACE = 'sb-namespace'
PARTITION_ID = 'id'
class PartitionService:
partition_client = None
scope = ''
@staticmethod
async def get_partition(data_partition_id: str) -> PartitionInfo:
partition_info = PartitionsCache.get(data_partition_id)
if not partition_info:
partition_info = await PartitionService._partition_client().get_partition(data_partition_id)
if partition_info:
PartitionsCache.set(data_partition_id, partition_info)
return partition_info
@staticmethod
async def get_storage_account_name(data_partition_id: str):
partition_info = await PartitionService.get_partition(data_partition_id)
if partition_info:
return partition_info.get_value(STORAGE_ACCOUNT_NAME)
@staticmethod
def _partition_client():
if not PartitionService.partition_client:
PartitionService._initialize_partition_client()
return PartitionService.partition_client
@staticmethod
def _initialize_partition_client():
if environ.get('USE_PARTITION_SERVICE', default='enabled').lower() == 'enabled':
PartitionService.partition_client = PartitionClient()
else:
PartitionService.partition_client = FakePartitionClient()
from cachetools import TTLCache
class PartitionsCache:
partitions_cache = TTLCache(maxsize=100, ttl=300)
@staticmethod
def get(data_partition_id: str):
return PartitionsCache.partitions_cache.get(data_partition_id)
@staticmethod
def set(data_partition_id: str, partition_info):
PartitionsCache.partitions_cache[data_partition_id] = partition_info
@staticmethod
def clear():
PartitionsCache.partitions_cache.clear()
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.blob import Blob
from typing import Optional, List, Any
from azure.storage.blob.aio import BlobServiceClient
from azure.identity.aio import DefaultAzureCredential
from osdu.core.api.storage.tenant import Tenant
from osdu_az.partition.partition_service import PartitionService
class AzureAioBlobStorage(BlobStorageBase):
async def list_objects(self, project: str, container: str,
*, auth: Optional = None, prefix: str = '', page_token: Optional[str] = None,
def _build_url(self, storage_account: str):
return f'https://{storage_account}.blob.core.windows.net'
def _get_credentials(self):
return DefaultAzureCredential(exclude_shared_token_cache_credential=True,
exclude_visual_studio_code_credential=True,
exclude_environment_credential=True)
async def _close_credentials(self, credential):
await credential.close()
async def _get_storage_account_name(self, data_partition_id: str):
return await PartitionService.get_storage_account_name(data_partition_id)
async def list_objects(self, tenant: Tenant,
*args, auth: Optional = None, prefix: str = '', page_token: Optional[str] = None,
max_result: Optional[int] = None, timeout: int = 10, **kwargs) -> List[str]:
"""
list all object within a bucket
list all object within a container
:param auth: auth obj to perform the operation
:param project: project or account
:param container: container or bucket name
:param tenant: tenant info
:param prefix: Filter results to objects whose names begin with this prefix
:param page_token: A previously-returned page token representing part of the larger set of results to view.
:param max_result: Maximum number of items to return.
:param timeout: timeout
:param kwargs:
:return: list of blob names
"""
"""
raise NotImplementedError('Azure blob storage implementation, "list_objects" not implemented')
async def delete(self, project: str, container: str, object_name: str,
*, auth: Optional = None, timeout: int = 10, **kwargs):
async def delete(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs):
"""
delete an object
:param auth: auth obj to perform the operation
:param project: project or account
:param container: container or bucket name
:param tenant: tenant info
:param object_name:
:param timeout:
:param kwargs:
:return:
"""
"""
raise NotImplementedError('Azure blob storage implementation, "delete" not implemented')
async def download(self, project: str, container: str, object_name: str,
*, auth: Optional = None, timeout: int = 10, **kwargs) -> bytes:
async def download(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs) -> bytes:
"""
download blob data
:param auth: auth obj to perform the operation
:param project: project or account
:param container: container or bucket name
:param tenant: tenant info
:param object_name:
:param timeout:
:param kwargs:
:return:
"""
raise NotImplementedError('Azure blob storage implementation, "download" not implemented')
"""
storage_account = await self._get_storage_account_name(tenant.data_partition_id)
container = tenant.bucket_name
async def download_metadata(self, project: str, container: str, object_name: str,
*, auth: Optional = None, timeout: int = 10, **kwargs) -> Blob:
cred = self._get_credentials()
account_url = self._build_url(storage_account)
try:
blob_service_client = BlobServiceClient(account_url=account_url, credential=cred)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
data = await blob_client.download_blob()
return await data.readall()
finally:
await self._close_credentials(cred)
# not for now, parquet only
async def download_metadata(self, tenant: Tenant, object_name: str,
*args, auth: Optional = None, timeout: int = 10, **kwargs) -> Blob:
"""
download blob data
:param auth: auth obj to perform the operation
:param project: project or account
:param container: container or bucket name
:param tenant: tenant info
:param object_name:
:param timeout:
:param kwargs:
:return: blob
"""
"""
raise NotImplementedError('Azure blob storage implementation, "download_metadata" not implemented')
async def upload(self, project: str, container: str, object_name: str, file_data: Any,
*, auth: Optional = None, content_type: str = None, metadata: dict = None,
async def upload(self, tenant: Tenant, object_name: str, file_data: Any,
*args, auth: Optional = None, content_type: str = None, metadata: dict = None,
timeout: int = 30, **kwargs):
raise NotImplementedError('Azure blob storage implementation, "upload" not implemented')
"""
upload blob data
:param tenant: tenant info
:param object_name: maps to file name
:param file_data: Any, *,
:param auth: Optional = None,
:param content_type: str = None,
:param metadata: dict = None,
:param timeout: int = 30, **kwargs
:param return: blob id
"""
storage_account = await self._get_storage_account_name(tenant.data_partition_id)
container = tenant.bucket_name
cred = self._get_credentials()
account_url = self._build_url(storage_account)
try:
blob_service_client = BlobServiceClient(account_url=account_url, credential=cred)
async with blob_service_client:
container_client = blob_service_client.get_container_client(container)
blob_client = container_client.get_blob_client(object_name)
data = await blob_client.upload_blob(file_data, overwrite=True, metadata=metadata)
return data
finally:
await self._close_credentials(cred)
# osdu core lib main python
# osdu-core-lib-python>=0.3.0, <0.4
# osdu-core-lib-python>=0.4.0, <0.5
azure.storage.blob>=12.4
azure.identity
azure-keyvault
cachetools
pytest>=3
pytest-asyncio
pytest-cov
mock
azure.core
\ No newline at end of file
from azure.storage.blob import BlobServiceClient
from os import environ
class Config:
storage_account = environ.get('TESTING_AZ_STORAGE_ACCOUNT', 'http://127.0.0.1:10000/devstoreaccount1')
storage_account_name = environ.get('STORAGE_ACCOUNT_NAME', 'devstoreaccount1')
credentials = environ.get('TESTING_AZ_CREDENTIALS', 'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==')
container = environ.get('TESTING_AZ_CONTAINER', 'wdms-osdu')
environ.setdefault('KEYVAULT_URL', 'invalid-keyvault-name')
environ.setdefault('SERVICE_HOST_PARTITION', 'invalid-partition-service-name')
def pytest_configure(config):
blob_service_client = BlobServiceClient(account_url=Config.storage_account, credential= Config.credentials)
try:
blob_service_client.create_container(Config.container)
except Exception as ex:
print(ex)
\ No newline at end of file
import pytest
from azure.keyvault.secrets import KeyVaultSecret
from mock import patch, Mock
from osdu_az.partition.partition_info import PartitionInfo
from osdu_az.partition.partition_service import PartitionService
@pytest.fixture
async def partition_service(monkeypatch) -> PartitionService: