[M18] EDS ingest fails to do a bulk copy of data
Wanted to copy 660 records. Created a ConnectedSourceDataJob with "FetchKind": "osdu:wks:master-data--Well:1.0.0", "LimitRecords": 200, used search endpoint query_with_cursor.
86 records was created during first eds_ingest run which succeed. Next runs of eds_ingest runs failed with error:
[2023-10-17, 16:01:33 UTC] {{src_dags_fetch_and_ingest.py:158}} ERROR - {'status': 'error', 'message': AttributeError("'NoneType' object has no attribute 'lower'")}
Traceback (most recent call last):
File "/usr/local/airflow/dags/eds_ingest/libs/src_dags_fetch_and_ingest.py", line 118, in fetch_and_ingest
if task_status.lower() == 'success' or current_try >= Constant.MAX_TRY:
AttributeError: 'NoneType' object has no attribute 'lower'
[2023-10-17, 16:01:33 UTC] {{taskinstance.py:1768}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 192, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/eds_ingest/src_dags_fetch_ingest_scheduler_dag.py", line 29, in _ingest
task_instance.xcom_push(key='Exception', value=status['message'])
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
return func(*args, session=session, **kwargs)
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 2290, in xcom_push
XCom.set(
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 234, in set
value = cls.serialize_value(
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 627, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/usr/lib/python3.10/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 176, in encode
return super().encode(o)
File "/usr/lib/python3.10/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/lib/python3.10/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/utils/json.py", line 153, in default
CLASSNAME: o.__module__ + "." + o.__class__.__qualname__,
AttributeError: 'AttributeError' object has no attribute '__module__'. Did you mean: '__reduce__'?
Osdu_ingest worked, but tried processed the same ids that first run already replicated. Second Osdu_ingest run:
The record was replicated and then updated with Osdu_ingest, sharing a piece of record:
"modifyUser": "serviceprincipal@testing.com",
"modifyTime": "2023-10-17T17:02:00.870Z",
"createTime": "2023-10-17T15:01:34.690Z",
"authority": "osdu",
"namespace": "osdu:wks",
"legal": {
"legaltags": [
"osdu-EDS-Legal-Tag-5604069"
],
"otherRelevantDataCountries": [
"US"
],
"status": "compliant"
},
"createUser": "serviceprincipal@testing.com",
"id": "osdu:master-data--Well:testing-dk-634"
The issues I noticed here are:
- Only 86 records where replicated, as it should pick 200 records? And finally replicate all 600 records.
- DAGs are failing with the mentioned AttributeError.
- Continuing runs of eds_ingest ran the Osdu_ingest which picked up the same records ids and updated them.