Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Data Flow
Data Ingestion
Ingestion DAGs
Commits
cb859e02
Commit
cb859e02
authored
Feb 12, 2021
by
David Diederich
Browse files
Merge branch 'cherry-pick-
bd4f28a0
' into 'release/0.5'
Cherry-pick 'ibm-changes' to release/0.5 See merge request
!17
parents
d4020723
be004ef3
Pipeline
#26587
passed with stages
in 10 minutes and 46 seconds
Changes
7
Pipelines
2
Hide whitespace changes
Inline
Side-by-side
src/dags/providers/blob_storage.py
View file @
cb859e02
...
...
@@ -20,6 +20,7 @@ import os
from
providers.factory
import
ProvidersFactory
# import section needed to register cloud specific clients
from
providers.azure
import
azure_blob_storage_client
from
providers.ibm
import
ibm_blob_storage_client
from
providers.gcp
import
gcp_blob_storage_client
# pylint: disable=unused-import
from
providers.types
import
BlobStorageClient
...
...
src/dags/providers/constants.py
View file @
cb859e02
...
...
@@ -18,3 +18,4 @@
GOOGLE_CLOUD_PROVIDER
=
"gcp"
AZURE_CLOUD_PROVIDER
=
"azure"
IBM_CLOUD_PROVIDER
=
"ibm"
src/dags/providers/credentials.py
View file @
cb859e02
...
...
@@ -20,6 +20,7 @@ import os
from
providers.factory
import
ProvidersFactory
# import section needed to register cloud specific clients
from
providers.azure
import
azure_credentials
from
providers.ibm
import
ibm_credentials
from
providers.gcp
import
gcp_credentials
# pylint: disable=unused-import
from
providers.types
import
BaseCredentials
...
...
src/dags/providers/ibm/__init__.py
0 → 100644
View file @
cb859e02
# Copyright 2021 IBM LLC
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
src/dags/providers/ibm/ibm_blob_storage_client.py
0 → 100644
View file @
cb859e02
# Licensed Materials - Property of IBM
# (c) Copyright IBM Corp. 2020. All Rights Reserved.
"""Blob storage IBM client module."""
import
tenacity
from
providers.constants
import
IBM_CLOUD_PROVIDER
import
logging
from
providers.factory
import
ProvidersFactory
from
providers.types
import
BlobStorageClient
,
FileLikeObject
from
typing
import
Tuple
logger
=
logging
.
getLogger
(
__name__
)
RETRY_SETTINGS
=
{
"stop"
:
tenacity
.
stop_after_attempt
(
3
),
"wait"
:
tenacity
.
wait_fixed
(
10
),
"reraise"
:
True
,
}
@
ProvidersFactory
.
register
(
IBM_CLOUD_PROVIDER
)
class
IBMCloudStorageClient
(
BlobStorageClient
):
"""Implementation of blob storage client for the IBM provider."""
def
__init__
(
self
):
"""Initialize storage client."""
pass
def
does_file_exist
(
self
,
uri
:
str
)
->
bool
:
"""Verify if a file exists in the given URI.
:param uri: The GCS URI of the file.
:type uri: str
:return: A boolean indicating if the file exists
:rtype: bool
"""
pass
def
download_to_file
(
self
,
uri
:
str
,
file
:
FileLikeObject
)
->
Tuple
[
FileLikeObject
,
str
]:
"""Download file from the given URI.
:param uri: The GCS URI of the file.
:type uri: str
:param file: The file where to download the blob content
:type file: FileLikeObject
:return: A tuple containing the file and its content-type
:rtype: Tuple[io.BytesIO, str]
"""
pass
def
download_file_as_bytes
(
self
,
uri
:
str
)
->
Tuple
[
bytes
,
str
]:
"""Download file as bytes from the given URI.
:param uri: The GCS URI of the file
:type uri: str
:return: The file as bytes and its content-type
:rtype: Tuple[bytes, str]
"""
pass
def
upload_file
(
self
,
uri
:
str
,
blob_file
:
FileLikeObject
,
content_type
:
str
):
"""Upload a file to the given uri.
:param uri: The GCS URI of the file
:type uri: str
:param blob: The file
:type blob: FileLikeObject
:param content_type: [description]
:type content_type: str
"""
pass
src/dags/providers/ibm/ibm_credentials.py
0 → 100644
View file @
cb859e02
# Copyright © IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""IBM Credential Module."""
import
json
import
logging
import
os
from
keycloak
import
KeycloakOpenID
from
providers.constants
import
IBM_CLOUD_PROVIDER
from
providers.exceptions
import
RefreshSATokenError
,
SAFilePathError
from
providers.factory
import
ProvidersFactory
from
providers.types
import
BaseCredentials
from
tenacity
import
retry
,
stop_after_attempt
logger
=
logging
.
getLogger
(
__name__
)
RETRIES
=
3
@
ProvidersFactory
.
register
(
IBM_CLOUD_PROVIDER
)
class
IBMCredentials
(
BaseCredentials
):
"""IBM Credential Provider"""
def
__init__
(
self
):
"""Initialize IBM Credentials object"""
self
.
_access_token
=
None
self
.
_client_id
=
None
self
.
_client_secret
=
None
self
.
_username
=
None
self
.
_password
=
None
self
.
_scope
=
None
self
.
_tenant_id
=
None
def
_populate_ad_credentials
(
self
)
->
None
:
uri
=
os
.
getenv
(
"KEYCLOACK_URI"
)
realm
=
os
.
getenv
(
"REALM_NAME"
)
self
.
_client_id
=
os
.
getenv
(
"client_id"
)
self
.
_client_secret
=
os
.
getenv
(
"client_secret"
)
self
.
_username
=
os
.
getenv
(
"username"
)
self
.
_password
=
os
.
getenv
(
"password"
)
self
.
_scope
=
os
.
getenv
(
"scope"
)
def
_generate_token
(
self
)
->
str
:
keycloak_openid
=
KeycloakOpenID
(
server_url
=
uri
,
client_id
=
self
.
_client_id
,
realm_name
=
realm
,
client_secret_key
=
self
.
_client_secret
)
token
=
keycloak_openid
.
token
(
self
.
_username
,
self
.
_password
)
refresh_token
=
keycloak_openid
.
refresh_token
(
token
[
'refresh_token'
])
access_token
=
refresh_token
[
'access_token'
]
@
retry
(
stop
=
stop_after_attempt
(
RETRIES
))
def
refresh_token
(
self
)
->
str
:
token
=
self
.
_generate_token
()
self
.
_access_token
=
token
return
self
.
_access_token
@
property
def
access_token
(
self
)
->
str
:
return
self
.
_access_token
tests/set_airflow_env.sh
View file @
cb859e02
...
...
@@ -22,6 +22,7 @@ pip install deepdiff
pip
install
azure-identity
pip
install
azure-keyvault-secrets
pip
install
msal
pip
install
python-keycloak
export
ACL
=
'{"viewers": ["foo"],"owners": ["foo"]}'
export
LEGAL
=
'{"legaltags": ["foo"], "otherRelevantDataCountries": ["FR", "US", "CA"],"status": "compliant"}'
export
WORKFLOW_URL
=
"http://127.0.0.1:5000"
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment