Skip to content
Snippets Groups Projects
Commit 83077971 authored by Siarhei Khaletski (EPAM)'s avatar Siarhei Khaletski (EPAM) :triangular_flag_on_post:
Browse files

Merge branch 'GONRG-3119_Python_SDK_Usage' into 'master'

GONRG-3119: Python SDK Usage

See merge request !2
parents 2c4a39e8 bf634127
No related branches found
No related tags found
1 merge request!2GONRG-3119: Python SDK Usage
Pipeline #76880 passed
Showing
with 165 additions and 209 deletions
......@@ -37,6 +37,7 @@ test-libs:
stage: test
script:
- pip install -r requirements-dev.txt
- export CLOUD_PROVIDER=provider_test && export OSDU_API_CONFIG_INI=./osdu_ingestion/tests/libs-unit-tests/osdu_api.ini
- python -m pytest ./osdu_ingestion/tests/libs-unit-tests
......
......@@ -22,6 +22,8 @@ from typing import List
import requests
import tenacity
from osdu_api.auth.authorization import TokenRefresher, authorize
from osdu_api.clients.storage.record_client import RecordClient
from osdu_api.model.storage.record import Record
from osdu_ingestion.libs.constants import RETRIES, WAIT
from osdu_ingestion.libs.context import Context
......@@ -48,7 +50,6 @@ class ManifestProcessor(HeadersMixin):
def __init__(
self,
storage_url: str,
file_handler: FileHandler,
source_file_checker: SourceFileChecker,
token_refresher: TokenRefresher,
......@@ -60,8 +61,6 @@ class ManifestProcessor(HeadersMixin):
:type file_handler: FileHandler
:param source_file_checker: An instance of file checker
:type source_file_checker: SourceFileChecker
:param storage_url: The OSDU Storage base url
:type storage_url: str
:param context: The tenant context
:type context: Context
:param token_refresher: An instance of token refresher
......@@ -70,9 +69,7 @@ class ManifestProcessor(HeadersMixin):
super().__init__(context)
self.file_handler = file_handler
self.source_file_checker = source_file_checker
self.storage_url = storage_url
self.context = context
self.token_refresher = token_refresher
self.record_client = RecordClient(token_refresher=token_refresher, data_partition_id=context.data_partition_id)
@staticmethod
def _get_kind_name(kind: str) -> str:
......@@ -111,15 +108,14 @@ class ManifestProcessor(HeadersMixin):
raise ValueError(f"Invalid answer from Storage server: {response_dict}")
@tenacity.retry(**RETRY_SETTINGS)
@authorize()
def save_record_to_storage(self, headers: dict, request_data: List[dict]) -> requests.Response:
def save_record_to_storage(self, request_data: List[dict]) -> requests.Response:
"""
Send request to record storage API.
"""
request_data = json.dumps(request_data)
logger.info("Sending records to Storage service")
logger.debug(f"{request_data}")
response = requests.put(self.storage_url, request_data, headers=headers)
request_data = [Record.from_dict(data) for data in request_data]
response = self.record_client.create_update_records(request_data)
if response.ok:
response_dict = response.json()
self._validate_storage_response(response_dict)
......@@ -180,8 +176,7 @@ class ManifestProcessor(HeadersMixin):
for manifest_record in manifest_records:
populated_manifest_records.append(
self._delete_surrogate_key(manifest_record.entity_data))
save_manifests_response = self.save_record_to_storage(
self.request_headers, request_data=populated_manifest_records)
save_manifests_response = self.save_record_to_storage(populated_manifest_records)
record_ids.extend(save_manifests_response.json()["recordIds"])
return record_ids
......@@ -19,7 +19,7 @@ R3 Process Single Manifest helper.
"""
import logging
from typing import Iterator, List, Set, Tuple
from typing import List, Set, Tuple
from osdu_ingestion.libs.constants import (FIRST_STORED_RECORD_INDEX,
SAVE_RECORDS_BATCH_SIZE)
......
......@@ -17,16 +17,18 @@
import json
import logging
from os.path import defpath
from typing import Generator, Set
import requests
import tenacity
from osdu_api.auth.authorization import TokenRefresher, authorize
from requests import Response
from osdu_api.clients.search.search_client import SearchClient
from osdu_api.model.search.query_request import \
QueryRequest as SearchQueryRequest
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.exceptions import RecordsNotSearchableError
from osdu_ingestion.libs.mixins import HeadersMixin
logger = logging.getLogger()
......@@ -37,15 +39,12 @@ TIMEOUT = 10
DEFAULT_LIMIT = 200
class SearchId(HeadersMixin):
class SearchId:
"""Class for stored records search validation."""
def __init__(self, search_url: str, record_ids: list, token_refresher: TokenRefresher,
context: Context, limit: int = DEFAULT_LIMIT):
def __init__(self, record_ids: list, token_refresher: TokenRefresher, context: Context, limit: int = DEFAULT_LIMIT):
"""Init search id validator.
:param search_url: Base OSDU Search service url
:type search_url: str
:param record_ids: The list of records id to be searched
:type record_ids: list
:param token_refresher: An instance of token refresher
......@@ -56,16 +55,14 @@ class SearchId(HeadersMixin):
:type limit: int
:raises ValueError: When a mismatch of record counts occur
"""
super().__init__(context)
if not record_ids:
logger.error("There are no record ids")
raise ValueError("There are no record id")
self.record_ids = record_ids
self.search_url = search_url
self.expected_total_count = len(record_ids)
self.token_refresher = token_refresher
self.search_client = SearchClient(token_refresher=token_refresher, data_partition_id=context.data_partition_id)
self.limit = limit
self._create_request_body()
def _create_search_query(self) -> str:
"""Create search query to send to Search service using recordIds need
......@@ -79,16 +76,6 @@ class SearchId(HeadersMixin):
query = f"id:({record_ids})"
return query
def _create_request_body(self):
"""Create request body to send to Search service."""
query = self._create_search_query()
request_body = {
"kind": "*:*:*:*",
"returnedFields": ["id", "version", "acl", "kind", "legal"],
"query": query
}
self.request_body = json.dumps(request_body)
def _is_record_searchable(self, response: requests.Response) -> bool:
"""Check if search service returns expected totalCount of records.
......@@ -112,55 +99,39 @@ class SearchId(HeadersMixin):
stop=tenacity.stop_after_attempt(RETRIES),
reraise=True
)
@authorize()
def search_files(self, headers: dict) -> requests.Response:
def search_files(self) -> requests.Response:
"""Send request with recordIds to Search service.
:param headers: The request headers
:type headers: dict
:raises RecordsNotSearchableError: When any of the records is not
searchable
:return: The server response
:rtype: requests.Response
"""
if self.request_body:
response = requests.post(
self.search_url, self.request_body, headers=headers)
if not self._is_record_searchable(response):
logger.error("Expected amount (%s) of records not found." %
self.expected_total_count,
)
raise RecordsNotSearchableError(
f"Can't find records {self.request_body}. "
f"Got response {response.json()} from Search service."
)
return response
search_query_request = SearchQueryRequest(
kind="*:*:*:*",
query=self._create_search_query(),
returned_fields=["id", "version", "acl", "kind", "legal"]
)
response = self.search_client.query_records(query_request=search_query_request)
if not self._is_record_searchable(response):
logger.error("Expected amount (%s) of records not found." %
self.expected_total_count,
)
raise RecordsNotSearchableError(
f"Can't find records {self.record_ids}. "
f"Got response {response.json()} from Search service."
)
return response
def check_records_searchable(self):
"""Check if every record in self.record_ids is searchable."""
headers = self.request_headers
self.search_files(headers)
self.search_files()
class ExtendedSearchId(SearchId):
def __init__(self, search_url: str, record_ids: list, token_refresher,
context: Context, limit: int = DEFAULT_LIMIT):
super().__init__(search_url, record_ids, token_refresher, context, limit=limit)
def _create_request_body(self):
"""
Create request body to send to Search service.
"""
query = self._create_search_query()
request_body = {
"kind": "*:*:*:*",
"query": query,
"returnedFields": ["id", "version", "acl", "kind", "legal"],
"limit": self.limit
}
self._request_body = request_body
self.request_body = json.dumps(request_body)
def __init__(self, record_ids: list, token_refresher, context: Context, limit: int = DEFAULT_LIMIT):
super().__init__(record_ids, token_refresher, context, limit=limit)
def _extract_id_from_response(self, response: dict):
results = response.get("results")
......@@ -170,40 +141,38 @@ class ExtendedSearchId(SearchId):
logger.debug(f"response ids: {record_ids}")
return record_ids
@authorize()
def _make_post_request(self, headers: dict, request_body: dict) -> Response:
return requests.post(self.search_url, request_body, headers=headers)
def search_records(self) -> Set[str]:
"""
Send request with recordIds to Search service.
"""
if self.request_body:
response = self._make_post_request(
self.request_headers, self.request_body)
logger.debug(response.text)
search_query_request = SearchQueryRequest(
kind="*:*:*:*",
query=self._create_search_query(),
returned_fields=["id", "version", "acl", "kind", "legal"]
)
response = self.search_client.query_records(query_request=search_query_request)
logger.debug(response.text)
data = response.json()
total_count = data.get("totalCount")
data = response.json()
total_count = data.get("totalCount")
logger.debug(f"Got total count {total_count}")
logger.debug(f"Got total count {total_count}")
if total_count is None:
raise ValueError(f"Got no totalCount field in Search service response. "
f"Response is {data}.")
if total_count is None:
raise ValueError(f"Got no totalCount field in Search service response. "
f"Response is {data}.")
response_records_ids = set(self._extract_id_from_response(data))
response_records_ids = set(self._extract_id_from_response(data))
cursor = total_count > self.limit
if cursor:
logger.debug("Start cursor iteration")
cursor = total_count > self.limit
if cursor:
logger.debug("Start cursor iteration")
offset = self.limit
for ids in self._iterate_cursor(offset, total_count):
response_records_ids.update(ids)
offset = self.limit
for ids in self._iterate_cursor(offset, total_count):
response_records_ids.update(ids)
return response_records_ids
return set()
return response_records_ids
def _iterate_cursor(self, offset: int, total_count: int) -> Generator:
"""
......@@ -211,8 +180,13 @@ class ExtendedSearchId(SearchId):
Fetch all records using offest.
"""
while offset <= total_count - self.limit:
response = self._make_post_request(
self.request_headers, json.dumps({"offset": offset, **self._request_body}))
search_query_request = SearchQueryRequest(
kind="*:*:*:*",
query=self._create_search_query(),
offset=offset,
returned_fields=["id", "version", "acl", "kind", "legal"]
)
response = self.search_client.query_records(query_request=search_query_request)
data = response.json()
ids = set(self._extract_id_from_response(data))
......
......@@ -20,6 +20,11 @@ import logging
import requests
from osdu_api.auth.authorization import TokenRefresher, authorize
from osdu_api.clients.data_workflow.data_workflow_client import \
DataWorkflowClient
from osdu_api.configuration.config_manager import DefaultConfigManager
from osdu_api.model.data_workflow.update_status_request import \
UpdateStatusRequest
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.mixins import HeadersMixin
......@@ -30,6 +35,9 @@ logger = logging.getLogger()
class UpdateStatus(HeadersMixin):
"""Class to perform update status of the workflow."""
FAILED_STATUS = "failed"
FINISHED_STATUS = "finished"
def __init__(
self,
workflow_name: str,
......@@ -59,15 +67,20 @@ class UpdateStatus(HeadersMixin):
"""
super().__init__(context)
self.workflow_name = workflow_name
self.workflow_url = workflow_url
self.workflow_id = workflow_id
self.workflow_url = workflow_url
self.run_id = run_id
self.context = context
self.status = status
self.token_refresher = token_refresher
config_manager=DefaultConfigManager()
self.workflow_client = DataWorkflowClient(
config_manager=config_manager,
token_refresher=token_refresher,
data_partition_id=context.data_partition_id
)
@authorize()
def update_status_request(self, headers: dict) -> requests.Response:
def update_status_request(self) -> requests.Response:
"""Send request to update status.
:param headers: The request headers
......@@ -75,14 +88,9 @@ class UpdateStatus(HeadersMixin):
:return: The Workflow server response
:rtype: requests.Response
"""
request_body = {
"status": self.status
}
request_body = json.dumps(request_body)
logger.debug(f"Sending request '{request_body}'")
update_status_url = f"{self.workflow_url}/v1/workflow/{self.workflow_name}/workflowRun/{self.run_id}"
logger.debug(f"Workflow URL: {update_status_url}")
response = requests.put(update_status_url, request_body, headers=headers)
update_status_request = UpdateStatusRequest(self.status)
logger.debug(f"Sending request '{update_status_request}'")
response = self.workflow_client.update_status(update_status_request, self.workflow_name, self.run_id)
return response
@authorize()
......@@ -107,6 +115,6 @@ class UpdateStatus(HeadersMixin):
"""Updates workflow status."""
headers = self.request_headers
if self.workflow_name:
self.update_status_request(headers)
self.update_status_request()
else:
self.update_status_request_old(headers)
......@@ -20,8 +20,7 @@ from osdu_ingestion.libs.constants import (DATA_SECTION, DATASETS_SECTION,
SEARCH_ID_BATCH_SIZE,
WORK_PRODUCT_COMPONENTS_SECTION)
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.exceptions import (EmptyManifestError,
ValidationIntegrityError)
from osdu_ingestion.libs.exceptions import ValidationIntegrityError
from osdu_ingestion.libs.linearize_manifest import ManifestLinearizer
from osdu_ingestion.libs.manifest_analyzer import ManifestAnalyzer
from osdu_ingestion.libs.search_record_ids import ExtendedSearchId
......@@ -41,7 +40,6 @@ class ManifestIntegrity:
def __init__(
self,
search_url: str,
token_refresher,
file_source_validator: FileSourceValidator,
context: Context,
......@@ -49,7 +47,6 @@ class ManifestIntegrity:
search_id_batch_size: int = SEARCH_ID_BATCH_SIZE,
):
self.search_url = search_url
self.token_refresher = token_refresher
self.file_source_validator = file_source_validator
self.context = context
......@@ -94,8 +91,11 @@ class ManifestIntegrity:
# TODO: Move ExtendedSearchId() to the class attribute.
# Refactor ExtendedSearchId().search_records() to take records to search
search_handler = ExtendedSearchId(self.search_url, external_references_without_version,
self.token_refresher, self.context)
search_handler = ExtendedSearchId(
external_references_without_version,
self.token_refresher,
self.context
)
found_ids = search_handler.search_records()
missing_ids.update(self._filter_not_found_ids(ids_batch, found_ids))
......
......@@ -27,6 +27,7 @@ import tenacity
from dateutil import parser as date_parser
from jsonschema import FormatChecker, exceptions
from osdu_api.auth.authorization import TokenRefresher, authorize
from osdu_api.clients.schema.schema_client import SchemaClient
from osdu_ingestion.libs.constants import (DATA_SECTION, DATASETS_SECTION,
MASTER_DATA_SECTION,
......@@ -79,14 +80,10 @@ def validate_date_time(value: str) -> bool:
class OSDURefResolver(jsonschema.RefResolver):
"""Extends base jsonschema resolver for OSDU."""
def __init__(self, schema_service: str, *args, **kwargs):
def __init__(self, *args, **kwargs):
"""Implements the schema validatoe
:param schema_service: The base url for schema service
:type schema_service: str
"""
super(OSDURefResolver, self).__init__(*args, **kwargs)
self.schema_service = schema_service
def resolve_fragment(self, document: dict, fragment: str) -> dict:
"""
......@@ -115,7 +112,6 @@ class SchemaValidator(HeadersMixin):
def __init__(
self,
schema_service: str,
token_refresher: TokenRefresher,
context: Context,
surrogate_key_fields_paths: SurrogateKeysPaths = None,
......@@ -123,33 +119,15 @@ class SchemaValidator(HeadersMixin):
):
"""Init SchemaValidator.
:param schema_service: The base OSDU Schema service url
:param token_refresher: An instance of token refresher
:param context: The tenant context
"""
super().__init__(context)
self.schema_service = schema_service
self.context = context
self.token_refresher = token_refresher
self.resolver_handlers = {
"osdu": self.get_schema_request,
"https": self.get_schema_request,
self.context.data_partition_id: self.get_schema_request
}
self.schema_client = SchemaClient(token_refresher=token_refresher, data_partition_id=context.data_partition_id)
self.surrogate_key_fields_paths = surrogate_key_fields_paths or []
self.data_types_with_surrogate_ids = data_types_with_surrogate_ids or []
@tenacity.retry(
wait=tenacity.wait_fixed(TIMEOUT),
stop=tenacity.stop_after_attempt(RETRIES),
reraise=True
)
@authorize()
def _get_schema_from_schema_service(self, headers: dict, uri: str) -> requests.Response:
"""Send request to Schema service to retrieve schema."""
response = requests.get(uri, headers=headers, timeout=60)
return response
def _clear_data_fields(self, schema_part: Union[dict, list]):
"""
Clear a schema's ReferenceData, Data and MasterData fields".
......@@ -164,20 +142,11 @@ class SchemaValidator(HeadersMixin):
if schema_part.get("MasterData"):
schema_part["MasterData"] = {}
def get_schema_request(self, uri: str) -> dict:
"""Get schema from Schema service. Change $id field to url.
:param uri: The URI to fetch the schema
:type uri: str
:return: The Schema service response
:rtype: dict
"""
if uri.startswith("osdu") or uri.startswith(self.context.data_partition_id):
uri = f"{self.schema_service}/{uri}"
response = self._get_schema_from_schema_service(self.request_headers, uri).json()
response["$id"] = uri
return response
@tenacity.retry(
wait=tenacity.wait_fixed(TIMEOUT),
stop=tenacity.stop_after_attempt(RETRIES),
reraise=True
)
@lru_cache()
def get_schema(self, kind: str) -> Union[dict, None]:
"""Fetch schema from Schema service.
......@@ -193,14 +162,13 @@ class SchemaValidator(HeadersMixin):
:return: Schema server response
:rtype: dict
"""
manifest_schema_uri = f"{self.schema_service}/{kind}"
try:
response = self.get_schema_request(manifest_schema_uri)
schema = self.schema_client.get_schema_by_id(kind).json()
except requests.HTTPError as e:
logger.error(f"Error on getting schema of kind '{kind}'")
logger.error(e)
return None
return response
return schema
@staticmethod
def _extend_pattern_with_surrogate_key(pattern: str) -> str:
......@@ -328,10 +296,8 @@ class SchemaValidator(HeadersMixin):
:return:
"""
resolver = OSDURefResolver(
schema_service=self.schema_service,
base_uri=schema.get("$id", ""),
referrer=schema,
handlers=self.resolver_handlers,
cache_remote=True
)
jsonschema.validate(
......
# Copyright © 2020 Amazon Web Services
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[environment]
data_partition_id=opendes
storage_url=blah/api/storage/v2
partition_url=blah/api/storage/v2
search_url=blah/api/search/v2
legal_url=blah/api/legal/v1
data_workflow_url=blah/api/data-workflow/v1
entitlements_url=blah/api/entitlements/v1
file_dms_url=blah/api/filedms/v2
dataset_url=blah/api/dataset-registry/v1
schema_url=blah/api/schema-service/v1
ingestion_workflow_url=stub
use_service_principal=no
[provider]
name=provider_test
service_principal_module_name=service_principal_util
token_url_ssm_path=/osdu/blah/oauth-token-uri
aws_oauth_custom_scope_ssm_path=/osdu/blah/oauth-custom-scope
client_id_ssm_path=/osdu/blah/client-credentials-client-id
client_secret_name=/osdu/blah/client_credentials_secret
client_secret_dict_key=client_credentials_client_secret
region_name=blah
\ No newline at end of file
......@@ -47,7 +47,7 @@ class TestIntegrityProvider:
@pytest.fixture
def manifest_integrity(self) -> ManifestIntegrity:
context = Context(app_key="", data_partition_id="test")
manifest_integrity = ManifestIntegrity("", BaseTokenRefresher(get_test_credentials()),
manifest_integrity = ManifestIntegrity(BaseTokenRefresher(get_test_credentials()),
FileSourceValidator(),
context)
return manifest_integrity
......
......@@ -16,10 +16,6 @@
import http
import json
import os
import sys
from osdu_ingestion.libs.context import Context
from osdu_ingestion.libs.handle_file import FileHandler
......@@ -94,7 +90,6 @@ class TestManifestProcessor:
file_handler = FileHandler("test", token_refresher, context)
source_file_checker = SourceFileChecker()
manifest_processor = process_manifest_r3.ManifestProcessor(
storage_url="",
token_refresher=token_refresher,
context=context,
file_handler=file_handler,
......@@ -138,12 +133,13 @@ class TestManifestProcessor:
manifest_processor: process_manifest_r3.ManifestProcessor,
manifest_records,
mock_records_list: list,
traversal_manifest_file: str,
conf_path: str,
records_file_path: str
):
self.monkeypatch_storage_response(monkeypatch)
manifest_processor.save_record_to_storage({}, records_file_path)
with open(records_file_path) as f:
records = json.load(f)
manifest_processor.save_record_to_storage(records)
@pytest.mark.parametrize(
"conf_path,traversal_manifest_file",
......@@ -157,12 +153,14 @@ class TestManifestProcessor:
monkeypatch,
manifest_records,
manifest_processor: process_manifest_r3.ManifestProcessor,
traversal_manifest_file: str,
conf_path: str
):
self.monkeypatch_storage_response(monkeypatch, b"{}")
with open(conf_path) as f:
manifest = json.load(f)
record = manifest["execution_context"]["manifest"]["Data"]["WorkProduct"]
with pytest.raises(ValueError):
manifest_processor.save_record_to_storage({}, [{}])
manifest_processor.save_record_to_storage([record])
@pytest.mark.parametrize(
"conf_path,traversal_manifest_file",
......@@ -181,8 +179,11 @@ class TestManifestProcessor:
conf_path: str
):
self.monkeypatch_storage_response_error(monkeypatch, http.HTTPStatus.INTERNAL_SERVER_ERROR)
with open(conf_path) as f:
manifest = json.load(f)
record = manifest["execution_context"]["manifest"]["Data"]["WorkProduct"]
with pytest.raises(requests.HTTPError):
manifest_processor.save_record_to_storage({}, conf_path)
manifest_processor.save_record_to_storage([record])
@pytest.mark.parametrize(
"conf_path,traversal_manifest_file",
......
......@@ -66,7 +66,6 @@ class TestSchemaValidator:
):
context = Context(app_key="", data_partition_id="")
validator = SchemaValidator(
"",
BaseTokenRefresher(get_test_credentials()),
context
)
......@@ -153,25 +152,6 @@ class TestSchemaValidator:
assert not valid_entities
assert len(skipped_entities)
@pytest.mark.parametrize(
"manifest_file,traversal_manifest_file,schema_file,kind",
[
pytest.param(
MANIFEST_WELLBORE_VALID_PATH,
TRAVERSAL_WELLBORE_VALID_PATH,
SCHEMA_WELLBORE_VALID_PATH,
"opendes:osdu:Wellbore:0.3.0",
id="Valid manifest Wellore"),
]
)
def test_get_schema_request(self,
schema_validator: SchemaValidator,
manifest_file: str,
traversal_manifest_file: str,
schema_file: str,
kind: str):
schema_validator.get_schema_request(kind)
@pytest.mark.parametrize(
"manifest_file,traversal_manifest_file,schema_file,kind",
[
......@@ -301,7 +281,6 @@ class TestSchemaValidator:
def test_delete_refs(self):
context = Context(app_key="", data_partition_id="")
validator = SchemaValidator(
"",
BaseTokenRefresher(get_test_credentials()),
context
)
......@@ -368,7 +347,6 @@ class TestSchemaValidator:
context = Context(app_key="", data_partition_id="osdu")
validator = SchemaValidator(
"",
BaseTokenRefresher(get_test_credentials()),
context,
surrogate_key_fields_paths=surrogate_fields_paths,
......@@ -394,7 +372,6 @@ class TestSchemaValidator:
data = json.load(f)
context = Context(app_key="", data_partition_id="")
schema_validator = SchemaValidator(
"",
BaseTokenRefresher(get_test_credentials()),
context
)
......@@ -449,7 +426,6 @@ class TestSchemaValidator:
date_time_data = {"date-time-data": datetime_value}
context = Context(app_key="", data_partition_id="")
schema_validator = SchemaValidator(
"",
BaseTokenRefresher(get_test_credentials()),
context
)
......@@ -475,7 +451,6 @@ class TestSchemaValidator:
date_time_data = {"date-time-data": datetime_value}
context = Context(app_key="", data_partition_id="")
schema_validator = SchemaValidator(
"",
BaseTokenRefresher(get_test_credentials()),
context
)
......
......@@ -71,7 +71,7 @@ class TestManifestProcessor:
try:
return next(responses)
except StopIteration:
return MockSearchResponseForRecords([], status_code=status_code, total_count=total_count, cursor=cursor)
return MockSearchResponseForRecords([], status_code=status_code, total_count=total_count)
monkeypatch.setattr(requests, "post", mock_response)
......@@ -94,7 +94,7 @@ class TestManifestProcessor:
def test_search_found_all_records(self, monkeypatch, record_ids: list,
search_response_path: str):
self.mock_storage_response(monkeypatch, search_response_path, total_count=len(record_ids))
id_searcher = SearchId("http://test", record_ids,
id_searcher = SearchId(record_ids,
BaseTokenRefresher(get_test_credentials()),
Context(app_key="", data_partition_id=""))
id_searcher.check_records_searchable()
......@@ -117,8 +117,7 @@ class TestManifestProcessor:
invalid_total_count = len(record_ids) - 1
self.mock_storage_response(monkeypatch, search_response_path,
total_count=invalid_total_count)
id_searcher = SearchId("", record_ids, BaseTokenRefresher(get_test_credentials()),
Context(app_key="", data_partition_id=""))
id_searcher = SearchId(record_ids, BaseTokenRefresher(get_test_credentials()), Context(app_key="", data_partition_id=""))
with pytest.raises(RecordsNotSearchableError):
id_searcher.check_records_searchable()
......@@ -138,9 +137,7 @@ class TestManifestProcessor:
def test_search_got_wrong_response_value(self, monkeypatch, record_ids: list,
search_response_path: str):
self.mock_storage_response(monkeypatch, search_response_path)
id_searcher = SearchId("http://test", record_ids,
BaseTokenRefresher(get_test_credentials()),
Context(app_key="", data_partition_id=""))
id_searcher = SearchId(record_ids, BaseTokenRefresher(get_test_credentials()), Context(app_key="", data_partition_id=""))
with pytest.raises(ValueError):
id_searcher.check_records_searchable()
......@@ -157,8 +154,7 @@ class TestManifestProcessor:
search_response_path: str):
self.mock_storage_response(monkeypatch, search_response_path)
with pytest.raises(ValueError):
SearchId("http://test", record_ids, BaseTokenRefresher(get_test_credentials()),
Context(app_key="", data_partition_id=""))
SearchId(record_ids, BaseTokenRefresher(get_test_credentials()), Context(app_key="", data_partition_id=""))
@pytest.mark.parametrize(
"record_ids,search_response_path,extracted_ids_path",
......@@ -176,9 +172,11 @@ class TestManifestProcessor:
self.mock_storage_response(monkeypatch, search_response_path, total_count=len(record_ids))
with open(search_response_path) as f:
response = json.load(f)
id_searcher = ExtendedSearchId("http://test", record_ids,
BaseTokenRefresher(get_test_credentials()),
Context(app_key="", data_partition_id=""))
id_searcher = ExtendedSearchId(
record_ids,
BaseTokenRefresher(get_test_credentials()),
Context(app_key="", data_partition_id="")
)
record_ids = id_searcher._extract_id_from_response(response)
with open(extracted_ids_path) as f:
extracted_ids = json.load(f)
......@@ -200,10 +198,12 @@ class TestManifestProcessor:
expected_length = 80
self.mock_storage_response_with_chunks(monkeypatch, search_response_path)
id_searcher = ExtendedSearchId("http://test", record_ids,
BaseTokenRefresher(get_test_credentials()),
Context(app_key="", data_partition_id=""),
limit=20)
id_searcher = ExtendedSearchId(
record_ids,
BaseTokenRefresher(get_test_credentials()),
Context(app_key="", data_partition_id=""),
limit=20
)
record_ids = id_searcher.search_records()
assert len(record_ids) == expected_length
......@@ -44,8 +44,8 @@ class TestUpdateStatus:
run_id = conf["run_id"]
status_updater = UpdateStatus(
workflow_name=workflow_name,
workflow_url="http://test",
workflow_id="",
workflow_url="https://stub",
run_id=run_id,
token_refresher=BaseTokenRefresher(get_test_credentials()),
context=context,
......
......@@ -7,4 +7,4 @@ responses==0.12.1
twine==3.2.0
--extra-index-url=https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple
osdu-api~=0.12.0
osdu-api~=0.13.0.dev241
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment