Skip to content
Snippets Groups Projects
test_file_upload.py 2.38 KiB
Newer Older
  • Learn to ignore specific revisions
  • #  Copyright 2020 Google LLC
    #  Copyright 2020 EPAM Systems
    #
    #  Licensed under the Apache License, Version 2.0 (the "License");
    #  you may not use this file except in compliance with the License.
    #  You may obtain a copy of the License at
    #
    #      http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    #  limitations under the License.
    
    import io
    import os
    import sys
    
    sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/plugins")
    sys.path.append(f"{os.getenv('AIRFLOW_SRC_DIR')}/dags")
    
    from libs.exceptions import GCSObjectURIError
    import pytest
    from libs.context import Context
    from libs.refresh_token import AirflowTokenRefresher
    from libs.upload_file import GCSFileUploader
    
    
    class TestSourceFileChecker:
    
        @pytest.fixture()
        def file_uploader(self, monkeypatch):
            context = Context(data_partition_id="test", app_key="")
            file_uploader = GCSFileUploader("http://test", AirflowTokenRefresher(),
                                                             context)
            monkeypatch.setattr(file_uploader, "_get_signed_url_request",
                                lambda *args, **kwargs: ("test", "test"))
            monkeypatch.setattr(file_uploader, "_upload_file_request",
                                lambda *args, **kwargs: None)
            monkeypatch.setattr(file_uploader, "_get_file_location_request",
                                lambda *args, **kwargs: "test")
            return file_uploader
    
        def test_get_file_from_bucket(
            self,
            monkeypatch,
            file_uploader: GCSFileUploader
        ):
            file = io.RawIOBase()
    
            monkeypatch.setattr(file_uploader, "get_file_from_bucket",
                                lambda *args, **kwargs: (file, "test"))
    
            file_uploader.upload_file("gs://test/test")
    
        @pytest.mark.parametrize(
            "file_path",
            [
                pytest.param("gs://test"),
                pytest.param("://test"),
                pytest.param("test"),
            ]
        )
        def test_invalid_gcs_object_uri(self, file_uploader: GCSFileUploader,
                                        file_path: str):
            with pytest.raises(GCSObjectURIError):
                file_uploader._parse_object_uri(file_path)