Source code for superpyrate.tasks

"""Contains the code for validating AIS messages
"""
import csv
import sys
from pyrate.algorithms.aisparser import parse_raw_row, \
                                        AIS_CSV_COLUMNS, \
                                        validate_row
# from exactVerify.ais_import.algorithms.exact_verifyparser import readcsv
import logging
from fuzzywuzzy import process as fuzz_proc

LOGGER = logging.getLogger('luigi-interface')
LOGGER.setLevel(logging.INFO)

FORCED_COL_MAP = {'MMSI': 'MMSI',
                  'Time': 'Time',
                  'Message_ID': 'Message_ID',
                  'Navigational_status': 'Navigational_status',
                  'SOG': 'SOG',
                  'Longitude': 'Longitude',
                  'Latitude': 'Latitude',
                  'COG': 'COG',
                  'Heading': 'Heading',
                  'IMO': 'IMO',
                  'Draught': 'Draught',
                  'Destination': 'Destination',
                  'Vessel_Name': 'Vessel_Name',
                  'ETA_month': 'ETA_month',
                  'ETA_day': 'ETA_day',
                  'ETA_hour': 'ETA_hour',
                  'ETA_minute': 'ETA_minute'}


[docs]def learn_columns(read_cols, required_cols, csv_or_xml='csv'): """Tries to match the read columns with the list given Arguments ========= read_cols : dict The columns read from the csv file required_cols : list A list of the columns required from the csv file csv_or_xml : str, default='csv' """ if csv_or_xml == 'csv': matched_cols = {} for col in required_cols: matched_cols[col] = fuzz_proc.extractOne(col, read_cols) return matched_cols else: return read_cols
[docs]def produce_valid_csv_file(inputf, outputf): """ Arguments --------- input_file : File path to a large CSV file of AIS data output_file : File path for a CSV file containing validated and cleaned data """ LOGGER.info("Processing {}".format(inputf)) # Read input_file # try: columns = AIS_CSV_COLUMNS with open(inputf, 'rU') as input_file: # Do validation and write a new file of valid messages with open(outputf, 'w') as output_file: writer = csv.DictWriter(output_file, dialect="excel", fieldnames=columns) writer.writeheader() # parse and iterate lines from the current file LOGGER.debug("Building the reader") reader = unfussy_reader(readcsv(input_file, forced_col_map=FORCED_COL_MAP, columns=columns)) LOGGER.debug("Iterating over the reader") for row in reader: if len(row) > 0: converted_row = {} try: converted_row = parse_raw_row(row) except ValueError as e: # invalid data in row. Write it to error log LOGGER.error("Invalid data in row: {}".format(e)) continue except KeyError as e: LOGGER.error("Missing data in row: {}".format(e)) continue else: # validate parsed row try: validated_row = validate_row(converted_row) except ValueError as e: LOGGER.error("Error in validating the convered row: {}".format(e)) else: try: # LOGGER.debug("Attempting writing validated data to file.") writer.writerow(validated_row) except ValueError as ve: LOGGER.error("Error in writing validated row to csvfile: {}".format(ve)) continue else: LOGGER.info("Illegal row, so not writing to file.")
[docs]def unfussy_reader(csv_reader): """ """ while True: try: yield next(csv_reader) # Catch csv field size limit exceeded error except csv.Error as ce: LOGGER.error('CSV Error: {} on line {}'.format(ce, csv_reader.line_num)) yield {} continue # catch 'ascii' decode error except UnicodeDecodeError as ude: LOGGER.error('CSV Error: {} on line {}'.format(ude, csv_reader.line_num)) yield {} continue
[docs]def readcsv(fp, forced_col_map=None, columns=None): """Yields a dctionary of the subset of columns required Reads each line in CSV file, checks if all columns are available, and returns a dictionary of the subset of columns required (as per AIS_CSV_COLUMNS). If row is invalid (too few columns), returns an empty dictionary. Arguments --------- fp : TextIOWrapper An open TextIOWrapper (returned by `open()`) forced_col_map : dict A dictionary mapping the keys defined in columns to columns with different names columns : dict, default=AIS_CSV_COLUMNS A dictionary of columns Yields ------ rowsubset : dict A dictionary of the subset of columns as per `columns` """ max_int = sys.maxsize decrement = True while decrement: # decrease the max_int value by factor 10 # as long as the OverflowError occurs. decrement = False try: csv.field_size_limit(max_int) except OverflowError: max_int = int(max_int / 10) decrement = True # first line is column headers. Use to extract indices of columns # we are extracting cols = fp.readline().strip('\r\n').split(',') LOGGER.info("There are {} columns in {}".format(len(cols), fp.name)) indices = {} auto_col_map = learn_columns(cols, columns) # LOGGER.debug("{}".format(auto_col_map)) used_map = {} for col in columns: if (len(forced_col_map) > 0) & (col in forced_col_map.keys()): try: indices[col] = cols.index(forced_col_map[col]) used_map.update({col: forced_col_map[col]}) except Exception as e0: LOGGER.warning("{}, {}".format(forced_col_map, col)) LOGGER.warning(cols) LOGGER.warning("Error mapping columns: {}".format(repr(e0))) if auto_col_map[col][1] >= 95: indices[col] = cols.index(auto_col_map[col][0]) used_map.update({col: auto_col_map[col][0]}) else: raise RuntimeError("Something wrong with provided column map. \ Double check key-value pairs. \ Note that names are case sensitive. {0}.".format(e0)) else: try: # LOGGER.debug(cols) indices[col] = cols.index(col) used_map.update({col: col}) except Exception as e1: LOGGER.warning("Error mapping columns: {}".format(repr(e1))) if auto_col_map[col][1] >= 95: indices[col] = cols.index(auto_col_map[col][0]) used_map.update({col: auto_col_map[col][0]}) else: auto_col_map_msg = '{' + ', '.join("'{!s}':{!r}".format( key, val[0]) for (key, val) in auto_col_map.items()) + '}' msg = "Possibly ambiguous column mapping or column missing "\ "in CSV header. Try specifying an exact map as a "\ "dictionary to exact_verify's 'run' command. "\ "Suggested column mapping (required:read): {}".format( auto_col_map_msg) raise RuntimeError( "Missing columns in file header: {0}. {1}.".format(e1, msg)) # Show message showing column map used for current file LOGGER.debug( "Using the following column mapping (required:read): {}". format(used_map)) unfussy = unfussy_reader(csv.reader(fp, delimiter=',', quotechar='"')) for row in unfussy: # LOGGER.info("New row") rowsubset = {} # only try to process row if all columns are available # changed from >= to == if len(row) == len(cols): for col in columns: rowsubset[col] = row[indices[col]] # raw column data # LOGGER.debug("Legal row found: {}".format(rowsubset)) yield rowsubset else: LOGGER.debug("""Expected column length doesn't match row in file: {}. Row is {}, column is {}""".format(fp.name, len(row), len(cols))) yield rowsubset
if __name__ == "__main__": an_input_file = sys.argv[0] an_output_file = sys.argv[1] produce_valid_csv_file(an_input_file, an_output_file)