Commit b1898598 authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) 🚩
Browse files

Added support of ADC ("Application Default Credentials") for GCP

parent 959891b7
Pipeline #68036 failed with stages
in 32 seconds
import aiohttp
import json
from typing import List, Optional
from typing import List, Optional, Type
import datetime
import enum
import os
import time
import jwt
from urllib.parse import quote_plus
from urllib.parse import urlencode
from osdu.core.auth import AuthBase
class Type(enum.Enum):
AUTHORIZED_USER = 'authorized_user'
SERVICE_ACCOUNT = 'service_account'
class GoogleAccountAuth(AuthBase):
_scheme: str = 'Bearer'
......@@ -24,9 +30,9 @@ class GoogleAccountAuth(AuthBase):
:param scopes: scopes
"""
super().__init__()
with open(service_file, 'r') as f:
self.service_data = json.load(f)
self.service_file = service_file
self.service_data = self.get_service_data()
self.session = session
self.scopes = ' '.join(scopes or [])
self.access_token: Optional[str] = None
......@@ -38,6 +44,29 @@ class GoogleAccountAuth(AuthBase):
""" Return scheme """
return self._scheme
@property
def token_type(self):
return self.service_data['type']
@property
def token_uri(self):
return self.service_data.get('token_uri', 'https://oauth2.googleapis.com/token')
def get_service_data(self) -> Optional[dict]:
service_data = self.service_file or os.environ.get('GOOGLE_APPLICATION_CREDENTIALS')
if not service_data:
cloudsdk_config = os.environ.get('CLOUDSDK_CONFIG')
sdkpath = (cloudsdk_config
or os.path.join(os.path.expanduser('~'), '.config', 'gcloud'))
service_data = os.path.join(sdkpath, 'application_default_credentials.json')
try:
with open(service_data, 'r') as f:
return json.load(f)
except Exception:
raise ValueError('No credentials provided')
async def header_value(self) -> str:
token = await self._get_token()
return f'{self._scheme} {token}'
......@@ -56,11 +85,9 @@ class GoogleAccountAuth(AuthBase):
return self.service_data['client_email']
async def _refresh_token_for_service_account(self):
token_uri = self.service_data.get('token_uri', 'https://oauth2.googleapis.com/token')
now = int(time.time())
assertion_payload = {
'aud': token_uri,
'aud': self.token_uri,
'exp': now + 3600,
'iat': now,
'iss': self.service_data['client_email'],
......@@ -78,7 +105,7 @@ class GoogleAccountAuth(AuthBase):
refresh_headers = {'Content-Type': 'application/x-www-form-urlencoded'}
async with self.session.post(token_uri, data=payload, headers=refresh_headers) as resp:
async with self.session.post(self.token_uri, data=payload, headers=refresh_headers) as resp:
if resp.status != 200:
raise Exception() #TODO
content = await resp.json()
......@@ -90,6 +117,27 @@ class GoogleAccountAuth(AuthBase):
self.access_token_duration = int(content['expires_in'])
self.access_token_acquired_at = datetime.datetime.utcnow()
async def _refresh_token_for_authorized_user(self):
payload = urlencode({
'grant_type': 'refresh_token',
'client_id': self.service_data['client_id'],
'client_secret': self.service_data['client_secret'],
'refresh_token': self.service_data['refresh_token'],
})
refresh_headers = {'Content-Type': 'application/x-www-form-urlencoded'}
async with self.session.post(url=self.token_uri, data=payload,
headers=refresh_headers) as resp:
if resp.status != 200:
raise Exception() #TODO
content = await resp.json()
self.access_token = str(content['access_token'])
self.access_token_duration = int(content['expires_in'])
self.access_token_acquired_at = datetime.datetime.utcnow()
async def _get_token(self) -> str:
if self.access_token:
now = datetime.datetime.utcnow()
......@@ -97,5 +145,9 @@ class GoogleAccountAuth(AuthBase):
if delta < 3000:
return self.access_token
if self.token_type == Type.AUTHORIZED_USER:
await self._refresh_token_for_authorized_user()
elif self.token_type == Type.SERVICE_ACCOUNT:
await self._refresh_token_for_service_account()
return self.access_token
......@@ -72,7 +72,7 @@ class GCloudAioStorage(BlobStorageBase):
if forwarded_auth is not None:
return await forwarded_auth.token()
assert self._service_account_file, 'No credentials provided'
# assert self._service_account_file, 'No credentials provided'
token_cache = self._access_token_dict
cache_key = f'{project}_{bucket}'
tenant_access_token = token_cache.get(cache_key, None)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment