Commit 26d1c8af authored by Jeremie Hallal's avatar Jeremie Hallal
Browse files

fix protocol issue on gcp

parent 48bbbda5
......@@ -350,7 +350,7 @@ class DaskBulkStorage:
async def _fill_catalog_columns_info(
self, catalog: BulkCatalog, session_metas, bulk_id: str
) -> Optional[BulkCatalog]:
""" build the catalog from the session."""
"""Build the catalog from the session."""
catalog_columns = set(catalog.all_columns_dtypes)
for chunks_metas in session_meta.get_next_chunk_files(session_metas):
......@@ -435,8 +435,8 @@ class DaskBulkStorage:
"""
bulk_id = new_bulk_id()
chunk_metas = await session_meta.get_chunks_metadata(self._fs, self.base_directory, session)
if len(chunk_metas) == 0:# there is no files in this session
chunk_metas = await session_meta.get_chunks_metadata(self._fs, self.protocol, self.base_directory, session)
if len(chunk_metas) == 0: # there is no files in this session
raise BulkNotProcessable(message="No data to commit")
if from_bulk_id:
......
......@@ -34,7 +34,7 @@ from .storage_path_builder import add_protocol, record_session_path
class SessionFileMeta:
"""The class extract information about chunks."""
def __init__(self, fs, file_path: str, lazy: bool = True) -> None:
def __init__(self, fs, protocol: str, file_path: str, lazy: bool = True) -> None:
"""
Args:
fs: fsspec filesystem
......@@ -49,6 +49,7 @@ class SessionFileMeta:
self.time, self.shape, tail = tail.split('.')
self._meta = None
self.path = file_path
self.protocol = protocol
if not lazy:
self._read_meta()
......@@ -77,7 +78,7 @@ class SessionFileMeta:
@property
def path_with_protocol(self) -> str:
"""Returns chunk path with protocol"""
return add_protocol(self.path, self._fs.protocol)
return add_protocol(self.path, self.protocol)
@property
def index_hash(self) -> str:
......@@ -145,12 +146,12 @@ def build_chunk_metadata(dataframe: pd.DataFrame) -> dict:
@capture_timings('get_chunks_metadata')
@with_trace('get_chunks_metadata')
async def get_chunks_metadata(filesystem, base_directory, session: Session) -> List[SessionFileMeta]:
async def get_chunks_metadata(filesystem, protocol: str, base_directory: str, session: Session) -> List[SessionFileMeta]:
"""Return metadata objects for a given session"""
session_path = record_session_path(base_directory, session.id, session.recordId)
with suppress(FileNotFoundError):
parquet_files = [f for f in filesystem.ls(session_path) if f.endswith(".parquet")]
futures = get_client().map(lambda f: SessionFileMeta(filesystem, f, lazy=False) , parquet_files)
futures = get_client().map(lambda f: SessionFileMeta(filesystem, protocol, f, lazy=False) , parquet_files)
return await get_client().gather(futures)
return []
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment