Commit 8b2ffc7a authored by Igor Zimovets (EPAM)'s avatar Igor Zimovets (EPAM)
Browse files

Merge branch 'master' into GONRG-4118-Update-Helms-with-imagePullPolicy-value

parents 9de1b65a 62055c53
Pipeline #87552 failed with stage
in 15 seconds
......@@ -46,6 +46,9 @@ include:
- project: "osdu/platform/ci-cd-pipelines"
file: "build/python.yml"
- project: "osdu/platform/ci-cd-pipelines"
file: "scanners/fossa-python.yml"
- project: "osdu/platform/ci-cd-pipelines"
file: "scanners/gitlab-ultimate.yml"
......@@ -128,9 +131,6 @@ osdu-gcp-containerize-gitlab:
image: docker:19.03
cache: {}
tags: ["osdu-medium"]
- $OSDU_GCP == 'true'
......@@ -192,9 +192,6 @@ osdu-gcp-test-python:
stage: integration
needs: ["osdu-gcp-deploy-deployment"]
- $OSDU_GCP == 'true' && $OSDU_GCP_INT_TEST_TYPE == 'python'
- apt-get install -y python3-venv
- python3 -m venv env
......@@ -224,136 +221,10 @@ osdu-gcp-test:
- .osdu-gcp-variables
- .osdu-gcp-dev2-variables
# Allow failure on private development deployments
allow_failure: true
# --------------------------------------------------------------------------------
# Experimental FOSSA jobs. These will be promoted to the standard ci-cd-pipelines after
# they've had some testing in a real project
image: $CI_REGISTRY/divido/fossa-with-cache/incremental:latest
stage: scan
needs: ['compile-and-unit-test']
FOSSA_OUTPUT_DIR: fossa-output
- fossa-output
when: always
expire_in: 2 days
# fossa-with-cache needs a CI_COMMIT_BRANCH defined to know how to parse the FOSSA API results
# When building tags, this isn't defined by GitLab. In that case, we use the tag name instead. If that's not defined
# then things will fail and we'll have to make this smarter
- |
if [ ! -e all-requirements.txt ]; then
echo "I was expecting a file named 'all-requirements.txt' to have been generated by compile-and-unit-test"
echo "However, that file doesn't seem to exist"
echo "----------------------------------------"
echo "That file should have been the output of a 'pip freeze', so that I knew what the full list of deep"
echo "dependencies were. I can't reasonably generate that in this job, because I don't know what python image"
echo "is appropriate. If this structure has been changed in the build/python.yml, you may need to update this"
echo "logic as well (in scanners/fossa-python.yml)"
exit 1
# This variable is used by the python build environment to refer to the set of requirements that need to
# be compiled down into the single 'all-requirements.txt'. Here, we override it to supply fossa-with-cache
# with a direct answer.
- PIP_REQUIREMENTS=all-requirements.txt fossa-with-cache
image: $CI_REGISTRY/divido/fossa-with-cache/incremental:latest
stage: scan
needs: ['fossa-analyze']
tags: ['osdu-small']
when: on_failure
- fossa-output/cached-NOTICE
- fossa-output/generated-clean-NOTICE
expire_in: 2 days
# Check to see if a newer commit exists for the pipeline's branch, and if it does, use that NOTICE instead of this one's
- |
if [ "$CI_COMMIT_BRANCH" != "" ]; then
function echoCmd() {
echo -e "${colorCmd}>" "$@" "${colorReset}"
echoCmd git fetch
git fetch
echoCmd git diff --name-only HEAD origin/$CI_COMMIT_BRANCH
branchDiffs="$(git diff --name-only HEAD origin/$CI_COMMIT_BRANCH)"
echo $branchDiffs
echo "--------------------"
if [ "$branchDiffs" == "NOTICE" ]; then
echo "The branch associated with this pipeline ($CI_COMMIT_BRANCH) has been changed, but the only changes are the NOTICE file"
echo "I will use the NOTICE file from origin/$CI_COMMIT_BRANCH ($(git rev-parse --short origin/$CI_COMMIT_BRANCH)) as the basis for comparison"
echoCmd git checkout origin/$CI_COMMIT_BRANCH -- NOTICE
git checkout origin/$CI_COMMIT_BRANCH -- NOTICE
elif [ "$branchDiffs" == "" ]; then
echo "The branch associated with this pipeline ($CI_COMMIT_BRANCH) has not been changed since the commit that spawned this pipeline"
echo "I will use the NOTICE file from the pipeline's commit ($CI_COMMIT_SHORT_SHA) as the basis for comparison"
echo "The branch associated with this pipeline ($CI_COMMIT_BRANCH) has been changed, but the changes include more than just the NOTICE file"
echo "I will use the NOTICE file from the pipeline's commit ($CI_COMMIT_SHORT_SHA) as the basis for comparison"
# Use a cached NOTICE if available, otherwise use a generated one
- |
if [ -e fossa-output/cached-NOTICE ]; then
elif [ -e fossa-output/generated-clean-NOTICE ]; then
echo "Couldn't find either a cached-NOTICE or generated-clean-NOTICE in the fossa-output/ directory"
echo "At least one of these should have been generated by a previous job stage (fossa-analyze) and stored"
echo "as an artifact. Something must be wrong in the CI setup"
exit 1
echo "Comparing with $fossaGeneratedNotice"
# If the comparison finds differences, let the user know what to do next
- |
if ! fossa-compare-notices NOTICE $fossaGeneratedNotice; then
echo --------------------------------------------------------------------------------
echo "There are differences in the NOTICE file"
echo "Please review these differences, and if they look appropriate based on your"
echo "changes, update the committed NOTICE file"
echo "--------------------"
echo "If you make changes to the NOTICE file (and only the NOTICE file), you can"
echo "re-run this single stage of the pipeline alone rather than the whole pipeline"
echo "One way to achieve this:"
echo "$ wget -O NOTICE '${CI_PROJECT_URL}/-/jobs/${CI_JOB_ID}/artifacts/raw/${fossaGeneratedNotice}?inline=false'"
echo "$ git add NOTICE"
echo "$ git commit -m 'Updating NOTICE'"
echo "$ git push -o ci.skip"
echo "Then retry this job"
exit 1
......@@ -9,7 +9,7 @@ Apache-2.0
The following software have components provided under the terms of this license:
- aiobotocore (from
- aiohttp (from
- aiohttp (from
- async-timeout (from
- boto3 (from
- botocore (from
......@@ -226,9 +226,9 @@ MIT
The following software have components provided under the terms of this license:
- PyJWT (from
- PyYAML (from
- adal (from
- aiohttp (from
- aiohttp (from
- aioitertools (from
- aioredis (from
- anyio (from,
......@@ -38,6 +38,9 @@ class ChunkGroup:
paths: List[str]
dtypes: List[str]
ColumnLabel = str
ColumnDType = str
class BulkCatalog:
"""Represent a bulk catalog
......@@ -67,7 +70,7 @@ class BulkCatalog:
self.columns: List[ChunkGroup] = []
def all_columns_dtypes(self) -> Dict[str, str]:
def all_columns_dtypes(self) -> Dict[ColumnLabel, ColumnDType]:
"""Returns all columns with their dtype
Dict[str, str]: a dict { column label : column dtype }
......@@ -120,11 +123,17 @@ class BulkCatalog:
paths: List[str]
def get_paths_for_columns(self, labels: Iterable[str], base_path: str) -> List[ColumnsPaths]:
"""Returns the paths to load data of the requested columns grouped by paths"""
"""Returns the paths to load data of the requested columns grouped by paths
labels (Iterable[str]): List of desired columns. If None or empty select all columns.
base_path (str): Base path as prefix to chunks path
List[ColumnsPaths]: The requested columns grouped by paths
grouped_files = []
for col_group in self.columns:
matching_columns = col_group.labels.intersection(labels)
matching_columns = col_group.labels.intersection(labels) if labels else col_group.labels
if matching_columns:
......@@ -33,14 +33,16 @@ from app.conf import Config
from .dask_worker_plugin import DaskWorkerPlugin
from .errors import BulkRecordNotFound, BulkNotProcessable, internal_bulk_exceptions
from .traces import map_with_trace, submit_with_trace
from .utils import (by_pairs, do_merge, worker_capture_timing_handlers,
from .utils import (WDMS_INDEX_NAME, by_pairs, do_merge, worker_capture_timing_handlers,
get_num_rows, set_index, index_union)
from ..dataframe_validators import is_reserved_column_name, DataFrameValidationFunc
from .. import DataframeSerializerSync
from . import storage_path_builder as pathBuilder
from . import session_file_meta as session_meta
from ..bulk_id import new_bulk_id
from .bulk_catalog import BulkCatalog, ChunkGroup, load_bulk_catalog, save_bulk_catalog
from .bulk_catalog import (BulkCatalog, ChunkGroup,
from ..mime_types import MimeType
from .dask_data_ipc import DaskNativeDataIPC, DaskLocalFileDataIPC
from . import dask_worker_write_bulk as bulk_writer
......@@ -50,8 +52,7 @@ def read_with_dask(path: Union[str, List[str]], **kwargs) -> dd.DataFrame:
"""call dask.dataframe.read_parquet with default parameters
Dask read_parquet parameters:
chunksize='25M': if chunk are too small, we aggregate them until we reach chunksize
aggregate_files=True: because we are passing a list of path when commiting a session,
aggregate_files is needed when paths are different
aggregate_files=True: aggregate_files is needed when files are in different folders
path (Union[str, List[str]]): a file, a folder or a list of files
......@@ -74,8 +75,8 @@ def _load_index_from_meta(meta, **kwargs):
def _index_union_tuple(t):
return index_union(*t)
def _index_union_tuple(indexes: Tuple[pd.Index, Optional[pd.Index]]):
return index_union(*indexes)
class DaskBulkStorage:
......@@ -165,14 +166,17 @@ class DaskBulkStorage:
- if columns is None, we load all columns
Returns: Future<dd.dataframe>
if columns is None:
columns = catalog.all_columns_dtypes.keys()
record_path = pathBuilder.record_path(self.base_directory, catalog.record_id, self.protocol)
files_to_load = catalog.get_paths_for_columns(columns, record_path)
# read all chunk for requested columns
def read_parquet_files(f):
return read_with_dask(f.paths, columns=f.labels, storage_options=self._parameters.storage_options)
dfs = self._map_with_trace(read_parquet_files, files_to_load)
index_df = self._read_index_from_catalog_index_path(catalog)
if index_df:
if not dfs:
raise RuntimeError("cannot find requested columns")
......@@ -187,11 +191,11 @@ class DaskBulkStorage:
"""Load columns from parquet files in the bulk_path.
Returns: Future<dd.DataFrame>
bulk_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id, self.protocol)
catalog = load_bulk_catalog(self._fs, bulk_path)
catalog = await self.get_bulk_catalog(record_id, bulk_id, generate_if_not_exists=False)
if catalog is not None:
return self._load_bulk_from_catalog(catalog, columns)
# No catalog means that we can read the folder as a parquet dataset. (legacy behavior)
bulk_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id, self.protocol)
return self._read_parquet(bulk_path, columns=columns)
......@@ -215,7 +219,7 @@ class DaskBulkStorage:
record_id (str): the record id on which belongs the bulk.
bulk_id (str): the bulk id to load.
columns (List[str], optional): columns to load. If None all all available columns. Defaults to None.
columns (List[str], optional): columns to load. If None, all available columns. Defaults to None.
BulkRecordNotFound: If bulk data cannot be found.
......@@ -223,7 +227,10 @@ class DaskBulkStorage:
future_df = await self._load_bulk(record_id, bulk_id, columns=columns)
return await future_df
dataframe = await future_df
if columns and set(dataframe.columns) != set(columns):
raise BulkRecordNotFound(record_id, bulk_id)
return dataframe
except (OSError, RuntimeError) as exp:
raise BulkRecordNotFound(record_id, bulk_id) from exp
......@@ -237,17 +244,18 @@ class DaskBulkStorage:
engine='pyarrow', schema="infer", compression='snappy')
async def get_bulk_catalog(self, record_id: str, bulk_id: str) -> BulkCatalog:
async def get_bulk_catalog(self, record_id: str, bulk_id: str, generate_if_not_exists=True) -> BulkCatalog:
bulk_path = pathBuilder.record_bulk_path(self.base_directory, record_id, bulk_id)
catalog = load_bulk_catalog(self._fs, bulk_path)
catalog = await async_load_bulk_catalog(self._fs, bulk_path)
if catalog:
return catalog
if generate_if_not_exists:
# For legacy bulk, construct a catalog on the fly
return await self._build_catalog_from_path(bulk_path, record_id)
except FileNotFoundError as e:
raise BulkRecordNotFound(record_id, bulk_id) from e
except FileNotFoundError as error:
raise BulkRecordNotFound(record_id, bulk_id) from error
async def _build_catalog_from_path(self, path: str, record_id: str) -> BulkCatalog:
......@@ -285,13 +293,24 @@ class DaskBulkStorage:
return catalog
def _read_index_from_catalog_index_path(self, catalog: BulkCatalog) -> Optional[dd.DataFrame]:
"""Returns a Future dask dataframe or None if index path is not in the catalog"""
if catalog.index_path:
index_path = pathBuilder.full_path(self.base_directory, catalog.record_id,
catalog.index_path, self.protocol)
return self._read_parquet(index_path)
return None
async def _future_load_index(self, record_id: str, bulk_id: str) -> Awaitable[pd.Index]:
"""load the dataframe index of the specified record"""
"""Loads the dataframe index of the specified record
index should be save in a specific folder but for bulk prior to catalog creation
we read one column and retreive the index associated with it.
catalog = await self.get_bulk_catalog(record_id, bulk_id)
if catalog.index_path:
index_path = pathBuilder.full_path(self.base_directory, record_id, catalog.index_path, self.protocol)
future_df = self._read_parquet(index_path)
else: # only read one column to get the index. It doesn't seems possible to get the index directly.
future_df = self._read_index_from_catalog_index_path(catalog)
if future_df is None:
# read one column to get the index. (It doesn't seems possible to get the index directly)
first_column = next(iter(catalog.all_columns_dtypes))
future_df = await self._load_bulk(record_id, bulk_id, [first_column])
return self._submit_with_trace(lambda df: df.index.compute(), future_df)
......@@ -331,7 +350,7 @@ class DaskBulkStorage:
async def _fill_catalog_columns_info(
self, catalog: BulkCatalog, session_metas, bulk_id: str
) -> Optional[BulkCatalog]:
""" build the catalog from the session."""
"""Build the catalog from the session."""
catalog_columns = set(catalog.all_columns_dtypes)
for chunks_metas in session_meta.get_next_chunk_files(session_metas):
......@@ -392,10 +411,12 @@ class DaskBulkStorage:
index_path = pathBuilder.join(index_folder, 'index.parquet')
f_pdf = await self.client.scatter(pd.DataFrame(index=index))
dataframe = pd.DataFrame(index=index) = WDMS_INDEX_NAME
f_pdf = await self.client.scatter(dataframe)
await self._submit_with_trace(DataframeSerializerSync.to_parquet, f_pdf, index_path,
return index_path
......@@ -414,8 +435,8 @@ class DaskBulkStorage:
bulk_id = new_bulk_id()
chunk_metas = await session_meta.get_chunks_metadata(self._fs, self.base_directory, session)
if len(chunk_metas) == 0:# there is no files in this session
chunk_metas = await session_meta.get_chunks_metadata(self._fs, self.protocol, self.base_directory, session)
if len(chunk_metas) == 0: # there is no files in this session
raise BulkNotProcessable(message="No data to commit")
if from_bulk_id:
......@@ -437,7 +458,8 @@ class DaskBulkStorage:
self._fill_catalog_columns_info(catalog, chunk_metas, bulk_id)
save_bulk_catalog(self._fs, commit_path, catalog)
fcatalog = await self.client.scatter(catalog)
await async_save_bulk_catalog(self._fs, commit_path, fcatalog)
return bulk_id
......@@ -3,6 +3,7 @@ import json
import fsspec
import pandas as pd
from app.bulk_persistence.dask.utils import WDMS_INDEX_NAME
from app.model.model_chunking import DataframeBasicDescribe
......@@ -69,6 +70,9 @@ def write_bulk_without_session(data_handle,
# set the name of the index column = WDMS_INDEX_NAME
# 3- build blob filename and final full blob path
filename = session_meta.generate_chunk_filename(df)
full_file_path = path_builder.join(bulk_base_path, filename + '.parquet')
......@@ -115,8 +119,9 @@ def add_chunk_in_session(data_handle,
# sort column by names # TODO could it be avoided ? then we could keep input untouched and save serialization step?
# sort column by names and set index column name # TODO could it be avoided ? then we could keep input untouched and save serialization step?
df = df[sorted(df.columns)] = WDMS_INDEX_NAME
# 3- build blob filename and final full blob path
filename = session_meta.generate_chunk_filename(df)
......@@ -34,7 +34,7 @@ from .storage_path_builder import add_protocol, record_session_path
class SessionFileMeta:
"""The class extract information about chunks."""
def __init__(self, fs, file_path: str, lazy: bool = True) -> None:
def __init__(self, fs, protocol: str, file_path: str, lazy: bool = True) -> None:
fs: fsspec filesystem
......@@ -49,6 +49,7 @@ class SessionFileMeta:
self.time, self.shape, tail = tail.split('.')
self._meta = None
self.path = file_path
self.protocol = protocol
if not lazy:
......@@ -77,7 +78,7 @@ class SessionFileMeta:
def path_with_protocol(self) -> str:
"""Returns chunk path with protocol"""
return add_protocol(self.path, self._fs.protocol)
return add_protocol(self.path, self.protocol)
def index_hash(self) -> str:
......@@ -145,12 +146,12 @@ def build_chunk_metadata(dataframe: pd.DataFrame) -> dict:
async def get_chunks_metadata(filesystem, base_directory, session: Session) -> List[SessionFileMeta]:
async def get_chunks_metadata(filesystem, protocol: str, base_directory: str, session: Session) -> List[SessionFileMeta]:
"""Return metadata objects for a given session"""
session_path = record_session_path(base_directory,, session.recordId)
with suppress(FileNotFoundError):
parquet_files = [f for f in if f.endswith(".parquet")]
futures = get_client().map(lambda f: SessionFileMeta(filesystem, f, lazy=False) , parquet_files)
futures = get_client().map(lambda f: SessionFileMeta(filesystem, protocol, f, lazy=False) , parquet_files)
return await get_client().gather(futures)
return []
......@@ -24,6 +24,8 @@ from app.helper.logger import get_logger
from app.utils import capture_timings
WDMS_INDEX_NAME = '_wdms_index_'
def worker_make_log_captured_timing_handler(level=INFO):
"""log captured timing from the worker subprocess (no access to context)"""
......@@ -64,6 +66,12 @@ def join_dataframes(dfs: List[dd.DataFrame]):
return dfs[0] if dfs else None
def rename_index(dataframe: pd.DataFrame, name):
"""Rename the dataframe index""" = name
return dataframe
@capture_timings("do_merge", handlers=worker_capture_timing_handlers)
def do_merge(df1: dd.DataFrame, df2: Optional[dd.DataFrame]):
"""Combine the 2 dask dataframe. Updates df1 with df2 values if overlap."""
......@@ -72,6 +80,10 @@ def do_merge(df1: dd.DataFrame, df2: Optional[dd.DataFrame]):
df1 = set_index(df1)
df2 = set_index(df2)
df1 = df1.map_partitions(rename_index, WDMS_INDEX_NAME)
df2 = df2.map_partitions(rename_index, WDMS_INDEX_NAME)
if share_items(df1.columns, df2.columns):
return df2.combine_first(df1)
return df1.join(df2, how='outer') # join seems faster when there no columns in common
......@@ -3,6 +3,7 @@ import re
import pandas as pd
from app.bulk_persistence.dask.utils import WDMS_INDEX_NAME
from app.bulk_persistence.dask.errors import BulkNotProcessable
from app.conf import Config
......@@ -77,7 +78,9 @@ PandasReservedIndexColRegexp = re.compile(r'__index_level_\d+__')
def is_reserved_column_name(name: str) -> bool:
"""Return True if the name is a reserved column name by Pandas/Dask with PyArrow"""
return PandasReservedIndexColRegexp.match(name) or name == '__null_dask_index__'
return (PandasReservedIndexColRegexp.match(name)
or name == '__null_dask_index__'
or name == WDMS_INDEX_NAME)
def any_reserved_column_name(names: Iterable[str]) -> bool:
......@@ -15,7 +15,6 @@ import asyncio
import far.family_processor.model as farmodel
from far.family_processor.family_processor import FamilyProcessor
from azure.core.tracing import SpanKind
from fastapi import APIRouter, Depends, HTTPException, status
from typing import Optional, List
......@@ -27,6 +26,7 @@ from app.clients.storage_service_client import get_storage_record_service
from app.conf import Config
from app.routers.common_parameters import REQUIRED_ROLES_WRITE
from app.utils import Context, get_ctx
from app.helper.traces import with_trace
from app.helper.traces import TracingRoute
......@@ -125,34 +125,38 @@ class GuessResponse(BaseModel):
base_unit: Optional[str] = None # Unit to convert log
async def process_with_trace(span_label, processor: FamilyProcessor, log_info: farmodel.GuessRequest):
async def process_with_trace(processor: FamilyProcessor, log_info: farmodel.GuessRequest):
Trace guess() method from given Process
with get_ctx().tracer.span(name=span_label) as span:
span.span_kind = SpanKind.CLIENT
return await asyncio.get_event_loop().run_in_executor(
None, processor.guess, log_info
)'/family', response_model=GuessResponse,'/family',
summary="Recognize family and unit",
description="Find the most probable family and unit using family assignment rule based catalogs. "
"User defined catalog will have the priority.",
responses={status.HTTP_404_NOT_FOUND: {"description": "Family not found"}}
async def post_recognize_custom(body: GuessRequest,
ctx: Context = Depends(get_ctx)) -> GuessResponse:
processor = await family_processor_manager.get_processor(ctx, ctx.partition_id)
result = await process_with_trace("first guess", processor, farmodel.GuessRequest(**body.dict()))
result = await process_with_trace(processor, farmodel.GuessRequest(**body.dict()))
if result.error is not None:
# Try with the default catalog
default_processor = family_processor_manager.get_default_processor()
result = await process_with_trace("second guess", default_processor, farmodel.GuessRequest(**body.dict()))
if result.error is not None:
result = await process_with_trace(default_processor, farmodel.GuessRequest(**body.dict()))
if result.error:
if result.error == 'Cannot find family name':
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=result.error)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=result.error)