|
|
The following section explains the flow and the responsibility of each service:
|
|
|
|
|
|
- Ingestion Service: This REST API forms the entry gate for this ingestion - enrichment pipeline. It provides two methods: submit and status. The details are captured at the following page: API Specs The file in its original format is stored at tenant's landing zone. A unique id, referred as jobId, will be returned from the submit method. This jobId will be used to update the status of the job across various stages in the pipeline.
|
|
|
|
|
|
- File Extractor Service: This service extracts the files in case if the input is a zip file and copies the extracted files to the tenant's persistence zone. The original file from the landing zone is deleted.
|
|
|
- File Metadata Service: This service creates a metadata record including the location of the file in cloud storage and inserts a record into Datalake.
|
|
|
- Ingestion Delegator: This service calls the right ingestor. submit method provides ingestorRoutines as an input parameter based on which right ingestor is called. (e.g. "ingestorRoutines" : {["JSONIngestor"]} )
|
|
|
- Domain Ingestor: XLSRiskIngestor is an example of domain-specific ingestor. Ingestion Delegator will send a notification to xlsRiskIngestor topic based on ingestorRoutines input parameter. (e.g. "ingestorRoutines" : {["xlsRiskIngestor"]} ). XlsRiskIngestor will subscribe to notifications at xlsRiskIngestor topic and based on notification receipt, it will start the ingestion process. Once ingestion is complete, xlsRiskIngestor will send a notification indicating the completion of ingestion to Post_Ingestion_Topic.
|
|
|
- Post Ingestion Service: This service marks the end of the ingestion stage and accordingly will update the status of the job. This service will also call pre-enrichment routine or enrichment routine based on the inputs to submit method.
|
|
|
- If input parameter 'enrichmentRoutines' contains pre-enrichment transform, then accordingly event is sent to pre-enrichment transform. For example "enrichmentRoutines" : {["DefaultEnrichment"={"preEnrichmentTransform": "jsonFlattener"}] }) will publish an event to json_Flattener_Transform topic. json Flattener Transform will transform the json from time-series hierarchical format to Enrichment acceptable format. Upon completion of the transform, a notification will be published to Pre-enrichment topic.
|
|
|
- If input parameter enrichmentRoutines does not contain pre-enrichment transform (e.g. "enrichmentRoutines" : {["DefaultEnrichment"]}), then Pre-enrichment service is called invoked via pre-enrichment topic. In other words, pre-enrichment stage is optional and no pre-enrichment transform will be called.
|
|
|
- Domains can plug-in their pre-enrichment transforms using above this concept.
|
|
|
- Pre-enrichment service: This service will receive the subscription notifications from Pre-enrichment topic. It will also update the status for the pre-enrichment stage. This service will also send notification to the right ingestor based on enrichmentRoutines (e.g. "enrichmentRoutines" : {["DefaultEnrichment"]}) or (e.g. "enrichmentRoutines" : {["AvocetEnrichment"]}). Domain as also plug in their own enrichment routine using this concept. Currently, we have only one enrichment service for now, and hence enrichmentRoutines will contain DefaultEnrichment as the value. (e.g. "enrichmentRoutines" : {["DefaultEnrichment"]})
|
|
|
- Post-Enrichment service: This service will mark end of enrichment stage and the main job. This service will update the status of both enrichment stage and overall job state.
|
|
|
- Error topic: This topic will be used as a mechanism to track the error posted by any stage. Currently, the errors are posted at this topic.
|
|
|
- Retry of failed ingestion job:
|
|
|
|
|
|
Error topic: Any error that is captured during the pipeline operations will be logged into Error topic.
|
|
|
- Domain Publisher service: This service will listen to this Error topic and will send the status messages (errors and important completion status) to domains using Notification and Task Management service.
|
|
|
- The records which are not ingested will be available at /output/failedrecords/original/originalrecords.json will contain list of complete records whose ingestion failed. This json can be used for retry.
|
|
|
|
|
|
## Example
|
|
|
[Ingestion Examples](/OpenDES-\(Deprecated\)-/Design-Details-and-Implementation/Ingestion-and-enrichment/Ingestion-Examples)
|
|
|
|
|
|
##Pipeline
|
|
|
|
|
|
![Simple version ingestion enrichment pipeline V3.jpg](uploads/sion%20ingestion%20enrichment%20pipeline%20V3-69aebfd6-51b9-461f-8a62-a18d0e87361e.jpg) |
|
|
\ No newline at end of file |