diff --git a/devops/gcp/configmap/README.md b/devops/gcp/configmap/README.md index a3d5365d97cae32139c611fa38fa5c2aaf393fd2..de90cf6156b51f8b5ab837ad5803d8ac88b724a2 100644 --- a/devops/gcp/configmap/README.md +++ b/devops/gcp/configmap/README.md @@ -51,6 +51,16 @@ First you need to set variables in **values.yaml** file using any code editor. S cat ~/.config/gcloud/application_default_credentials.json | grep client_id ``` +### Bootstrap variables + +This variables can be omitted in case **conf.bootstrapEnabled** is set to `false`. + +| Name | Description | Type | Default |Required | +|------|-------------|------|---------|---------| +**storageHost** | Storage service host address | string | `http://storage` | yes +**defaultLegalTag** | Name of the previously created legal tag (without partition part) | string | `default-data-tag` | yes +**dataPartitionId** | Data partition id | string | `redis-storage-master` | yes + ### Config variables | Name | Description | Type | Default |Required | @@ -58,6 +68,7 @@ cat ~/.config/gcloud/application_default_credentials.json | grep client_id **appName** | name of the app | string | storage | yes **configmap** | configmap name | string | storage-config | yes **onPremEnabled** | whether on-prem is enabled | boolean | false | yes +**bootstrapEnabled** | whether to enable storage bootstrap (requires previously created legal tag) | boolean | false | yes ### Install the helm chart diff --git a/devops/gcp/configmap/templates/bootstrap-configmap.yaml b/devops/gcp/configmap/templates/bootstrap-configmap.yaml new file mode 100644 index 0000000000000000000000000000000000000000..b58659ae622712c09f63b895f4f32ab0f5f7249d --- /dev/null +++ b/devops/gcp/configmap/templates/bootstrap-configmap.yaml @@ -0,0 +1,14 @@ +{{- if .Values.conf.bootstrapEnabled }} +apiVersion: v1 +kind: ConfigMap +metadata: + labels: + app: "{{ .Values.conf.appName }}-bootstrap" + name: "{{ .Values.conf.configmap }}-bootstrap" + namespace: "{{ .Release.Namespace }}" +data: + STORAGE_HOST: "{{ .Values.data.storageHost }}" + DEFAULT_LEGAL_TAG: "{{ .Values.data.defaultLegalTag }}" + DATA_PARTITION_ID: "{{ .Values.data.dataPartitionId }}" + ONPREM_ENABLED: "{{ .Values.conf.onPremEnabled }}" +{{- end }} diff --git a/devops/gcp/configmap/templates/variables.yaml b/devops/gcp/configmap/templates/configmap.yaml similarity index 100% rename from devops/gcp/configmap/templates/variables.yaml rename to devops/gcp/configmap/templates/configmap.yaml diff --git a/devops/gcp/configmap/values.yaml b/devops/gcp/configmap/values.yaml index 62b83709aa03b1df50f466c34af6f6b3b4684db4..532adf203a6038c180947043411b90e1138d7f54 100644 --- a/devops/gcp/configmap/values.yaml +++ b/devops/gcp/configmap/values.yaml @@ -14,10 +14,15 @@ data: legalHost: "http://legal" redisGroupHost: "redis-group-master" redisStorageHost: "redis-storage-master" - + + # bootstrap + storageHost: "http://storage" + defaultLegalTag: "default-data-tag" + dataPartitionId: "" # gcp googleAudiences: "" conf: appName: "storage" configmap: "storage-config" onPremEnabled: false + bootstrapEnabled: false diff --git a/devops/gcp/deploy/README.md b/devops/gcp/deploy/README.md index 87cca04d78aaa69d93ad2f01122d0ed8ce3ca782..763c5035c70623f5cd9a03f4c877837fe33689a2 100644 --- a/devops/gcp/deploy/README.md +++ b/devops/gcp/deploy/README.md @@ -38,6 +38,8 @@ First you need to set variables in **values.yaml** file using any code editor. S **image** | service image | string | - | yes **imagePullPolicy** | when to pull image | string | IfNotPresent | yes **serviceAccountName** | name of your service account | string | storage | yes +**bootstrapImage** | bootstrap image | string | - | yes +**bootstrapServiceAccountName** | service account that will be used for bootstrap | string | - | yes ### Config variables @@ -50,8 +52,10 @@ First you need to set variables in **values.yaml** file using any code editor. S **minioSecretName** | secret for minio | string | `storage-minio-secret` | yes **postgresSecretName** | secret for postgres | string | `storage-postgres-secret` | yes **rabbitmqSecretName** | secret for rabbitmq | string | `rabbitmq-secret` | yes +**bootstrapSecretName** | secret for bootstrap to access opendi provider | string | `datafier-secret` | yes **replicas** | Number of replicas | integer | 3 | yes **onPremEnabled** | whether on-prem is enabled | boolean | false | yes +**bootstrapEnabled** | whether to enable storage bootstrap (should be enabled also for config chart) | boolean | false | yes **domain** | your domain | string | - | yes ### Install the helm chart diff --git a/devops/gcp/deploy/templates/bootstrap-deployment.yaml b/devops/gcp/deploy/templates/bootstrap-deployment.yaml new file mode 100644 index 0000000000000000000000000000000000000000..c4ecb6ed37d0f1912655965749a64e3ddd2b2e9c --- /dev/null +++ b/devops/gcp/deploy/templates/bootstrap-deployment.yaml @@ -0,0 +1,38 @@ +{{- if .Values.conf.bootstrapEnabled }} +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: "{{ .Values.conf.appName }}-bootstrap" + name: "{{ .Values.conf.appName }}-bootstrap" + namespace: "{{ .Release.Namespace }}" +spec: + replicas: 1 + selector: + matchLabels: + app: "{{ .Values.conf.appName }}-bootstrap" + template: + metadata: + labels: + app: "{{ .Values.conf.appName }}-bootstrap" + annotations: + rollme: {{ randAlphaNum 5 | quote }} + spec: + containers: + - name: "{{ .Values.conf.appName }}-bootstrap" + image: "{{ .Values.data.bootstrapImage }}" + readinessProbe: + exec: + command: + - cat + - /tmp/bootstrap_ready + imagePullPolicy: "{{ .Values.data.imagePullPolicy }}" + envFrom: + - configMapRef: + name: "{{ .Values.conf.configmap }}-bootstrap" + {{- if .Values.conf.onPremEnabled }} + - secretRef: + name: "{{ .Values.conf.bootstrapSecretName }}" + {{- end }} + serviceAccountName: "{{ .Values.data.bootstrapServiceAccountName }}" +{{- end }} diff --git a/devops/gcp/deploy/values.yaml b/devops/gcp/deploy/values.yaml index 9071744f44e052834f088e585e50c8328c1b4fe6..69bda61f2b59e3fa2d21c9a4ba5a74aefd895adc 100644 --- a/devops/gcp/deploy/values.yaml +++ b/devops/gcp/deploy/values.yaml @@ -8,6 +8,8 @@ data: requestsMemory: "2048M" limitsCpu: "1" limitsMemory: "3G" + bootstrapImage: "" + bootstrapServiceAccountName: "" image: "" imagePullPolicy: "IfNotPresent" serviceAccountName: "storage" @@ -18,6 +20,8 @@ conf: minioSecretName: "storage-minio-secret" postgresSecretName: "storage-postgres-secret" rabbitmqSecretName: "rabbitmq-secret" + bootstrapSecretName: "datafier-secret" replicas: 3 onPremEnabled: false + bootstrapEnabled: false domain: "" diff --git a/devops/gcp/pipeline/override-stages.yml b/devops/gcp/pipeline/override-stages.yml index d277ad72bba92eeaebe80e5c4db1e3f444e7b017..cda242a0a013aba8d1eae4a8fd968a8d8a3637d4 100644 --- a/devops/gcp/pipeline/override-stages.yml +++ b/devops/gcp/pipeline/override-stages.yml @@ -3,8 +3,36 @@ variables: OSDU_GCP_VENDOR: gcp NO_DATA_ACCESS_TESTER: $OSDU_GCP_NO_DATA_ACCESS_TESTER +.define_DATA_BRANCH: + script: + - > + if [[ -z $CI_COMMIT_TAG ]] && [[ $CI_COMMIT_BRANCH =~ ^release\/[0-9]{1,2}.[0-9]{1,2}$ ]]; + then + REF_DATA_BRANCH=$CI_COMMIT_BRANCH + elif [[ $CI_COMMIT_TAG =~ ^v[0-9]{1,2}\.[0-9]{1,2}\.[0-9]{1,2}$ ]]; + then + TAG=$(echo $CI_COMMIT_TAG | sed "s/^v//"); + REF_DATA_BRANCH="release/${TAG%.*}"; + else + REF_DATA_BRANCH="master"; + fi; + osdu-gcp-anthos-test: variables: OSDU_GCP_VENDOR: anthos TEST_NO_ACCESS_OPENID_PROVIDER_CLIENT_ID: storage-no-access-tester TEST_NO_ACCESS_OPENID_PROVIDER_CLIENT_SECRET: $OSDU_ANTHOS_STORAGE_NO_ACCESS_CLIENT_SECRET + +osdu-gcp-containerize-bootstrap-gitlab: + variables: + OSDU_GCP_ENABLE_BOOTSTRAP: "true" + BUILD_ARGS: "--build-arg DATA_BRANCH=$REF_DATA_BRANCH" + before_script: + - !reference [.define_DATA_BRANCH, script] + +osdu-gcp-containerize-bootstrap-gcr: + variables: + OSDU_GCP_ENABLE_BOOTSTRAP: "true" + BUILD_ARGS: "--build-arg DATA_BRANCH=$REF_DATA_BRANCH" + before_script: + - !reference [.define_DATA_BRANCH, script] diff --git a/provider/storage-gcp/bootstrap/Dockerfile b/provider/storage-gcp/bootstrap/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..94a032e7841f5821f3352c9f569081cfeb953e5d --- /dev/null +++ b/provider/storage-gcp/bootstrap/Dockerfile @@ -0,0 +1,20 @@ +FROM google/cloud-sdk:397.0.0-slim + +ARG DATA_BRANCH + +WORKDIR /opt + +ENV MANIFESTS_DIR="./manifests" +ENV DATA_BRANCH=${DATA_BRANCH} + +COPY ./provider/storage-gcp/bootstrap ./ + +RUN apt-get update \ + && apt-get install zip jq -y \ + && pip3 install --upgrade pip \ + && pip install -r requirements.txt \ + && chmod +x download-data.sh \ + && chmod +x bootstrap_storage.sh \ + && ./download-data.sh + +CMD ["/bin/bash", "-c", "/opt/bootstrap_storage.sh && sleep 365d"] diff --git a/provider/storage-gcp/bootstrap/Python-README.md b/provider/storage-gcp/bootstrap/Python-README.md new file mode 100644 index 0000000000000000000000000000000000000000..3906d09e0764c4803b6938aee414d8ae51ebdc33 --- /dev/null +++ b/provider/storage-gcp/bootstrap/Python-README.md @@ -0,0 +1,41 @@ +# Environmental Variables + +## Common variables +```bash +CLOUD_PROVIDER="anthos|gcp" +MANIFESTS_DIR="</path/to/manifests>" +STORAGE_URL="<https|http>://<storage_host>/api/storage/v2" +DATA_PARTITION_ID="<string>" +ACL_OWNERS='["<owner_group>"]' +ACL_VIEWERS='["<viewer_group>"]' +LEGALTAGS='["<valid_legaltag>"]' +``` + +## Optional common variables +```bash +THREAD_NUMBER=<integer> # the number of simultaneous connections to Storage; default is 3 +BATCH_SIZE=<integer> # the size of Record batch; default is 250 +DATA_BRANCH="<string>" # branch or tag data is got from; default 'master' +``` + +## Anthos Secrets +```bash +KEYCLOAK_AUTH_URL="<https|http>://<keycloak_host>/auth/realms/<realm>/protocol/openid-connect/token" +KEYCLOAK_CLIENT_ID="client_id" +KEYCLOAK_CLIENT_SECRET="client_secret" +``` + +## GCP Secrets (if there are **no** default credentails in the environment) +```bash +SA_FILE_PATH=gs://path/to/sa/file +``` + +# How to run the script + +```bash +# after you set all the variables +python3.8 -m venv venv +source venv/bin/activate +pip install -r requirements.txt +python bootstrap_data.py +``` diff --git a/provider/storage-gcp/bootstrap/README.md b/provider/storage-gcp/bootstrap/README.md new file mode 100644 index 0000000000000000000000000000000000000000..500bb21c9b78d5473c481fe0b712b43c2aecc71c --- /dev/null +++ b/provider/storage-gcp/bootstrap/README.md @@ -0,0 +1,41 @@ +# Storage bootstrap + +## Description + +Bootstrap for storage service uploads sample data to storage service via requests. +It requires previously created legal tag. +To enable storage bootstrap in helm chars - respective configuration parameters should be enabled in provided values. +Logic for bootstrap is implemented in respective (Python scripts)[./Python-README.md]. +Bash part is responsible for providing required environment variables to Python script. +For successful bootstrap several variables should also be provided to Bash script. +List of required bash variables can be found below. + +## Environmental Variables + +### Common variables + +| Name | Description | Example | +|------|-------------|---------| +**DATA_PARTITION_ID** | data partition id | `osdu` +**MANIFESTS_DIR** | directory containing storage records to upload | `./manifests` +**STORAGE_HOST** | Storage service host | `http://storage` +**DEFAULT_LEGAL_TAG** | Previously created legal tag without `<data_partition_id>` part | `default-legal-tag` + +### Anthos Variables + +| Name | Description | Example | +|------|-------------|---------| +**OPENID_PROVIDER_URL** | url to access openid provider | `http://keycloak/realms/osdu` +**OPENID_PROVIDER_CLIENT_ID** | client id access openid provider | `keycloak` +**OPENID_PROVIDER_CLIENT_SECRET** | client secret to access openid provider | `p@ssw0rd` + +### Hardcoded Variables + +Bash script also fills some default environment variables values, that will be later used in Python script. + +| Name | Description | Value | +|------|-------------|---------| +**CLOUD_PROVIDER** | provider type | `gcp|anthos` +**ACL_OWNERS** | acl owners group name | `["data.default.owners@${DATA_PARTITION_ID}.group"]` +**ACL_VIEWERS** | acl owners group name | `["data.default.viewers@${DATA_PARTITION_ID}.group"]` + diff --git a/provider/storage-gcp/bootstrap/__init__.py b/provider/storage-gcp/bootstrap/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/provider/storage-gcp/bootstrap/bootstrap_data.py b/provider/storage-gcp/bootstrap/bootstrap_data.py new file mode 100644 index 0000000000000000000000000000000000000000..53cb69e162fae2e659fbf388c665bd7faea7b6bb --- /dev/null +++ b/provider/storage-gcp/bootstrap/bootstrap_data.py @@ -0,0 +1,148 @@ +import json +import logging +import os +import threading +from typing import Iterable, List + +import requests +import tenacity +from config_manager import ConfigManager +from prepare_records import RecordsPreparer +from osdu_api.clients.storage.record_client import RecordClient +from osdu_api.model.record import Record +from osdu_ingestion.libs.refresh_token import BaseTokenRefresher +from osdu_ingestion.libs.utils import split_into_batches +from storage_client import StorageClient +from utils import prepared_manifests_records, unique_records + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +error_report = [] + +# get environmental variables +try: + ACL_OWNERS = json.loads(os.environ["ACL_OWNERS"]) + ACL_VIEWERS = json.loads(os.environ["ACL_VIEWERS"]) + LEGALTAGS = json.loads(os.environ["LEGALTAGS"]) + MANIFESTS_DIR = os.environ["MANIFESTS_DIR"] +except (KeyError, json.JSONDecodeError) as e: + logger.error( + "The following env variable must be set:\n" + "\tACL_OWNERS='[\"value_1\", \"value_2\"]'\n" + "\tACL_VIEWERS='[\"value_1\", \"value_2\"]'\n" + "\tLEGALTAGS='[\"value_1\", \"value_2\"]'\n" + "\tMANIFESTS_DIR='/path/to/manifests'\n" + ) + raise e + +# get optional environmental variables +try: + THREAD_NUMBER = int(os.environ.get("THREAD_NUMBER", 3)) + BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 250)) +except ValueError as e: + logger.error( + f"Environmental variables THREAD_NUMBER and BATCH_SIZE must be type of integer.") + raise e + +def on_storage_error_callback(retry_state: tenacity.RetryCallState): + failed_record_ids = [record.id for record in retry_state.args[1]] + try: + retry_state.outcome.result() + except (requests.HTTPError, requests.exceptions.ConnectionError) as e: + error_msg = str(e) + # TODO: Think about saving dead letters + error_report.append( + { + "error": error_msg, + "record_ids": failed_record_ids + } + ) + return retry_state.outcome.result() + + +def print_error_report(): + error_report_string = "" + for err in error_report: + error_report_string = error_report_string \ + + f"Error: {err['error']}\n Ids: {err['record_ids']}\n" + if error_report: + logger.warn( + "Following records weren't stored: \n" + f"{error_report_string}" + ) + + +class SaveToStorageThread(threading.Thread): + + _lock = threading.Lock() + + def __init__(self, storage_client: RecordClient, record_batches: Iterable[List[Record]]): + super().__init__() + self._storage_client = storage_client + self._record_batches = record_batches + + @tenacity.retry( + wait=tenacity.wait_fixed(3), + stop=tenacity.stop_after_attempt(3), + retry_error_callback=on_storage_error_callback, + reraise=True + ) + def _send_storage_request(self, record_batch: List[Record]): + print(f"Send batch of {len(record_batch)} records.") + response = self._storage_client.create_update_records(record_batch) + logger.info(response) + response.raise_for_status() + + def run(self): + while True: + try: + self._lock.acquire() + record_batch = next(self._record_batches) + self._lock.release() + record_batch = unique_records(record_batch) + self._send_storage_request(record_batch) + except StopIteration: + logger.info("There are no records left to save.") + self._lock.release() + break + +def main(): + config_manager = ConfigManager() + + records_preparer = RecordsPreparer( + data_partition_id=config_manager.get( + "environment", "data_partition_id"), + acl_owners=ACL_OWNERS, + acl_viewers=ACL_VIEWERS, + legaltags=LEGALTAGS + ) + + token_refresher = BaseTokenRefresher() + token_refresher.refresh_token() + + storage_client = StorageClient( + config_manager=config_manager, token_refresher=token_refresher) + + manifests_records = prepared_manifests_records( + records_preparer, MANIFESTS_DIR) + + record_batches = split_into_batches( + manifests_records, + BATCH_SIZE + ) + + threads = [] + for _ in range(THREAD_NUMBER): + threads.append(SaveToStorageThread(storage_client, record_batches)) + + for t in threads: + t.start() + for t in threads: + t.join() + + print_error_report() + + +if __name__ == "__main__": + main() diff --git a/provider/storage-gcp/bootstrap/bootstrap_storage.sh b/provider/storage-gcp/bootstrap/bootstrap_storage.sh new file mode 100644 index 0000000000000000000000000000000000000000..ddc088fb489bb9e588d8aea344c15859738526e0 --- /dev/null +++ b/provider/storage-gcp/bootstrap/bootstrap_storage.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash +# +# Script that bootstraps storage service using Python scripts +# It creates legal tag via request to Legal service +# After that records are uploaded via requests to Storage service, using previously created Legal tag +# Contains logic for both onprem and gcp version + +set -ex + +source ./validate-env.sh "DATA_PARTITION_ID" +source ./validate-env.sh "MANIFESTS_DIR" +source ./validate-env.sh "STORAGE_HOST" + +export ACL_OWNERS="[\"data.default.owners@${DATA_PARTITION_ID}.group\"]" +export ACL_VIEWERS="[\"data.default.viewers@${DATA_PARTITION_ID}.group\"]" +export LEGALTAGS="[\"${DATA_PARTITION_ID}-${DEFAULT_LEGAL_TAG}\"]" +export STORAGE_URL="${STORAGE_HOST}/api/storage/v2" + +if [ "${ONPREM_ENABLED}" == "true" ]; then + source ./validate-env.sh "OPENID_PROVIDER_URL" + source ./validate-env.sh "OPENID_PROVIDER_CLIENT_ID" + source ./validate-env.sh "OPENID_PROVIDER_CLIENT_SECRET" + + # Check that all env ANTHOS variables are provided + export KEYCLOAK_AUTH_URL="${OPENID_PROVIDER_URL}/protocol/openid-connect/token" + export KEYCLOAK_CLIENT_ID="${OPENID_PROVIDER_CLIENT_ID}" + export KEYCLOAK_CLIENT_SECRET="${OPENID_PROVIDER_CLIENT_SECRET}" + export CLOUD_PROVIDER="anthos" + + python3 /opt/bootstrap_data.py + +else + # Check that all GCP env variables are provided + export CLOUD_PROVIDER="gcp" + + python3 /opt/bootstrap_data.py + +fi + +touch /tmp/bootstrap_ready diff --git a/provider/storage-gcp/bootstrap/config_manager.py b/provider/storage-gcp/bootstrap/config_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..2945a912e6195113df5fff4fa242f1b8825043c7 --- /dev/null +++ b/provider/storage-gcp/bootstrap/config_manager.py @@ -0,0 +1,61 @@ +from asyncio.log import logger +import logging +import os + +from osdu_api.configuration.base_config_manager import BaseConfigManager + +logger = logging.getLogger() + + +class ConfigManager(BaseConfigManager): + """Custom config manager for Python SDK + """ + + def __init__(self) -> None: + try: + self.config = { + "environment": { + "storage_url": os.environ["STORAGE_URL"], + "data_partition_id": os.environ["DATA_PARTITION_ID"], + "use_service_principal": False + }, + "provider": { + "name": os.environ["CLOUD_PROVIDER"] + } + } + except KeyError as error: + logger.error( + "One or more of the following environmental variables weren't set:\n" + "STORAGE_URL,\n" + "DATA_PARTITION_ID,\n" + "CLOUD_PROVIDER." + ) + raise error + + def get(self, section: str, option: str) -> str: + try: + config_value = self.config[section][option] + except KeyError: + config_value = "stub" + return config_value + + def getint(self, section: str, option: str) -> int: + try: + config_value = self.config[section][option] + except KeyError: + config_value = 0 + return config_value + + def getfloat(self, section: str, option: str) -> float: + try: + config_value = self.config[section][option] + except KeyError: + config_value = 0.0 + return config_value + + def getbool(self, section: str, option: str, default=False) -> bool: + try: + config_value = self.config[section][option] + except KeyError: + config_value = default + return config_value diff --git a/provider/storage-gcp/bootstrap/delete_records.py b/provider/storage-gcp/bootstrap/delete_records.py new file mode 100644 index 0000000000000000000000000000000000000000..c1d587290b0358356005f59a70a1a0f27e3a0b62 --- /dev/null +++ b/provider/storage-gcp/bootstrap/delete_records.py @@ -0,0 +1,114 @@ +import json +import logging +import os +import threading +from typing import Iterable + +import tenacity +from config_manager import ConfigManager +from prepare_records import RecordsPreparer +from osdu_api.clients.storage.record_client import RecordClient +from osdu_api.model.storage.record import Record +from osdu_ingestion.libs.refresh_token import BaseTokenRefresher +from utils import prepared_manifests_records + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +error_report = [] + +# get optional environmental variables +try: + THREAD_NUMBER = int(os.environ.get("THREAD_NUMBER", 3)) +except ValueError as e: + logger.error( + f"Environmental variable THREAD_NUMBER must be type of integer.") + raise e + +# get environmental variables +try: + ACL_OWNERS = json.loads(os.environ["ACL_OWNERS"]) + ACL_VIEWERS = json.loads(os.environ["ACL_VIEWERS"]) + LEGALTAGS = json.loads(os.environ["LEGALTAGS"]) + MANIFESTS_DIR = os.environ["MANIFESTS_DIR"] +except (KeyError, json.JSONDecodeError) as e: + logger.error( + "The following env variable must be set:\n" + "\tACL_OWNERS='[\"value_1\", \"value_2\"]'\n" + "\tACL_VIEWERS='[\"value_1\", \"value_2\"]'\n" + "\tLEGALTAGS='[\"value_1\", \"value_2\"]'\n" + "\tMANIFESTS_DIR='/path/to/manifests'\n" + ) + raise e + + +def on_storage_error_callback(retry_state: tenacity.RetryCallState): + try: + retry_state.outcome.result() + except Exception as e: + print(e) + + + +class DeleteFromStorageThread(threading.Thread): + + _lock = threading.Lock() + + def __init__(self, storage_client: RecordClient, records: Iterable[Record]): + super().__init__() + self._storage_client = storage_client + self._records = records + + @tenacity.retry( + wait=tenacity.wait_fixed(1), + stop=tenacity.stop_after_attempt(1), + retry_error_callback=on_storage_error_callback, + reraise=True + ) + def send_storage_request(self, record: Record): + print(f"Delete record {record.id}") + self._storage_client.delete_record(record.id) + + def run(self): + while True: + try: + self._lock.acquire() + record = next(self._records) + self._lock.release() + self._send_storage_request(record) + except StopIteration: + logger.info("There are no records left to save.") + self._lock.release() + break + +def main(): + config_manager = ConfigManager() + records_preparer = RecordsPreparer( + data_partition_id=config_manager.get( + "environment", "data_partition_id"), + acl_owners=ACL_OWNERS, + acl_viewers=ACL_VIEWERS, + legaltags=LEGALTAGS + ) + + token_refresher = BaseTokenRefresher() + token_refresher.refresh_token() + + storage_client = RecordClient( + config_manager=config_manager, token_refresher=token_refresher) + + manifests_records = prepared_manifests_records( + records_preparer, MANIFESTS_DIR) + + threads = [] + for _ in THREAD_NUMBER: + threads.append(DeleteFromStorageThread(storage_client, manifests_records)) + + for t in threads: + t.start() + for t in threads: + t.join() + + +if __name__ == "__main__": + main() diff --git a/provider/storage-gcp/bootstrap/download-data.sh b/provider/storage-gcp/bootstrap/download-data.sh new file mode 100644 index 0000000000000000000000000000000000000000..9e808ac79cfbaf456a79049e47379f433e540d92 --- /dev/null +++ b/provider/storage-gcp/bootstrap/download-data.sh @@ -0,0 +1,43 @@ +#! /bin/sh +set -x + +if [ -z "${MANIFESTS_DIR}" ]; then + echo "Set 'MANIFESTS_DIR' variable" + exit 1 +else + mkdir -p "${MANIFESTS_DIR}" +fi + +if [ -z "${DATA_BRANCH}" ]; then + echo "Set 'DATA_BRANCH' variable to master" + DATA_BRANCH="master" +fi + +REFERENCE_VALUES="https://community.opengroup.org/osdu/data/data-definitions/-/archive/${DATA_BRANCH}/data-definitions-${DATA_BRANCH}.zip?path=ReferenceValues/Manifests/reference-data" +TNO_VOLVE_VALUES="https://community.opengroup.org/osdu/platform/data-flow/data-loading/open-test-data/-/archive/${DATA_BRANCH}/open-test-data-${DATA_BRANCH}.zip?path=rc--3.0.0/4-instances" + +curl -o reference-data.zip "$REFERENCE_VALUES" +if unzip -t "reference-data.zip" > /dev/null; then + echo "Succesfully downloaded and verified reference-data.zip" +else + echo "reference-data.zip has not been downloaded." + exit 1 +fi + +curl -o tno-volve-data.zip "$TNO_VOLVE_VALUES" +if unzip -t "tno-volve-data.zip" > /dev/null; then + echo "Succesfully downloaded and verified tno-volve-data.zip" +else + echo "tno-volve-data.zip has not been downloaded." + exit 1 +fi + +unzip -o reference-data.zip -d "$MANIFESTS_DIR" > /dev/null +unzip -o tno-volve-data.zip -d "$MANIFESTS_DIR" > /dev/null + +rm -f reference-data.zip +rm -f tno-volve-data.zip + +find "$MANIFESTS_DIR" -type f -name 'IngestionSequence.json' -delete +find "$MANIFESTS_DIR" -type f -name 'ReferenceValueTypeDependencies.json' -delete +find "$MANIFESTS_DIR" -type d -name 'work-products' -exec rm -rf {} + diff --git a/provider/storage-gcp/bootstrap/prepare_records.py b/provider/storage-gcp/bootstrap/prepare_records.py new file mode 100644 index 0000000000000000000000000000000000000000..92c5bd361ae80f9b8e884a5032cc2bdbdbd62102 --- /dev/null +++ b/provider/storage-gcp/bootstrap/prepare_records.py @@ -0,0 +1,88 @@ +import json +import re +from typing import Iterator + +from osdu_api.model.storage.acl import Acl +from osdu_api.model.storage.legal import Legal +from osdu_api.model.storage.record import Record as Record +from osdu_api.model.storage.record_ancestry import RecordAncestry + + +class RecordsPreparer: + """ + This class is used for preparing records from the Manifest to be sent via StorageClient + """ + + RECORD_ID_REGEX = r"\"([\w\-\.\{\}]+)(:[\w\-\.]+\-\-[\w\-\.]+:[\w\-\.\:\%]+)\"" + RECORD_KIND_REGEX = r"\"([\w\-\.]+)(:[\w\-\.]+:[\w\-\.]+:[0-9]+.[0-9]+.[0-9]+)\"" + + def __init__( + self, + data_partition_id: str, + acl_owners: list, + acl_viewers: list, + legaltags: list, + country_codes: str = ["US"] + ) -> None: + self.data_partition_id = data_partition_id + self.acl = Acl(viewers=acl_viewers, owners=acl_owners) + self.legal = Legal( + legaltags=legaltags, + other_relevant_data_countries=country_codes, + status="" + ) + # FIXME: Status is not working with Storage service. Need to investigate this problem further + try: + del self.legal.status + except AttributeError: + pass + + def _data_partition_id_repl_func(self, match: re.Match) -> str: + value_without_data_partition = match.group(2) + return f"\"{self.data_partition_id}{value_without_data_partition}\"" + + def _replace_data_partition_id(self, manifest_content: str) -> str: + """Replace data-partition-id in the Manifest + + :param manifest_content: Manifest content + :type manifest_content: str + :return: Manifest content with replaced data-partition-id + :rtype: str + """ + manifest_content = re.sub( + self.RECORD_ID_REGEX, + self._data_partition_id_repl_func, + manifest_content + ) + return manifest_content + + def _prepare_record(self, record_dict: dict) -> Record: + record_ancestry = RecordAncestry( + parents=record_dict.get("ancestry", {}).get("parents", []) + ) + record = Record( + kind=record_dict["kind"], + acl=self.acl, + legal=self.legal, + id=record_dict.get("id"), + data=record_dict["data"], + meta=record_dict.get("meta"), + ancestry=record_ancestry + ) + return record + + def manifest_records(self, manifest_string: str) -> Iterator[Record]: + """Generator that gets a Manifest file as a string, + applies changes to it and yields osdu_api model Record ony by one. + + :param manifest_string: content of the Manifest file + :type manifest_string: str + :yield: osdu_api model Record object + :rtype: Iterator[Record] + """ + manifest_string = self._replace_data_partition_id(manifest_string) + manifest_dict = json.loads(manifest_string) + record_list = manifest_dict.get( + "ReferenceData", []) + manifest_dict.get("MasterData", []) + for record_dict in record_list: + yield self._prepare_record(record_dict) diff --git a/provider/storage-gcp/bootstrap/requirements.txt b/provider/storage-gcp/bootstrap/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..290d32b5b5106633c5e0c91cd6261be3ddd33540 --- /dev/null +++ b/provider/storage-gcp/bootstrap/requirements.txt @@ -0,0 +1,14 @@ +google-api-core==1.31.5 +google-auth==1.32.1 +google-cloud-core==1.7.2 +google-cloud-storage==1.40.0 + +requests==2.25.1 +tenacity==6.2.0 + +--extra-index-url https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple +osdu-api~=0.17.0rc1 + +--extra-index-url https://community.opengroup.org/api/v4/projects/823/packages/pypi/simple +osdu-ingestion==0.16.0 + diff --git a/provider/storage-gcp/bootstrap/storage_client.py b/provider/storage-gcp/bootstrap/storage_client.py new file mode 100644 index 0000000000000000000000000000000000000000..1b9d4dda55c1d252dfa59c9d0f7cb136baec6f89 --- /dev/null +++ b/provider/storage-gcp/bootstrap/storage_client.py @@ -0,0 +1,21 @@ +from typing import List + +from osdu_api.clients.base_client import BaseClient +from osdu_api.model.http_method import HttpMethod +from osdu_api.model.storage.query_records_request import QueryRecordsRequest +from osdu_api.model.storage.record import Record +from osdu_api.clients.storage.record_client import RecordClient + + +class StorageClient(RecordClient): + + def create_update_records(self, records: List[Record], bearer_token=None): + """ + Need to override this method to add extra query argument to skip duplicates. + """ + records_data = '[' + for record in records: + records_data = '{}{}{}'.format(records_data, record.to_JSON(), ',') + records_data = records_data[:-1] + records_data = '{}{}'.format(records_data, ']') + return self.make_request(method=HttpMethod.PUT, url='{}{}'.format(self.storage_url, '/records?skipdupes=true'), data=records_data, bearer_token=bearer_token) diff --git a/provider/storage-gcp/bootstrap/utils.py b/provider/storage-gcp/bootstrap/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..7dc72391632e47e297c9b044a0ba770d3c43dc6d --- /dev/null +++ b/provider/storage-gcp/bootstrap/utils.py @@ -0,0 +1,28 @@ +import logging +import os +from typing import Iterable, Iterator, List + +from osdu_api.model.storage.record import Record +from prepare_records import RecordsPreparer + +logger = logging.getLogger() + + +def manifest_paths(base_dir: str) -> Iterator[str]: + for root, _, files in os.walk(base_dir): + for file in files: + if not file.endswith(".json"): + continue + file_path = os.path.join(root, file) + yield file_path + +def unique_records(record_batch: Iterable[Record]) -> List[Record]: + record_batch = {r.id: r for r in record_batch}.values() + return list(record_batch) + +def prepared_manifests_records(records_preparer: RecordsPreparer, manifests_dir: str) -> Iterator[Record]: + for file_path in manifest_paths(manifests_dir): + with open(file_path) as f: + manifest_string = f.read() + for record in records_preparer.manifest_records(manifest_string): + yield record diff --git a/provider/storage-gcp/bootstrap/validate-env.sh b/provider/storage-gcp/bootstrap/validate-env.sh new file mode 100644 index 0000000000000000000000000000000000000000..5649203a18d6a06976bf164f8745ab848f29f949 --- /dev/null +++ b/provider/storage-gcp/bootstrap/validate-env.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Copyright 2020 Google LLC +# Copyright 2017-2019, Schlumberger +# Copyright 2022 EPAM +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ENV_VAR_NAME=$1 + +if [ "${!ENV_VAR_NAME}" = "" ] +then + echo "Missing environment variable '$ENV_VAR_NAME'. Please provide all variables and try again" + exit 1 +fi