superpyrate package¶
Submodules¶
superpyrate.db_setup module¶
Sets up the tables in a newly created database, ready for data ingest
superpyrate.pipeline module¶
Runs an integrated pipeline from raw zip file to database tables.
This mega-pipeline is constructed out of three smaller pipelines and now brings together tasks which:
- Unzip individual AIS archives and output the csv files
- Validate each of the csv files, processing using a derived version of the pyrate code, outputting vaidated csv files
- Using the postgres copy command, ingest the validated data directly into the database
Because the exact contents of the zipped archives are unknown until they are unzipped, tasks are spawned dynamically.
Entry points¶
It is not necessary to run the entire pipeline, although there is little harm in doing so, as luigi manages the process so that individual tasks are idempotent. This means that a task only runs if required. Luigi only runs the tasks necessary to produce the files which are required for the specified entry point.
For example, to run the entire pipeline, producing a full ingested and clustered database, run:
luigi --module superpyrate.pipeline ClusterAisClean
--workers 12
--folder-of-zips /folder/of/zips/
--with_db
If only the validated csv files are required, run:
luigi --module superpyrate.pipeline ProcessZipArchives
--workers 12
--folder-of-zips /folder/of/zips/
--with_db
Working folder¶
The working folder LUIGIWORK
must contain two subfolders - files and tmp.
The files
subfolder contains the unzipped
and cleancsv
folders,
with all of the respective temporary files stored within.
The tmp
subfolder contains processcsv
, writecsv
, archives
and database
folders and contains files which are generated by the tasks which do not produce
an actual file as output, rather spawn child-tasks.
Environment Variables¶
LUIGIWORK
- the working folder for the files
DBHOSTNAME
- hostname for the database e.g. localhost
DBNAME
- the name of the database
DBUSER
- the name of the user with access to the database
DBUSERPASS
- the password of the database user
-
class
superpyrate.pipeline.
ClusterAisClean
(*args, **kwargs)[source]¶ Bases:
superpyrate.pipeline.ClusterAisClean
Clusters the ais_clean table over the disk on the mmsi index
-
requires
(_self)¶
-
task_namespace
= None¶
-
-
class
superpyrate.pipeline.
GetCsvFile
(*args, **kwargs)[source]¶ Bases:
luigi.task.ExternalTask
-
csvfile
= <luigi.parameter.Parameter object>¶
-
task_namespace
= None¶
-
-
class
superpyrate.pipeline.
GetFolderOfArchives
(*args, **kwargs)[source]¶ Bases:
luigi.task.ExternalTask
Returns the folder of zipped archives as a luigi.file.LocalTarget
-
folder_of_zips
= <luigi.parameter.Parameter object>¶
-
task_namespace
= None¶
-
-
class
superpyrate.pipeline.
GetZipArchive
(*args, **kwargs)[source]¶ Bases:
luigi.task.ExternalTask
Returns a zipped archive as a luigi.file.LocalTarget
-
task_namespace
= None¶
-
zip_file
= <luigi.parameter.Parameter object>¶
-
-
class
superpyrate.pipeline.
LoadCleanedAIS
(*args, **kwargs)[source]¶ Bases:
luigi.postgres.CopyToTable
Update
ais_sources
table with name of CSV file processedAfter the valid csv files are successfully written to the database, this function updates the
sources
table with the name of the file which has been written-
column_separator
= ','¶
-
csvfile
= <luigi.parameter.Parameter object>¶
-
database
= ''¶
-
host
= ''¶
-
null_values
= (None, '')¶
-
password
= ''¶
-
table
= 'ais_sources'¶
-
task_namespace
= None¶
-
user
= ''¶
-
-
class
superpyrate.pipeline.
MakeAllIndices
(*args, **kwargs)[source]¶ Bases:
superpyrate.pipeline.MakeAllIndices
Creates the indices required for a specified table
The list of indices are derived from the table specification in
pyrate
Parameters: table (str, default='ais_clean') – -
requires
(_self)¶
-
task_namespace
= None¶
-
-
class
superpyrate.pipeline.
ProcessCsv
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Yields: ValidMessages -
output
()[source]¶ Dummy files are placed in a folder of the same name as the zip file
The files are placed in a subdirectory of
LUIGIWORK
calledtmp/processcsv
-
task_namespace
= None¶
-
zip_file
= <luigi.parameter.Parameter object>¶
-
-
class
superpyrate.pipeline.
ProcessZipArchives
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Dynamically spawns
WriteCsvToDb
orProcessCsv
depending on databaseParameters: with_db (bool) – Indicate whether a database is available for writing csv files
Yields: -
folder_of_zips
= <luigi.parameter.Parameter object>¶
-
task_namespace
= None¶
-
with_db
= <luigi.parameter.BoolParameter object>¶
-
-
class
superpyrate.pipeline.
RunQueryOnTable
(*args, **kwargs)[source]¶ Bases:
luigi.postgres.PostgresQuery
Runs a query on a table in the database
Used for passing in utility type queries to the database such as creation of indices etc.
Parameters: - query (str) – A legal sql query
- table (str, default='ais_clean') – A table on which to run the query
-
database
= ''¶
-
host
= ''¶
-
password
= ''¶
-
query
= <luigi.parameter.Parameter object>¶
-
table
= <luigi.parameter.Parameter object>¶
-
task_namespace
= None¶
-
update_id
= <luigi.parameter.Parameter object>¶
-
user
= ''¶
-
class
superpyrate.pipeline.
UnzippedArchive
(*args, **kwargs)[source]¶ Bases:
luigi.contrib.external_program.ExternalProgramTask
Unzips the zipped archive into a folder of AIS csv format files the same name as the original file
Parameters: zip_file (str) – The absolute path of the zipped archives -
output
()[source]¶ Outputs the files into a folder of the same name as the zip file
The files are placed in a subdirectory of
LUIGIWORK
calledfiles/unzipped
-
program_args
()[source]¶ Runs 7zip to extract the archives of AIS files
Notes
e
- Unzip all ignoring folder structure (i.e. to highest level)
-o
- Output folder
-y
- Answer yes to everything
-
task_namespace
= None¶
-
zip_file
= <luigi.parameter.Parameter object>¶
-
-
class
superpyrate.pipeline.
ValidMessages
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Takes AIS messages and runs validation functions, generating valid csv files in folder called ‘cleancsv’ at the same level as unzipped_ais_path
-
csvfile
= <luigi.parameter.Parameter object>¶
-
output
()[source]¶ Validated files are named as the original csv file
The files are placed in a subdirectory of
LUIGIWORK
calledfiles/cleancsv
-
task_namespace
= None¶
-
-
class
superpyrate.pipeline.
ValidMessagesToDatabase
(*args, **kwargs)[source]¶ Bases:
luigi.postgres.CopyToTable
Writes the valid csv files to the postgres database
Parameters: original_csvfile (luigi.Parameter) – The raw csvfile containing AIS data -
cols
= ['MMSI', 'Time', 'Message_ID', 'Navigational_status', 'SOG', 'Longitude', 'Latitude', 'COG', 'Heading', 'IMO', 'Draught', 'Destination', 'Vessel_Name', 'ETA_month', 'ETA_day', 'ETA_hour', 'ETA_minute']¶
-
column_separator
= ','¶
-
columns
= ['mmsi', 'time', 'message_id', 'navigational_status', 'sog', 'longitude', 'latitude', 'cog', 'heading', 'imo', 'draught', 'destination', 'vessel_name', 'eta_month', 'eta_day', 'eta_hour', 'eta_minute']¶
-
database
= ''¶
-
host
= ''¶
-
null_values
= (None, '')¶
-
original_csvfile
= <luigi.parameter.Parameter object>¶
-
password
= ''¶
-
rows
()[source]¶ Return/yield tuples or lists corresponding to each row to be inserted.
Yields: row (iterable)
-
run
()[source]¶ Inserts data generated by rows() into target table.
If the target table doesn’t exist, self.create_table will be called to attempt to create the table.
-
table
= 'ais_clean'¶
-
task_namespace
= None¶
-
user
= ''¶
-
-
class
superpyrate.pipeline.
WriteCsvToDb
(*args, **kwargs)[source]¶ Bases:
superpyrate.pipeline.WriteCsvToDb
Dynamically spawns
LoadCleanedAIS
to load valid csvs into the database-
requires
(_self)¶
-
task_namespace
= None¶
-
-
superpyrate.pipeline.
get_environment_variable
(name)[source]¶ Tries to access an environment variable, and handles the error by replacing the value with a dummy value (an empty string)
-
superpyrate.pipeline.
get_working_folder
(folder_of_zips=None)[source]¶ Parameters: folder_of_zips (str) – The absolute path of the folder of zips e.g. /tests/fixtures/testais/
Returns: working_folder – The path of the working folder. This is either set by the environment variable LUIGIWORK
, or if empty is computed from the argumentsReturn type: str
-
superpyrate.pipeline.
setup_working_folder
()[source]¶ Setup the working folder structure for the entire luigi pipeline
The working folder (‘LUIGIWORK’) must contain two subfolders - files and tmp. The files subfolder contains the unzipped and cleancsv folders, with all of the respective temporary files stored within. The tmp subfolder contains processcsv, writecsv, archives and database folders and contains files which are generated by the tasks which do not produce an actual file as output, rather spawn child-tasks.
superpyrate.task_countfiles module¶
Holds the luigi tasks which count the number of rows in the files
Records the number of clean and dirty rows in the AIS data, writing these stats to the database and finally producing a report of the statistics
- Count the number of rows in the raw csv files (in
files/unzipped/<archive>
) - Count the number of rows int the clean csv files (in
files/cleancsv/
) - Write the clean rows in the clean column of ais_sources
- Write the dirty (raw - clean) rows into the dirty column of ais_sources
-
class
superpyrate.task_countfiles.
CountLines
(*args, **kwargs)[source]¶ Bases:
superpyrate.task_countfiles.CountLines
Counts the number of lines for all the csvfiles in a folder
Writes all the counts and filenames to a delimited file with the name of the folder
Parameters: folder_name (str) – The absolute path of the csv file -
requires
(_self)¶
-
task_namespace
= None¶
-
-
class
superpyrate.task_countfiles.
DoIt
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
-
folder_of_zips
= <luigi.parameter.Parameter object>¶
-
task_namespace
= None¶
-
with_db
= <luigi.parameter.BoolParameter object>¶
-
-
class
superpyrate.task_countfiles.
GetCountsForAllFiles
(*args, **kwargs)[source]¶ Bases:
superpyrate.task_countfiles.GetCountsForAllFiles
Counts the rows in all clean (validated) and raw files
-
requires
(_self)¶
-
task_namespace
= None¶
-
-
class
superpyrate.task_countfiles.
ProduceStatisticsReport
(*args, **kwargs)[source]¶ Bases:
superpyrate.task_countfiles.ProduceStatisticsReport
Produces a report of the data statistics
-
requires
(_self)¶
-
task_namespace
= None¶
-
superpyrate.tasks module¶
Contains the code for validating AIS messages
-
superpyrate.tasks.
learn_columns
(read_cols, required_cols, csv_or_xml='csv')[source]¶ Tries to match the read columns with the list given
Parameters:
-
superpyrate.tasks.
produce_valid_csv_file
(inputf, outputf)[source]¶ Parameters: - input_file – File path to a large CSV file of AIS data
- output_file – File path for a CSV file containing validated and cleaned data
-
superpyrate.tasks.
readcsv
(fp, forced_col_map=None, columns=None)[source]¶ Yields a dctionary of the subset of columns required
Reads each line in CSV file, checks if all columns are available, and returns a dictionary of the subset of columns required (as per AIS_CSV_COLUMNS).
If row is invalid (too few columns), returns an empty dictionary.
Parameters: - fp (TextIOWrapper) – An open TextIOWrapper (returned by open())
- forced_col_map (dict) – A dictionary mapping the keys defined in columns to columns with different names
- columns (dict, default=AIS_CSV_COLUMNS) – A dictionary of columns
Yields: rowsubset (dict) – A dictionary of the subset of columns as per columns