Commit bf634127 authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

GONRG-3119_Python_SDK_Usage

parent 2c4a39e8
Pipeline #76109 failed with stages
in 52 seconds
......@@ -37,6 +37,7 @@ test-libs:
stage: test
script:
- pip install -r requirements-dev.txt
- export CLOUD_PROVIDER=provider_test && export OSDU_API_CONFIG_INI=./osdu_ingestion/tests/libs-unit-tests/osdu_api.ini
- python -m pytest ./osdu_ingestion/tests/libs-unit-tests
......
......@@ -22,6 +22,8 @@ from typing import List
import requests
import tenacity
from osdu_api.auth.authorization import TokenRefresher, authorize
from osdu_api.clients.storage.record_client import RecordClient
from osdu_api.model.storage.record import Record
from osdu_ingestion.libs.constants import RETRIES, WAIT
from osdu_ingestion.libs.context import Context
......@@ -48,7 +50,6 @@ class ManifestProcessor(HeadersMixin):
def __init__(
self,
storage_url: str,
file_handler: FileHandler,
source_file_checker: SourceFileChecker,
token_refresher: TokenRefresher,
......@@ -60,8 +61,6 @@ class ManifestProcessor(HeadersMixin):
:type file_handler: FileHandler
:param source_file_checker: An instance of file checker
:type source_file_checker: SourceFileChecker
:param storage_url: The OSDU Storage base url
:type storage_url: str
:param context: The tenant context
:type context: Context
:param token_refresher: An instance of token refresher
......@@ -70,9 +69,7 @@ class ManifestProcessor(HeadersMixin):
super().__init__(context)
self.file_handler = file_handler
self.source_file_checker = source_file_checker
self.storage_url = storage_url
self.context = context
self.token_refresher = token_refresher
self.record_client = RecordClient(token_refresher=token_refresher, data_partition_id=context.data_partition_id)
@staticmethod
def _get_kind_name(kind: str) -> str:
......@@ -111,15 +108,14 @@ class ManifestProcessor(HeadersMixin):
raise ValueError(f"Invalid answer from Storage server: {response_dict}")
@tenacity.retry(**RETRY_SETTINGS)
@authorize()
def save_record_to_storage(self, headers: dict, request_data: List[dict]) -> requests.Response:
def save_record_to_storage(self, request_data: List[dict]) -> requests.Response:
"""
Send request to record storage API.
"""
request_data = json.dumps(request_data)
logger.info("Sending records to Storage service")
logger.debug(f"{request_data}")
response = requests.put(self.storage_url, request_data, headers=headers)
request_data = [Record.from_dict(data) for data in request_data]
response = self.record_client.create_update_records(request_data)
if response.ok:
response_dict = response.json()
self._validate_storage_response(response_dict)
......@@ -180,8 +176,7 @@ class ManifestProcessor(HeadersMixin):
for manifest_record in manifest_records:
populated_manifest_records.append(
self._delete_surrogate_key(manifest_record.entity_data))
save_manifests_response = self.save_record_to_storage(
self.request_headers, request_data=populated_manifest_records)
save_manifests_response = self.save_record_to_storage(populated_manifest_records)
record_ids.extend(save_manifests_response.json()["recordIds"])
return record_ids
......@@ -19,7 +19,7 @@ R3 Process Single Manifest helper.
"""
import logging
from typing import Iterator, List, Set, Tuple
from typing import List, Set, Tuple
from osdu_ingestion.libs.constants import (FIRST_STORED_RECORD_INDEX,
SAVE_RECORDS_BATCH_SIZE)
......
......@@ -17,16 +17,18 @@
import json
import logging
from os.path import defpath
from typing import Generator, Set
import requests
import tenacity
from osdu_api.auth.authorization import TokenRefresher, authorize
from requests import Response
from osdu_api.clients.search.search_client import SearchClient
from osdu_api.model.search.query_request import \
QueryRequest as SearchQueryRequest
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.exceptions import RecordsNotSearchableError
from osdu_ingestion.libs.mixins import HeadersMixin
logger = logging.getLogger()
......@@ -37,15 +39,12 @@ TIMEOUT = 10
DEFAULT_LIMIT = 200
class SearchId(HeadersMixin):
class SearchId:
"""Class for stored records search validation."""
def __init__(self, search_url: str, record_ids: list, token_refresher: TokenRefresher,
context: Context, limit: int = DEFAULT_LIMIT):
def __init__(self, record_ids: list, token_refresher: TokenRefresher, context: Context, limit: int = DEFAULT_LIMIT):
"""Init search id validator.
:param search_url: Base OSDU Search service url
:type search_url: str
:param record_ids: The list of records id to be searched
:type record_ids: list
:param token_refresher: An instance of token refresher
......@@ -56,16 +55,14 @@ class SearchId(HeadersMixin):
:type limit: int
:raises ValueError: When a mismatch of record counts occur
"""
super().__init__(context)
if not record_ids:
logger.error("There are no record ids")
raise ValueError("There are no record id")
self.record_ids = record_ids
self.search_url = search_url
self.expected_total_count = len(record_ids)
self.token_refresher = token_refresher
self.search_client = SearchClient(token_refresher=token_refresher, data_partition_id=context.data_partition_id)
self.limit = limit
self._create_request_body()
def _create_search_query(self) -> str:
"""Create search query to send to Search service using recordIds need
......@@ -79,16 +76,6 @@ class SearchId(HeadersMixin):
query = f"id:({record_ids})"
return query
def _create_request_body(self):
"""Create request body to send to Search service."""
query = self._create_search_query()
request_body = {
"kind": "*:*:*:*",
"returnedFields": ["id", "version", "acl", "kind", "legal"],
"query": query
}
self.request_body = json.dumps(request_body)
def _is_record_searchable(self, response: requests.Response) -> bool:
"""Check if search service returns expected totalCount of records.
......@@ -112,55 +99,39 @@ class SearchId(HeadersMixin):
stop=tenacity.stop_after_attempt(RETRIES),
reraise=True
)
@authorize()
def search_files(self, headers: dict) -> requests.Response:
def search_files(self) -> requests.Response:
"""Send request with recordIds to Search service.
:param headers: The request headers
:type headers: dict
:raises RecordsNotSearchableError: When any of the records is not
searchable
:return: The server response
:rtype: requests.Response
"""
if self.request_body:
response = requests.post(
self.search_url, self.request_body, headers=headers)
if not self._is_record_searchable(response):
logger.error("Expected amount (%s) of records not found." %
self.expected_total_count,
)
raise RecordsNotSearchableError(
f"Can't find records {self.request_body}. "
f"Got response {response.json()} from Search service."
)
return response
search_query_request = SearchQueryRequest(
kind="*:*:*:*",
query=self._create_search_query(),
returned_fields=["id", "version", "acl", "kind", "legal"]
)
response = self.search_client.query_records(query_request=search_query_request)
if not self._is_record_searchable(response):
logger.error("Expected amount (%s) of records not found." %
self.expected_total_count,
)
raise RecordsNotSearchableError(
f"Can't find records {self.record_ids}. "
f"Got response {response.json()} from Search service."
)
return response
def check_records_searchable(self):
"""Check if every record in self.record_ids is searchable."""
headers = self.request_headers
self.search_files(headers)
self.search_files()
class ExtendedSearchId(SearchId):
def __init__(self, search_url: str, record_ids: list, token_refresher,
context: Context, limit: int = DEFAULT_LIMIT):
super().__init__(search_url, record_ids, token_refresher, context, limit=limit)
def _create_request_body(self):
"""
Create request body to send to Search service.
"""
query = self._create_search_query()
request_body = {
"kind": "*:*:*:*",
"query": query,
"returnedFields": ["id", "version", "acl", "kind", "legal"],
"limit": self.limit
}
self._request_body = request_body
self.request_body = json.dumps(request_body)
def __init__(self, record_ids: list, token_refresher, context: Context, limit: int = DEFAULT_LIMIT):
super().__init__(record_ids, token_refresher, context, limit=limit)
def _extract_id_from_response(self, response: dict):
results = response.get("results")
......@@ -170,40 +141,38 @@ class ExtendedSearchId(SearchId):
logger.debug(f"response ids: {record_ids}")
return record_ids
@authorize()
def _make_post_request(self, headers: dict, request_body: dict) -> Response:
return requests.post(self.search_url, request_body, headers=headers)
def search_records(self) -> Set[str]:
"""
Send request with recordIds to Search service.
"""
if self.request_body:
response = self._make_post_request(
self.request_headers, self.request_body)
logger.debug(response.text)
search_query_request = SearchQueryRequest(
kind="*:*:*:*",
query=self._create_search_query(),
returned_fields=["id", "version", "acl", "kind", "legal"]
)
response = self.search_client.query_records(query_request=search_query_request)
logger.debug(response.text)
data = response.json()
total_count = data.get("totalCount")
data = response.json()
total_count = data.get("totalCount")
logger.debug(f"Got total count {total_count}")
logger.debug(f"Got total count {total_count}")
if total_count is None:
raise ValueError(f"Got no totalCount field in Search service response. "
f"Response is {data}.")
if total_count is None:
raise ValueError(f"Got no totalCount field in Search service response. "
f"Response is {data}.")
response_records_ids = set(self._extract_id_from_response(data))
response_records_ids = set(self._extract_id_from_response(data))
cursor = total_count > self.limit
if cursor:
logger.debug("Start cursor iteration")
cursor = total_count > self.limit
if cursor:
logger.debug("Start cursor iteration")
offset = self.limit
for ids in self._iterate_cursor(offset, total_count):
response_records_ids.update(ids)
offset = self.limit
for ids in self._iterate_cursor(offset, total_count):
response_records_ids.update(ids)
return response_records_ids
return set()
return response_records_ids
def _iterate_cursor(self, offset: int, total_count: int) -> Generator:
"""
......@@ -211,8 +180,13 @@ class ExtendedSearchId(SearchId):
Fetch all records using offest.
"""
while offset <= total_count - self.limit:
response = self._make_post_request(
self.request_headers, json.dumps({"offset": offset, **self._request_body}))
search_query_request = SearchQueryRequest(
kind="*:*:*:*",
query=self._create_search_query(),
offset=offset,
returned_fields=["id", "version", "acl", "kind", "legal"]
)
response = self.search_client.query_records(query_request=search_query_request)
data = response.json()
ids = set(self._extract_id_from_response(data))
......
......@@ -20,6 +20,11 @@ import logging
import requests
from osdu_api.auth.authorization import TokenRefresher, authorize
from osdu_api.clients.data_workflow.data_workflow_client import \
DataWorkflowClient
from osdu_api.configuration.config_manager import DefaultConfigManager
from osdu_api.model.data_workflow.update_status_request import \
UpdateStatusRequest
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.mixins import HeadersMixin
......@@ -30,6 +35,9 @@ logger = logging.getLogger()
class UpdateStatus(HeadersMixin):
"""Class to perform update status of the workflow."""
FAILED_STATUS = "failed"
FINISHED_STATUS = "finished"
def __init__(
self,
workflow_name: str,
......@@ -59,15 +67,20 @@ class UpdateStatus(HeadersMixin):
"""
super().__init__(context)
self.workflow_name = workflow_name
self.workflow_url = workflow_url
self.workflow_id = workflow_id
self.workflow_url = workflow_url
self.run_id = run_id
self.context = context
self.status = status
self.token_refresher = token_refresher
config_manager=DefaultConfigManager()
self.workflow_client = DataWorkflowClient(
config_manager=config_manager,
token_refresher=token_refresher,
data_partition_id=context.data_partition_id
)
@authorize()
def update_status_request(self, headers: dict) -> requests.Response:
def update_status_request(self) -> requests.Response:
"""Send request to update status.
:param headers: The request headers
......@@ -75,14 +88,9 @@ class UpdateStatus(HeadersMixin):
:return: The Workflow server response
:rtype: requests.Response
"""
request_body = {
"status": self.status
}
request_body = json.dumps(request_body)
logger.debug(f"Sending request '{request_body}'")
update_status_url = f"{self.workflow_url}/v1/workflow/{self.workflow_name}/workflowRun/{self.run_id}"
logger.debug(f"Workflow URL: {update_status_url}")
response = requests.put(update_status_url, request_body, headers=headers)
update_status_request = UpdateStatusRequest(self.status)
logger.debug(f"Sending request '{update_status_request}'")
response = self.workflow_client.update_status(update_status_request, self.workflow_name, self.run_id)
return response
@authorize()
......@@ -107,6 +115,6 @@ class UpdateStatus(HeadersMixin):
"""Updates workflow status."""
headers = self.request_headers
if self.workflow_name:
self.update_status_request(headers)
self.update_status_request()
else:
self.update_status_request_old(headers)
......@@ -20,8 +20,7 @@ from osdu_ingestion.libs.constants import (DATA_SECTION, DATASETS_SECTION,
SEARCH_ID_BATCH_SIZE,
WORK_PRODUCT_COMPONENTS_SECTION)
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.exceptions import (EmptyManifestError,
ValidationIntegrityError)
from osdu_ingestion.libs.exceptions import ValidationIntegrityError
from osdu_ingestion.libs.linearize_manifest import ManifestLinearizer
from osdu_ingestion.libs.manifest_analyzer import ManifestAnalyzer
from osdu_ingestion.libs.search_record_ids import ExtendedSearchId
......@@ -41,7 +40,6 @@ class ManifestIntegrity:
def __init__(
self,
search_url: str,
token_refresher,
file_source_validator: FileSourceValidator,
context: Context,
......@@ -49,7 +47,6 @@ class ManifestIntegrity:
search_id_batch_size: int = SEARCH_ID_BATCH_SIZE,
):
self.search_url = search_url
self.token_refresher = token_refresher
self.file_source_validator = file_source_validator
self.context = context
......@@ -94,8 +91,11 @@ class ManifestIntegrity:
# TODO: Move ExtendedSearchId() to the class attribute.
# Refactor ExtendedSearchId().search_records() to take records to search
search_handler = ExtendedSearchId(self.search_url, external_references_without_version,
self.token_refresher, self.context)
search_handler = ExtendedSearchId(
external_references_without_version,
self.token_refresher,
self.context
)
found_ids = search_handler.search_records()
missing_ids.update(self._filter_not_found_ids(ids_batch, found_ids))
......
......@@ -27,6 +27,7 @@ import tenacity
from dateutil import parser as date_parser
from jsonschema import FormatChecker, exceptions
from osdu_api.auth.authorization import TokenRefresher, authorize
from osdu_api.clients.schema.schema_client import SchemaClient
from osdu_ingestion.libs.constants import (DATA_SECTION, DATASETS_SECTION,
MASTER_DATA_SECTION,
......@@ -79,14 +80,10 @@ def validate_date_time(value: str) -> bool:
class OSDURefResolver(jsonschema.RefResolver):
"""Extends base jsonschema resolver for OSDU."""
def __init__(self, schema_service: str, *args, **kwargs):
def __init__(self, *args, **kwargs):
"""Implements the schema validatoe
:param schema_service: The base url for schema service
:type schema_service: str
"""
super(OSDURefResolver, self).__init__(*args, **kwargs)
self.schema_service = schema_service
def resolve_fragment(self, document: dict, fragment: str) -> dict:
"""
......@@ -115,7 +112,6 @@ class SchemaValidator(HeadersMixin):
def __init__(
self,
schema_service: str,
token_refresher: TokenRefresher,
context: Context,
surrogate_key_fields_paths: SurrogateKeysPaths = None,
......@@ -123,33 +119,15 @@ class SchemaValidator(HeadersMixin):
):
"""Init SchemaValidator.
:param schema_service: The base OSDU Schema service url
:param token_refresher: An instance of token refresher
:param context: The tenant context
"""
super().__init__(context)
self.schema_service = schema_service
self.context = context
self.token_refresher = token_refresher
self.resolver_handlers = {
"osdu": self.get_schema_request,
"https": self.get_schema_request,
self.context.data_partition_id: self.get_schema_request
}
self.schema_client = SchemaClient(token_refresher=token_refresher, data_partition_id=context.data_partition_id)
self.surrogate_key_fields_paths = surrogate_key_fields_paths or []
self.data_types_with_surrogate_ids = data_types_with_surrogate_ids or []
@tenacity.retry(
wait=tenacity.wait_fixed(TIMEOUT),
stop=tenacity.stop_after_attempt(RETRIES),
reraise=True
)
@authorize()
def _get_schema_from_schema_service(self, headers: dict, uri: str) -> requests.Response:
"""Send request to Schema service to retrieve schema."""
response = requests.get(uri, headers=headers, timeout=60)
return response
def _clear_data_fields(self, schema_part: Union[dict, list]):
"""
Clear a schema's ReferenceData, Data and MasterData fields".
......@@ -164,20 +142,11 @@ class SchemaValidator(HeadersMixin):
if schema_part.get("MasterData"):
schema_part["MasterData"] = {}
def get_schema_request(self, uri: str) -> dict:
"""Get schema from Schema service. Change $id field to url.
:param uri: The URI to fetch the schema
:type uri: str
:return: The Schema service response
:rtype: dict
"""
if uri.startswith("osdu") or uri.startswith(self.context.data_partition_id):
uri = f"{self.schema_service}/{uri}"
response = self._get_schema_from_schema_service(self.request_headers, uri).json()
response["$id"] = uri
return response
@tenacity.retry(
wait=tenacity.wait_fixed(TIMEOUT),
stop=tenacity.stop_after_attempt(RETRIES),
reraise=True
)
@lru_cache()
def get_schema(self, kind: str) -> Union[dict, None]:
"""Fetch schema from Schema service.
......@@ -193,14 +162,13 @@ class SchemaValidator(HeadersMixin):
:return: Schema server response
:rtype: dict
"""
manifest_schema_uri = f"{self.schema_service}/{kind}"
try:
response = self.get_schema_request(manifest_schema_uri)
schema = self.schema_client.get_schema_by_id(kind).json()
except requests.HTTPError as e:
logger.error(f"Error on getting schema of kind '{kind}'")
logger.error(e)
return None
return response
return schema
@staticmethod
def _extend_pattern_with_surrogate_key(pattern: str) -> str:
......@@ -328,10 +296,8 @@ class SchemaValidator(HeadersMixin):
:return:
"""
resolver = OSDURefResolver(
schema_service=self.schema_service,
base_uri=schema.get("$id", ""),
referrer=schema,
handlers=self.resolver_handlers,
cache_remote=True
)
jsonschema.validate(
......
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[environment]
data_partition_id=opendes
storage_url=blah/api/storage/v2
partition_url=blah/api/storage/v2
search_url=blah/api/search/v2
legal_url=blah/api/legal/v1
data_workflow_url=blah/api/data-workflow/v1
entitlements_url=blah/api/entitlements/v1
file_dms_url=blah/api/filedms/v2
dataset_url=blah/api/dataset-registry/v1
schema_url=blah/api/schema-service/v1
ingestion_workflow_url=stub
use_service_principal=no
[provider]
name=provider_test
service_principal_module_name=service_principal_util
token_url_ssm_path=/osdu/blah/oauth-token-uri
aws_oauth_custom_scope_ssm_path=/osdu/blah/oauth-custom-scope
client_id_ssm_path=/osdu/blah/client-credentials-client-id
client_secret_name=/osdu/blah/client_credentials_secret
client_secret_dict_key=client_credentials_client_secret
region_name=blah
\ No newline at end of file
......@@ -47,7 +47,7 @@ class TestIntegrityProvider:
@pytest.fixture
def manifest_integrity(self) -> ManifestIntegrity:
context = Context(app_key="", data_partition_id="test")
manifest_integrity = ManifestIntegrity("", BaseTokenRefresher(get_test_credentials()),
manifest_integrity = ManifestIntegrity(BaseTokenRefresher(get_test_credentials()),
FileSourceValidator(),
context)
return manifest_integrity
......
......@@ -16,10 +16,6 @@
import http
import json
import os
import sys
from osdu_ingestion.libs.context import Context