diff --git a/.gitignore b/.gitignore
index 07f42eea1e2ebbb204b07951462c6e06024219f8..99dae20149e066f1f15960c95e4ed4dc0fb6d03b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,3 +31,6 @@
 **/*.egg
 **/MANIFEST
 
+
+# will remove it latter
+**/schema_registration/
diff --git a/.pylintrc b/.pylintrc
index 8bf223eb6b0df51c0e0f4063c99dcde58d332ad0..d3520f3b279fa04585d3d5752afa7a43c04065b6 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -1,5 +1,8 @@
 [MASTER]
 
+# Add operators and dags to sys-path
+init-hook='import sys; sys.path.append("src/plugins"); sys.path.append("src/dags")'
+
 # Specify a configuration file.
 #rcfile=.pylintrc
 
diff --git a/src/dags/default-ingest.py b/src/dags/default-ingest.py
index 0d0758d1b72c89182abb07a4b6228baf36101f42..18ff8e01401b451b2f230308279b8d21ab73b6c1 100644
--- a/src/dags/default-ingest.py
+++ b/src/dags/default-ingest.py
@@ -14,16 +14,16 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-
+"""DAG for opaque ingestion"""
 
 from datetime import timedelta
 
 import airflow.utils.dates
 from airflow import DAG
-from airflow.operators import UpdateStatusOperator
 from airflow.operators.python_operator import PythonOperator
+from operators.update_status_op import UpdateStatusOperator
 
-from libs.create_records import create_records #isort:skip
+from libs.create_records import create_records  # isort:skip
 
 """
 A workflow creating a record
@@ -69,5 +69,4 @@ create_records_op = PythonOperator(
     dag=dag
 )
 
-
 update_status_running_op >> create_records_op >> update_status_finished_op
diff --git a/src/plugins/libs/exceptions.py b/src/dags/libs/exceptions.py
similarity index 83%
rename from src/plugins/libs/exceptions.py
rename to src/dags/libs/exceptions.py
index 446ec6ea1f6662f1e182443d1264ed81be6d55f0..43978146e43982a2361ffa99fe3381ff7b698e2a 100644
--- a/src/plugins/libs/exceptions.py
+++ b/src/dags/libs/exceptions.py
@@ -33,3 +33,16 @@ class PipelineFailedError(Exception):
     Raise when pipeline failed.
     """
     pass
+
+
+class EmptyManifestError(Exception):
+    """
+    Raise when manifest field is empty.
+    """
+    pass
+
+class GetSchemaError(Exception):
+    """
+    Raise when can't find schema.
+    """
+    pass
diff --git a/src/plugins/libs/refresh_token.py b/src/dags/libs/refresh_token.py
similarity index 82%
rename from src/plugins/libs/refresh_token.py
rename to src/dags/libs/refresh_token.py
index 384e475730c090a4e52ec990a11600b324343685..9341466b08518d8a2373ccf1b00126e33a93def5 100644
--- a/src/plugins/libs/refresh_token.py
+++ b/src/dags/libs/refresh_token.py
@@ -15,6 +15,8 @@
 
 import logging
 import sys
+import time
+from functools import partial
 from http import HTTPStatus
 
 import requests
@@ -83,6 +85,18 @@ def _check_token():
         set_access_token(SA_FILE_PATH, ACCESS_SCOPES)
 
 
+def make_callable_request(obj, request_function, headers, *args, **kwargs):
+    """
+    Create send_request_with_auth function.
+    """
+    headers["Authorization"] = f"Bearer {ACCESS_TOKEN}"
+    if obj:  # if wrapped function is an object's method
+        callable_request = lambda: request_function(obj, headers, *args, **kwargs)
+    else:
+        callable_request = lambda: request_function(headers, *args, **kwargs)
+    return callable_request
+
+
 def _wrapper(*args, **kwargs):
     """
     Generic decorator wrapper for checking token and refreshing it.
@@ -94,22 +108,22 @@ def _wrapper(*args, **kwargs):
     if not isinstance(headers, dict):
         logger.error("Got headers %s" % headers)
         raise TypeError
-    headers["Authorization"] = f"Bearer {ACCESS_TOKEN}"
-    if obj:  # if wrapped function is an object's method
-        send_request_with_auth = lambda: request_function(obj, headers, *args, **kwargs)
-    else:
-        send_request_with_auth = lambda: request_function(headers, *args, **kwargs)
+    send_request_with_auth = make_callable_request(obj, request_function, headers,
+                                                   *args, **kwargs)
     response = send_request_with_auth()
     if not isinstance(response, requests.Response):
         logger.error("Function %s must return values of type requests.Response. "
-                     "Got %s instead" % (kwargs["rqst_func"], type(response)))
+                     "Got %s instead" % (request_function, type(response)))
         raise TypeError
     if not response.ok:
         if response.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN):
             set_access_token(SA_FILE_PATH, ACCESS_SCOPES)
+            send_request_with_auth =  make_callable_request(obj,
+                                                            request_function,
+                                                            headers,
+                                                            *args, **kwargs)
             response = send_request_with_auth()
-        else:
-            response.raise_for_status()
+        response.raise_for_status()
     return response
 
 
diff --git a/src/dags/osdu-ingest.py b/src/dags/osdu-ingest-r2.py
similarity index 85%
rename from src/dags/osdu-ingest.py
rename to src/dags/osdu-ingest-r2.py
index f43fb22358f21742d45da7d21a5d0afb1addd017..03fb1d94d09d7f12dd935a67b0ac357b0bf421e0 100644
--- a/src/dags/osdu-ingest.py
+++ b/src/dags/osdu-ingest-r2.py
@@ -13,22 +13,25 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
+"""DAG for R2 ingestion"""
+
 from datetime import timedelta
 
 import airflow
 from airflow import DAG
-from airflow.operators import ProcessManifestOperator, SearchRecordIdOperator, UpdateStatusOperator
+from operators.process_manifest_r2_op import ProcessManifestOperatorR2
+from operators.search_record_id_op import SearchRecordIdOperator
+from operators.update_status_op import UpdateStatusOperator
 
 default_args = {
     "start_date": airflow.utils.dates.days_ago(0),
     "retries": 0,
     "retry_delay": timedelta(minutes=50),
     "trigger_rule": "none_failed",
-
 }
 
 dag = DAG(
-    "Osdu_ingest",
+    "Osdu_ingest_r2",
     default_args=default_args,
     description="liveness monitoring dag",
     schedule_interval=None,
@@ -48,13 +51,12 @@ update_status_finished_op = UpdateStatusOperator(
     trigger_rule="all_done",
 )
 
-process_manifest_op = ProcessManifestOperator(
+process_manifest_op = ProcessManifestOperatorR2(
     task_id="proccess_manifest_task",
     provide_context=True,
     dag=dag
 )
 
-
 search_record_ids_op = SearchRecordIdOperator(
     task_id="search_record_ids_task",
     provide_context=True,
diff --git a/src/dags/osdu-ingest-r3.py b/src/dags/osdu-ingest-r3.py
new file mode 100644
index 0000000000000000000000000000000000000000..e0292cae08e789332057c5030375ea05cf1b8d5c
--- /dev/null
+++ b/src/dags/osdu-ingest-r3.py
@@ -0,0 +1,66 @@
+#  Copyright 2020 Google LLC
+#  Copyright 2020 EPAM Systems
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+"""DAG for R3 ingestion"""
+
+from datetime import timedelta
+
+import airflow
+from airflow import DAG
+from operators.process_manifest_r3 import ProcessManifestOperatorR3
+from operators.search_record_id_op import SearchRecordIdOperator
+from operators.update_status_op import UpdateStatusOperator
+
+default_args = {
+    "start_date": airflow.utils.dates.days_ago(0),
+    "retries": 0,
+    "retry_delay": timedelta(seconds=30),
+    "trigger_rule": "none_failed",
+}
+
+dag = DAG(
+    "Osdu_ingest",
+    default_args=default_args,
+    description="liveness monitoring dag",
+    schedule_interval=None,
+    dagrun_timeout=timedelta(minutes=60)
+)
+
+update_status_running_op = UpdateStatusOperator(
+    task_id="update_status_running_task",
+    dag=dag
+)
+
+update_status_finished_op = UpdateStatusOperator(
+    task_id="update_status_finished_task",
+    dag=dag,
+    trigger_rule="all_done",
+)
+
+process_manifest_op = ProcessManifestOperatorR3(
+    task_id="proccess_manifest_task",
+    provide_context=True,
+    dag=dag
+)
+
+search_record_ids_op = SearchRecordIdOperator(
+    task_id="search_record_ids_task",
+    provide_context=True,
+    dag=dag,
+    retries=4
+)
+
+update_status_running_op >> process_manifest_op >> \
+search_record_ids_op >> update_status_finished_op # pylint: disable=pointless-statement
diff --git a/src/dags/other-log-ingest.py b/src/dags/other-log-ingest.py
index 3be207a1a708ce32a4fc5e29f4469709060b7f88..101a0dcf2e055a707a43b8f4844ed43b073622ef 100644
--- a/src/dags/other-log-ingest.py
+++ b/src/dags/other-log-ingest.py
@@ -12,6 +12,8 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
+"""Dummy DAG"""
+
 from datetime import timedelta
 
 import airflow
@@ -31,7 +33,6 @@ dag = DAG(
     schedule_interval=None,
     dagrun_timeout=timedelta(minutes=60))
 
-
 t1 = BashOperator(
     task_id="echo",
     bash_command="echo test",
diff --git a/src/plugins/__init__.py b/src/plugins/__init__.py
deleted file mode 100644
index d6cb30bc70e4adcb525269d15429e6249c5f0daa..0000000000000000000000000000000000000000
--- a/src/plugins/__init__.py
+++ /dev/null
@@ -1,45 +0,0 @@
-#  Copyright 2020 Google LLC
-#  Copyright 2020 EPAM Systems
-#  Copyright 2020 Amazon
-#
-#  Licensed under the Apache License, Version 2.0 (the "License");
-#  you may not use this file except in compliance with the License.
-#  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-
-"""isort:skip_file"""
-from airflow.plugins_manager import AirflowPlugin
-
-from operators.search_record_id_op import SearchRecordIdOperator  # pylint: disable=import-error
-
-from .operators import ProcessManifestOperator, UpdateStatusOperator
-
-
-# Defining the plugin class
-class OSDUPlugin(AirflowPlugin):
-    name = "osdu_plugin"
-    operators = [
-        UpdateStatusOperator,
-        ProcessManifestOperator,
-        SearchRecordIdOperator
-    ]
-    hooks = []
-    # A list of class(es) derived from BaseExecutor
-    executors = []
-    # A list of references to inject into the macros namespace
-    macros = []
-    # A list of objects created from a class derived
-    # from flask_admin.BaseView
-    admin_views = []
-    # A list of Blueprint object created from flask.Blueprint
-    flask_blueprints = []
-    # A list of menu links (flask_admin.base.MenuLink)
-    menu_links = []
diff --git a/src/plugins/operators/__init__.py b/src/plugins/operators/__init__.py
index c0da87d0584cba75f258461d4ed576e7a8ca0c3e..5511adb864013540d7eda3007c4ffb6186bc68cb 100644
--- a/src/plugins/operators/__init__.py
+++ b/src/plugins/operators/__init__.py
@@ -12,13 +12,3 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
-
-from .process_manifest_op import ProcessManifestOperator
-from .search_record_id_op import SearchRecordIdOperator
-from .update_status_op import UpdateStatusOperator
-
-__all__ = [
-    'UpdateStatusOperator',
-    'ProcessManifestOperator',
-    'SearchRecordIdOperator',
-]
diff --git a/src/plugins/operators/process_manifest_op.py b/src/plugins/operators/process_manifest_r2_op.py
similarity index 99%
rename from src/plugins/operators/process_manifest_op.py
rename to src/plugins/operators/process_manifest_r2_op.py
index 6e63848521401d0c789e534c2772861cdaf5cf0e..89ddc08976fccef1d0b44ef4cd8e03a23698b287 100644
--- a/src/plugins/operators/process_manifest_op.py
+++ b/src/plugins/operators/process_manifest_r2_op.py
@@ -338,7 +338,7 @@ def process_manifest(**kwargs):
     kwargs["ti"].xcom_push(key="record_ids", value=record_ids)
 
 
-class ProcessManifestOperator(BaseOperator):
+class ProcessManifestOperatorR2(BaseOperator):
     ui_color = '#dad5ff'
     ui_fgcolor = '#000000'
 
diff --git a/src/plugins/operators/process_manifest_r3.py b/src/plugins/operators/process_manifest_r3.py
new file mode 100644
index 0000000000000000000000000000000000000000..b0323cd7b78e987363267aafc38b78359064bcdd
--- /dev/null
+++ b/src/plugins/operators/process_manifest_r3.py
@@ -0,0 +1,234 @@
+#  Copyright 2020 Google LLC
+#  Copyright 2020 EPAM Systems
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import copy
+import dataclasses
+import json
+import logging
+import sys
+import uuid
+from datetime import datetime
+
+import jsonschema
+import requests
+import tenacity
+from airflow.models import BaseOperator, Variable
+from libs.exceptions import EmptyManifestError, GetSchemaError
+from libs.refresh_token import refresh_token
+
+# Set up base logger
+handler = logging.StreamHandler(sys.stdout)
+handler.setFormatter(
+    logging.Formatter("%(asctime)s [%(name)-14.14s] [%(levelname)-7.7s]  %(message)s"))
+logger = logging.getLogger("Dataload")
+logger.setLevel(logging.INFO)
+logger.addHandler(handler)
+
+timestamp = datetime.now().isoformat()
+
+# Set up file logger
+handler = logging.FileHandler(f"execution_{timestamp}.log")
+handler.setFormatter(logging.Formatter("%(message)s"))
+file_logger = logging.getLogger("Execution")
+file_logger.setLevel(logging.INFO)
+file_logger.addHandler(handler)
+
+RETRIES = 3
+TIMEOUT = 1
+
+
+@dataclasses.dataclass
+class Context(object):
+    data_partition_id: str
+    app_key: str
+
+    @classmethod
+    def populate(cls, ctx: dict) -> 'Context':
+        ctx_payload = ctx.pop('Payload')
+        app_key = ctx_payload['AppKey']
+        data_partition_id = ctx_payload['data-partition-id']
+        ctx_obj = cls(app_key=app_key, data_partition_id=data_partition_id)
+        return ctx_obj
+
+
+class OSDURefResolver(jsonschema.RefResolver):
+
+    def __init__(self, schema_service, *args, **kwargs):
+        super(OSDURefResolver, self).__init__(*args, **kwargs)
+        self.schema_service = schema_service
+
+    def resolve_fragment(self, document, fragment):
+        """
+        Extend base resolve_fragment method. If a nested schema has 'definitions' field and there
+        is a schema under this 'definitions', jsonschema attempts to use the id field of this
+        double-nested schema as URI to get this schema later. So it has sense to replace this id
+        with a correct one.
+        """
+        document = super().resolve_fragment(document, fragment)
+        fragment_parts = fragment.split("/")  # /definitions/<OsduID> -> [..., <OsduID>]
+        if len(fragment_parts) > 1:
+            osdu_id = fragment_parts[-1]
+            url = f"{self.schema_service}/{osdu_id}"
+            document["$id"] = url
+        return document
+
+
+class SchemaValidator(object):
+    """Class to validate schema of Manifests."""
+
+    def __init__(self, schema_service, dagrun_conf, context):
+        self.schema_service = schema_service
+        self.data_object = copy.deepcopy(dagrun_conf)
+        self.context = context
+        self.resolver_handlers = {"osdu": self.get_schema_request,
+                                  "https": self.get_schema_request,
+                                  self.context.data_partition_id: self.get_schema_request}
+        self.create_request_headers()
+
+    def create_request_headers(self):
+        self.request_headers = {
+            'Content-type': 'application/json',
+            'data-partition-id': self.context.data_partition_id,
+            'AppKey': self.context.app_key,
+        }
+
+    @refresh_token
+    def _get_schema_request(self, headers, uri):
+        response = requests.get(uri, headers=headers, timeout=60)
+        return response
+
+    def get_schema_request(self, uri):
+        if uri.startswith("osdu") or uri.startswith(self.context.data_partition_id):
+            uri = f"{self.schema_service}/{uri}"
+        response = self._get_schema_request(self.request_headers, uri).json()
+        response["$id"] = uri
+        return response
+
+    def get_schema(self, kind):
+        manifest_schema_uri = f"{self.schema_service}/{kind}"
+        response = self.get_schema_request(manifest_schema_uri)
+        return response
+
+    def _validate_schema(self, schema, manifest):
+        resolver = OSDURefResolver(schema_service=self.schema_service,
+                                   base_uri=schema.get("$id", ""), referrer=schema,
+                                   handlers=self.resolver_handlers, cache_remote=True)
+        validator = jsonschema.Draft7Validator(schema=schema, resolver=resolver)
+        validator.validate(manifest)
+
+    def validate_manifest(self):
+        for m in self.data_object["manifest"]:
+            manifest_schema = self.get_schema(m["kind"])
+            self._validate_schema(manifest_schema, m)
+
+
+class ManifestProcessor(object):
+    """Class to process WP, Master and Reference data"""
+    RECORD_TEMPLATE: dict = {
+        "legal": {},
+        "acl": {},
+        "kind": "",
+        "id": "",
+        "data": {
+        }
+    }
+
+    def __init__(self, storage_url, dagrun_conf, context):
+        self.storage_url = storage_url
+        self.data_object = copy.deepcopy(dagrun_conf)
+        self.context = context
+
+    def generate_id(self):
+        return f"{self.context.data_partition_id}:doc:{str(uuid.uuid4())}"
+
+    @property
+    def request_headers(self):
+        headers = {
+            'Content-type': 'application/json',
+            'data-partition-id': self.context.data_partition_id,
+            'AppKey': self.context.app_key
+        }
+        return headers
+
+    def populate_manifest_storage_record(self, manifest):
+        """
+        Create a record from Master-manifest to store it in Storage service
+        """
+        record = copy.deepcopy(self.RECORD_TEMPLATE)
+        record["id"] = self.generate_id()
+        record["kind"] = manifest.pop("kind")
+        record["legal"] = manifest.pop("legal")
+        record["acl"] = manifest.pop("acl")
+        record["data"] = manifest
+        return record
+
+    @tenacity.retry(tenacity.wait_fixed(TIMEOUT),
+                    tenacity.stop_after_attempt(RETRIES))
+    @refresh_token
+    def save_record(self, headers, request_data):
+        """
+        Send request to record storage API
+        """
+        response = requests.put(self.storage_url, json.dumps(request_data), headers=headers)
+        if response.ok:
+            file_logger.info(",".join(map(str, response.json()["recordIds"])))
+        else:
+            reason = response.text[:250]
+            logger.error(f"Request error.")
+            logger.error(f"Response status: {response.status_code}. "
+                         f"Response content: {reason}.")
+        return response
+
+    def create_manifest_records(self):
+        manifest_records = []
+        for manifest in self.data_object["manifest"]:
+            record = self.populate_manifest_storage_record(manifest)
+            manifest_records.append(record)
+        return manifest_records
+
+    def process_manifest(self):
+        if "manifest" in self.data_object:
+            manifest_records = self.create_manifest_records()
+        else:
+            raise EmptyManifestError
+        response = self.save_record(self.request_headers,
+                                                  request_data=manifest_records)
+        record_ids = response.json()["recordIds"]
+        return record_ids
+
+
+class ProcessManifestOperatorR3(BaseOperator):
+    ui_color = '#dad5ff'
+    ui_fgcolor = '#000000'
+
+    def pre_execute(self, context):
+        self.schema_service_url = Variable.get('schema_service_url')
+        self.storage_url = Variable.get('storage_url')
+
+    def execute(self, context):
+        payload_context = Context.populate(context["dag_run"].conf)
+        validator = SchemaValidator(
+            self.schema_service_url,
+            context["dag_run"].conf,
+            payload_context
+        )
+        manifest_processor = ManifestProcessor(
+            self.storage_url,
+            context["dag_run"].conf,
+            payload_context
+        )
+        validator.validate_manifest()
+        record_ids = manifest_processor.process_manifest()
+        context["ti"].xcom_push(key="record_ids", value=record_ids)
diff --git a/src/plugins/operators/search_record_id_op.py b/src/plugins/operators/search_record_id_op.py
index 90d58d21cdde505950596c3b306d9a17151c3358..700afc70b1034fe0900cd4773b92a34ced48d804 100644
--- a/src/plugins/operators/search_record_id_op.py
+++ b/src/plugins/operators/search_record_id_op.py
@@ -95,10 +95,10 @@ class SearchRecordIdOperator(BaseOperator):
         """
         Check if search service returns expected totalCount of records.
         """
+        logger.info(resp.text)
         data = resp.json()
         return data.get("totalCount") == self.expected_total_count
 
-    @tenacity.retry(tenacity.wait_exponential(multiplier=5), tenacity.stop_after_attempt(5))
     @refresh_token
     def search_files(self, headers, **kwargs):
         if self.request_body:
diff --git a/src/plugins/operators/update_status_op.py b/src/plugins/operators/update_status_op.py
index 1acdc6d01991bfaf85308ac33cd986ac258f895c..b02f6786a808ea2a02d7cbe5456d301297a78ab3 100644
--- a/src/plugins/operators/update_status_op.py
+++ b/src/plugins/operators/update_status_op.py
@@ -23,7 +23,7 @@ from functools import partial
 import tenacity
 from airflow.models import BaseOperator, Variable
 from airflow.utils.decorators import apply_defaults
-from hooks import search_http_hook, workflow_hook
+from hooks.http_hooks import search_http_hook, workflow_hook
 from libs.exceptions import PipelineFailedError
 from libs.refresh_token import refresh_token
 
diff --git a/tests/end-to-end-tests/mock-data/osdu-ingest-invalid.json b/tests/end-to-end-tests/mock-data/osdu-ingest-r2-invalid.json
similarity index 100%
rename from tests/end-to-end-tests/mock-data/osdu-ingest-invalid.json
rename to tests/end-to-end-tests/mock-data/osdu-ingest-r2-invalid.json
diff --git a/tests/end-to-end-tests/mock-data/osdu-ingest-valid.json b/tests/end-to-end-tests/mock-data/osdu-ingest-r2-valid.json
similarity index 100%
rename from tests/end-to-end-tests/mock-data/osdu-ingest-valid.json
rename to tests/end-to-end-tests/mock-data/osdu-ingest-r2-valid.json
diff --git a/tests/end-to-end-tests/test-osdu-ingest-fail.sh b/tests/end-to-end-tests/test-osdu-ingest-r2-fail.sh
similarity index 83%
rename from tests/end-to-end-tests/test-osdu-ingest-fail.sh
rename to tests/end-to-end-tests/test-osdu-ingest-r2-fail.sh
index e8a24a7dfed017d26d7b24c9f4e0edbf8bfff589..8ba0d80f6b39ff14a239a8816c85052cac584a78 100755
--- a/tests/end-to-end-tests/test-osdu-ingest-fail.sh
+++ b/tests/end-to-end-tests/test-osdu-ingest-r2-fail.sh
@@ -14,5 +14,5 @@
 #  limitations under the License.
 
 
-json=$(cat `dirname $0`/mock-data/osdu-ingest-invalid.json | tail -n +15)
-airflow trigger_dag -c "$json" Osdu_ingest
+json=$(cat `dirname $0`/mock-data/osdu-ingest-r2-invalid.json | tail -n +15)
+airflow trigger_dag -c "$json" Osdu_ingest_r2
diff --git a/tests/end-to-end-tests/test-osdu-ingest-success.sh b/tests/end-to-end-tests/test-osdu-ingest-r2-success.sh
similarity index 83%
rename from tests/end-to-end-tests/test-osdu-ingest-success.sh
rename to tests/end-to-end-tests/test-osdu-ingest-r2-success.sh
index fef8a0f8c4e1fe21157da2d212a668d35c558869..eeb9080051ea6ec6b9c327cbd61d1fb84a778c90 100755
--- a/tests/end-to-end-tests/test-osdu-ingest-success.sh
+++ b/tests/end-to-end-tests/test-osdu-ingest-r2-success.sh
@@ -14,5 +14,5 @@
 #  limitations under the License.
 
 
-json=$(cat `dirname $0`/mock-data/osdu-ingest-valid.json | tail -n +15)
-airflow trigger_dag -c "$json" Osdu_ingest
+json=$(cat `dirname $0`/mock-data/osdu-ingest-r2-valid.json | tail -n +15)
+airflow trigger_dag -c "$json" Osdu_ingest_r2
diff --git a/tests/plugin-unit-tests/data/__init__.py b/tests/plugin-unit-tests/data/__init__.py
index e9d8ac876ee1c3ce060bf9b178727657bf6f2758..7be2c556ed3b00241e3a46b707419523a803dcbd 100644
--- a/tests/plugin-unit-tests/data/__init__.py
+++ b/tests/plugin-unit-tests/data/__init__.py
@@ -13,4 +13,4 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-from .process_manifest_op import *
+from .process_manifest_r2_op import *
diff --git a/tests/plugin-unit-tests/data/process_manifest_op.py b/tests/plugin-unit-tests/data/process_manifest_r2_op.py
similarity index 100%
rename from tests/plugin-unit-tests/data/process_manifest_op.py
rename to tests/plugin-unit-tests/data/process_manifest_r2_op.py
diff --git a/tests/plugin-unit-tests/data/process_manifest_r3.py b/tests/plugin-unit-tests/data/process_manifest_r3.py
new file mode 100644
index 0000000000000000000000000000000000000000..c1d88c84111ee7a986b5b6796196bf7f5db4e77c
--- /dev/null
+++ b/tests/plugin-unit-tests/data/process_manifest_r3.py
@@ -0,0 +1,570 @@
+#  Copyright 2020 Google LLC
+#  Copyright 2020 EPAM Systems
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+ACL_DICT = {'viewers': ['data.default.viewers@odes.osdu.test.net'],
+            'owners': ['data.default.owners@odes.osdu.test.net']}
+LEGAL_DICT = {'legaltags': ['odes-demo-legaltag'], 'otherRelevantDataCountries': ['FR', 'US', 'CA'],
+              'status': 'compliant'}
+
+CONF = {
+    "Payload": {
+        "authorization": "Bearer test",
+        "data-partition-id": "opendes",
+        "AppKey": "",
+        "kind_version": "3.0.0"
+    },
+    "$schema": "https://schema.osdu.opengroup.org/json/master-data/Wellbore.1.0.0.json",
+    "$filename": "load_Wellbore.1.0.0_350112350400.json",
+    "manifest": [
+{
+            "id": "srn:<namespace>:master-data/Wellbore:350112350400",
+            "kind": "opendes:osdu:Wellbore:0.3.0",
+            "groupType": "master-data",
+            "version": 1,
+            "acl": {
+                "owners": [
+"data.default.viewers@opendes.osdu-gcp.go3-nrg.projects.epam.com"
+                ],
+                "viewers": [
+"data.default.owners@opendes.osdu-gcp.go3-nrg.projects.epam.com"
+                ]
+            },
+            "legal": {
+                "legaltags": [
+                    "opendes-demo-legaltag"
+                ],
+                "otherRelevantDataCountries": [
+                    "srn:<namespace>:master-data/GeoPoliticalEntity:USA:"
+                ],
+                "status": "srn:<namespace>:reference-data/LegalStatus:public:1111"
+            },
+            "resourceHostRegionIDs": [
+                "srn:<namespace>:reference-data/OSDURegion:US-EAST:"
+            ],
+            "resourceObjectCreationDateTime": "2020-10-16T11:14:45-05:00",
+            "resourceVersionCreationDateTime": "2020-10-16T11:14:45-05:00",
+            "resourceSecurityClassification": "srn:<namespace>:reference-data/ResourceSecurityClassification:public:",
+            "source": "srn:<namespace>:master-data/Organisation:Oklahoma Corporation Commission:",
+            "existenceKind": "srn:<namespace>:reference-data/ExistenceKind:Active:",
+            "licenseState": "srn:<namespace>:reference-data/LicenseState:Unlicensed:",
+            "data": {
+                "FacilityTypeID": "srn:<namespace>:reference-data/FacilityType:Wellbore:",
+                "FacilityOperator": [
+                    {
+                        "FacilityOperatorOrganisationID": "srn:<namespace>:master-data/Organisation:CONTINENTAL RESOURCES INC:"
+                    }
+                ],
+                "DataSourceOrganisationID": "srn:<namespace>:master-data/Organisation:Oklahoma Corporation Commission:",
+                "SpatialLocation": [
+                    {
+                        "Coordinates": [
+                            {
+                                "x": -98.580887,
+                                "y": 35.6381829999999
+                            }
+                        ],
+                        "SpatialGeometryTypeID": "srn:<namespace>:reference-data/SpatialGeometryType:Point:",
+                        "VerticalCRSID": "srn:<namespace>:reference-data/VerticalCRS:MSL:",
+                        "HorizontalCRSID": "srn:<namespace>:reference-data/HorizontalCRS:NAD27:",
+                        "HeightAboveGroundLevelUOMID": "srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:"
+                    }
+                ],
+                "OperatingEnvironmentID": "srn:<namespace>:reference-data/OperatingEnvironment:onshore:",
+                "FacilityName": "IRETA 1-4-9XH",
+                "FacilityNameAlias": [
+                    {
+                        "AliasName": " IRETA 1-4-9XH",
+                        "AliasNameTypeID": "srn:<namespace>:reference-data/AliasNameType:Name:"
+                    },
+                    {
+                        "AliasName": "350112350400",
+                        "AliasNameTypeID": "srn:<namespace>:reference-data/AliasNameType:UWBI:"
+                    }
+                ],
+                "FacilityEvent": [
+                    {
+                        "FacilityEventTypeID": "srn:<namespace>:reference-data/FacilityEventType:SPUD:",
+                        "EffectiveDateTime": "2015-03-11T00:00:00-05:00"
+                    },
+                    {
+                        "FacilityEventTypeID": "srn:<namespace>:reference-data/FacilityEventType:DRILLING FINISH:",
+                        "EffectiveDateTime": "2015-05-18T00:00:00-06:00"
+                    }
+                ],
+                "WellID": "srn:<namespace>:master-data/Well:3501123504:",
+                "SequenceNumber": 1,
+                "VerticalMeasurements": [
+                    {
+                        "VerticalMeasurementID": "TD_1",
+                        "VerticalMeasurement": 0,
+                        "VerticalMeasurementTypeID": "srn:<namespace>:reference-data/VerticalMeasurementType:Total Depth:",
+                        "VerticalMeasurementPathID": "srn:<namespace>:reference-data/VerticalMeasurementPath:Measured Depth:",
+                        "VerticalMeasurementUnitOfMeasureID": "srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:",
+                        "VerticalReferenceID": "Drill Floor"
+                    },
+                    {
+                        "VerticalMeasurementID": "TD_2",
+                        "VerticalMeasurement": 0,
+                        "VerticalMeasurementTypeID": "srn:<namespace>:reference-data/VerticalMeasurementType:Total Depth:",
+                        "VerticalMeasurementPathID": "srn:<namespace>:reference-data/VerticalMeasurementPath:True Vertical Depth:",
+                        "VerticalMeasurementUnitOfMeasureID": "srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:",
+                        "VerticalReferenceID": "Drill Floor"
+                    },
+                    {
+                        "VerticalMeasurementID": "Elev_1",
+                        "VerticalMeasurement": 1636,
+                        "VerticalMeasurementTypeID": "srn:<namespace>:reference-data/VerticalMeasurementType:Drill Floor:",
+                        "VerticalMeasurementPathID": "srn:<namespace>:reference-data/VerticalMeasurementPath:Elevation:",
+                        "VerticalMeasurementUnitOfMeasureID": "srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:",
+                        "VerticalCRSID": "srn:<namespace>:reference-data/VerticalCRS:MSL:"
+                    },
+                    {
+                        "VerticalMeasurementID": "Elev_2",
+                        "VerticalMeasurement": 1606,
+                        "VerticalMeasurementTypeID": "srn:<namespace>:reference-data/VerticalMeasurementType:Ground Level:",
+                        "VerticalMeasurementPathID": "srn:<namespace>:reference-data/VerticalMeasurementPath:Elevation:",
+                        "VerticalMeasurementUnitOfMeasureID": "srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:",
+                        "VerticalCRSID": "srn:<namespace>:reference-data/VerticalCRS:MSL:"
+                    }
+                ],
+                "TrajectoryTypeID": "srn:<namespace>:reference-data/WellboreTrajectoryType:Horizontal:",
+                "DefaultVerticalMeasurementID": "",
+                "GeographicBottomHoleLocation": {
+                    "Coordinates": [
+                        {
+                            "x": -98.580887,
+                            "y": 35.6381829999999
+                        }
+                    ]
+                }
+            }
+        }
+
+    ],
+    "WorkflowID": "foo"
+}
+
+CONF2 = {
+    "WorkflowID": "{{workflow_id}}",
+    "Payload": {
+        "AppKey": "",
+        "data-partition-id": "opendes"
+    },
+    "manifest": [
+        {
+            "id": "srn:<namespace>:master-data/Wellbore:350112350400",
+            "kind": "osdu:osdu:Wellbore:0.3.0",
+            "groupType": "master-data",
+            "version": 1,
+            "acl": {
+                "Viewers": [
+                    "data.default.viewers@opendes.osdu-gcp.go3-nrg.projects.epam.com"
+                ],
+                "Owners": [
+                    "data.default.owners@opendes.osdu-gcp.go3-nrg.projects.epam.com"
+                ]
+            },
+            "legal": {
+                "LegalTags": [
+                    "legaltag1"
+                ],
+                "OtherRelevantDataCountries": [
+                    "srn:<namespace>:master-data/GeoPoliticalEntity:USA:"
+                ],
+                "Status": "srn:<namespace>:reference-data/LegalStatus:public:"
+            },
+            "resourceHostRegionIDs": [
+                "srn:<namespace>:reference-data/OSDURegion:US-EAST:"
+            ],
+            "resourceObjectCreationDateTime": "2020-10-16T11:14:45-05:00",
+            "resourceVersionCreationDateTime": "2020-10-16T11:14:45-05:00",
+            "resourceSecurityClassification": "srn:<namespace>:reference-data/ResourceSecurityClassification:public:",
+            "source": "srn:<namespace>:master-data/Organisation:Oklahoma Corporation Commission:",
+            "existenceKind": "srn:<namespace>:reference-data/ExistenceKind:Active:",
+            "licenseState": "srn:<namespace>:reference-data/LicenseState:Unlicensed:",
+            "data": {
+                "FacilityTypeID": "srn:<namespace>:reference-data/FacilityType:Wellbore:",
+                "FacilityOperator": [
+                    {
+                        "FacilityOperatorOrganisationID": "srn:<namespace>:master-data/Organisation:CONTINENTAL RESOURCES INC:"
+                    }
+                ],
+                "DataSourceOrganisationID": "srn:<namespace>:master-data/Organisation:Oklahoma Corporation Commission:",
+                "SpatialLocation": [
+                    {
+                        "Coordinates": [
+                            {
+                                "x": -98.580887,
+                                "y": 35.6381829999999
+                            }
+                        ],
+                        "SpatialGeometryTypeID": "srn:<namespace>:reference-data/SpatialGeometryType:Point:",
+                        "VerticalCRSID": "srn:<namespace>:reference-data/VerticalCRS:MSL:",
+                        "HorizontalCRSID": "srn:<namespace>:reference-data/HorizontalCRS:NAD27:",
+                        "HeightAboveGroundLevelUOMID": "srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:"
+                    }
+                ],
+                "OperatingEnvironmentID": "srn:<namespace>:reference-data/OperatingEnvironment:onshore:",
+                "FacilityName": "IRETA 1-4-9XH",
+                "FacilityNameAlias": [
+                    {
+                        "AliasName": " IRETA 1-4-9XH",
+                        "AliasNameTypeID": "srn:<namespace>:reference-data/AliasNameType:Name:"
+                    },
+                    {
+                        "AliasName": "350112350400",
+                        "AliasNameTypeID": "srn:<namespace>:reference-data/AliasNameType:UWBI:"
+                    }
+                ],
+                "FacilityEvent": [
+                    {
+                        "FacilityEventTypeID": "srn:<namespace>:reference-data/FacilityEventType:SPUD:",
+                        "EffectiveDateTime": "2015-03-11T00:00:00-05:00"
+                    },
+                    {
+                        "FacilityEventTypeID": "srn:<namespace>:reference-data/FacilityEventType:DRILLING FINISH:",
+                        "EffectiveDateTime": "2015-05-18T00:00:00-06:00"
+                    }
+                ],
+                "WellID": "srn:<namespace>:master-data/Well:3501123504:",
+                "SequenceNumber": 1,
+                "VerticalMeasurements": [
+                    {
+                        "VerticalMeasurementID": "TD_1",
+                        "VerticalMeasurement": 0,
+                        "VerticalMeasurementTypeID": "srn:<namespace>:reference-data/VerticalMeasurementType:Total Depth:",
+                        "VerticalMeasurementPathID": "srn:<namespace>:reference-data/VerticalMeasurementPath:Measured Depth:",
+                        "VerticalMeasurementUnitOfMeasureID": "srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:",
+                        "VerticalReferenceID": "Drill Floor"
+                    },
+                    {
+                        "VerticalMeasurementID": "TD_2",
+                        "VerticalMeasurement": 0,
+                        "VerticalMeasurementTypeID": "srn:<namespace>:reference-data/VerticalMeasurementType:Total Depth:",
+                        "VerticalMeasurementPathID": "srn:<namespace>:reference-data/VerticalMeasurementPath:True Vertical Depth:",
+                        "VerticalMeasurementUnitOfMeasureID": "srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:",
+                        "VerticalReferenceID": "Drill Floor"
+                    },
+                    {
+                        "VerticalMeasurementID": "Elev_1",
+                        "VerticalMeasurement": 1636,
+                        "VerticalMeasurementTypeID": "srn:<namespace>:reference-data/VerticalMeasurementType:Drill Floor:",
+                        "VerticalMeasurementPathID": "srn:<namespace>:reference-data/VerticalMeasurementPath:Elevation:",
+                        "VerticalMeasurementUnitOfMeasureID": "srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:",
+                        "VerticalCRSID": "srn:<namespace>:reference-data/VerticalCRS:MSL:"
+                    },
+                    {
+                        "VerticalMeasurementID": "Elev_2",
+                        "VerticalMeasurement": 1606,
+                        "VerticalMeasurementTypeID": "srn:<namespace>:reference-data/VerticalMeasurementType:Ground Level:",
+                        "VerticalMeasurementPathID": "srn:<namespace>:reference-data/VerticalMeasurementPath:Elevation:",
+                        "VerticalMeasurementUnitOfMeasureID": "srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:",
+                        "VerticalCRSID": "srn:<namespace>:reference-data/VerticalCRS:MSL:"
+                    }
+                ],
+                "TrajectoryTypeID": "srn:<namespace>:reference-data/WellboreTrajectoryType:Horizontal:",
+                "DefaultVerticalMeasurementID": "",
+                "GeographicBottomHoleLocation": {
+                    "Coordinates": [
+                        {
+                            "x": -98.580887,
+                            "y": 35.6381829999999
+                        }
+                    ]
+                }
+            }
+        }
+    ]
+}
+
+TEST_SCHEMA = {
+    "x-osdu-license": "Copyright 2020, The Open Group \\nLicensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 . Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.",
+    "$id": "https://schema.osdu.opengroup.org/json/reference-data/ContractorType.1.0.0.json",
+    "$schema": "http://json-schema.org/draft-07/schema#",
+    "title": "Test",
+    "description": "Test.",
+    "type": "object",
+    "properties": {
+        "id": {
+            "description": "The SRN which identifies this OSDU resource object without version.",
+            "title": "Entity ID",
+            "type": "string",
+            "pattern": "^srn:<namespace>:master-data\\/Wellbore:[^:]+$",
+            "example": "srn:<namespace>:master-data/Wellbore:2adac27b-5d84-5bcd-89f2-93ee709c06d9"
+        },
+        "kind": {
+            "description": "The schema identification for the OSDU resource object following the pattern <Namespace>:<Source>:<Type>:<VersionMajor>.<VersionMinor>.<VersionPatch>. The versioning scheme follows the semantic versioning, https://semver.org/.",
+            "title": "Entity Kind",
+            "type": "string",
+            "pattern": "^[A-Za-z0-9-_]+:[A-Za-z0-9-_]+:[A-Za-z0-9-_]+:[0-9]+.[0-9]+.[0-9]+$",
+            "example": "namespace:osdu:Wellbore:2.7.112"
+        },
+        "groupType": {
+            "description": "The OSDU GroupType assigned to this resource object.",
+            "title": "Group Type",
+            "const": "master-data"
+        },
+        "version": {
+            "description": "The version number of this OSDU resource; set by the framework.",
+            "title": "Version Number",
+            "type": "integer",
+            "format": "int64",
+            "example": 1831253916104085
+        },
+        "acl": {
+            "description": "The access control tags associated with this entity.",
+            "title": "Access Control List",
+            "type": "object"
+        },
+        "legal": {
+            "description": "The entity's legal tags and compliance status.",
+            "title": "Legal Tags",
+            "type": "object"
+        },
+        "resourceHomeRegionID": {
+            "description": "The name of the home [cloud environment] region for this OSDU resource object.",
+            "title": "Resource Home Region ID",
+            "type": "string",
+            "pattern": "^srn:<namespace>:reference-data\\/OSDURegion:[^:]+:[0-9]*$"
+        },
+        "resourceHostRegionIDs": {
+            "description": "The name of the host [cloud environment] region(s) for this OSDU resource object.",
+            "title": "Resource Host Region ID",
+            "type": "array",
+            "items": {
+                "type": "string",
+                "pattern": "^srn:<namespace>:reference-data\\/OSDURegion:[^:]+:[0-9]*$"
+            }
+        },
+        "resourceObjectCreationDateTime": {
+            "description": "Timestamp of the time at which Version 1 of this OSDU resource object was originated.",
+            "title": "Resource Object Creation DateTime",
+            "type": "string",
+            "format": "date-time"
+        },
+        "resourceVersionCreationDateTime": {
+            "description": "Timestamp of the time when the current version of this resource entered the OSDU.",
+            "title": "Resource Version Creation DateTime",
+            "type": "string",
+            "format": "date-time"
+        },
+        "resourceCurationStatus": {
+            "description": "Describes the current Curation status.",
+            "title": "Resource Curation Status",
+            "type": "string",
+            "pattern": "^srn:<namespace>:reference-data\\/ResourceCurationStatus:[^:]+:[0-9]*$"
+        },
+        "resourceLifecycleStatus": {
+            "description": "Describes the current Resource Lifecycle status.",
+            "title": "Resource Lifecycle Status",
+            "type": "string",
+            "pattern": "^srn:<namespace>:reference-data\\/ResourceLifecycleStatus:[^:]+:[0-9]*$"
+        },
+        "resourceSecurityClassification": {
+            "description": "Classifies the security level of the resource.",
+            "title": "Resource Security Classification",
+            "type": "string",
+            "pattern": "^srn:<namespace>:reference-data\\/ResourceSecurityClassification:[^:]+:[0-9]*$"
+        },
+        "ancestry": {
+            "description": "The links to data, which constitute the inputs.",
+            "title": "Ancestry",
+            "$ref": "osdu:osdu:AbstractLegalParentList:1.0.0"
+        },
+        "source": {
+            "description": "Where did the data resource originate? This could be many kinds of entities, such as company, agency, team or individual.",
+            "title": "Data Source",
+            "type": "string",
+            "pattern": "^srn:<namespace>:master-data\\/Organisation:[^:]+:[0-9]*$"
+        },
+        "existenceKind": {
+            "description": "Where does this data resource sit in the cradle-to-grave span of its existence?",
+            "title": "Existence Kind",
+            "type": "string",
+            "pattern": "^srn:<namespace>:reference-data\\/ExistenceKind:[^:]+:[0-9]*$"
+        },
+        "licenseState": {
+            "description": "Indicates what kind of ownership Company has over data.",
+            "title": "License State",
+            "type": "string",
+            "pattern": "^srn:<namespace>:reference-data\\/LicenseState:[^:]+:[0-9]*$"
+        },
+        "data": {
+            "allOf": [
+                {
+                    "type": "object",
+                    "properties": {
+                        "WellID": {
+                            "type": "string"
+                        }
+                    }
+                }
+            ]
+        },
+        "required": [
+            "kind",
+            "acl",
+            "groupType",
+            "legal"
+        ],
+        "additionalProperties": False
+    }
+}
+
+EMPTY_MANIFEST = {
+    "Payload": {
+        "authorization": "Bearer test",
+        "data-partition-id": "osdu",
+        "AppKey": "",
+        "kind_version": "3.0.0"
+    },
+    "$schema": "https://schema.osdu.opengroup.org/json/master-data/Wellbore.1.0.0.json",
+    "$filename": "load_Wellbore.1.0.0_350112350400.json",
+    "manifest": [],
+    "WorkflowID": "foo"
+}
+
+EXPECTED_RECORD = [{'legal': {'LegalTags': ['legaltag1'], 'OtherRelevantDataCountries': [
+    'srn:<namespace>:master-data/GeoPoliticalEntity:USA:'],
+                              'Status': 'srn:<namespace>:reference-data/LegalStatus:public:'},
+                    'acl': {'Owners': ['users@odes.osdu.joonix.net'],
+                            'Viewers': ['users@odes.osdu.joonix.net']},
+                    'kind': 'osdu:osdu:Wellbore:0.3.0', 'id': '',
+                    'data': {'id': 'srn:<namespace>:master-data/Wellbore:350112350400',
+                             'groupType': 'master-data', 'version': 1, 'resourceHostRegionIDs': [
+                            'srn:<namespace>:reference-data/OSDURegion:US-EAST:'],
+                             'resourceObjectCreationDateTime': '2020-10-16T11:14:45-05:00',
+                             'resourceVersionCreationDateTime': '2020-10-16T11:14:45-05:00',
+                             'resourceSecurityClassification': 'srn:<namespace>:reference-data/ResourceSecurityClassification:public:',
+                             'source': 'srn:<namespace>:master-data/Organisation:Oklahoma Corporation Commission:',
+                             'existenceKind': 'srn:<namespace>:reference-data/ExistenceKind:Active:',
+                             'licenseState': 'srn:<namespace>:reference-data/LicenseState:Unlicensed:',
+                             'data': {
+                                 'FacilityTypeID': 'srn:<namespace>:reference-data/FacilityType:Wellbore:',
+                                 'FacilityOperator': [{
+                                                          'FacilityOperatorOrganisationID': 'srn:<namespace>:master-data/Organisation:CONTINENTAL RESOURCES INC:'}],
+                                 'DataSourceOrganisationID': 'srn:<namespace>:master-data/Organisation:Oklahoma Corporation Commission:',
+                                 'SpatialLocation': [
+                                     {'Coordinates': [{'x': -98.580887, 'y': 35.6381829999999}],
+                                      'SpatialGeometryTypeID': 'srn:<namespace>:reference-data/SpatialGeometryType:Point:',
+                                      'VerticalCRSID': 'srn:<namespace>:reference-data/VerticalCRS:MSL:',
+                                      'HorizontalCRSID': 'srn:<namespace>:reference-data/HorizontalCRS:NAD27:',
+                                      'HeightAboveGroundLevelUOMID': 'srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:'}],
+                                 'OperatingEnvironmentID': 'srn:<namespace>:reference-data/OperatingEnvironment:onshore:',
+                                 'FacilityName': 'IRETA 1-4-9XH', 'FacilityNameAlias': [
+                                     {'AliasName': ' IRETA 1-4-9XH',
+                                      'AliasNameTypeID': 'srn:<namespace>:reference-data/AliasNameType:Name:'},
+                                     {'AliasName': '350112350400',
+                                      'AliasNameTypeID': 'srn:<namespace>:reference-data/AliasNameType:UWBI:'}],
+                                 'FacilityEvent': [{
+                                                       'FacilityEventTypeID': 'srn:<namespace>:reference-data/FacilityEventType:SPUD:',
+                                                       'EffectiveDateTime': '2015-03-11T00:00:00-05:00'},
+                                                   {
+                                                       'FacilityEventTypeID': 'srn:<namespace>:reference-data/FacilityEventType:DRILLING FINISH:',
+                                                       'EffectiveDateTime': '2015-05-18T00:00:00-06:00'}],
+                                 'WellID': 'srn:<namespace>:master-data/Well:3501123504:',
+                                 'SequenceNumber': 1, 'VerticalMeasurements': [
+                                     {'VerticalMeasurementID': 'TD_1', 'VerticalMeasurement': 0,
+                                      'VerticalMeasurementTypeID': 'srn:<namespace>:reference-data/VerticalMeasurementType:Total Depth:',
+                                      'VerticalMeasurementPathID': 'srn:<namespace>:reference-data/VerticalMeasurementPath:Measured Depth:',
+                                      'VerticalMeasurementUnitOfMeasureID': 'srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:',
+                                      'VerticalReferenceID': 'Drill Floor'},
+                                     {'VerticalMeasurementID': 'TD_2', 'VerticalMeasurement': 0,
+                                      'VerticalMeasurementTypeID': 'srn:<namespace>:reference-data/VerticalMeasurementType:Total Depth:',
+                                      'VerticalMeasurementPathID': 'srn:<namespace>:reference-data/VerticalMeasurementPath:True Vertical Depth:',
+                                      'VerticalMeasurementUnitOfMeasureID': 'srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:',
+                                      'VerticalReferenceID': 'Drill Floor'},
+                                     {'VerticalMeasurementID': 'Elev_1',
+                                      'VerticalMeasurement': 1636,
+                                      'VerticalMeasurementTypeID': 'srn:<namespace>:reference-data/VerticalMeasurementType:Drill Floor:',
+                                      'VerticalMeasurementPathID': 'srn:<namespace>:reference-data/VerticalMeasurementPath:Elevation:',
+                                      'VerticalMeasurementUnitOfMeasureID': 'srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:',
+                                      'VerticalCRSID': 'srn:<namespace>:reference-data/VerticalCRS:MSL:'},
+                                     {'VerticalMeasurementID': 'Elev_2',
+                                      'VerticalMeasurement': 1606,
+                                      'VerticalMeasurementTypeID': 'srn:<namespace>:reference-data/VerticalMeasurementType:Ground Level:',
+                                      'VerticalMeasurementPathID': 'srn:<namespace>:reference-data/VerticalMeasurementPath:Elevation:',
+                                      'VerticalMeasurementUnitOfMeasureID': 'srn:<namespace>:reference-data/UnitOfMeasure:ft[US]:',
+                                      'VerticalCRSID': 'srn:<namespace>:reference-data/VerticalCRS:MSL:'}],
+                                 'TrajectoryTypeID': 'srn:<namespace>:reference-data/WellboreTrajectoryType:Horizontal:',
+                                 'DefaultVerticalMeasurementID': '',
+                                 'GeographicBottomHoleLocation': {
+                                     'Coordinates': [{'x': -98.580887, 'y': 35.6381829999999}]}}}},
+                   {'legal': {'legaltags': ['$$LEGAL_TAG$$'],
+                              'otherRelevantDataCountries': ['$$ISO_3166_ALPHA_2_CODE$$']},
+                    'acl': {'owners': ['$$DATA_OWNERS_GROUP$$'],
+                            'viewers': ['$$DATA_VIEWERS_GROUP$$']},
+                    'kind': 'osdu:osdu:TestReference:1.0.0', 'id': '',
+                    'data': {'id': '$$srn:NAMESPACE$$:type/Type:TestReference', 'groupType': 'type',
+                             'data': {'Name': 'TestReference',
+                                      'Description': 'A meaningful description of this TestReference.',
+                                      'Schema': 'http://json-schema.org/draft-07/schema#',
+                                      'SchemaID': 'https://schema.osdu.opengroup.org/json/reference-data/TestReference.1.0.0.json',
+                                      'SchemaKind': 'osdu:osdu:TestReference:1.0.0',
+                                      'GroupType': 'reference-data', 'IsReferenceValueType': True,
+                                      'GovernanceAuthorities': [
+                                          '$$srn:NAMESPACE$$:reference-data/OrganisationType:osdu'],
+                                      'NaturalKeys': ['data.Code', 'data.Name'],
+                                      'GovernanceModel': 'LOCAL'}}}, {
+                       'legal': {'legalTags': ['$$LEGAL_TAG$$'],
+                                 'otherRelevantDataCountries': ['$$ISO_3166_ALPHA_2_CODE$$']},
+                       'acl': {'owners': ['$$DATA_OWNERS_GROUP$$'],
+                               'viewers': ['$$DATA_VIEWERS_GROUP$$']},
+                       'kind': 'osdu:osdu:UnitQuantity:1.0.0', 'id': '',
+                       'data': {'id': '$$srn:NAMESPACE$$:reference-data/UnitQuantity:1',
+                                'groupType': 'reference-data',
+                                'resourceObjectCreationDateTime': '2020-10-08T12:16:15Z',
+                                'resourceVersionCreationDateTime': '2020-10-08T12:16:15Z',
+                                'source': 'Workbook Authoring/UnitQuantity.1.0.0.xlsx; commit SHA 3159b9b1.',
+                                'Name': 'dimensionless', 'ID': '1', 'InactiveIndicator': False,
+                                'Code': '1', 'AttributionAuthority': 'Energistics',
+                                'AttributionPublication': 'Energistics Unit of Measure Dictionary V1.0',
+                                'AttributionRevision': '1.0', 'BaseForConversion': 'Euc',
+                                'ParentUnitQuantity': '1',
+                                'PersistableReference': '{"ancestry":"1","type":"UM"}',
+                                'UnitDimension': '1'}}]
+
+PROCESS_FILE_ITEMS_RESULT = (
+    [
+        (
+            {
+                'kind': 'test:osdu:file:3.0.0',
+                'legal': {'legaltags': ['odes-demo-legaltag'], 'otherRelevantDataCountries': ['US'],
+                          'status': 'compliant'},
+                'acl': {'viewers': ['data.default.viewers@odes.osdu.test.net'],
+                        'owners': ['data.default.owners@odes.osdu.test.net']},
+                'data': {
+                    'ResourceTypeID': 'srn:type:file/las2:',
+                    'ResourceSecurityClassification': 'srn:reference-data/ResourceSecurityClassification:RESTRICTED:',
+                    'Data': {'GroupTypeProperties': {'FileSource': '', 'PreLoadFilePath': 'foo'},
+                             'IndividualTypeProperties': {}, 'ExtensionProperties': {}},
+                    'AssociativeID': 'f-1',
+                    'ResourceID': ""
+                }
+            },
+            'File'
+        )
+    ],
+    ['srn:file/las2:434064998475386:']
+)
+
+LOADED_CONF = {
+    "acl": ACL_DICT,
+    "legal_tag": LEGAL_DICT,
+    "data_object": CONF
+}
+
+CONF_PAYLOAD = CONF["Payload"]
diff --git a/tests/plugin-unit-tests/test_process_manifest_op.py b/tests/plugin-unit-tests/test_process_manifest_r2_op.py
similarity index 66%
rename from tests/plugin-unit-tests/test_process_manifest_op.py
rename to tests/plugin-unit-tests/test_process_manifest_r2_op.py
index 3c516f4a801ed3c3851cfff00676d854b8ba1a89..e0932e1e6d2d47b38fd8ead49bd31bbcb2fd6fd3 100644
--- a/tests/plugin-unit-tests/test_process_manifest_op.py
+++ b/tests/plugin-unit-tests/test_process_manifest_r2_op.py
@@ -22,8 +22,8 @@ import pytest
 
 sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins")
 
-from data import process_manifest_op as test_data
-from operators import process_manifest_op
+from data import process_manifest_r2_op as test_data
+from operators import process_manifest_r2_op
 
 
 @pytest.mark.parametrize(
@@ -34,8 +34,8 @@ from operators import process_manifest_op
     ]
 )
 def test_determine_data_type(test_input, expected):
-    data_type = process_manifest_op.determine_data_type(test_input)
-    assert data_type == expected
+   data_type = process_manifest_r2_op.determine_data_type(test_input)
+   assert data_type == expected
 
 
 @pytest.mark.parametrize(
@@ -48,13 +48,13 @@ def test_determine_data_type(test_input, expected):
     ]
 )
 def test_process_file_items(data_type, loaded_conf, conf_payload, expected_file_result):
-    file_id_regex = re.compile(r"srn\:file/" + data_type + r"\:\d+\:")
-    expected_file_list = expected_file_result[0]
-    file_list, file_ids = process_manifest_op.process_file_items(loaded_conf, conf_payload)
-    for i in file_ids:
-        assert file_id_regex.match(i)
-
-    for i in file_list:
-        assert file_id_regex.match(i[0]["data"]["ResourceID"])
-        i[0]["data"]["ResourceID"] = ""
-    assert file_list == expected_file_list
+   file_id_regex = re.compile(r"srn\:file/" + data_type + r"\:\d+\:")
+   expected_file_list = expected_file_result[0]
+   file_list, file_ids = process_manifest_r2_op.process_file_items(loaded_conf, conf_payload)
+   for i in file_ids:
+      assert file_id_regex.match(i)
+
+   for i in file_list:
+      assert file_id_regex.match(i[0]["data"]["ResourceID"])
+      i[0]["data"]["ResourceID"] = ""
+   assert file_list == expected_file_list
diff --git a/tests/plugin-unit-tests/test_process_manifest_r3.py b/tests/plugin-unit-tests/test_process_manifest_r3.py
new file mode 100644
index 0000000000000000000000000000000000000000..e7a8b8123ee8cb55d37f7bb87b540bfbe99b88f8
--- /dev/null
+++ b/tests/plugin-unit-tests/test_process_manifest_r3.py
@@ -0,0 +1,64 @@
+#  Copyright 2020 Google LLC
+#  Copyright 2020 EPAM Systems
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+import copy
+import os
+import sys
+
+import libs.exceptions
+import pytest
+
+sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins")
+
+from data import process_manifest_r3 as test_data
+from operators import process_manifest_r3
+
+
+@pytest.mark.parametrize(
+    "input",
+    [
+        pytest.param(test_data.CONF, id="Valid manifest"),
+        # pytest.param(test_data.EMPTY_MANIFEST, marks=pytest.mark.xfail, id="Empty manifest")
+    ]
+)
+def test_create_manifest_records(input):
+    conf = copy.deepcopy(input)
+    context = process_manifest_r3.Context.populate(conf)
+    manifest_processor = process_manifest_r3.ManifestProcessor(
+        storage_url="https://storage-jvmvia5dea-uc.a.run.app/api/storage/v2/records",
+        dagrun_conf=test_data.CONF,
+        context=context
+    )
+    manifest_processor.create_manifest_records()
+
+
+@pytest.mark.parametrize(
+    "schema,manifest",
+    [
+        pytest.param(test_data.TEST_SCHEMA, test_data.CONF, id="Valid manifest"),
+    ]
+)
+def test_schema_validator(schema, manifest):
+    conf = copy.deepcopy(manifest)
+    context = process_manifest_r3.Context.populate(manifest)
+    SCHEMA_SERVICE = "https://os-schema-jvmvia5dea-uc.a.run.app/api/schema-service/v1/schema"
+    context.data_partition_id = "opendes"
+    validator = process_manifest_r3.SchemaValidator(
+        SCHEMA_SERVICE,
+        conf,
+        context
+    )
+    validator._validate_schema(schema, manifest)
diff --git a/tests/set_airflow_env.sh b/tests/set_airflow_env.sh
index 21be656adb59f203d1351dc5b68c60b0d3047755..399e1a1c271c44dc5b41521b7c657820a753a21d 100755
--- a/tests/set_airflow_env.sh
+++ b/tests/set_airflow_env.sh
@@ -14,6 +14,8 @@
 #  limitations under the License.
 
 pip install --upgrade google-api-python-client
+pip install dataclasses
+pip install jsonschema
 export ACL='{"viewers": ["foo"],"owners": ["foo"]}'
 export LEGAL='{"legaltags": ["foo"], "otherRelevantDataCountries": ["FR", "US", "CA"],"status": "compliant"}'
 export WORKFLOW_URL="http://127.0.0.1:5000/wf"
@@ -23,15 +25,8 @@ export SEARCH_CONN_ID="http://127.0.0.1:5000"
 export WORKFLOW_CONN_ID="http://127.0.0.1:5000"
 export DATALOAD_CONFIG_PATH="/usr/local/airflow/dags/configs/dataload.ini"
 
-
-mkdir -p /usr/local/airflow/dags/
-cp -rf src/* /usr/local/airflow/
-cp -r tests/end-to-end-tests/mock-external-apis /mock-server
-cp -r tests/end-to-end-tests/mock-data /mock-server/mock-data
-cp  tests/end-to-end-tests/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh} /mock-server/
-cp tests/*.py /mock-server/
-
 airflow initdb > /dev/null 2>&1
+
 # exclude testing DAGS
 sed -i 's/load_examples = True/load_examples = False/'  /usr/local/airflow/airflow.cfg
 # turn on all dags
@@ -54,8 +49,15 @@ airflow variables -s search_query_ep sr/qr
 airflow variables -s access_token test
 airflow variables -s "sa-file-osdu" "test"
 
-airflow connections -a  --conn_id search --conn_uri $SEARCH_CONN_ID
-airflow connections -a  --conn_id workflow --conn_uri $WORKFLOW_CONN_ID
-airflow connections -a  --conn_id google_cloud_storage --conn_uri $WORKFLOW_CONN_ID
+airflow connections -a --conn_id search --conn_uri $SEARCH_CONN_ID
+airflow connections -a --conn_id workflow --conn_uri $WORKFLOW_CONN_ID
+airflow connections -a --conn_id google_cloud_storage --conn_uri $WORKFLOW_CONN_ID
+
+mkdir -p /usr/local/airflow/dags/
+cp -rf src/* /usr/local/airflow/
+cp -r tests/end-to-end-tests/mock-external-apis /mock-server
+cp -r tests/end-to-end-tests/mock-data /mock-server/mock-data
+cp tests/end-to-end-tests/{test-osdu-ingest-r2-success.sh,test-osdu-ingest-r2-fail.sh,test-default-ingest-{success,fail}.sh} /mock-server/
+cp tests/*.py /mock-server/
 
-chmod +x /mock-server/{test-osdu-ingest-success.sh,test-osdu-ingest-fail.sh,test-default-ingest-{success,fail}.sh}
+chmod +x /mock-server/{test-osdu-ingest-r2-success.sh,test-osdu-ingest-r2-fail.sh,test-default-ingest-{success,fail}.sh}
diff --git a/tests/test_dags.py b/tests/test_dags.py
index ffd6a056f05130d54d1b9327ebc19b341e622d8c..43a99ac78e51caab1cc6e8a94f78630ca44c4ef0 100644
--- a/tests/test_dags.py
+++ b/tests/test_dags.py
@@ -19,26 +19,27 @@ import time
 
 
 class DagStatus(enum.Enum):
-    RUNNING = "running"
-    FAILED = "failed"
-    FINISHED = "finished"
+    RUNNING = enum.auto()
+    FAILED = enum.auto()
+    FINISHED = enum.auto()
 
-
-OSDU_INGEST_SUCCESS_SH = "/mock-server/./test-osdu-ingest-success.sh"
-OSDU_INGEST_FAIL_SH = "/mock-server/./test-osdu-ingest-fail.sh"
+OSDU_INGEST_SUCCESS_SH = "/mock-server/./test-osdu-ingest-r2-success.sh"
+OSDU_INGEST_FAIL_SH = "/mock-server/./test-osdu-ingest-r2-fail.sh"
 DEFAULT_INGEST_SUCCESS_SH = "/mock-server/./test-default-ingest-success.sh"
 DEFAULT_INGEST_FAIL_SH = "/mock-server/./test-default-ingest-fail.sh"
 
-subprocess.run(f"/bin/bash -c 'airflow scheduler > /dev/null 2>&1 &'", shell=True)
+with open("/tmp/osdu_ingest_result", "w") as f:
+    f.close()
 
+subprocess.run(f"/bin/bash -c 'airflow scheduler > /dev/null 2>&1 &'", shell=True)
 
-def check_dag_status(dag_name: str) -> DagStatus:
+def check_dag_status(dag_name):
     time.sleep(5)
     output = subprocess.getoutput(f'airflow list_dag_runs {dag_name}')
     if "failed" in output:
         print(dag_name)
         print(output)
-        return DagStatus.FAILED
+        return  DagStatus.FAILED
     if "running" in output:
         return DagStatus.RUNNING
     print(dag_name)
@@ -46,17 +47,32 @@ def check_dag_status(dag_name: str) -> DagStatus:
     return DagStatus.FINISHED
 
 
-def test_dag_execution_result(dag_name: str, script: str, expected_status: DagStatus):
+def test_dag_success(dag_name, script):
+    print(f"Test {dag_name} success")
+    subprocess.run(f"{script}", shell=True)
+    while True:
+        dag_status = check_dag_status(dag_name)
+        if dag_status is DagStatus.RUNNING:
+            continue
+        elif dag_status is DagStatus.FINISHED:
+            return
+        else:
+            raise Exception(f"Error {dag_name} supposed to be finished")
+
+def test_dag_fail(dag_name, script):
     subprocess.run(f"{script}", shell=True)
-    print(f"Expecting {dag_name} to be {expected_status.value}")
+    print(f"Expecting {dag_name} fail")
     while True:
         dag_status = check_dag_status(dag_name)
-        if dag_status is not DagStatus.RUNNING:
-            break
-    assert dag_status is expected_status, f"Error {dag_name} supposed to be {expected_status.value}"
+        if dag_status is DagStatus.RUNNING:
+            continue
+        elif dag_status is DagStatus.FAILED:
+            return
+        else:
+            raise Exception(f"Error {dag_name} supposed to be failed")
 
 
-test_dag_execution_result("Osdu_ingest", OSDU_INGEST_SUCCESS_SH, DagStatus.FINISHED)
-test_dag_execution_result("Osdu_ingest", OSDU_INGEST_FAIL_SH, DagStatus.FAILED)
-test_dag_execution_result("Default_ingest", DEFAULT_INGEST_SUCCESS_SH, DagStatus.FINISHED)
-test_dag_execution_result("Default_ingest", DEFAULT_INGEST_FAIL_SH, DagStatus.FAILED)
+test_dag_success("Osdu_ingest_r2", OSDU_INGEST_SUCCESS_SH)
+test_dag_fail("Osdu_ingest_r2", OSDU_INGEST_FAIL_SH)
+test_dag_success("Default_ingest", DEFAULT_INGEST_SUCCESS_SH)
+test_dag_fail("Default_ingest", DEFAULT_INGEST_FAIL_SH)