Newer
Older
# Copyright 2020 Google LLC
# Copyright 2020 EPAM Systems
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides SchemaValidator."""
from typing import Union, Any, List
import jsonschema
import requests
import tenacity
from jsonschema import exceptions
from libs.context import Context
from libs.exceptions import EmptyManifestError, NotOSDUSchemaFormatError
from libs.mixins import HeadersMixin
from libs.refresh_token import TokenRefresher, refresh_token
logger = logging.getLogger()
RETRIES = 3
TIMEOUT = 1
class OSDURefResolver(jsonschema.RefResolver):
"""Extends base jsonschema resolver for OSDU."""
def __init__(self, schema_service: str, *args, **kwargs):
"""Implements the schema validatoe
:param schema_service: The base url for schema service
:type schema_service: str
"""
super(OSDURefResolver, self).__init__(*args, **kwargs)
self.schema_service = schema_service
def resolve_fragment(self, document: dict, fragment: str) -> dict:
"""Extend base resolve_fragment method. If a nested schema has
'definitions' field and there is a schema under this 'definitions',
jsonschema attempts to use the id field of this double-nested schema
as URI to get this schema later. So it has sense to replace this id
:param document: The schema document
:type document: dict
:param fragment: schema fragment
:type fragment: str
:return: The updated schema document
:rtype: dict
"""
document = super().resolve_fragment(document, fragment)
fragment_parts = fragment.split("/") # /definitions/<OsduID> -> [..., <OsduID>]
if len(fragment_parts) > 1:
osdu_id = fragment_parts[-1]
url = f"{self.schema_service}/{osdu_id}"
document["$id"] = url
return document
class SchemaValidator(HeadersMixin):
"""Class to validate schema of Manifests."""
def __init__(
self, schema_service: str,
token_refresher: TokenRefresher,
context: Context
):
"""Init SchemaValidator.
:param schema_service: The base OSDU Schema service url
:type schema_service: str
:param dagrun_conf: The airflow dagrun.conf
:type dagrun_conf: dict
:param token_refresher: An instance of token refresher
:type token_refresher: TokenRefresher
:param context: The tenant context
:type context: Context
"""
super().__init__(context)
self.schema_service = schema_service
self.context = context
self.token_refresher = token_refresher
self.resolver_handlers = {
"osdu": self.get_schema_request,
"https": self.get_schema_request,
self.context.data_partition_id: self.get_schema_request
}
@tenacity.retry(
wait=tenacity.wait_fixed(TIMEOUT),
stop=tenacity.stop_after_attempt(RETRIES),
reraise=True
)
def _get_schema_from_schema_service(self, headers: dict, uri: str) -> requests.Response:
"""Send request to Schema service to retrieve schema."""
response = requests.get(uri, headers=headers, timeout=60)
return response
def __delete_refs(self, schema_part: Union[dict, list]):
"""
Recursively clear a schema's object parts containing "$ref".
This method is used by generic manifest validation, deleting these fields make such a
validation.more generic.
:param schema_part:
"""
if isinstance(schema_part, dict):
if "$ref" in schema_part:
schema_part.clear()
else:
for k in schema_part:
self.__delete_refs(schema_part[k])
elif isinstance(schema_part, list):
for i in schema_part:
self.__delete_refs(i)
def get_schema_request(self, uri: str) -> dict:
"""Get schema from Schema service. Change $id field to url.
:param uri: The URI to fetch the schema
:type uri: str
:return: The Schema service response
:rtype: dict
"""
if uri.startswith("osdu") or uri.startswith(self.context.data_partition_id):
uri = f"{self.schema_service}/{uri}"
response = self._get_schema_from_schema_service(self.request_headers, uri).json()
response["$id"] = uri
return response
def get_schema(self, kind: str) -> dict:
"""Fetch schema from Schema service.
:param kind: The kind of the scheema to fetch
:type kind: str
:raises e: Generic exception
:return: Schema server response
:rtype: dict
"""
manifest_schema_uri = f"{self.schema_service}/{kind}"
try:
response = self.get_schema_request(manifest_schema_uri)
except Exception as e:
logger.error(f"Error on getting schema of kind '{kind}'")
raise e
return response
def _validate_entity(self, entity: dict, schema: dict = None):
"""
Validate the 'data' field of any entity against a schema got by entity's kind.
schema = self.get_schema(entity["kind"])
try:
self._validate_against_schema(schema, data)
logger.debug(f"Record successfully validated")
return True
except exceptions.ValidationError as exc:
logger.error("Schema validation error. Data field.")
logger.error(f"Manifest kind: {entity['kind']}")
logger.error(f"Error: {exc}")
return False
def _validate_work_product(self, work_product: dict):
"""
Validate WP manifest. Raise error if manifest is not valid.
"""
for key, value in work_product.items():
if key != "WorkProduct":
for component in value:
self._validate_entity(component)
self._validate_entity(value)
def _validate_against_schema(self, schema: dict, data: Any):
"""
Validate any data against schema.
:param schema:
:param data:
:return:
"""
resolver = OSDURefResolver(
schema_service=self.schema_service,
base_uri=schema.get("$id", ""),
referrer=schema,
handlers=self.resolver_handlers,
cache_remote=True
)
validator = jsonschema.Draft7Validator(schema=schema, resolver=resolver)
validator.validate(data)
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def _validate_data_group(self, entities: list):
"""
Validate each entity from a list of entities.
:param entities:
:return:
"""
if isinstance(entities, list):
for entity in entities:
self._validate_entity(entity)
def _validate_whole_manifest(self, manifest: dict):
"""
Validate any manifest in general.
Also at this step verify that MasterData, ReferenceData, WorkProduct, WorkProductComponents,
Files entities correspond their generic schemas, because references to those schemas are in
a Manifest schema.
"""
schema = self.get_schema(manifest["kind"])
logger.debug(f"Validating kind {manifest['kind']}")
self._validate_against_schema(schema, manifest)
def validate_common_schema(self, manifest: dict) -> dict:
"""
This is a preliminary validation of a manifest that verifies that a manifest corresponds
the OSDU schemes at the highest level.
This validation skips verifying each concrete entity by removing references to their schemas.
:param manifest:
:return: Manifest schema
"""
if "manifest" not in manifest:
raise EmptyManifestError
schema = self.get_schema(manifest["manifest"]["kind"])
schema_without_refs = copy.deepcopy(schema)
if schema_without_refs.get("properties"):
self.__delete_refs(schema_without_refs["properties"])
else:
self.__delete_refs(schema_without_refs)
logger.debug("Schema without refs")
logger.debug(f"{schema_without_refs}")
self._validate_against_schema(schema, manifest)
return schema
def _validate_against_generic_schema(self, schema: str, entity: Any) -> bool:
try:
self._validate_against_schema(schema, entity)
logger.debug(f"Record successfully validated against generic schema.")
return True
except exceptions.ValidationError as exc:
logger.error("Schema validation error.")
logger.error(f"Manifest kind: {entity['kind']}")
logger.error(f"Manifest: {entity}")
logger.error(f"Error: {exc}")
return False
def validate_manifest(self, manifest_records: List[dict]) -> List[dict]:
"""
Validate manifest. Raise error if manifest is not valid.
validated_records = []
if not manifest_records:
for manifest_record in manifest_records:
manifest = manifest_record.get("entity")
if isinstance(manifest, dict) and manifest.get("kind"):
generic_schema = self.get_schema(manifest_record.get("schema"))
validation_result = self._validate_against_generic_schema(generic_schema, manifest) \
and self._validate_entity(manifest)
if validation_result:
validated_records.append(manifest_record)
raise NotOSDUSchemaFormatError(f"Not valid schema {manifest}")
return validated_records