Commit 275d1a2a authored by fabian serin's avatar fabian serin Committed by Luc Yriarte
Browse files

Slb code push and pipeline

parent c7c19419
[bumpversion]
current_version = 0.2.4
commit = True
tag = False
[bumpversion:file:osdu_gcp/__init__.py]
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# ide
.idea/
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
.venv*
env/
venv/
venv*/
ENV/
env.bak/
venv.bak/
.virtualenv/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
default:
image: python:3.7-slim-buster
stages:
- test
- deploy
build:
stage: test
script:
- echo ---- ---- ---- SYSTEM DEPENDENCIES ---- ---- ----
- apt update
- apt install -y --no-install-recommends git
- echo ---- ---- ---- BUILD IMAGE ---- ---- ----
- pip3 install -r requirements.txt
- pip3 install -r requirements_opengroup.txt
- pip3 install -r requirements_dev.txt
- pytest tests --junitxml=report.xml
artifacts:
when: always
reports:
junit: report.xml
# This job only runs on master, and it creates the lib and push it to the feed
deploylib:
stage: deploy
script:
- echo ---- ---- ---- SYSTEM DEPENDENCIES ---- ---- ----
- apt update
- apt install -y --no-install-recommends git
- echo ---- ---- ---- BUILD IMAGE ---- ---- ----
- pip3 install -r requirements.txt
- pip3 install -r requirements_opengroup.txt
- pip3 install twine
- python3 setup.py sdist bdist_wheel
- TWINE_PASSWORD=${CI_JOB_TOKEN} TWINE_USERNAME=${CI_REGISTRY_USER} python -m twine upload --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/*
rules:
- if: $CI_COMMIT_BRANCH == 'master'
# wellbore-gcp-lib
# osdu-core-lib-python-gcp
This is the python package for osdu google cloud implementations.
## testing
You need valid gcp credentials and a project ID to run the unit tests.
```bash
export TESTING_GCP_DATA_PROJECT_CREDENTIALS='path_to_jwt'
export TESTING_GCP_DATA_PROJECT_ID='xxxx'
pytest
```
# Copyright 2020 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = '0.2.4'
# MUSE NOT BE CHANGED MANUALLY
# The patch number (the last one) should only be updated by the build/deploy script, if changed manually
# To increase the minor or major version:
# $> pip install -U bumpversion
# $> bumpversion major | minor
# then $> git push --tag
# may fail on windows try without commit: bumpversion part --no-commit
# Copyright 2020 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import aiohttp
import json
from typing import List, Optional
import datetime
import time
import jwt
from urllib.parse import quote_plus
from urllib.parse import urlencode
from osdu.core.auth import AuthBase
class GoogleAccountAuth(AuthBase):
_scheme: str = 'Bearer'
"""
Based on gcloud aio project https://github.com/talkiq/gcloud-aio/blob/master/auth/gcloud/aio/auth/token.py
"""
def __init__(self,
service_file: str,
session: aiohttp.ClientSession,
scopes: List[str] = None):
"""
:param service_file: service account file
:param session: aio http session
:param scopes: scopes
"""
super().__init__()
with open(service_file, 'r') as f:
self.service_data = json.load(f)
self.session = session
self.scopes = ' '.join(scopes or [])
self.access_token: Optional[str] = None
self.access_token_duration = 0
self.access_token_acquired_at = datetime.datetime(1970, 1, 1)
@property
def scheme(self) -> Optional[str]:
""" Return scheme """
return self._scheme
async def header_value(self) -> str:
token = await self._get_token()
return f'{self._scheme} {token}'
async def token(self) -> str:
return await self._get_token()
async def __call__(self):
""" return tuple scheme, credentials/token """
token = await self._get_token()
return self._scheme, token
@property
def email(self) -> Optional[str]:
""" Return email used in token, if available """
return self.service_data['client_email']
async def _refresh_token_for_service_account(self):
token_uri = self.service_data.get('token_uri', 'https://oauth2.googleapis.com/token')
now = int(time.time())
assertion_payload = {
'aud': token_uri,
'exp': now + 3600,
'iat': now,
'iss': self.service_data['client_email'],
'scope': self.scopes,
}
# N.B. algorithm='RS256' requires an extra 240MB in dependencies...
assertion = jwt.encode(assertion_payload,
self.service_data['private_key'],
algorithm='RS256')
payload = urlencode({
'assertion': assertion,
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
}, quote_via=quote_plus)
refresh_headers = {'Content-Type': 'application/x-www-form-urlencoded'}
async with self.session.post(token_uri, data=payload, headers=refresh_headers) as resp:
if resp.status != 200:
raise Exception() #TODO
content = await resp.json()
# print('from token ------------')
# print(content)
self.access_token = str(content['access_token'])
self.access_token_duration = int(content['expires_in'])
self.access_token_acquired_at = datetime.datetime.utcnow()
async def _get_token(self) -> str:
if self.access_token:
now = datetime.datetime.utcnow()
delta = (now - self.access_token_acquired_at).total_seconds()
if delta < 3000:
return self.access_token
await self._refresh_token_for_service_account()
return self.access_token
# Based on https://github.com/talkiq/gcloud-aio/blob/master/storage/gcloud/aio/storage/storage.py
import enum
import io
import mimetypes
import os
from typing import Any, Optional, Tuple, List
from urllib.parse import quote
from asyncio import sleep
from .http_client_gcp import *
from aiohttp import ClientResponseError
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from osdu.core.api.storage.blob import Blob
from .auth_gcp_sa import GoogleAccountAuth
try:
import ujson as json
except ImportError:
import json # type: ignore
API_ROOT = 'https://www.googleapis.com/storage/v1/b'
API_ROOT_UPLOAD = 'https://www.googleapis.com/upload/storage/v1/b'
MAX_CONTENT_LENGTH_SIMPLE_UPLOAD = 5 * 1024 * 1024 # 5 MB
# log = logging.getLogger(__name__)
class UploadType(enum.Enum):
SIMPLE = 1
RESUMABLE = 2
class GCloudAioStorage(BlobStorageBase):
_scopes = ['https://www.googleapis.com/auth/devstorage.read_write']
_access_token_dict = {}
def __init__(self, session=None, service_account_file: Optional[str] = None):
self._client = GCloudAioHttpClient(scopes=self._scopes, session=session)
self._session = session
self._service_account_file = service_account_file
async def _get_access_token(self,
project: str, bucket: str,
forwarded_auth: Optional = None) -> str:
if forwarded_auth is not None:
return await forwarded_auth.token()
assert self._service_account_file, 'No credentials provided'
token_cache = self._access_token_dict
cache_key = f'{project}_{bucket}'
tenant_access_token = token_cache.get(cache_key, None)
if tenant_access_token is None:
tenant_access_token = GoogleAccountAuth(service_file=self._service_account_file,
session=self._session,
scopes=self._scopes)
token_cache[cache_key] = tenant_access_token
return await tenant_access_token.token()
async def delete(self, project: str, bucket: str, object_name: str,
*, auth: Optional = None, timeout: int = 10, params: dict = None, **kwargs):
# https://cloud.google.com/storage/docs/json_api/#encoding
encoded_object_name = quote(object_name, safe='')
url = f'{API_ROOT}/{bucket}/o/{encoded_object_name}'
token = await self._get_access_token(project, bucket, auth)
await self._client.delete(url,
params=params or {},
timeout=timeout,
auth_token=token)
async def download(self, project: str, bucket: str, object_name: str,
*, auth: Optional = None, timeout: int = 10, **kwargs) -> bytes:
return await self._download(project, bucket, object_name, auth=auth, timeout=timeout, params={'alt': 'media'})
async def download_metadata(self, project: str, bucket: str, object_name: str,
*, auth: Optional = None, timeout: int = 10, ** kwargs) -> Blob:
data = await self._download(project, bucket, object_name, auth=auth, timeout=timeout)
metadata: dict = json.loads(data.decode())
return Blob(identifier=metadata.get('id', None),
bucket=metadata.get('bucket', bucket),
name=metadata.get('name', object_name),
metadata=metadata,
acl=metadata.get('acl', None),
content_type=metadata.get('contentType', None),
time_created=metadata.get('timeCreated', None),
time_updated=metadata.get('updated', None),
size=metadata.get('size', -1)
)
async def list_objects(self, project: str, bucket: str, *,
auth: Optional = None, prefix: str = '', page_token: Optional[str] = None,
max_result: Optional[int] = None, timeout: int = 10, **kwargs) -> List[str]:
url = f'{API_ROOT}/{bucket}/o'
params = {'prefix': prefix}
if page_token is not None:
params['pageToken'] = page_token
if max_result is not None:
params['maxResults'] = max_result
data, _ = await self._client.get(url, response_type=ResponseType.JSON, params=params or {}, timeout=timeout,
auth_token=await self._get_access_token(project, bucket, auth))
return [x['name'] for x in data.get('items', list())]
@classmethod
def build_URI(cls, bucket: str, object_name: str) -> str:
return f'gs://{bucket}/{object_name}'
# TODO: if `metadata` is set, use multipart upload:
# https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload
async def upload(self, project: str, bucket: str, object_name: str, file_data: Any, *,
auth: Optional = None, content_type: str = None, metadata: dict = None,
timeout: int = 30, **kwargs) -> dict:
url = f'{API_ROOT_UPLOAD}/{bucket}/o'
stream = self._preprocess_data(file_data)
content_length = self._get_stream_len(stream)
# mime detection method same as in aiohttp 3.4.4
content_type = content_type or mimetypes.guess_type(object_name)[0]
parameters = {}
headers = {}
headers.update({
'Content-Length': str(content_length),
'Content-Type': content_type or '',
})
force_resumable_upload: bool = None
upload_type = self._decide_upload_type(force_resumable_upload, content_length)
# log.debug('using %r gcloud storage upload method', upload_type)
if upload_type == UploadType.SIMPLE:
# if metadata:
# log.warning('metadata will be ignored for upload_type=Simple')
return await self._upload_simple(project, bucket, url, object_name, stream, parameters, headers,
auth=auth, timeout=timeout)
if upload_type == UploadType.RESUMABLE:
return await self._upload_resumable(project, bucket, url, object_name, stream, parameters, headers,
auth=auth, metadata=metadata, timeout=timeout)
raise TypeError(f'upload type {upload_type} not supported')
@staticmethod
def _get_stream_len(stream: io.IOBase) -> int:
current = stream.tell()
try:
return stream.seek(0, os.SEEK_END)
finally:
stream.seek(current)
@staticmethod
def _preprocess_data(data: Any) -> io.IOBase:
if data is None:
return io.StringIO('')
if isinstance(data, bytes):
return io.BytesIO(data)
if isinstance(data, str):
return io.StringIO(data)
if isinstance(data, io.IOBase):
return data
raise TypeError(f'unsupported upload type: "{type(data)}"')
@staticmethod
def _decide_upload_type(force_resumable_upload: Optional[bool],
content_length: int) -> UploadType:
# force resumable
if force_resumable_upload is True:
return UploadType.RESUMABLE
# force simple
if force_resumable_upload is False:
return UploadType.SIMPLE
# decide based on Content-Length
if content_length > MAX_CONTENT_LENGTH_SIMPLE_UPLOAD:
return UploadType.RESUMABLE
return UploadType.SIMPLE
@staticmethod
def _split_content_type(content_type: str) -> Tuple[str, str]:
content_type_and_encoding_split = content_type.split(';')
content_type = content_type_and_encoding_split[0].lower().strip()
encoding = None
if len(content_type_and_encoding_split) > 1:
encoding_str = content_type_and_encoding_split[1].lower().strip()
encoding = encoding_str.split('=')[-1]
return content_type, encoding
async def _download(self, project: str, bucket: str, object_name: str, *
, auth: Optional = None, params: dict = None, timeout: int = 10) -> bytes:
# https://cloud.google.com/storage/docs/json_api/#encoding
encoded_object_name = quote(object_name, safe='')
url = f'{API_ROOT}/{bucket}/o/{encoded_object_name}'
token = await self._get_access_token(project, bucket, auth)
data, _ = await self._client.get(url, response_type=ResponseType.BYTES,
params=params or {}, timeout=timeout, auth_token=token)
# N.B. the GCS API sometimes returns 'application/octet-stream' when a
# string was uploaded. To avoid potential weirdness, always return a
# bytes object.
return data
async def _upload_simple(self, project: str, bucket: str, url: str, object_name: str,
stream: io.IOBase, params: dict, headers: dict,
*, auth: Optional = None, timeout: int = 30) -> dict:
# https://cloud.google.com/storage/docs/json_api/v1/how-tos/simple-upload
params['name'] = object_name
params['uploadType'] = 'media'
headers.update({'Accept': 'application/json'})
token = await self._get_access_token(project, bucket, auth)
data, _ = await self._client.post(url, data=stream, response_type=ResponseType.JSON, headers=headers,
params=params, timeout=timeout, auth_token=token)
return data
async def _upload_resumable(self, project: str, bucket: str, url: str, object_name: str,
stream: io.IOBase, params: dict,
headers: dict, *, metadata: dict = None,
auth: Optional = None, timeout: int = 30) -> dict:
# https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
session_uri = await self._initiate_upload(project, bucket, url, object_name, params, headers,
auth=auth, metadata=metadata)
data: dict = await self._do_upload(project, bucket, session_uri, stream,
auth=auth, headers=headers, timeout=timeout)
return data
async def _initiate_upload(self, project: str, bucket: str, url: str, object_name: str, params: dict,
headers: dict, *, auth: Optional = None, metadata: dict = None) -> str:
params['uploadType'] = 'resumable'
metadict = (metadata or {}).copy()
metadict.update({'name': object_name})
metadata = json.dumps(metadict)
post_headers = headers.copy()
post_headers.update({
'X-Upload-Content-Type': headers['Content-Type'],
'X-Upload-Content-Length': headers['Content-Length']
})
token = await self._get_access_token(project, bucket, auth)
_, response_headers = await self._client.post(url, response_type=ResponseType.NONE, headers=post_headers,
params=params, data=metadata, timeout=10,
auth_token=token)
session_uri: str = response_headers['Location']
return session_uri
async def _do_upload(self, project: str, bucket: str, session_uri: str, stream: io.IOBase,
headers: dict, *, auth: Optional = None, retries: int = 5, **kwargs) -> dict:
for tries in range(retries):
try:
token = await self._get_access_token(project, bucket, auth)
data, _ = await self._client.put(session_uri, response_type=ResponseType.JSON, headers=headers,
data=stream, auth_token=token, **kwargs)
return data
except ClientResponseError:
headers.update({'Content-Range': '*/*'})
await sleep(2. ** tries)
from typing import Optional, List, Any, Union, Tuple
from multidict import CIMultiDictProxy
import enum
try:
import ujson as json
except ImportError:
import json # type: ignore
__all__ = ['ResponseType', 'GCloudAioHttpClient']
class ResponseType(enum.Enum):
NONE = 1
JSON = 2
TEXT = 3
BYTES = 4
class GCloudAioHttpClient: