From 630e79f7a230aa6244cd5b9160a314bc2e076b3d Mon Sep 17 00:00:00 2001
From: yan <yan_sushchynski@epam.com>
Date: Mon, 7 Dec 2020 17:45:50 +0300
Subject: [PATCH] GONRG-1317: Add file uploader

---
 src/dags/libs/exceptions.py                   |  11 ++
 src/dags/libs/process_manifest_r3.py          |  38 +++--
 src/dags/libs/refresh_token.py                |  37 ++++-
 src/dags/libs/search_record_ids.py            |  16 +-
 ...e_source_check.py => source_file_check.py} |   8 +-
 src/dags/libs/update_status.py                |   8 +-
 src/dags/libs/upload_file.py                  | 157 ++++++++++++++++++
 src/dags/libs/validate_schema.py              |  14 +-
 src/plugins/operators/process_manifest_r3.py  |  14 +-
 src/plugins/operators/search_record_id.py     |   4 +-
 src/plugins/operators/update_status.py        |   4 +-
 .../data/workProduct/SeismicTraceData.json    |   2 +-
 .../workProduct/record_SeismicTraceData.json  |   4 +-
 ...e_source_check.py => test_file_checker.py} |  16 +-
 tests/plugin-unit-tests/test_file_uplaod.py   |  65 ++++++++
 .../test_manifest_processor_r3.py             |  16 +-
 tests/plugin-unit-tests/test_operators_r3.py  |   8 +-
 .../test_schema_validator_r3.py               |   2 +
 .../test_search_record_id.py                  |  13 +-
 .../test_update_status_r3.py                  |   2 +
 tests/set_airflow_env.sh                      |   1 +
 21 files changed, 368 insertions(+), 72 deletions(-)
 rename src/dags/libs/{file_source_check.py => source_file_check.py} (91%)
 create mode 100644 src/dags/libs/upload_file.py
 rename tests/plugin-unit-tests/{test_file_source_check.py => test_file_checker.py} (84%)
 create mode 100644 tests/plugin-unit-tests/test_file_uplaod.py

diff --git a/src/dags/libs/exceptions.py b/src/dags/libs/exceptions.py
index f0755e3..839562f 100644
--- a/src/dags/libs/exceptions.py
+++ b/src/dags/libs/exceptions.py
@@ -70,3 +70,14 @@ class GCSObjectURIError(Exception):
     Raise when wrong Google Storage Object was given.
     """
     pass
+
+class UploadFileError(Exception):
+    """
+    Raise when there is an error while uploading a file into OSDU.
+    """
+
+class TokenRefresherNotPresentError(Exception):
+    """
+    Raise when token refresher is not present in "refresh_token' decorator.
+    """
+    pass
diff --git a/src/dags/libs/process_manifest_r3.py b/src/dags/libs/process_manifest_r3.py
index 1f858e8..bc24dca 100644
--- a/src/dags/libs/process_manifest_r3.py
+++ b/src/dags/libs/process_manifest_r3.py
@@ -23,9 +23,10 @@ import requests
 import tenacity
 from libs.context import Context
 from libs.exceptions import EmptyManifestError, NotOSDUShemaFormatError
-from libs.file_source_check import FileSourceChecker
+from libs.source_file_check import SourceFileChecker
+from libs.upload_file import FileUploader
 from libs.mixins import HeadersMixin
-from libs.refresh_token import AirflowTokenRefresher, refresh_token
+from libs.refresh_token import AirflowTokenRefresher, refresh_token, TokenRefresher
 
 logger = logging.getLogger()
 
@@ -48,15 +49,16 @@ class ManifestProcessor(HeadersMixin):
         self,
         storage_url: str,
         dagrun_conf: dict,
-        file_source_checker: FileSourceChecker,
+        file_uploader: FileUploader,
+        token_refresher: TokenRefresher,
         context: Context
     ):
         super().__init__(context)
-        self._validate_file_source_checker_type(file_source_checker)
-        self.file_source_checker = file_source_checker
+        self.file_uploader = file_uploader
         self.storage_url = storage_url
         self.data_object = copy.deepcopy(dagrun_conf)
         self.context = context
+        self.token_refresher = token_refresher
 
     @staticmethod
     def _get_kind_name(kind: str) -> str:
@@ -66,9 +68,10 @@ class ManifestProcessor(HeadersMixin):
         kind_name = kind.split(":")[2]
         return kind_name
 
-    def check_file_resource(self, file_record: dict):
-        file_path = file_record["Data"]["PreLoadFilePath"]
-        self.file_source_checker.does_file_exist(file_path)
+    def upload_source_file(self, file_record: dict) -> dict:
+        file_path = file_record["data"]["PreLoadFilePath"]
+        file_record["data"]["FileSource"] = self.file_uploader.upload_file(file_path)
+        return file_record
 
     def generate_id(self, manifest_fragment: dict) -> str:
         """
@@ -93,9 +96,9 @@ class ManifestProcessor(HeadersMixin):
         return record
 
     @staticmethod
-    def _validate_file_source_checker_type(file_source_checker: FileSourceChecker):
-        if not isinstance(file_source_checker, FileSourceChecker):
-            raise TypeError(f"File checker must be instance of '{FileSourceChecker}' class.\n"
+    def _validate_file_source_checker_type(file_source_checker: SourceFileChecker):
+        if not isinstance(file_source_checker, SourceFileChecker):
+            raise TypeError(f"File checker must be instance of '{SourceFileChecker}' class.\n"
                             f"Got got instance of '{file_source_checker}'")
 
     @staticmethod
@@ -117,14 +120,15 @@ class ManifestProcessor(HeadersMixin):
         Send request to record storage API.
         """
         request_data = json.dumps(request_data)
-        logger.info("Send to Storage service")
-        logger.info(f"{request_data}")
+        logger.info("Sending records to Storage service")
+        logger.debug(f"{request_data}")
         response = requests.put(self.storage_url, request_data, headers=headers)
         if response.ok:
             response_dict = response.json()
             self._validate_storage_response(response_dict)
-            logger.info(f"Response: {response_dict}")
-            logger.info(",".join(map(str, response_dict["recordIds"])))
+            record_ids = ", ".join(map(str, response_dict["recordIds"]))
+            logger.debug(f"Response: {response_dict}")
+            logger.info(f"Records '{record_ids}' were saved using Storage service.")
         else:
             reason = response.text[:250]
             logger.error(f"Request error.")
@@ -156,8 +160,8 @@ class ManifestProcessor(HeadersMixin):
         """
         records = []
         for file in manifest["Files"]:
-            self.check_file_resource(file)
-            record = self.populate_manifest_storage_record(file)
+            record = self.upload_source_file(file)
+            record = self.populate_manifest_storage_record(record)
             records.append(record)
         return records
 
diff --git a/src/dags/libs/refresh_token.py b/src/dags/libs/refresh_token.py
index 6228694..dc0a2eb 100644
--- a/src/dags/libs/refresh_token.py
+++ b/src/dags/libs/refresh_token.py
@@ -26,7 +26,7 @@ import requests
 from google.auth.transport.requests import Request
 from google.cloud import storage
 from google.oauth2 import service_account
-from libs.exceptions import RefreshSATokenError, SAFilePathError
+from libs.exceptions import RefreshSATokenError, SAFilePathError, TokenRefresherNotPresentError
 from tenacity import retry, stop_after_attempt
 
 logger = logging.getLogger()
@@ -181,7 +181,7 @@ def make_callable_request(obj: Union[object, None], request_function: Callable,
 def _validate_headers_type(headers: dict):
     if not isinstance(headers, dict):
         logger.error(f"Got headers {headers}")
-        raise TypeError
+        raise TypeError("Request's headers type expected to be 'dict'")
 
 
 def _validate_response_type(response: requests.Response, request_function: Callable):
@@ -199,20 +199,38 @@ def _validate_token_refresher_type(token_refresher: TokenRefresher):
         )
 
 
+def _get_object_token_refresher(
+    token_refresher: TokenRefresher,
+    obj: object = None
+) -> TokenRefresher:
+    """
+    Check if token refresher passed into decorator or specified in object's as 'token_refresher'
+    property.
+    """
+    if token_refresher is None and obj:
+        try:
+            obj.__getattribute__("token_refresher")
+        except AttributeError:
+            raise TokenRefresherNotPresentError("Token refresher must be passed into decorator or "
+                                                "set as object's 'refresh_token' attribute.")
+        else:
+            token_refresher = obj.token_refresher
+    return token_refresher
+
+
 def send_request_with_auth_header(token_refresher: TokenRefresher, *args,
                                   **kwargs) -> requests.Response:
     """
     Send request with authorization token. If response status is in HTTPStatus.UNAUTHORIZED or
     HTTPStatus.FORBIDDEN, then refresh token and send request once again.
     """
-    obj = kwargs.pop("obj") if kwargs.get("obj") else None
+    obj = kwargs.pop("obj", None)
     request_function = kwargs.pop("request_function")
     headers = kwargs.pop("headers")
     _validate_headers_type(headers)
     headers.update(token_refresher.authorization_header)
 
-    send_request_with_auth = make_callable_request(obj, request_function, headers,
-                                                   *args, **kwargs)
+    send_request_with_auth = make_callable_request(obj, request_function, headers, *args, **kwargs)
     response = send_request_with_auth()
     _validate_response_type(response, request_function)
 
@@ -233,7 +251,7 @@ def send_request_with_auth_header(token_refresher: TokenRefresher, *args,
     return response
 
 
-def refresh_token(token_refresher: TokenRefresher) -> Callable:
+def refresh_token(token_refresher: TokenRefresher = None) -> Callable:
     """
     Wrap a request function and check response. If response's error status code
     is about Authorization, refresh token and invoke this function once again.
@@ -244,13 +262,13 @@ def refresh_token(token_refresher: TokenRefresher) -> Callable:
     request_method(self, header: dict, *args, **kwargs) -> requests.Response
     """
 
-    _validate_token_refresher_type(token_refresher)
-
     def refresh_token_wrapper(request_function: Callable) -> Callable:
         is_method = len(request_function.__qualname__.split(".")) > 1
         if is_method:
             def _wrapper(obj: object, headers: dict, *args, **kwargs) -> requests.Response:
-                return send_request_with_auth_header(token_refresher,
+                _token_refresher = _get_object_token_refresher(token_refresher, obj)
+                _validate_token_refresher_type(_token_refresher)
+                return send_request_with_auth_header(_token_refresher,
                                                      request_function=request_function,
                                                      obj=obj,
                                                      headers=headers,
@@ -258,6 +276,7 @@ def refresh_token(token_refresher: TokenRefresher) -> Callable:
                                                      **kwargs)
         else:
             def _wrapper(headers: dict, *args, **kwargs) -> requests.Response:
+                _validate_token_refresher_type(token_refresher)
                 return send_request_with_auth_header(token_refresher,
                                                      request_function=request_function,
                                                      headers=headers,
diff --git a/src/dags/libs/search_record_ids.py b/src/dags/libs/search_record_ids.py
index 7385930..e33a5ca 100644
--- a/src/dags/libs/search_record_ids.py
+++ b/src/dags/libs/search_record_ids.py
@@ -33,7 +33,7 @@ TIMEOUT = 10
 
 class SearchId(HeadersMixin):
 
-    def __init__(self, search_url: str, record_ids: list, context: Context):
+    def __init__(self, search_url: str, record_ids: list, token_refresher, context: Context):
         super().__init__(context)
         if not record_ids:
             logger.error("There are no record ids")
@@ -41,6 +41,7 @@ class SearchId(HeadersMixin):
         self.record_ids = record_ids
         self.search_url = search_url
         self.expected_total_count = len(record_ids)
+        self.token_refresher = token_refresher
         self._create_request_body()
 
     def _create_search_query(self) -> str:
@@ -48,7 +49,7 @@ class SearchId(HeadersMixin):
         Create search query to send to Search service using recordIds need to be found.
         """
         record_ids = " OR ".join(f"\"{id_}\"" for id_ in self.record_ids)
-        logger.info(f"Search query {record_ids}")
+        logger.debug(f"Search query {record_ids}")
         query = f"id:({record_ids})"
         return query
 
@@ -67,10 +68,10 @@ class SearchId(HeadersMixin):
         """
         Check if search service returns expected totalCount of records.
         """
-        logger.info(response.text)
+        logger.debug(response.text)
         data = response.json()
         total_count = data.get('totalCount')
-        logger.info(f"Got total count {total_count}")
+        logger.debug(f"Got total count {total_count}")
         if total_count is None:
             raise ValueError(f"Got no totalCount field in Search service response. "
                              f"Response is {data}.")
@@ -81,7 +82,7 @@ class SearchId(HeadersMixin):
         stop=tenacity.stop_after_attempt(RETRIES),
         reraise=True
     )
-    @refresh_token(AirflowTokenRefresher())
+    @refresh_token()
     def search_files(self, headers: dict) -> requests.Response:
         """
         Send request with recordIds to Search service.
@@ -92,7 +93,10 @@ class SearchId(HeadersMixin):
                 logger.error("Expected amount (%s) of records not found." %
                              self.expected_total_count,
                              )
-                raise RecordsNotSearchableError
+                raise RecordsNotSearchableError(
+                    f"Can't find records {self.request_body}. "
+                    f"Got response {response.json()} from Search service."
+                )
             return response
 
     def check_records_searchable(self):
diff --git a/src/dags/libs/file_source_check.py b/src/dags/libs/source_file_check.py
similarity index 91%
rename from src/dags/libs/file_source_check.py
rename to src/dags/libs/source_file_check.py
index 43ae0c3..6f5468f 100644
--- a/src/dags/libs/file_source_check.py
+++ b/src/dags/libs/source_file_check.py
@@ -26,7 +26,7 @@ from libs.exceptions import GCSObjectURIError, FileSourceError
 RETRIES = 3
 
 
-class FileSourceChecker(ABC):
+class SourceFileChecker(ABC):
 
     @abstractmethod
     def does_file_exist(self, file_path: str) -> bool:
@@ -36,7 +36,7 @@ class FileSourceChecker(ABC):
         pass
 
 
-class GoogleStorageFileSourceChecker(FileSourceChecker):
+class GCSSourceFileChecker(SourceFileChecker):
 
     def __init__(self):
         pass
@@ -57,13 +57,13 @@ class GoogleStorageFileSourceChecker(FileSourceChecker):
             if bucket_name and source_blob_name:
                 return bucket_name, source_blob_name
 
-        raise GCSObjectURIError
+        raise GCSObjectURIError(f"Wrong format path to GCS object. Object path is '{file_path}'")
 
     @tenacity.retry(
         stop=tenacity.stop_after_attempt(RETRIES),
         reraise=True
     )
-    def _does_file_exist_in_bucket(self, bucket_name: str, source_blob_name: str):
+    def _does_file_exist_in_bucket(self, bucket_name: str, source_blob_name: str) -> bool:
         storage_client = storage.Client()
         bucket = storage_client.bucket(bucket_name)
         blob = bucket.blob(source_blob_name)
diff --git a/src/dags/libs/update_status.py b/src/dags/libs/update_status.py
index e00ea18..5b75674 100644
--- a/src/dags/libs/update_status.py
+++ b/src/dags/libs/update_status.py
@@ -20,7 +20,7 @@ import logging
 import requests
 from libs.context import Context
 from libs.mixins import HeadersMixin
-from libs.refresh_token import AirflowTokenRefresher, refresh_token
+from libs.refresh_token import TokenRefresher, refresh_token
 
 logger = logging.getLogger()
 
@@ -32,6 +32,7 @@ class UpdateStatus(HeadersMixin):
         workflow_id: str,
         workflow_url: str,
         status: str,
+        token_refresher: TokenRefresher,
         context: Context,
     ) -> None:
         super().__init__(context)
@@ -39,15 +40,16 @@ class UpdateStatus(HeadersMixin):
         self.workflow_id = workflow_id
         self.context = context
         self.status = status
+        self.token_refresher = token_refresher
 
-    @refresh_token(AirflowTokenRefresher())
+    @refresh_token()
     def update_status_request(self, headers: dict) -> requests.Response:
         request_body = {
             "WorkflowID": self.workflow_id,
             "Status": self.status
         }
         request_body = json.dumps(request_body)
-        logger.info(f" Sending request '{request_body}'")
+        logger.debug(f" Sending request '{request_body}'")
         response = requests.post(self.workflow_url, request_body, headers=headers)
         return response
 
diff --git a/src/dags/libs/upload_file.py b/src/dags/libs/upload_file.py
new file mode 100644
index 0000000..73e260d
--- /dev/null
+++ b/src/dags/libs/upload_file.py
@@ -0,0 +1,157 @@
+#  Copyright 2020 Google LLC
+#  Copyright 2020 EPAM Systems
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+import io
+import json
+import logging
+from abc import ABC, abstractmethod
+from typing import Tuple, TypeVar
+from urllib.parse import urlparse
+
+import requests
+import tenacity
+from google.cloud import storage
+from libs.context import Context
+from libs.exceptions import GCSObjectURIError, FileSourceError
+from libs.mixins import HeadersMixin
+from libs.refresh_token import TokenRefresher, refresh_token
+
+logger = logging.getLogger()
+
+FileLikeObject = TypeVar("FileLikeObject", io.IOBase, io.RawIOBase, io.BytesIO)
+
+RETRY_SETTINGS = {
+    "stop": tenacity.stop_after_attempt(3),
+    "wait": tenacity.wait_fixed(2),
+}
+
+
+class FileUploader(HeadersMixin, ABC):
+    """
+    File uploader to copy file from PreLoadPath into FileSource on OSDU platform.
+    """
+
+    def __init__(self, file_service: str, token_refresher: TokenRefresher, context: Context):
+        super().__init__(context)
+        self.file_service = file_service
+        self.token_refresher = token_refresher
+
+    @abstractmethod
+    def get_file_from_preload_path(self, preload_path: str) -> FileLikeObject:
+        """
+        Return file-like object containing raw content of a file
+        in preload path.
+        """
+
+    @tenacity.retry(**RETRY_SETTINGS)
+    @refresh_token()
+    def _send_post_request(self, headers: dict, url: str, request_body: str) -> requests.Response:
+        response = requests.post(url, request_body, headers=headers)
+        return response
+
+    @tenacity.retry(**RETRY_SETTINGS)
+    def _get_signed_url_request(self, headers: dict) -> Tuple[str, str]:
+        """
+        Get fileID and SignedURL using File Service.
+        """
+        logger.debug("Getting signed url.")
+        request_body = json.dumps({})  # '/getLocation' method requires empty json.
+        response = self._send_post_request(headers, f"{self.file_service}/getLocation",
+                                           request_body).json()
+        logger.debug("Signed url got.")
+        logger.debug(response)
+        return response["FileID"], response["Location"]["SignedURL"]
+
+    @tenacity.retry(**RETRY_SETTINGS)
+    def _upload_file_request(self, headers: dict, signed_url: str, buffer: FileLikeObject):
+        """
+        Upload file via File service using signed_url.
+        """
+        logger.debug("Uploading file.")
+        buffer.seek(0)
+        requests.put(signed_url, buffer.read(), headers=headers)
+        logger.debug("File uploaded.")
+
+    @tenacity.retry(**RETRY_SETTINGS)
+    def _get_file_location_request(self, headers: dict, file_id: str) -> str:
+        """
+        Get file location using File Service.
+        """
+        logger.debug("Getting file location.")
+        request_body = json.dumps({"FileID": file_id})
+        response = self._send_post_request(headers, f"{self.file_service}/getFileLocation",
+                                           request_body).json()
+        logger.debug("File location got.")
+        return response["Location"]
+
+    def upload_file(self, preload_file_path: str) -> str:
+        """
+        Copy file from Landing zone(preload_file_path) onto OSDU platform using File service.
+        Return file_location.
+        """
+        buffer = self.get_file_from_preload_path(preload_file_path)
+        file_id, signed_url = self._get_signed_url_request(self.request_headers)
+        self._upload_file_request(self.request_headers, signed_url, buffer)
+        file_location = self._get_file_location_request(self.request_headers, file_id)
+        return file_location
+
+
+class GCSFileUploader(FileUploader):
+
+    def __init__(
+        self,
+        file_service: str,
+        token_refresher: TokenRefresher,
+        context: Context,
+    ):
+        super().__init__(file_service, token_refresher, context)
+
+    @staticmethod
+    def _parse_object_uri(file_path: str) -> Tuple[str, str]:
+        """
+        Parse GCS Object uri.
+        Return bucket and blob names.
+        """
+        parsed_path = urlparse(file_path)
+        if parsed_path.scheme == "gs":
+            bucket_name = parsed_path.netloc
+            source_blob_name = parsed_path.path[1:]  # delete the first slash
+
+            if bucket_name and source_blob_name:
+                return bucket_name, source_blob_name
+
+        raise GCSObjectURIError
+
+    @tenacity.retry(**RETRY_SETTINGS)
+    def get_file_from_bucket(self, bucket_name: str, source_blob_name: str) -> io.BytesIO:
+        storage_client = storage.Client()
+        bucket = storage_client.bucket(bucket_name)
+        blob = bucket.blob(source_blob_name)
+
+        does_exist = blob.exists()
+        if not does_exist:
+            raise FileSourceError("File doesn't exist in preloadPath "
+                                  f"'gs://{bucket_name}/{source_blob_name}'")
+
+        file = io.BytesIO()
+        blob.download_to_file(file)
+        logger.debug("File got from landing zone")
+        return file
+
+    def get_file_from_preload_path(self, preload_file_path: str) -> io.BytesIO:
+        bucket_name, blob_name = self._parse_object_uri(preload_file_path)
+        buffer = self.get_file_from_bucket(bucket_name, blob_name)
+        return buffer
diff --git a/src/dags/libs/validate_schema.py b/src/dags/libs/validate_schema.py
index 99edb68..9506e12 100644
--- a/src/dags/libs/validate_schema.py
+++ b/src/dags/libs/validate_schema.py
@@ -22,7 +22,7 @@ import tenacity
 from libs.context import Context
 from libs.exceptions import EmptyManifestError, NotOSDUShemaFormatError
 from libs.mixins import HeadersMixin
-from libs.refresh_token import AirflowTokenRefresher, refresh_token
+from libs.refresh_token import TokenRefresher, refresh_token
 
 logger = logging.getLogger()
 
@@ -55,11 +55,17 @@ class OSDURefResolver(jsonschema.RefResolver):
 class SchemaValidator(HeadersMixin):
     """Class to validate schema of Manifests."""
 
-    def __init__(self, schema_service: str, dagrun_conf: dict, context: Context):
+    def __init__(
+        self, schema_service: str,
+        dagrun_conf: dict,
+        token_refresher: TokenRefresher,
+        context: Context
+    ):
         super().__init__(context)
         self.schema_service = schema_service
         self.data_object = copy.deepcopy(dagrun_conf)
         self.context = context
+        self.token_refresher = token_refresher
         self.resolver_handlers = {
             "osdu": self.get_schema_request,
             "https": self.get_schema_request,
@@ -71,7 +77,7 @@ class SchemaValidator(HeadersMixin):
         stop=tenacity.stop_after_attempt(RETRIES),
         reraise=True
     )
-    @refresh_token(AirflowTokenRefresher())
+    @refresh_token()
     def _get_schema_from_schema_service(self, headers: dict, uri: str) -> requests.Response:
         """
         Request to Schema service to retrieve schema.
@@ -105,7 +111,7 @@ class SchemaValidator(HeadersMixin):
         """
         if not schema:
             schema = self.get_schema(manifest["kind"])
-        logger.info(f"Validating kind {manifest['kind']}")
+        logger.debug(f"Validating kind {manifest['kind']}")
         resolver = OSDURefResolver(schema_service=self.schema_service,
                                    base_uri=schema.get("$id", ""), referrer=schema,
                                    handlers=self.resolver_handlers, cache_remote=True)
diff --git a/src/plugins/operators/process_manifest_r3.py b/src/plugins/operators/process_manifest_r3.py
index 82d2e8a..f2ceb67 100644
--- a/src/plugins/operators/process_manifest_r3.py
+++ b/src/plugins/operators/process_manifest_r3.py
@@ -18,11 +18,11 @@ import logging
 from airflow.utils import apply_defaults
 from airflow.models import BaseOperator, Variable
 from libs.context import Context
-from libs.file_source_check import GoogleStorageFileSourceChecker
+from libs.upload_file import GCSFileUploader
+from libs.refresh_token import AirflowTokenRefresher
 from libs.process_manifest_r3 import ManifestProcessor
 from libs.validate_schema import SchemaValidator
 
-
 logger = logging.getLogger()
 
 RETRIES = 3
@@ -38,20 +38,28 @@ class ProcessManifestOperatorR3(BaseOperator):
         super().__init__(*args, **kwargs)
         self.schema_service_url = Variable.get('schema_service_url')
         self.storage_url = Variable.get('storage_url')
+        self.file_service_url = Variable.get('file_service_url')
 
     def execute(self, context: dict):
         payload_context = Context.populate(context["dag_run"].conf)
+        token_refresher = AirflowTokenRefresher()
+        file_uploader = GCSFileUploader(self.file_service_url, token_refresher,
+                                                         payload_context)
+
         validator = SchemaValidator(
             self.schema_service_url,
             context["dag_run"].conf,
+            token_refresher,
             payload_context
         )
         manifest_processor = ManifestProcessor(
             self.storage_url,
             context["dag_run"].conf,
-            GoogleStorageFileSourceChecker(),
+            file_uploader,
+            token_refresher,
             payload_context,
         )
+
         validator.validate_manifest()
         record_ids = manifest_processor.process_manifest()
         context["ti"].xcom_push(key="record_ids", value=record_ids)
diff --git a/src/plugins/operators/search_record_id.py b/src/plugins/operators/search_record_id.py
index eb5b366..d122dc6 100644
--- a/src/plugins/operators/search_record_id.py
+++ b/src/plugins/operators/search_record_id.py
@@ -17,6 +17,7 @@ import logging
 
 from airflow.models import BaseOperator, Variable
 from libs.context import Context
+from libs.refresh_token import AirflowTokenRefresher
 from libs.search_record_ids import SearchId
 
 logger = logging.getLogger()
@@ -41,5 +42,6 @@ class SearchRecordIdOperator(BaseOperator):
         """
         payload_context = Context.populate(context["dag_run"].conf)
         record_ids = context["ti"].xcom_pull(key="record_ids", )
-        ids_searcher = SearchId(Variable.get("search_url"), record_ids, payload_context, )
+        ids_searcher = SearchId(Variable.get("search_url"), record_ids, AirflowTokenRefresher(),
+                                payload_context)
         ids_searcher.check_records_searchable()
diff --git a/src/plugins/operators/update_status.py b/src/plugins/operators/update_status.py
index 06eee9d..c2b70bc 100644
--- a/src/plugins/operators/update_status.py
+++ b/src/plugins/operators/update_status.py
@@ -20,6 +20,7 @@ import logging
 
 from airflow.models import BaseOperator, Variable
 from libs.context import Context
+from libs.refresh_token import AirflowTokenRefresher
 from libs.exceptions import PipelineFailedError
 from libs.update_status import UpdateStatus
 
@@ -62,7 +63,7 @@ class UpdateStatusOperator(BaseOperator):
            If they are then update status FINISHED else FAILED
         """
         conf = copy.deepcopy(context["dag_run"].conf)
-        logger.info(f"Got conf {conf}.")
+        logger.debug(f"Got conf {conf}.")
         if "Payload" in conf:
             payload_context = Context.populate(conf)
         else:
@@ -74,6 +75,7 @@ class UpdateStatusOperator(BaseOperator):
             workflow_url=Variable.get("update_status_url"),
             workflow_id=workflow_id,
             status=status,
+            token_refresher=AirflowTokenRefresher(),
             context=payload_context
         )
         status_updater.update_workflow_status()
diff --git a/tests/plugin-unit-tests/data/workProduct/SeismicTraceData.json b/tests/plugin-unit-tests/data/workProduct/SeismicTraceData.json
index 6946398..9c4035d 100644
--- a/tests/plugin-unit-tests/data/workProduct/SeismicTraceData.json
+++ b/tests/plugin-unit-tests/data/workProduct/SeismicTraceData.json
@@ -375,7 +375,7 @@
                     "resourceSecurityClassification": "srn:opendes:reference-data/ResourceSecurityClassification:RESTRICTED:",
                     "data": {
                         "SchemaFormatTypeID": "srn:opendes:reference-data/SchemaFormatType:SEG-Y Seismic Trace Data:",
-                        "PreLoadFilePath": "C:\\Seismic\\ST0202R08_PS_PSDM_RAW_PP_TIME.MIG_RAW.POST_STACK.3D.JS-017534.segy",
+                        "PreLoadFilePath": "test",
                         "FileSource": "",
                         "FileSize": 277427976,
                         "EncodingFormatTypeID": "srn:opendes:reference-data/EncodingFormatType:segy:",
diff --git a/tests/plugin-unit-tests/data/workProduct/record_SeismicTraceData.json b/tests/plugin-unit-tests/data/workProduct/record_SeismicTraceData.json
index 5e8b95a..0c39e27 100644
--- a/tests/plugin-unit-tests/data/workProduct/record_SeismicTraceData.json
+++ b/tests/plugin-unit-tests/data/workProduct/record_SeismicTraceData.json
@@ -24,8 +24,8 @@
             "resourceSecurityClassification": "srn:opendes:reference-data/ResourceSecurityClassification:RESTRICTED:",
             "data": {
                 "SchemaFormatTypeID": "srn:opendes:reference-data/SchemaFormatType:SEG-Y Seismic Trace Data:",
-                "PreLoadFilePath": "C:\\Seismic\\ST0202R08_PS_PSDM_RAW_PP_TIME.MIG_RAW.POST_STACK.3D.JS-017534.segy",
-                "FileSource": "",
+                "PreLoadFilePath": "test",
+                "FileSource": "test",
                 "FileSize": 277427976,
                 "EncodingFormatTypeID": "srn:opendes:reference-data/EncodingFormatType:segy:",
                 "Endian": "BIG",
diff --git a/tests/plugin-unit-tests/test_file_source_check.py b/tests/plugin-unit-tests/test_file_checker.py
similarity index 84%
rename from tests/plugin-unit-tests/test_file_source_check.py
rename to tests/plugin-unit-tests/test_file_checker.py
index 349a854..cf09ee9 100644
--- a/tests/plugin-unit-tests/test_file_source_check.py
+++ b/tests/plugin-unit-tests/test_file_checker.py
@@ -20,23 +20,23 @@ import sys
 sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins")
 sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/dags")
 
-from libs.exceptions import FileSourceError, GCSObjectURIError
 import pytest
-from libs.file_source_check import GoogleStorageFileSourceChecker
+from libs.exceptions import FileSourceError, GCSObjectURIError
+from libs.source_file_check import GCSSourceFileChecker
 
 
-class TestFileSourceChecker:
+class TestSourceFileChecker:
 
     @pytest.fixture()
     def file_checker(self):
-        return GoogleStorageFileSourceChecker()
+        return GCSSourceFileChecker()
 
     @pytest.fixture()
     def mock_file_exist_under_uri(self, monkeypatch, file_exists: bool):
         """
         Mock response from GCS if file exists or not.
         """
-        monkeypatch.setattr(GoogleStorageFileSourceChecker, "_does_file_exist_in_bucket",
+        monkeypatch.setattr(GCSSourceFileChecker, "_does_file_exist_in_bucket",
                             lambda *args, **kwargs: file_exists)
 
     @pytest.mark.parametrize(
@@ -50,7 +50,7 @@ class TestFileSourceChecker:
         monkeypatch,
         mock_file_exist_under_uri,
         file_exists: bool,
-        file_checker: GoogleStorageFileSourceChecker,
+        file_checker: GCSSourceFileChecker,
         file_path: str
     ):
         """
@@ -69,7 +69,7 @@ class TestFileSourceChecker:
         monkeypatch,
         mock_file_exist_under_uri,
         file_exists: bool,
-        file_checker: GoogleStorageFileSourceChecker,
+        file_checker: GCSSourceFileChecker,
         file_path: str
     ):
         """
@@ -86,6 +86,6 @@ class TestFileSourceChecker:
             pytest.param("test"),
         ]
     )
-    def test_invalid_gcs_object_uri(self, file_checker: GoogleStorageFileSourceChecker, file_path):
+    def test_invalid_gcs_object_uri(self, file_checker: GCSSourceFileChecker, file_path):
         with pytest.raises(GCSObjectURIError):
             file_checker._parse_object_uri(file_path)
diff --git a/tests/plugin-unit-tests/test_file_uplaod.py b/tests/plugin-unit-tests/test_file_uplaod.py
new file mode 100644
index 0000000..9042b30
--- /dev/null
+++ b/tests/plugin-unit-tests/test_file_uplaod.py
@@ -0,0 +1,65 @@
+#  Copyright 2020 Google LLC
+#  Copyright 2020 EPAM Systems
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import io
+import os
+import sys
+
+sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins")
+sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/dags")
+
+from libs.exceptions import GCSObjectURIError
+import pytest
+from libs.context import Context
+from libs.refresh_token import AirflowTokenRefresher
+from libs.upload_file import GCSFileUploader
+
+
+class TestSourceFileChecker:
+
+    @pytest.fixture()
+    def file_uploader(self, monkeypatch):
+        context = Context(data_partition_id="test", app_key="")
+        file_uploader = GCSFileUploader("http://test", AirflowTokenRefresher(),
+                                                         context)
+        monkeypatch.setattr(file_uploader, "_get_signed_url_request",
+                            lambda *args, **kwargs: ("test", "test"))
+        monkeypatch.setattr(file_uploader, "_upload_file_request",
+                            lambda *args, **kwargs: None)
+        monkeypatch.setattr(file_uploader, "_get_file_location_request",
+                            lambda *args, **kwargs: "test")
+        return file_uploader
+
+    def test_get_file_from_bucket(
+        self,
+        monkeypatch,
+        file_uploader: GCSFileUploader
+    ):
+        file = io.RawIOBase()
+        monkeypatch.setattr(file_uploader, "get_file_from_bucket", lambda *args, **kwargs: file)
+        file_uploader.upload_file("gs://test/test")
+
+    @pytest.mark.parametrize(
+        "file_path",
+        [
+            pytest.param("gs://test"),
+            pytest.param("://test"),
+            pytest.param("test"),
+        ]
+    )
+    def test_invalid_gcs_object_uri(self, file_uploader: GCSFileUploader,
+                                    file_path: str):
+        with pytest.raises(GCSObjectURIError):
+            file_uploader._parse_object_uri(file_path)
diff --git a/tests/plugin-unit-tests/test_manifest_processor_r3.py b/tests/plugin-unit-tests/test_manifest_processor_r3.py
index d7c69eb..9bc83af 100644
--- a/tests/plugin-unit-tests/test_manifest_processor_r3.py
+++ b/tests/plugin-unit-tests/test_manifest_processor_r3.py
@@ -23,7 +23,8 @@ sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins")
 sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/dags")
 
 from libs.context import Context
-from libs.file_source_check import FileSourceChecker, GoogleStorageFileSourceChecker
+from libs.upload_file import GCSFileUploader
+from libs.refresh_token import AirflowTokenRefresher
 from libs.exceptions import EmptyManifestError
 from deepdiff import DeepDiff
 import pytest
@@ -79,17 +80,19 @@ class TestManifestProcessor:
         with open(conf_path) as f:
             conf = json.load(f)
         context = Context.populate(conf)
-        file_checker = GoogleStorageFileSourceChecker()
+        token_refresher = AirflowTokenRefresher()
+        file_uploader = GCSFileUploader("test", token_refresher, context)
         manifest_processor = process_manifest_r3.ManifestProcessor(
             storage_url="",
             dagrun_conf=conf,
+            token_refresher=token_refresher,
             context=context,
-            file_source_checker=file_checker
+            file_uploader = file_uploader
         )
 
         monkeypatch.setattr(manifest_processor, "generate_id", lambda manifest: "test_id")
-        monkeypatch.setattr(file_checker, "_does_file_exist_in_bucket",
-                            lambda *args, **kwargs: True)
+        monkeypatch.setattr(file_uploader, "upload_file",
+                            lambda *args, **kwargs: "test")
         return manifest_processor
 
     @pytest.fixture()
@@ -246,8 +249,9 @@ class TestManifestProcessor:
         manifest_processor = process_manifest_r3.ManifestProcessor(
             storage_url="",
             dagrun_conf=conf,
+            token_refresher=AirflowTokenRefresher(),
             context=context,
-            file_source_checker=GoogleStorageFileSourceChecker()
+            file_uploader=GCSFileUploader("test", AirflowTokenRefresher(), context)
         )
         for manifest_part in manifest_processor.data_object["manifest"]:
             group_type = manifest_part["groupType"]
diff --git a/tests/plugin-unit-tests/test_operators_r3.py b/tests/plugin-unit-tests/test_operators_r3.py
index 219cd97..2b34ce6 100644
--- a/tests/plugin-unit-tests/test_operators_r3.py
+++ b/tests/plugin-unit-tests/test_operators_r3.py
@@ -37,8 +37,9 @@ from file_paths import (
 from operators.process_manifest_r3 import ProcessManifestOperatorR3, SchemaValidator, \
     ManifestProcessor
 from operators.search_record_id import SearchRecordIdOperator
-from libs.file_source_check import FileSourceChecker, GoogleStorageFileSourceChecker
 from operators.update_status import UpdateStatusOperator
+from libs.upload_file import GCSFileUploader
+from libs.refresh_token import AirflowTokenRefresher
 from mock_responses import MockSearchResponse, MockWorkflowResponse
 
 CustomOperator = TypeVar("CustomOperator")
@@ -72,8 +73,9 @@ class TestOperators(object):
         monkeypatch.setattr(SchemaValidator, "validate_manifest", lambda obj: None)
         monkeypatch.setattr(ManifestProcessor, "save_record",
                             lambda obj, headers, request_data: MockStorageResponse())
-        monkeypatch.setattr(GoogleStorageFileSourceChecker, "_does_file_exist_in_bucket",
-                            lambda *args, **kwargs: True)
+        monkeypatch.setattr(GCSFileUploader, "upload_file",
+                            lambda *args, **kwargs: "test")
+
         task, context = self._create_task(ProcessManifestOperatorR3)
         task.pre_execute(context)
         task.execute(context)
diff --git a/tests/plugin-unit-tests/test_schema_validator_r3.py b/tests/plugin-unit-tests/test_schema_validator_r3.py
index ee25893..bae115d 100644
--- a/tests/plugin-unit-tests/test_schema_validator_r3.py
+++ b/tests/plugin-unit-tests/test_schema_validator_r3.py
@@ -36,6 +36,7 @@ from file_paths import (
 )
 from mock_responses import MockSchemaResponse
 from libs.context import Context
+from libs.refresh_token import AirflowTokenRefresher
 from libs.exceptions import EmptyManifestError, NotOSDUShemaFormatError
 import pytest
 
@@ -55,6 +56,7 @@ class TestSchemaValidator:
         validator = SchemaValidator(
             "",
             conf,
+            AirflowTokenRefresher(),
             context
         )
         if schema_file:
diff --git a/tests/plugin-unit-tests/test_search_record_id.py b/tests/plugin-unit-tests/test_search_record_id.py
index 63db7f5..a7faccb 100644
--- a/tests/plugin-unit-tests/test_search_record_id.py
+++ b/tests/plugin-unit-tests/test_search_record_id.py
@@ -29,6 +29,7 @@ from file_paths import (
 )
 from libs.exceptions import RecordsNotSearchableError
 from libs.context import Context
+from libs.refresh_token import AirflowTokenRefresher
 from tenacity import stop_after_attempt
 from libs.search_record_ids import SearchId
 from mock_responses import MockSearchResponse
@@ -66,7 +67,8 @@ class TestManifestProcessor:
     def test_search_found_all_records(self, monkeypatch, record_ids: list,
                                       search_response_path: str):
         self.mock_storage_response(monkeypatch, search_response_path, total_count=len(record_ids))
-        id_searcher = SearchId("http://test", record_ids, Context(app_key="", data_partition_id=""))
+        id_searcher = SearchId("http://test", record_ids, AirflowTokenRefresher(),
+                               Context(app_key="", data_partition_id=""))
         id_searcher.check_records_searchable()
 
     @pytest.mark.parametrize(
@@ -87,7 +89,8 @@ class TestManifestProcessor:
         invalid_total_count = len(record_ids) - 1
         self.mock_storage_response(monkeypatch, search_response_path,
                                    total_count=invalid_total_count)
-        id_searcher = SearchId("", record_ids, Context(app_key="", data_partition_id=""))
+        id_searcher = SearchId("", record_ids, AirflowTokenRefresher(),
+                               Context(app_key="", data_partition_id=""))
         with pytest.raises(RecordsNotSearchableError):
             id_searcher.check_records_searchable()
 
@@ -107,7 +110,8 @@ class TestManifestProcessor:
     def test_search_got_wrong_response_value(self, monkeypatch, record_ids: list,
                                              search_response_path: str):
         self.mock_storage_response(monkeypatch, search_response_path)
-        id_searcher = SearchId("http://test", record_ids, Context(app_key="", data_partition_id=""))
+        id_searcher = SearchId("http://test", record_ids, AirflowTokenRefresher(),
+                               Context(app_key="", data_partition_id=""))
         with pytest.raises(ValueError):
             id_searcher.check_records_searchable()
 
@@ -124,4 +128,5 @@ class TestManifestProcessor:
                                         search_response_path: str):
         self.mock_storage_response(monkeypatch, search_response_path)
         with pytest.raises(ValueError):
-            SearchId("http://test", record_ids, Context(app_key="", data_partition_id=""))
+            SearchId("http://test", record_ids, AirflowTokenRefresher(),
+                     Context(app_key="", data_partition_id=""))
diff --git a/tests/plugin-unit-tests/test_update_status_r3.py b/tests/plugin-unit-tests/test_update_status_r3.py
index df05b36..1dfd85f 100644
--- a/tests/plugin-unit-tests/test_update_status_r3.py
+++ b/tests/plugin-unit-tests/test_update_status_r3.py
@@ -28,6 +28,7 @@ from file_paths import (
     MANIFEST_WELLBORE_VALID_PATH
 )
 from libs.context import Context
+from libs.refresh_token import AirflowTokenRefresher
 from libs.update_status import UpdateStatus
 from mock_responses import MockWorkflowResponse
 
@@ -43,6 +44,7 @@ class TestUpdateStatus:
         status_updater = UpdateStatus(
             workflow_url = "http://test",
             workflow_id=workflow_id,
+            token_refresher=AirflowTokenRefresher(),
             context=context,
             status=status
         )
diff --git a/tests/set_airflow_env.sh b/tests/set_airflow_env.sh
index 8b750c3..3bdcc1f 100755
--- a/tests/set_airflow_env.sh
+++ b/tests/set_airflow_env.sh
@@ -48,6 +48,7 @@ airflow variables -s provider gcp
 airflow variables -s record_kind "odes:osdu:file:0.2.0"
 airflow variables -s schema_version "0.2.0"
 airflow variables -s workflow_url $WORKFLOW_URL
+airflow variables -s file_service_url $LOCALHOST
 airflow variables -s update_status_url $UPDATE_STATUS_URL
 airflow variables -s search_url $SEARCH_URL
 airflow variables -s schema_service_url  $LOCALHOST
-- 
GitLab