superpyrate package

Submodules

superpyrate.db_setup module

Sets up the tables in a newly created database, ready for data ingest

superpyrate.db_setup.main()[source]
superpyrate.db_setup.make_options()[source]

superpyrate.pipeline module

Runs an integrated pipeline from raw zip file to database tables.

This mega-pipeline is constructed out of three smaller pipelines and now brings together tasks which:

  1. Unzip individual AIS archives and output the csv files
  2. Validate each of the csv files, processing using a derived version of the pyrate code, outputting vaidated csv files
  3. Using the postgres copy command, ingest the validated data directly into the database

Because the exact contents of the zipped archives are unknown until they are unzipped, tasks are spawned dynamically.

digraph pipeline {
    GetZipArchive [label="GetZipArchive", href="superpyrate.html#superpyrate.pipeline.GetZipArchive", target="_top", shape=box];
    GetFolderOfArchives [label="GetFolderOfArchives", href="superpyrate.html#superpyrate.pipeline.GetFolderOfArchives", target="_top", shape=box];
    UnzippedArchive [label="UnzippedArchive", href="superpyrate.html#superpyrate.pipeline.UnzippedArchive", target="_top", shape=diamond];
    UnzippedArchive -> GetZipArchive;
    ProcessCsv [label="ProcessCsv", href="superpyrate.html#superpyrate.pipeline.ProcessCsv", target="_top", shape=diamond, colorscheme=dark26, color=4, style=filled];
    ProcessCsv -> UnzippedArchive;
    ProcessCsv -> ValidMessages [arrowhead=dot,arrowtail=dot]
    GetCsvFile [label="GetCsvFile", href="superpyrate.html#superpyrate.pipeline.GetCsvFile", target="_top", shape=box];
    ValidMessages [label="ValidMessages", href="", target="_top", shape=diamond];
    ValidMessages -> GetCsvFile;
    ValidMessages -> fs [arrowhead=odot];
    ValidMessagesToDatabase [label="ValidMessagesToDatabase", href="superpyrate.html#superpyrate.pipeline.ValidMessagesToDatabase", target="_top", shape=diamond];
    ValidMessagesToDatabase -> ValidMessages;
    ValidMessagesToDatabase -> db [arrowhead=odot];
    LoadCleanedAIS [label="LoadCleanedAIS", href="superpyrate.html#superpyrate.pipeline.LoadCleanedAIS", target="_top", shape=diamond];
    LoadCleanedAIS -> ValidMessagesToDatabase;
    LoadCleanedAIS -> db [arrowhead=odot];
    WriteCsvToDb [label="WriteCsvToDb", href="superpyrate.html#superpyrate.pipeline.WriteCsvToDb", target="_top", shape=diamond, colorscheme=dark26, color=4, style=filled];
    WriteCsvToDb -> UnzippedArchive;
    WriteCsvToDb -> LoadCleanedAIS [arrowhead=dot,arrowtail=dot];
    ProcessZipArchives [label="ProcessZipArchives", href="superpyrate.html#superpyrate.pipeline.ProcessZipArchives", target="_top", shape=diamond, colorscheme=dark26, color=3, style=filled];
    ProcessZipArchives -> GetFolderOfArchives;
    ProcessZipArchives -> ProcessCsv [arrowhead=dot, arrowtail=dot];
    ProcessZipArchives -> WriteCsvToDb [arrowhead=dot, arrowtail=dot];
    RunQueryOnTable [label="RunQueryOnTable", href="superpyrate.html#superpyrate.pipeline.RunQueryOnTable", target="_top", shape=diamond];
    RunQueryOnTable -> db [arrowhead=odot];
    MakeAllIndices [label="MakeAllIndices", href="superpyrate.html#superpyrate.pipeline.MakeAllIndices", target="_top", shape=diamond, colorscheme=dark26, color=2, style=filled];
    MakeAllIndices -> RunQueryOnTable [arrowhead=dot, arrowtail=dot];
    MakeAllIndices -> ProcessZipArchives;
    ClusterAisClean [label="ClusterAisClean", href="superpyrate.html#superpyrate.pipeline.ClusterAisClean", target="_top", shape=diamond, colorscheme=dark26, color=1, style=filled];
    ClusterAisClean -> MakeAllIndices;
    ClusterAisClean -> db [arrowhead=odot];

    db [label="database", shape=cylinder];
    fs [label="filesystem", shape=folder];
}

Entry points

It is not necessary to run the entire pipeline, although there is little harm in doing so, as luigi manages the process so that individual tasks are idempotent. This means that a task only runs if required. Luigi only runs the tasks necessary to produce the files which are required for the specified entry point.

For example, to run the entire pipeline, producing a full ingested and clustered database, run:

luigi --module superpyrate.pipeline ClusterAisClean
      --workers 12
      --folder-of-zips /folder/of/zips/
      --with_db

If only the validated csv files are required, run:

luigi --module superpyrate.pipeline ProcessZipArchives
      --workers 12
      --folder-of-zips /folder/of/zips/
      --with_db

Working folder

The working folder LUIGIWORK must contain two subfolders - files and tmp. The files subfolder contains the unzipped and cleancsv folders, with all of the respective temporary files stored within. The tmp subfolder contains processcsv, writecsv, archives and database folders and contains files which are generated by the tasks which do not produce an actual file as output, rather spawn child-tasks.

Environment Variables

LUIGIWORK
the working folder for the files
DBHOSTNAME
hostname for the database e.g. localhost
DBNAME
the name of the database
DBUSER
the name of the user with access to the database
DBUSERPASS
the password of the database user
class superpyrate.pipeline.ClusterAisClean(*args, **kwargs)[source]

Bases: superpyrate.pipeline.ClusterAisClean

Clusters the ais_clean table over the disk on the mmsi index

requires(_self)
task_namespace = None
class superpyrate.pipeline.GetCsvFile(*args, **kwargs)[source]

Bases: luigi.task.ExternalTask

csvfile = <luigi.parameter.Parameter object>
output()[source]
task_namespace = None
class superpyrate.pipeline.GetFolderOfArchives(*args, **kwargs)[source]

Bases: luigi.task.ExternalTask

Returns the folder of zipped archives as a luigi.file.LocalTarget

folder_of_zips = <luigi.parameter.Parameter object>
output()[source]
run()[source]
task_namespace = None
class superpyrate.pipeline.GetZipArchive(*args, **kwargs)[source]

Bases: luigi.task.ExternalTask

Returns a zipped archive as a luigi.file.LocalTarget

output()[source]
task_namespace = None
zip_file = <luigi.parameter.Parameter object>
class superpyrate.pipeline.LoadCleanedAIS(*args, **kwargs)[source]

Bases: luigi.postgres.CopyToTable

Update ais_sources table with name of CSV file processed

After the valid csv files are successfully written to the database, this function updates the sources table with the name of the file which has been written

column_separator = ','
csvfile = <luigi.parameter.Parameter object>
database = ''
host = ''
null_values = (None, '')
password = ''
requires()[source]
run()[source]
table = 'ais_sources'
task_namespace = None
user = ''
class superpyrate.pipeline.MakeAllIndices(*args, **kwargs)[source]

Bases: superpyrate.pipeline.MakeAllIndices

Creates the indices required for a specified table

The list of indices are derived from the table specification in pyrate

Parameters:table (str, default='ais_clean') –
requires(_self)
task_namespace = None
class superpyrate.pipeline.ProcessCsv(*args, **kwargs)[source]

Bases: luigi.task.Task

Yields:ValidMessages
output()[source]

Dummy files are placed in a folder of the same name as the zip file

The files are placed in a subdirectory of LUIGIWORK called tmp/processcsv

requires()[source]
run()[source]
task_namespace = None
zip_file = <luigi.parameter.Parameter object>
class superpyrate.pipeline.ProcessZipArchives(*args, **kwargs)[source]

Bases: luigi.task.Task

Dynamically spawns WriteCsvToDb or ProcessCsv depending on database

Parameters:

with_db (bool) – Indicate whether a database is available for writing csv files

Yields:
folder_of_zips = <luigi.parameter.Parameter object>
output()[source]
requires()[source]
run()[source]
task_namespace = None
with_db = <luigi.parameter.BoolParameter object>
class superpyrate.pipeline.RunQueryOnTable(*args, **kwargs)[source]

Bases: luigi.postgres.PostgresQuery

Runs a query on a table in the database

Used for passing in utility type queries to the database such as creation of indices etc.

Parameters:
  • query (str) – A legal sql query
  • table (str, default='ais_clean') – A table on which to run the query
database = ''
host = ''
password = ''
query = <luigi.parameter.Parameter object>
table = <luigi.parameter.Parameter object>
task_namespace = None
update_id = <luigi.parameter.Parameter object>
user = ''
class superpyrate.pipeline.UnzippedArchive(*args, **kwargs)[source]

Bases: luigi.contrib.external_program.ExternalProgramTask

Unzips the zipped archive into a folder of AIS csv format files the same name as the original file

Parameters:zip_file (str) – The absolute path of the zipped archives
output()[source]

Outputs the files into a folder of the same name as the zip file

The files are placed in a subdirectory of LUIGIWORK called files/unzipped

program_args()[source]

Runs 7zip to extract the archives of AIS files

Notes

e
Unzip all ignoring folder structure (i.e. to highest level)
-o
Output folder
-y
Answer yes to everything
requires()[source]
task_namespace = None
zip_file = <luigi.parameter.Parameter object>
class superpyrate.pipeline.ValidMessages(*args, **kwargs)[source]

Bases: luigi.task.Task

Takes AIS messages and runs validation functions, generating valid csv files in folder called ‘cleancsv’ at the same level as unzipped_ais_path

csvfile = <luigi.parameter.Parameter object>
output()[source]

Validated files are named as the original csv file

The files are placed in a subdirectory of LUIGIWORK called files/cleancsv

requires()[source]
run()[source]
task_namespace = None
class superpyrate.pipeline.ValidMessagesToDatabase(*args, **kwargs)[source]

Bases: luigi.postgres.CopyToTable

Writes the valid csv files to the postgres database

Parameters:original_csvfile (luigi.Parameter) – The raw csvfile containing AIS data
cols = ['MMSI', 'Time', 'Message_ID', 'Navigational_status', 'SOG', 'Longitude', 'Latitude', 'COG', 'Heading', 'IMO', 'Draught', 'Destination', 'Vessel_Name', 'ETA_month', 'ETA_day', 'ETA_hour', 'ETA_minute']
column_separator = ','
columns = ['mmsi', 'time', 'message_id', 'navigational_status', 'sog', 'longitude', 'latitude', 'cog', 'heading', 'imo', 'draught', 'destination', 'vessel_name', 'eta_month', 'eta_day', 'eta_hour', 'eta_minute']
copy(cursor, clean_file)[source]
database = ''
host = ''
null_values = (None, '')
original_csvfile = <luigi.parameter.Parameter object>
password = ''
requires()[source]
rows()[source]

Return/yield tuples or lists corresponding to each row to be inserted.

Yields:row (iterable)
run()[source]

Inserts data generated by rows() into target table.

If the target table doesn’t exist, self.create_table will be called to attempt to create the table.

table = 'ais_clean'
task_namespace = None
user = ''
class superpyrate.pipeline.WriteCsvToDb(*args, **kwargs)[source]

Bases: superpyrate.pipeline.WriteCsvToDb

Dynamically spawns LoadCleanedAIS to load valid csvs into the database

requires(_self)
task_namespace = None
superpyrate.pipeline.get_environment_variable(name)[source]

Tries to access an environment variable, and handles the error by replacing the value with a dummy value (an empty string)

superpyrate.pipeline.get_working_folder(folder_of_zips=None)[source]
Parameters:folder_of_zips (str) – The absolute path of the folder of zips e.g. /tests/fixtures/testais/
Returns:working_folder – The path of the working folder. This is either set by the environment variable LUIGIWORK, or if empty is computed from the arguments
Return type:str
superpyrate.pipeline.setup_working_folder()[source]

Setup the working folder structure for the entire luigi pipeline

The working folder (‘LUIGIWORK’) must contain two subfolders - files and tmp. The files subfolder contains the unzipped and cleancsv folders, with all of the respective temporary files stored within. The tmp subfolder contains processcsv, writecsv, archives and database folders and contains files which are generated by the tasks which do not produce an actual file as output, rather spawn child-tasks.

superpyrate.task_countfiles module

Holds the luigi tasks which count the number of rows in the files

Records the number of clean and dirty rows in the AIS data, writing these stats to the database and finally producing a report of the statistics

  1. Count the number of rows in the raw csv files (in files/unzipped/<archive>)
  2. Count the number of rows int the clean csv files (in files/cleancsv/)
  3. Write the clean rows in the clean column of ais_sources
  4. Write the dirty (raw - clean) rows into the dirty column of ais_sources
class superpyrate.task_countfiles.CountLines(*args, **kwargs)[source]

Bases: superpyrate.task_countfiles.CountLines

Counts the number of lines for all the csvfiles in a folder

Writes all the counts and filenames to a delimited file with the name of the folder

Parameters:folder_name (str) – The absolute path of the csv file
requires(_self)
task_namespace = None
class superpyrate.task_countfiles.DoIt(*args, **kwargs)[source]

Bases: luigi.task.Task

folder_of_zips = <luigi.parameter.Parameter object>
output()[source]
requires()[source]
run()[source]
task_namespace = None
with_db = <luigi.parameter.BoolParameter object>
class superpyrate.task_countfiles.GetCountsForAllFiles(*args, **kwargs)[source]

Bases: superpyrate.task_countfiles.GetCountsForAllFiles

Counts the rows in all clean (validated) and raw files

requires(_self)
task_namespace = None
class superpyrate.task_countfiles.ProduceStatisticsReport(*args, **kwargs)[source]

Bases: superpyrate.task_countfiles.ProduceStatisticsReport

Produces a report of the data statistics

requires(_self)
task_namespace = None

superpyrate.tasks module

Contains the code for validating AIS messages

superpyrate.tasks.learn_columns(read_cols, required_cols, csv_or_xml='csv')[source]

Tries to match the read columns with the list given

Parameters:
  • read_cols (dict) – The columns read from the csv file
  • required_cols (list) – A list of the columns required from the csv file
  • csv_or_xml (str, default='csv') –
superpyrate.tasks.produce_valid_csv_file(inputf, outputf)[source]
Parameters:
  • input_file – File path to a large CSV file of AIS data
  • output_file – File path for a CSV file containing validated and cleaned data
superpyrate.tasks.readcsv(fp, forced_col_map=None, columns=None)[source]

Yields a dctionary of the subset of columns required

Reads each line in CSV file, checks if all columns are available, and returns a dictionary of the subset of columns required (as per AIS_CSV_COLUMNS).

If row is invalid (too few columns), returns an empty dictionary.

Parameters:
  • fp (TextIOWrapper) – An open TextIOWrapper (returned by open())
  • forced_col_map (dict) – A dictionary mapping the keys defined in columns to columns with different names
  • columns (dict, default=AIS_CSV_COLUMNS) – A dictionary of columns
Yields:

rowsubset (dict) – A dictionary of the subset of columns as per columns

superpyrate.tasks.unfussy_reader(csv_reader)[source]

Module contents