Commit 0b7b8d4c authored by Yan Sushchynski (EPAM)'s avatar Yan Sushchynski (EPAM)
Browse files

GONRG-2921: Batch processing records

parent f0e6627f
Pipeline #66253 passed with stages
in 2 minutes and 49 seconds
......@@ -52,6 +52,9 @@ class ProcessManifestOperatorR3(BaseOperator, ReceivingContextMixin):
ui_color = '#dad5ff'
ui_fgcolor = '#000000'
# TODO: Use the corresponding constant from osdu_api latter.
SAVE_RECORDS_BATCH_SIZE = 500
@apply_defaults
def __init__(self, previous_task_id: str = None, batch_number=3, *args, **kwargs):
"""Init base operator and obtain base urls from Airflow Variables."""
......@@ -63,6 +66,8 @@ class ProcessManifestOperatorR3(BaseOperator, ReceivingContextMixin):
self.storage_url = Variable.get('core__service__storage__url')
self.file_service_host = Variable.get('core__service__file__host')
self.batch_count = int(Variable.get("core__ingestion__batch_count", "3"))
self.batch_save_enabled = Variable.get("core__ingestion__batch_save_enabled", default_var=False, deserialize_json=True)
self.batch_save_size = int(Variable.get("core__ingestion__batch_save_size", default_var=self.SAVE_RECORDS_BATCH_SIZE))
self._show_skipped_ids = Variable.get('core__config__show_skipped_ids', default_var=False)
def _get_manifest_files_range(self, manifests: List[dict]) -> Tuple[int, int]:
......@@ -162,6 +167,8 @@ class ProcessManifestOperatorR3(BaseOperator, ReceivingContextMixin):
manifest_processor=manifest_processor,
schema_validator=validator,
token_refresher=token_refresher,
batch_save_enabled=self.batch_save_enabled,
save_records_batch_size=self.batch_save_size
)
manifest_data = self._get_manifest_data(context, execution_context)
......
......@@ -24,7 +24,7 @@ pip install azure-identity
pip install azure-keyvault-secrets
pip install msal
pip install python-keycloak
pip install osdu-api --extra-index-url https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple
pip install osdu-api==0.10.1.dev0+92014f64 --extra-index-url https://community.opengroup.org/api/v4/projects/148/packages/pypi/simple
export WORKFLOW_URL="http://127.0.0.1:5000"
export UPDATE_STATUS_URL="http://127.0.0.1:5000/wf/us"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment