Skip to content
Snippets Groups Projects
Commit b1b69812 authored by Danylo Vanin (EPAM)'s avatar Danylo Vanin (EPAM) Committed by Oleksandr Kosse (EPAM)
Browse files

[GONRG-5460] Added bootstrap script

parent 02c0da44
No related branches found
No related tags found
1 merge request!477[GONRG-5460] Added bootstrap script
Showing
with 736 additions and 1 deletion
......@@ -51,6 +51,16 @@ First you need to set variables in **values.yaml** file using any code editor. S
cat ~/.config/gcloud/application_default_credentials.json | grep client_id
```
### Bootstrap variables
This variables can be omitted in case **conf.bootstrapEnabled** is set to `false`.
| Name | Description | Type | Default |Required |
|------|-------------|------|---------|---------|
**storageHost** | Storage service host address | string | `http://storage` | yes
**defaultLegalTag** | Name of the previously created legal tag (without partition part) | string | `default-data-tag` | yes
**dataPartitionId** | Data partition id | string | `redis-storage-master` | yes
### Config variables
| Name | Description | Type | Default |Required |
......@@ -58,6 +68,7 @@ cat ~/.config/gcloud/application_default_credentials.json | grep client_id
**appName** | name of the app | string | storage | yes
**configmap** | configmap name | string | storage-config | yes
**onPremEnabled** | whether on-prem is enabled | boolean | false | yes
**bootstrapEnabled** | whether to enable storage bootstrap (requires previously created legal tag) | boolean | false | yes
### Install the helm chart
......
{{- if .Values.conf.bootstrapEnabled }}
apiVersion: v1
kind: ConfigMap
metadata:
labels:
app: "{{ .Values.conf.appName }}-bootstrap"
name: "{{ .Values.conf.configmap }}-bootstrap"
namespace: "{{ .Release.Namespace }}"
data:
STORAGE_HOST: "{{ .Values.data.storageHost }}"
DEFAULT_LEGAL_TAG: "{{ .Values.data.defaultLegalTag }}"
DATA_PARTITION_ID: "{{ .Values.data.dataPartitionId }}"
ONPREM_ENABLED: "{{ .Values.conf.onPremEnabled }}"
{{- end }}
......@@ -14,10 +14,15 @@ data:
legalHost: "http://legal"
redisGroupHost: "redis-group-master"
redisStorageHost: "redis-storage-master"
# bootstrap
storageHost: "http://storage"
defaultLegalTag: "default-data-tag"
dataPartitionId: ""
# gcp
googleAudiences: ""
conf:
appName: "storage"
configmap: "storage-config"
onPremEnabled: false
bootstrapEnabled: false
......@@ -38,6 +38,8 @@ First you need to set variables in **values.yaml** file using any code editor. S
**image** | service image | string | - | yes
**imagePullPolicy** | when to pull image | string | IfNotPresent | yes
**serviceAccountName** | name of your service account | string | storage | yes
**bootstrapImage** | bootstrap image | string | - | yes
**bootstrapServiceAccountName** | service account that will be used for bootstrap | string | - | yes
### Config variables
......@@ -50,8 +52,10 @@ First you need to set variables in **values.yaml** file using any code editor. S
**minioSecretName** | secret for minio | string | `storage-minio-secret` | yes
**postgresSecretName** | secret for postgres | string | `storage-postgres-secret` | yes
**rabbitmqSecretName** | secret for rabbitmq | string | `rabbitmq-secret` | yes
**bootstrapSecretName** | secret for bootstrap to access opendi provider | string | `datafier-secret` | yes
**replicas** | Number of replicas | integer | 3 | yes
**onPremEnabled** | whether on-prem is enabled | boolean | false | yes
**bootstrapEnabled** | whether to enable storage bootstrap (should be enabled also for config chart) | boolean | false | yes
**domain** | your domain | string | - | yes
### Install the helm chart
......
{{- if .Values.conf.bootstrapEnabled }}
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: "{{ .Values.conf.appName }}-bootstrap"
name: "{{ .Values.conf.appName }}-bootstrap"
namespace: "{{ .Release.Namespace }}"
spec:
replicas: 1
selector:
matchLabels:
app: "{{ .Values.conf.appName }}-bootstrap"
template:
metadata:
labels:
app: "{{ .Values.conf.appName }}-bootstrap"
annotations:
rollme: {{ randAlphaNum 5 | quote }}
spec:
containers:
- name: "{{ .Values.conf.appName }}-bootstrap"
image: "{{ .Values.data.bootstrapImage }}"
readinessProbe:
exec:
command:
- cat
- /tmp/bootstrap_ready
imagePullPolicy: "{{ .Values.data.imagePullPolicy }}"
envFrom:
- configMapRef:
name: "{{ .Values.conf.configmap }}-bootstrap"
{{- if .Values.conf.onPremEnabled }}
- secretRef:
name: "{{ .Values.conf.bootstrapSecretName }}"
{{- end }}
serviceAccountName: "{{ .Values.data.bootstrapServiceAccountName }}"
{{- end }}
......@@ -8,6 +8,8 @@ data:
requestsMemory: "2048M"
limitsCpu: "1"
limitsMemory: "3G"
bootstrapImage: ""
bootstrapServiceAccountName: ""
image: ""
imagePullPolicy: "IfNotPresent"
serviceAccountName: "storage"
......@@ -18,6 +20,8 @@ conf:
minioSecretName: "storage-minio-secret"
postgresSecretName: "storage-postgres-secret"
rabbitmqSecretName: "rabbitmq-secret"
bootstrapSecretName: "datafier-secret"
replicas: 3
onPremEnabled: false
bootstrapEnabled: false
domain: ""
......@@ -3,8 +3,36 @@ variables:
OSDU_GCP_VENDOR: gcp
NO_DATA_ACCESS_TESTER: $OSDU_GCP_NO_DATA_ACCESS_TESTER
.define_DATA_BRANCH:
script:
- >
if [[ -z $CI_COMMIT_TAG ]] && [[ $CI_COMMIT_BRANCH =~ ^release\/[0-9]{1,2}.[0-9]{1,2}$ ]];
then
REF_DATA_BRANCH=$CI_COMMIT_BRANCH
elif [[ $CI_COMMIT_TAG =~ ^v[0-9]{1,2}\.[0-9]{1,2}\.[0-9]{1,2}$ ]];
then
TAG=$(echo $CI_COMMIT_TAG | sed "s/^v//");
REF_DATA_BRANCH="release/${TAG%.*}";
else
REF_DATA_BRANCH="master";
fi;
osdu-gcp-anthos-test:
variables:
OSDU_GCP_VENDOR: anthos
TEST_NO_ACCESS_OPENID_PROVIDER_CLIENT_ID: storage-no-access-tester
TEST_NO_ACCESS_OPENID_PROVIDER_CLIENT_SECRET: $OSDU_ANTHOS_STORAGE_NO_ACCESS_CLIENT_SECRET
osdu-gcp-containerize-bootstrap-gitlab:
variables:
OSDU_GCP_ENABLE_BOOTSTRAP: "true"
BUILD_ARGS: "--build-arg DATA_BRANCH=$REF_DATA_BRANCH"
before_script:
- !reference [.define_DATA_BRANCH, script]
osdu-gcp-containerize-bootstrap-gcr:
variables:
OSDU_GCP_ENABLE_BOOTSTRAP: "true"
BUILD_ARGS: "--build-arg DATA_BRANCH=$REF_DATA_BRANCH"
before_script:
- !reference [.define_DATA_BRANCH, script]
FROM google/cloud-sdk:397.0.0-slim
ARG DATA_BRANCH
WORKDIR /opt
ENV MANIFESTS_DIR="./manifests"
ENV DATA_BRANCH=${DATA_BRANCH}
COPY ./provider/storage-gcp/bootstrap ./
RUN apt-get update \
&& apt-get install zip jq -y \
&& pip3 install --upgrade pip \
&& pip install -r requirements.txt \
&& chmod +x download-data.sh \
&& chmod +x bootstrap_storage.sh \
&& ./download-data.sh
CMD ["/bin/bash", "-c", "/opt/bootstrap_storage.sh && sleep 365d"]
# Environmental Variables
## Common variables
```bash
CLOUD_PROVIDER="anthos|gcp"
MANIFESTS_DIR="</path/to/manifests>"
STORAGE_URL="<https|http>://<storage_host>/api/storage/v2"
DATA_PARTITION_ID="<string>"
ACL_OWNERS='["<owner_group>"]'
ACL_VIEWERS='["<viewer_group>"]'
LEGALTAGS='["<valid_legaltag>"]'
```
## Optional common variables
```bash
THREAD_NUMBER=<integer> # the number of simultaneous connections to Storage; default is 3
BATCH_SIZE=<integer> # the size of Record batch; default is 250
DATA_BRANCH="<string>" # branch or tag data is got from; default 'master'
```
## Anthos Secrets
```bash
KEYCLOAK_AUTH_URL="<https|http>://<keycloak_host>/auth/realms/<realm>/protocol/openid-connect/token"
KEYCLOAK_CLIENT_ID="client_id"
KEYCLOAK_CLIENT_SECRET="client_secret"
```
## GCP Secrets (if there are **no** default credentails in the environment)
```bash
SA_FILE_PATH=gs://path/to/sa/file
```
# How to run the script
```bash
# after you set all the variables
python3.8 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
python bootstrap_data.py
```
# Storage bootstrap
## Description
Bootstrap for storage service uploads sample data to storage service via requests.
It requires previously created legal tag.
To enable storage bootstrap in helm chars - respective configuration parameters should be enabled in provided values.
Logic for bootstrap is implemented in respective (Python scripts)[./Python-README.md].
Bash part is responsible for providing required environment variables to Python script.
For successful bootstrap several variables should also be provided to Bash script.
List of required bash variables can be found below.
## Environmental Variables
### Common variables
| Name | Description | Example |
|------|-------------|---------|
**DATA_PARTITION_ID** | data partition id | `osdu`
**MANIFESTS_DIR** | directory containing storage records to upload | `./manifests`
**STORAGE_HOST** | Storage service host | `http://storage`
**DEFAULT_LEGAL_TAG** | Previously created legal tag without `<data_partition_id>` part | `default-legal-tag`
### Anthos Variables
| Name | Description | Example |
|------|-------------|---------|
**OPENID_PROVIDER_URL** | url to access openid provider | `http://keycloak/realms/osdu`
**OPENID_PROVIDER_CLIENT_ID** | client id access openid provider | `keycloak`
**OPENID_PROVIDER_CLIENT_SECRET** | client secret to access openid provider | `p@ssw0rd`
### Hardcoded Variables
Bash script also fills some default environment variables values, that will be later used in Python script.
| Name | Description | Value |
|------|-------------|---------|
**CLOUD_PROVIDER** | provider type | `gcp|anthos`
**ACL_OWNERS** | acl owners group name | `["data.default.owners@${DATA_PARTITION_ID}.group"]`
**ACL_VIEWERS** | acl owners group name | `["data.default.viewers@${DATA_PARTITION_ID}.group"]`
import json
import logging
import os
import threading
from typing import Iterable, List
import requests
import tenacity
from config_manager import ConfigManager
from prepare_records import RecordsPreparer
from osdu_api.clients.storage.record_client import RecordClient
from osdu_api.model.record import Record
from osdu_ingestion.libs.refresh_token import BaseTokenRefresher
from osdu_ingestion.libs.utils import split_into_batches
from storage_client import StorageClient
from utils import prepared_manifests_records, unique_records
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
error_report = []
# get environmental variables
try:
ACL_OWNERS = json.loads(os.environ["ACL_OWNERS"])
ACL_VIEWERS = json.loads(os.environ["ACL_VIEWERS"])
LEGALTAGS = json.loads(os.environ["LEGALTAGS"])
MANIFESTS_DIR = os.environ["MANIFESTS_DIR"]
except (KeyError, json.JSONDecodeError) as e:
logger.error(
"The following env variable must be set:\n"
"\tACL_OWNERS='[\"value_1\", \"value_2\"]'\n"
"\tACL_VIEWERS='[\"value_1\", \"value_2\"]'\n"
"\tLEGALTAGS='[\"value_1\", \"value_2\"]'\n"
"\tMANIFESTS_DIR='/path/to/manifests'\n"
)
raise e
# get optional environmental variables
try:
THREAD_NUMBER = int(os.environ.get("THREAD_NUMBER", 3))
BATCH_SIZE = int(os.environ.get("BATCH_SIZE", 250))
except ValueError as e:
logger.error(
f"Environmental variables THREAD_NUMBER and BATCH_SIZE must be type of integer.")
raise e
def on_storage_error_callback(retry_state: tenacity.RetryCallState):
failed_record_ids = [record.id for record in retry_state.args[1]]
try:
retry_state.outcome.result()
except (requests.HTTPError, requests.exceptions.ConnectionError) as e:
error_msg = str(e)
# TODO: Think about saving dead letters
error_report.append(
{
"error": error_msg,
"record_ids": failed_record_ids
}
)
return retry_state.outcome.result()
def print_error_report():
error_report_string = ""
for err in error_report:
error_report_string = error_report_string \
+ f"Error: {err['error']}\n Ids: {err['record_ids']}\n"
if error_report:
logger.warn(
"Following records weren't stored: \n"
f"{error_report_string}"
)
class SaveToStorageThread(threading.Thread):
_lock = threading.Lock()
def __init__(self, storage_client: RecordClient, record_batches: Iterable[List[Record]]):
super().__init__()
self._storage_client = storage_client
self._record_batches = record_batches
@tenacity.retry(
wait=tenacity.wait_fixed(3),
stop=tenacity.stop_after_attempt(3),
retry_error_callback=on_storage_error_callback,
reraise=True
)
def _send_storage_request(self, record_batch: List[Record]):
print(f"Send batch of {len(record_batch)} records.")
response = self._storage_client.create_update_records(record_batch)
logger.info(response)
response.raise_for_status()
def run(self):
while True:
try:
self._lock.acquire()
record_batch = next(self._record_batches)
self._lock.release()
record_batch = unique_records(record_batch)
self._send_storage_request(record_batch)
except StopIteration:
logger.info("There are no records left to save.")
self._lock.release()
break
def main():
config_manager = ConfigManager()
records_preparer = RecordsPreparer(
data_partition_id=config_manager.get(
"environment", "data_partition_id"),
acl_owners=ACL_OWNERS,
acl_viewers=ACL_VIEWERS,
legaltags=LEGALTAGS
)
token_refresher = BaseTokenRefresher()
token_refresher.refresh_token()
storage_client = StorageClient(
config_manager=config_manager, token_refresher=token_refresher)
manifests_records = prepared_manifests_records(
records_preparer, MANIFESTS_DIR)
record_batches = split_into_batches(
manifests_records,
BATCH_SIZE
)
threads = []
for _ in range(THREAD_NUMBER):
threads.append(SaveToStorageThread(storage_client, record_batches))
for t in threads:
t.start()
for t in threads:
t.join()
print_error_report()
if __name__ == "__main__":
main()
#!/usr/bin/env bash
#
# Script that bootstraps storage service using Python scripts
# It creates legal tag via request to Legal service
# After that records are uploaded via requests to Storage service, using previously created Legal tag
# Contains logic for both onprem and gcp version
set -ex
source ./validate-env.sh "DATA_PARTITION_ID"
source ./validate-env.sh "MANIFESTS_DIR"
source ./validate-env.sh "STORAGE_HOST"
export ACL_OWNERS="[\"data.default.owners@${DATA_PARTITION_ID}.group\"]"
export ACL_VIEWERS="[\"data.default.viewers@${DATA_PARTITION_ID}.group\"]"
export LEGALTAGS="[\"${DATA_PARTITION_ID}-${DEFAULT_LEGAL_TAG}\"]"
export STORAGE_URL="${STORAGE_HOST}/api/storage/v2"
if [ "${ONPREM_ENABLED}" == "true" ]; then
source ./validate-env.sh "OPENID_PROVIDER_URL"
source ./validate-env.sh "OPENID_PROVIDER_CLIENT_ID"
source ./validate-env.sh "OPENID_PROVIDER_CLIENT_SECRET"
# Check that all env ANTHOS variables are provided
export KEYCLOAK_AUTH_URL="${OPENID_PROVIDER_URL}/protocol/openid-connect/token"
export KEYCLOAK_CLIENT_ID="${OPENID_PROVIDER_CLIENT_ID}"
export KEYCLOAK_CLIENT_SECRET="${OPENID_PROVIDER_CLIENT_SECRET}"
export CLOUD_PROVIDER="anthos"
python3 /opt/bootstrap_data.py
else
# Check that all GCP env variables are provided
export CLOUD_PROVIDER="gcp"
python3 /opt/bootstrap_data.py
fi
touch /tmp/bootstrap_ready
from asyncio.log import logger
import logging
import os
from osdu_api.configuration.base_config_manager import BaseConfigManager
logger = logging.getLogger()
class ConfigManager(BaseConfigManager):
"""Custom config manager for Python SDK
"""
def __init__(self) -> None:
try:
self.config = {
"environment": {
"storage_url": os.environ["STORAGE_URL"],
"data_partition_id": os.environ["DATA_PARTITION_ID"],
"use_service_principal": False
},
"provider": {
"name": os.environ["CLOUD_PROVIDER"]
}
}
except KeyError as error:
logger.error(
"One or more of the following environmental variables weren't set:\n"
"STORAGE_URL,\n"
"DATA_PARTITION_ID,\n"
"CLOUD_PROVIDER."
)
raise error
def get(self, section: str, option: str) -> str:
try:
config_value = self.config[section][option]
except KeyError:
config_value = "stub"
return config_value
def getint(self, section: str, option: str) -> int:
try:
config_value = self.config[section][option]
except KeyError:
config_value = 0
return config_value
def getfloat(self, section: str, option: str) -> float:
try:
config_value = self.config[section][option]
except KeyError:
config_value = 0.0
return config_value
def getbool(self, section: str, option: str, default=False) -> bool:
try:
config_value = self.config[section][option]
except KeyError:
config_value = default
return config_value
import json
import logging
import os
import threading
from typing import Iterable
import tenacity
from config_manager import ConfigManager
from prepare_records import RecordsPreparer
from osdu_api.clients.storage.record_client import RecordClient
from osdu_api.model.storage.record import Record
from osdu_ingestion.libs.refresh_token import BaseTokenRefresher
from utils import prepared_manifests_records
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
error_report = []
# get optional environmental variables
try:
THREAD_NUMBER = int(os.environ.get("THREAD_NUMBER", 3))
except ValueError as e:
logger.error(
f"Environmental variable THREAD_NUMBER must be type of integer.")
raise e
# get environmental variables
try:
ACL_OWNERS = json.loads(os.environ["ACL_OWNERS"])
ACL_VIEWERS = json.loads(os.environ["ACL_VIEWERS"])
LEGALTAGS = json.loads(os.environ["LEGALTAGS"])
MANIFESTS_DIR = os.environ["MANIFESTS_DIR"]
except (KeyError, json.JSONDecodeError) as e:
logger.error(
"The following env variable must be set:\n"
"\tACL_OWNERS='[\"value_1\", \"value_2\"]'\n"
"\tACL_VIEWERS='[\"value_1\", \"value_2\"]'\n"
"\tLEGALTAGS='[\"value_1\", \"value_2\"]'\n"
"\tMANIFESTS_DIR='/path/to/manifests'\n"
)
raise e
def on_storage_error_callback(retry_state: tenacity.RetryCallState):
try:
retry_state.outcome.result()
except Exception as e:
print(e)
class DeleteFromStorageThread(threading.Thread):
_lock = threading.Lock()
def __init__(self, storage_client: RecordClient, records: Iterable[Record]):
super().__init__()
self._storage_client = storage_client
self._records = records
@tenacity.retry(
wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_attempt(1),
retry_error_callback=on_storage_error_callback,
reraise=True
)
def send_storage_request(self, record: Record):
print(f"Delete record {record.id}")
self._storage_client.delete_record(record.id)
def run(self):
while True:
try:
self._lock.acquire()
record = next(self._records)
self._lock.release()
self._send_storage_request(record)
except StopIteration:
logger.info("There are no records left to save.")
self._lock.release()
break
def main():
config_manager = ConfigManager()
records_preparer = RecordsPreparer(
data_partition_id=config_manager.get(
"environment", "data_partition_id"),
acl_owners=ACL_OWNERS,
acl_viewers=ACL_VIEWERS,
legaltags=LEGALTAGS
)
token_refresher = BaseTokenRefresher()
token_refresher.refresh_token()
storage_client = RecordClient(
config_manager=config_manager, token_refresher=token_refresher)
manifests_records = prepared_manifests_records(
records_preparer, MANIFESTS_DIR)
threads = []
for _ in THREAD_NUMBER:
threads.append(DeleteFromStorageThread(storage_client, manifests_records))
for t in threads:
t.start()
for t in threads:
t.join()
if __name__ == "__main__":
main()
#! /bin/sh
set -x
if [ -z "${MANIFESTS_DIR}" ]; then
echo "Set 'MANIFESTS_DIR' variable"
exit 1
else
mkdir -p "${MANIFESTS_DIR}"
fi
if [ -z "${DATA_BRANCH}" ]; then
echo "Set 'DATA_BRANCH' variable to master"
DATA_BRANCH="master"
fi
REFERENCE_VALUES="https://community.opengroup.org/osdu/data/data-definitions/-/archive/${DATA_BRANCH}/data-definitions-${DATA_BRANCH}.zip?path=ReferenceValues/Manifests/reference-data"
TNO_VOLVE_VALUES="https://community.opengroup.org/osdu/platform/data-flow/data-loading/open-test-data/-/archive/${DATA_BRANCH}/open-test-data-${DATA_BRANCH}.zip?path=rc--3.0.0/4-instances"
curl -o reference-data.zip "$REFERENCE_VALUES"
if unzip -t "reference-data.zip" > /dev/null; then
echo "Succesfully downloaded and verified reference-data.zip"
else
echo "reference-data.zip has not been downloaded."
exit 1
fi
curl -o tno-volve-data.zip "$TNO_VOLVE_VALUES"
if unzip -t "tno-volve-data.zip" > /dev/null; then
echo "Succesfully downloaded and verified tno-volve-data.zip"
else
echo "tno-volve-data.zip has not been downloaded."
exit 1
fi
unzip -o reference-data.zip -d "$MANIFESTS_DIR" > /dev/null
unzip -o tno-volve-data.zip -d "$MANIFESTS_DIR" > /dev/null
rm -f reference-data.zip
rm -f tno-volve-data.zip
find "$MANIFESTS_DIR" -type f -name 'IngestionSequence.json' -delete
find "$MANIFESTS_DIR" -type f -name 'ReferenceValueTypeDependencies.json' -delete
find "$MANIFESTS_DIR" -type d -name 'work-products' -exec rm -rf {} +
import json
import re
from typing import Iterator
from osdu_api.model.storage.acl import Acl
from osdu_api.model.storage.legal import Legal
from osdu_api.model.storage.record import Record as Record
from osdu_api.model.storage.record_ancestry import RecordAncestry
class RecordsPreparer:
"""
This class is used for preparing records from the Manifest to be sent via StorageClient
"""
RECORD_ID_REGEX = r"\"([\w\-\.\{\}]+)(:[\w\-\.]+\-\-[\w\-\.]+:[\w\-\.\:\%]+)\""
RECORD_KIND_REGEX = r"\"([\w\-\.]+)(:[\w\-\.]+:[\w\-\.]+:[0-9]+.[0-9]+.[0-9]+)\""
def __init__(
self,
data_partition_id: str,
acl_owners: list,
acl_viewers: list,
legaltags: list,
country_codes: str = ["US"]
) -> None:
self.data_partition_id = data_partition_id
self.acl = Acl(viewers=acl_viewers, owners=acl_owners)
self.legal = Legal(
legaltags=legaltags,
other_relevant_data_countries=country_codes,
status=""
)
# FIXME: Status is not working with Storage service. Need to investigate this problem further
try:
del self.legal.status
except AttributeError:
pass
def _data_partition_id_repl_func(self, match: re.Match) -> str:
value_without_data_partition = match.group(2)
return f"\"{self.data_partition_id}{value_without_data_partition}\""
def _replace_data_partition_id(self, manifest_content: str) -> str:
"""Replace data-partition-id in the Manifest
:param manifest_content: Manifest content
:type manifest_content: str
:return: Manifest content with replaced data-partition-id
:rtype: str
"""
manifest_content = re.sub(
self.RECORD_ID_REGEX,
self._data_partition_id_repl_func,
manifest_content
)
return manifest_content
def _prepare_record(self, record_dict: dict) -> Record:
record_ancestry = RecordAncestry(
parents=record_dict.get("ancestry", {}).get("parents", [])
)
record = Record(
kind=record_dict["kind"],
acl=self.acl,
legal=self.legal,
id=record_dict.get("id"),
data=record_dict["data"],
meta=record_dict.get("meta"),
ancestry=record_ancestry
)
return record
def manifest_records(self, manifest_string: str) -> Iterator[Record]:
"""Generator that gets a Manifest file as a string,
applies changes to it and yields osdu_api model Record ony by one.
:param manifest_string: content of the Manifest file
:type manifest_string: str
:yield: osdu_api model Record object
:rtype: Iterator[Record]
"""
manifest_string = self._replace_data_partition_id(manifest_string)
manifest_dict = json.loads(manifest_string)
record_list = manifest_dict.get(
"ReferenceData", []) + manifest_dict.get("MasterData", [])
for record_dict in record_list:
yield self._prepare_record(record_dict)
google-api-core==1.31.5
google-auth==1.32.1
google-cloud-core==1.7.2
google-cloud-storage==1.40.0
requests==2.25.1
tenacity==6.2.0
--extra-index-url https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple
osdu-api~=0.17.0rc1
--extra-index-url https://community.opengroup.org/api/v4/projects/823/packages/pypi/simple
osdu-ingestion==0.16.0
from typing import List
from osdu_api.clients.base_client import BaseClient
from osdu_api.model.http_method import HttpMethod
from osdu_api.model.storage.query_records_request import QueryRecordsRequest
from osdu_api.model.storage.record import Record
from osdu_api.clients.storage.record_client import RecordClient
class StorageClient(RecordClient):
def create_update_records(self, records: List[Record], bearer_token=None):
"""
Need to override this method to add extra query argument to skip duplicates.
"""
records_data = '['
for record in records:
records_data = '{}{}{}'.format(records_data, record.to_JSON(), ',')
records_data = records_data[:-1]
records_data = '{}{}'.format(records_data, ']')
return self.make_request(method=HttpMethod.PUT, url='{}{}'.format(self.storage_url, '/records?skipdupes=true'), data=records_data, bearer_token=bearer_token)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment