Commit a02d7b88 authored by ethiraj krishnamanaidu's avatar ethiraj krishnamanaidu
Browse files

Initial import

parent 4a870a5a
Pipeline #4202 failed with stages
in 3 minutes and 58 seconds
# .coveragerc to control coverage.py
[run]
branch = True
include = */ingestor_dataflow_utilities/*
[report]
# Regexes for lines to exclude from consideration
exclude_lines =
# Have to re-enable the standard pragma
pragma: no cover
# Don't complain about missing debug-only code:
def __repr__
if self\.debug
# Don't complain if tests don't hit defensive assertion code:
raise AssertionError
raise NotImplementedError
# Don't complain if non-runnable code isn't run:
if 0:
if __name__ == .__main__.:
ignore_errors = True
[html]
directory = coverage_html_report
Legal Notice and Attribution for Open Source Components
The project/software/distribution with which this Notice and Attribution is associated may include one or more of the following open source components.
This Notice and Attribution hereby satifies any legal obligation regarding use of the components.
No claim of ownership is made to any such component, nor is any component offered with any warranty by Schlumberger. All components are offered "AS-IS".
google-cloud-dataflow under Apache 2.0 license
https://pypi.org/project/google-cloud-dataflow/
pytz under MIT license
https://pypi.org/project/pytz/
The license text files for the components are present in $\ingestor_utilities\licenses\
\ No newline at end of file
# Introduction
TODO: Give a short introduction of your project. Let this section explain the objectives or the motivation behind this project.
# Getting Started
TODO: Guide users through getting your code up and running on their own system. In this section you can talk about:
1. Installation process
2. Software dependencies
3. Latest releases
4. API references
# Build and Test
TODO: Describe and show how to build your code and run the tests.
# Contribute
TODO: Explain how other users and developers can contribute to make your code better.
If you want to learn more about creating good readme files then refer the following [guidelines](https://www.visualstudio.com/en-us/docs/git/create-a-readme). You can also seek inspiration from the below readme files:
- [ASP.NET Core](https://github.com/aspnet/Home)
- [Visual Studio Code](https://github.com/Microsoft/vscode)
- [Chakra Core](https://github.com/Microsoft/ChakraCore)
\ No newline at end of file
#!/bin/bash
set -e
set -o pipefail
export PYTHONPATH=${PYTHONPATH}:${WORKSPACE}/lib:${PWD}/lib:lib
echo
echo
echo "[START] Create Templates on Google Bucket: $2"
echo
echo
echo "Dataflow Zone: $4"
region="$(cut -d- -f1-2 <<<$4)"
echo "Dataflow Region: $region"
python -m csv_ingestor_template \
--runner=DataflowRunner \
--project=$1 \
--staging_location=$2/staging \
--temp_location=$2/temp \
--template_location=$2/templates/csv-parser \
--requirements_file=requirements.txt \
--setup_file=${WORKSPACE}/setup.py \
--disk_size_gb=50 \
--max_num_workers=5 \
--region=$region
if [ ! $? == "0" ]
then
echo
echo "Deployment of csv-parser Pipeline Template Failed"
exit 1
else
echo "Deployment of csv-parser Pipeline Template was Successful"
fi
### Add your other templates here ###
echo
echo
echo "[END] Create Templates on Google Bucket: $2"
echo
echo
DATA_TYPE_CONVERSION_WARNINGS = "DATA_TYPE_CONVERSION_WARNINGS"
ADD_GEO_LOCATION_RECORDS = "ADD_GEO_LOCATION_RECORDS"
NO_GEO_TAGGING_REQUIRED = "NO_GEO_TAGGING_REQUIRED"
DEFAULT_HEADERS = {'Content-Type': 'application/json', 'Accept': 'application/json'}
AUTHORIZATION_HEADER = 'Authorization'
BAD_CSV_RECORDS = 'bad_csv_records'
\ No newline at end of file
import codecs
import csv
from itertools import islice
from apache_beam.io.filebasedsource import FileBasedSource
from apache_beam.metrics import Metrics
from ingestor_dataflow_utilities.utf8_dict_reader import UTF8DictReader
class CsvFileBasedSource(FileBasedSource):
def __init__(self, file_pattern, encoding, job_id):
super(CsvFileBasedSource, self).__init__(file_pattern)
self.total_records_counter = Metrics.counter(self.__class__, 'total_records')
self._encoding = encoding
self._job_id = job_id
def read_records(self, file_name, offset_range_tracker):
"""Returns a generator of records created by reading file 'file_name'.
Uses UTF8Encoder to read the line in given encoding and convert it to utf-8 format
before further processing
Args:
file_name: a ``string`` that gives the name of the file to be read. Method
``FileBasedSource.open_file()`` must be used to open the file
and create a seekable file object.
offset_range_tracker: a object of type ``OffsetRangeTracker``. This
defines the byte range of the file that should be
read. See documentation in
``iobase.BoundedSource.read()`` for more information
on reading records while complying to the range
defined by a given ``RangeTracker``.
Returns:
an iterator that gives the records read from the given file.
"""
header = self._get_header(file_name)
with self.open_file(file_name) as f:
self._set_start_position(f, offset_range_tracker.start_position())
reader = UTF8DictReader(f, encoding=self._encoding.get(), fieldnames=header, restval='')
while offset_range_tracker.try_claim(reader.current_position):
record = reader.next()
if any(record.values()):
self.total_records_counter.inc()
actual_row = self._get_csv_line(record, header)
record = self._add_job_id(record)
yield record , actual_row
@staticmethod
def _set_start_position(f, start):
""" Seek and return the current position in the file to read
Args:
f (file): file handler
start (int): start position
Returns:
int: current position in the file
"""
if start > 0:
# Any line that starts after 'start' does not belong to the current
# bundle. Seeking to (start - 1) and skipping a line moves the current
# position to the starting position of the first line that belongs to
# the current bundle.
f.seek(start - 1)
else:
f.seek(0)
f.readline()
def _get_header(self, file_name):
""" Determines if the file has header information and if so, returns it.
Args:
file_name (str): CSV file name
Returns:
str: header information
Raises:
ValueError: If header information is missing.
"""
with self.open_file(file_name) as f:
self._skip_bom_utf8_if_present(f)
return csv.DictReader(f).fieldnames
@staticmethod
def _skip_bom_utf8_if_present(f):
"""Check if the file has a Byte Offset Mark (BOM) and if present skip it"""
f.seek(0)
first_three_bytes = f.read(3)
if first_three_bytes != codecs.BOM_UTF8:
f.seek(0)
def _get_csv_line(self, record_dict, header):
line =','.join(map(lambda data:record_dict[data], header))
return line
def _add_job_id(self, record):
"""Add ingestion pipeline job id to data"""
record['jobId'] = self._job_id.get()
return record
from apache_beam.io import filebasedsink
import logging
from apache_beam.io import Write
from apache_beam.coders import coders
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.transforms import PTransform
from apache_beam.transforms.display import DisplayDataItem
class _TextSinkWithHeaders(filebasedsink.FileBasedSink):
"""A sink to a GCS or local text file or files."""
def __init__(self,
file_path_prefix,
file_name_suffix='',
append_trailing_newlines=True,
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=CompressionTypes.AUTO,
header=None):
"""
Returns:
A _TextSinkWithHeaders object usable for writing.
"""
super(_TextSinkWithHeaders, self).__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
num_shards=num_shards,
shard_name_template=shard_name_template,
coder=coder,
mime_type='text/plain',
compression_type=compression_type)
self._append_trailing_newlines = append_trailing_newlines
self._header = header
def open(self, temp_path):
file_handle = super(_TextSinkWithHeaders, self).open(temp_path)
if self._header is not None:
header_str = self._header.get()
logging.debug("CSV File Header : {}".format(header_str))
file_handle.write(header_str)
if self._append_trailing_newlines:
file_handle.write('\n')
return file_handle
def display_data(self):
dd_parent = super(_TextSinkWithHeaders, self).display_data()
dd_parent['append_newline'] = DisplayDataItem(
self._append_trailing_newlines,
label='Append Trailing New Lines')
return dd_parent
def write_encoded_record(self, file_handle, encoded_value):
"""Writes a single encoded record."""
file_handle.write(encoded_value)
if self._append_trailing_newlines:
file_handle.write('\n')
class WriteToTextWithHeaders(PTransform):
"""Initialize a :class:`WriteToText` transform."""
def __init__(self,
file_path_prefix,
file_name_suffix='',
append_trailing_newlines=True,
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=CompressionTypes.AUTO,
header=None):
self._sink = _TextSinkWithHeaders(file_path_prefix, file_name_suffix,
append_trailing_newlines, num_shards,
shard_name_template, coder, compression_type, header)
def expand(self, pcoll):
return pcoll | Write(self._sink)
import csv
from ingestor_dataflow_utilities.utf8_recoder import UTF8Recoder
class UTF8DictReader:
"""
A CSV reader which will iterate over lines in the CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, encoding='utf-8', fieldnames=None, restkey=None, restval=None,
dialect="excel", *args, **kwds):
self._recoder = UTF8Recoder(f, encoding)
self.reader = csv.DictReader(self._recoder, fieldnames=fieldnames, restkey=restkey, restval=restval,
dialect=dialect, *args,
**kwds)
def next(self):
return self.reader.next()
@property
def current_position(self):
return self._recoder.current_position
def __iter__(self):
return self
import codecs
class UTF8Recoder:
"""
Iterator that reads an encoded stream and reencodes the input to UTF-8
"""
def __init__(self, f, encoding):
self.reader = codecs.getreader(encoding)(f)
self._position = f.tell()
def __iter__(self):
return self
def next(self):
row = self.reader.next().encode("utf-8")
self._position = self._position + len(row)
return row
@property
def current_position(self):
return self._position
import json
import logging
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from dataflow_utilities.add_geolocation_pardo import AddGeoLocationPardo
from dataflow_utilities.constants import BAD_RECORDS, ERROR_LOGS, GOOD_RECORDS, BAD_CSV_RECORDS
from dataflow_utilities.datalake_writer_pardo import DatalakeWriterDoFn
from dataflow_utilities.enrich_records_pardo import EnrichRecordsPardo
from dataflow_utilities.operation import Operation
from dataflow_utilities.parse_or_test_run_pardo import ParseOrTestRunPardo
from ingestor_dataflow_utilities.constants import DATA_TYPE_CONVERSION_WARNINGS, ADD_GEO_LOCATION_RECORDS, \
NO_GEO_TAGGING_REQUIRED
from ingestor_dataflow_utilities.csv_file_based_source import CsvFileBasedSource
from dataflow_utilities.cleanup_pardo import CleanUpPardo
from ingestor_dataflow_utilities.custom_file_sink import WriteToTextWithHeaders
from dataflow_utilities.custom_file_based_source import CustomFileBasedSource
class CSVTemplateOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input_file_path',
type=str,
dest="input_file_path",
help='Path of the file to read from')
parser.add_value_provider_argument(
'--parsed_output_file_path',
type=str,
dest="parsed_output_file_path",
help='Path of the file to write good records')
parser.add_value_provider_argument(
'--failed_original_file_path',
type=str,
dest="failed_original_file_path",
help='Path of the file to write original failed records')
parser.add_value_provider_argument(
'--failed_warnings_file_path',
type=str,
dest="failed_warnings_file_path",
help='Path of the file to write warning messages')
parser.add_value_provider_argument(
'--failed_errors_file_path',
type=str,
dest="failed_errors_file_path",
help='Path of file to store the errors and recordIds on failure of a batch')
parser.add_value_provider_argument(
'--success_response_file_path',
type=str,
dest="success_response_file_path",
help='Path of file to store the recordsIds on successful ingestion')
parser.add_value_provider_argument(
'--kind',
type=str,
dest="kind",
help='Datalake kind')
parser.add_value_provider_argument(
'--acl',
type=str,
dest="acl",
help='Access Control List')
parser.add_value_provider_argument(
'--legal',
type=str,
dest="legal",
help='Legal information')
parser.add_value_provider_argument(
'--dataset_descriptor_id',
type=str,
dest="dataset_descriptor_id",
help='Dataset descriptor id')
parser.add_value_provider_argument(
'--record_id',
type=str,
dest="record_id",
help='CSV record field that should be used for an ID')
parser.add_value_provider_argument(
'--attribute_datatype_map',
type=str,
dest="attribute_datatype_map",
help='Dictionary of attribute data types')
parser.add_value_provider_argument(
'--common_attributes',
type=str,
dest="common_attributes",
help='Common attributes that would be added to all data types')
parser.add_value_provider_argument(
'--relationships',
type=str,
dest="relationships",
help='Relationship information to other data types')
parser.add_value_provider_argument(
'--meta',
type=str,
dest="meta",
help='Meta information'
)
parser.add_value_provider_argument(
'--crs_info',
type=str,
dest="crs_info",
help='Coordinate Reference System (CRS) information')
parser.add_value_provider_argument(
'--encoding',
type=str,
dest="encoding",
help='CSV file encoding',
default='utf-8')
parser.add_value_provider_argument(
'--operation',
type=Operation,
choices=list(Operation),
default=Operation.INGEST,
dest="operation",
help='PARSE, INGEST, TEST_RUN or PARSE_AND_INGEST')
parser.add_value_provider_argument(
'--datalake_storage_base_url',
type=str,
dest="datalake_storage_base_url",
help='Datalake storage url')
parser.add_value_provider_argument(
'--account_id',
type=str,
dest="account_id",
help='Schlumberger account id')
parser.add_value_provider_argument(
'--target_audience',
type=str,
dest="target_audience",
help='Target audience')
parser.add_value_provider_argument(
'--file_header',
type=str,
dest="file_header",
help='File Header')
parser.add_argument(
'--file_type',
type=str,
default="csv",
help='Type of File')
parser.add_argument(
'--correlation_id',
type=str,
dest="correlation_id",
default='NONE',
help='Correlation ID')
parser.add_value_provider_argument(
'--storage_record_ids',
type=str,
default='[]',
help='List of storage record ids of the corresponding parent record ids')
parser.add_value_provider_argument(
'--credentials_file_path',
type=str,
dest="credentials_file_path",
help='Path of file to store the errors and recordIds on failure of a batch')
parser.add_value_provider_argument(
'--data_partition_id',
type=str,
dest="data_partition_id",
help='Schlumberger data partition id')
parser.add_value_provider_argument(
'--job_id',
type=str,
dest="job_id",
help='Ingestion Pipeline Job Id')
def create_pipeline():
logging.getLogger().setLevel(logging.INFO)
pipeline_options = PipelineOptions()
known_args = pipeline_options.view_as(CSVTemplateOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
enriched_records = (p | 'Read CSV' >> beam.io.Read(CsvFileBasedSource(known_args.input_file_path, known_args.encoding, known_args.job_id))
| 'Enrich records' >> beam.ParDo(EnrichRecordsPardo(known_args)).with_outputs(
DATA_TYPE_CONVERSION_WARNINGS, NO_GEO_TAGGING_REQUIRED, main=ADD_GEO_LOCATION_RECORDS))
enriched_records[DATA_TYPE_CONVERSION_WARNINGS] | "Write data conversion warnings" >> beam.io.WriteToText(
known_args.failed_warnings_file_path, file_name_suffix='.json', shard_name_template='')
credentials = p | 'Get Credentials' >> beam.io.Read(CustomFileBasedSource(known_args.credentials_file_path))
credentials | "Clean Up" >> beam.ParDo(CleanUpPardo(known_args.credentials_file_path))
geo_location_added = (enriched_records[ADD_GEO_LOCATION_RECORDS]
| 'Batch records' >> beam.BatchElements(min_batch_size=10000, max_batch_size=10000)
| "Add geo-location" >> beam.ParDo(AddGeoLocationPardo(known_args),credentials = pvalue.AsSingleton(credentials))
| 'Flatten geo-location records' >> beam.FlatMap(lambda x: x))
flattened_records = (geo_location_added, enriched_records[NO_GEO_TAGGING_REQUIRED]) | beam.Flatten()
flattened_records | 'Parse or test run' >> beam.ParDo(ParseOrTestRunPardo(known_args)) \
| 'Create JSON' >> beam.Map(lambda x: (json.dumps(x[0]))) \
| "Write JSON" >> beam.io.WriteToText(known_args.parsed_output_file_path, file_name_suffix='.json', shard_name_template='')
datalake_response = flattened_records | 'Batch datalake records' >> beam.BatchElements(min_batch_size=100,
max_batch_size=100) \
|beam.ParDo(DatalakeWriterDoFn(user_options=known_args), credentials = pvalue.AsSingleton(credentials)).with_outputs(BAD_RECORDS,
ERROR_LOGS,
BAD_CSV_RECORDS,
main=GOOD_RECORDS)
datalake_response[GOOD_RECORDS] | 'Write datalake response to output file' >> WriteToText(
known_args.success_response_file_path, file_name_suffix='.json', shard_name_template='')
datalake_response[BAD_CSV_RECORDS] | 'Write bad datalake records in original CSV format to failed file' >> WriteToTextWithHeaders(
known_args.failed_original_file_path, file_name_suffix='.csv', shard_name_template='', header=known_args.file_header)
datalake_response[BAD_RECORDS] | 'Write bad datalake JSON records to failed file' >> WriteToText(
known_args.failed_original_file_path, file_name_suffix='.json', shard_name_template='')
datalake_response[ERROR_LOGS] | 'Write failure errors and recordIds' >> WriteToText(
known_args.failed_errors_file_path, file_name_suffix='.json', shard_name_template='')
if __name__ == '__main__':
create_pipeline()
from setuptools import setup
setup(
name = "ingestor_dataflow_utilities",
version = "1.0.0",
packages=['ingestor_dataflow_utilities'],
)
\ No newline at end of file
apache-beam[test]==2.6.0
requests-mock==1.5.0
\ No newline at end of file