superpyrate¶
This is the documentation of superpyrate.
Contents¶
Setup¶
Configuration¶
After installation, it is necessary to conduct three separate configuration tasks for:
- logging
- luigi
- postgres
Setup and initialise a remote luigi server to host the scheduler. In the UCL shipping group, this is currently hosted on the ‘linux box’
Logging¶
The log output of the superpyrate pipeline is extremely verbose when set to DEBUG level. For production, it is strongly recommended that the level is set to INFO only to avoid the generation of GB-scale log files.
This is easily done using a python logging configuration file, and referencing the path to the logging config in the luigi configuration file.
The contents of the python logging file should look something like this:
[loggers]
keys=root,luigi
[handlers]
keys=consoleHandler,fileHandler
[formatters]
keys=fileFormatter,consoleFormatter
[logger_root]
level=INFO
handlers=consoleHandler
[logger_luigi]
level=INFO
handlers=consoleHandler,fileHandler
qualname=luigi.interface
propagate=0
[handler_consoleHandler]
class=StreamHandler
level=WARNING
formatter=consoleFormatter
args=(sys.stdout,)
[handler_fileHandler]
class=FileHandler
level=INFO
formatter=fileFormatter
args=('logfile.log',)
[formatter_fileFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=
[formatter_consoleFormatter]
format=%(levelname)s - %(message)s
datefmt=
Luigi¶
The luigi configuration file can be placed anywhere and is specified through the
environment variable LUIGI_CONFIG_PATH
.
More information about the configuration file can be found in the luigi documentation.
Here’s an example configuration file:
[resources]
postgres=12
matlab=120
postgres_linuxbox=100
[core]
email-type=html
error-email=user@ucl.ac.uk
default-scheduler-host=123.45.678.910
default-scheduler-port=8028
logging_conf_file=/home/username/logging.conf
[worker]
keep_alive=False
ping_interval=30
Postgres¶
The postgres configuration file postgresql.conf
resides in the data folder
specified by the environment variable PGDATA
.
This file contains a number of server specific settings which dramatically affect the performance of the database, particular for memory and processor intensive operations such as index generation, clustering and bulk copying of data.
Various postgres configuration files are stored in a github repository.
Setup for ingest¶
Setting up the database:
# Initialise the database server using Scratch for the data
initdb -D $HOME/Scratch/data
# Spin up the database server
pg_ctl -D $HOME/Scratch/data -l logfile start
# Create the test database
createdb test_aisdb
# Use the following command to access the database schema and tables
#psql --host=localhost --port=5432 --username=test_ais --dbname=test_aisdb
psql -U postgres -c "create extension postgis"
psql -c "create database test_aisdb;" -U postgres
psql -U postgres -c "CREATE USER test_ais WITH PASSWORD 'test_ais' SUPERUSER;"
psql -U postgres -c "GRANT ALL PRIVILEGES ON DATABASE test_aisdb to test_ais;"
Setup virtual python environment using conda:
wget http://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh \
-O miniconda.sh
chmod +x miniconda.sh && ./miniconda.sh -b -p $HOME/miniconda
export PATH=$HOME/miniconda/bin:$PATH
conda update --yes conda
# Configure the conda environment and put it in the path using the
# provided versions
conda create -n testenv --yes python=$PYTHON_VERSION pip scipy pandas numpy psycopg2 sphinx pylint
source activate testenv
Before installing superpyrate, you’ll need to setup your git account. Enter the following commands:
git config --global user.name "YOUR NAME"
git config --global user.email "YOUR EMAIL ADDRESS"
To access git from Legion, you’ll need to setup a certificate and ssh access to git. You can follow the instructions here:
cd $HOME
git clone https://github.com/UCL-ShippingGroup/superpyrate.git
cd superpyrate
pip install -r requirements.txt
python setup.py develop
It is also a good idea to get the legion-scripts which help with running superpyrate:
cd $HOME
git clone https://github.com/UCL-ShippingGroup/legion-scripts.git
Add a configuration file for py:mod:luigi:
[core]
default-scheduler-host=128.40.181.109
default-scheduler-port=8028
logging_conf_file=/home/ucftxyz/logging.conf
[worker]
keep_alive=False
ping_interval=30
Installation¶
Using Legion, loading postgres is as simple as loading the required modules:
If you have legion-scripts installed,
then you can just run the load_postgres.sh
script.
Otherwise, we assume install on linux. For MacOSx, Windows and other architectures, refer to the packages and documentation for postgres available on the website.
Compiling from source¶
First, install postgres:
cd $HOME
wget https://ftp.postgresql.org/pub/source/v9.5.2/postgresql-9.5.2.tar.gz
gunzip postgresql-9.5.2.tar.gz
tar xf postgresql-9.5.2.tar
cd postgresql-9.5.2
# Install a local version of postgres in the user directory on the login node
./configure --prefix=$HOME/pgsql
./make -s
./make install
./make clean
# Setup path
echo 'PATH=$HOME/pgsql/bin:$PATH' >> ~/.bash_profile
Installing postgis, is painful:
# Obtain, compile and install postgis and its requirements (GEOS, PROJ4, GDAL)
cd $HOME
svn checkout http://svn.osgeo.org/geos/trunk geos-svn
cd geos-svn
./autogen.sh
./configure --prefix=$HOME/geos
./make -s
./make install
echo 'PATH=$HOME/geos/bin:$PATH' >> ~/.bash_profile
echo 'export LD_LIBRARY_PATH=$HOME/geos/lib:$LD_LIBRARY_PATH' >> ~/.bash_profile
cd $HOME
wget http://download.osgeo.org/proj/proj-4.9.1.tar.gz
tar xf proj-4.9.1.tar.gz
cd proj-4.9.1
./configure --prefix=$HOME/proj4
./make
./make install
echo 'PATH=$HOME/proj4/bin:$PATH' >> ~/.bash_profile
echo 'export LD_LIBRARY_PATH=$HOME/proj4/lib:$LD_LIBRARY_PATH' >> ~/.bash_profile
cd $HOME
wget http://download.osgeo.org/gdal/2.1.0/gdal-2.1.0.tar.gz
tar xf gdal-2.1.0.tar.gz
cd gdal-2.1.0
./configure --prefix=$HOME/gdal
./make
./make install
echo 'export PATH=/$HOME/gdal/bin:$PATH' >> ~/.bash_profile
echo 'export LD_LIBRARY_PATH=$HOME/gdal/lib:$LD_LIBRARY_PATH' >> ~/.bash_profile
echo 'export GDAL_DATA=$HOME/gdal/share/gdal' >> ~/.bash_profile
echo 'export PATH' >> ~/.bash_profile
# Test
#% gdalinfo --version
# See below for installation of Python bindings
wget http://download.osgeo.org/postgis/source/postgis-2.2.2.tar.gz
tar xf postgis-2.2.2.tar.gz
cd postgis-2.2.2
./configure --prefix=$HOME/postgis
./make
./make install
License¶
The MIT License (MIT)
Copyright (c) 2016 Will Usher
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Developers¶
- Will Usher <w.usher@ucl.ac.uk>
superpyrate¶
superpyrate package¶
Submodules¶
superpyrate.db_setup module¶
Sets up the tables in a newly created database, ready for data ingest
superpyrate.pipeline module¶
Runs an integrated pipeline from raw zip file to database tables.
This mega-pipeline is constructed out of three smaller pipelines and now brings together tasks which:
- Unzip individual AIS archives and output the csv files
- Validate each of the csv files, processing using a derived version of the pyrate code, outputting vaidated csv files
- Using the postgres copy command, ingest the validated data directly into the database
Because the exact contents of the zipped archives are unknown until they are unzipped, tasks are spawned dynamically.
![digraph pipeline {
GetZipArchive [label="GetZipArchive", href="superpyrate.html#superpyrate.pipeline.GetZipArchive", target="_top", shape=box];
GetFolderOfArchives [label="GetFolderOfArchives", href="superpyrate.html#superpyrate.pipeline.GetFolderOfArchives", target="_top", shape=box];
UnzippedArchive [label="UnzippedArchive", href="superpyrate.html#superpyrate.pipeline.UnzippedArchive", target="_top", shape=diamond];
UnzippedArchive -> GetZipArchive;
ProcessCsv [label="ProcessCsv", href="superpyrate.html#superpyrate.pipeline.ProcessCsv", target="_top", shape=diamond, colorscheme=dark26, color=4, style=filled];
ProcessCsv -> UnzippedArchive;
ProcessCsv -> ValidMessages [arrowhead=dot,arrowtail=dot]
GetCsvFile [label="GetCsvFile", href="superpyrate.html#superpyrate.pipeline.GetCsvFile", target="_top", shape=box];
ValidMessages [label="ValidMessages", href="", target="_top", shape=diamond];
ValidMessages -> GetCsvFile;
ValidMessages -> fs [arrowhead=odot];
ValidMessagesToDatabase [label="ValidMessagesToDatabase", href="superpyrate.html#superpyrate.pipeline.ValidMessagesToDatabase", target="_top", shape=diamond];
ValidMessagesToDatabase -> ValidMessages;
ValidMessagesToDatabase -> db [arrowhead=odot];
LoadCleanedAIS [label="LoadCleanedAIS", href="superpyrate.html#superpyrate.pipeline.LoadCleanedAIS", target="_top", shape=diamond];
LoadCleanedAIS -> ValidMessagesToDatabase;
LoadCleanedAIS -> db [arrowhead=odot];
WriteCsvToDb [label="WriteCsvToDb", href="superpyrate.html#superpyrate.pipeline.WriteCsvToDb", target="_top", shape=diamond, colorscheme=dark26, color=4, style=filled];
WriteCsvToDb -> UnzippedArchive;
WriteCsvToDb -> LoadCleanedAIS [arrowhead=dot,arrowtail=dot];
ProcessZipArchives [label="ProcessZipArchives", href="superpyrate.html#superpyrate.pipeline.ProcessZipArchives", target="_top", shape=diamond, colorscheme=dark26, color=3, style=filled];
ProcessZipArchives -> GetFolderOfArchives;
ProcessZipArchives -> ProcessCsv [arrowhead=dot, arrowtail=dot];
ProcessZipArchives -> WriteCsvToDb [arrowhead=dot, arrowtail=dot];
RunQueryOnTable [label="RunQueryOnTable", href="superpyrate.html#superpyrate.pipeline.RunQueryOnTable", target="_top", shape=diamond];
RunQueryOnTable -> db [arrowhead=odot];
MakeAllIndices [label="MakeAllIndices", href="superpyrate.html#superpyrate.pipeline.MakeAllIndices", target="_top", shape=diamond, colorscheme=dark26, color=2, style=filled];
MakeAllIndices -> RunQueryOnTable [arrowhead=dot, arrowtail=dot];
MakeAllIndices -> ProcessZipArchives;
ClusterAisClean [label="ClusterAisClean", href="superpyrate.html#superpyrate.pipeline.ClusterAisClean", target="_top", shape=diamond, colorscheme=dark26, color=1, style=filled];
ClusterAisClean -> MakeAllIndices;
ClusterAisClean -> db [arrowhead=odot];
db [label="database", shape=cylinder];
fs [label="filesystem", shape=folder];
}](_images/graphviz-df3a8bf3fad1c1de701158bd94f383a173de4524.png)
Entry points¶
It is not necessary to run the entire pipeline, although there is little harm in doing so, as luigi manages the process so that individual tasks are idempotent. This means that a task only runs if required. Luigi only runs the tasks necessary to produce the files which are required for the specified entry point.
For example, to run the entire pipeline, producing a full ingested and clustered database, run:
luigi --module superpyrate.pipeline ClusterAisClean
--workers 12
--folder-of-zips /folder/of/zips/
--with_db
If only the validated csv files are required, run:
luigi --module superpyrate.pipeline ProcessZipArchives
--workers 12
--folder-of-zips /folder/of/zips/
--with_db
Working folder¶
The working folder LUIGIWORK
must contain two subfolders - files and tmp.
The files
subfolder contains the unzipped
and cleancsv
folders,
with all of the respective temporary files stored within.
The tmp
subfolder contains processcsv
, writecsv
, archives
and database
folders and contains files which are generated by the tasks which do not produce
an actual file as output, rather spawn child-tasks.
Environment Variables¶
LUIGIWORK
- the working folder for the files
DBHOSTNAME
- hostname for the database e.g. localhost
DBNAME
- the name of the database
DBUSER
- the name of the user with access to the database
DBUSERPASS
- the password of the database user
-
class
superpyrate.pipeline.
ClusterAisClean
(*args, **kwargs)[source]¶ Bases:
superpyrate.pipeline.ClusterAisClean
Clusters the ais_clean table over the disk on the mmsi index
-
requires
(_self)¶
-
task_namespace
= None¶
-
-
class
superpyrate.pipeline.
GetCsvFile
(*args, **kwargs)[source]¶ Bases:
luigi.task.ExternalTask
-
csvfile
= <luigi.parameter.Parameter object>¶
-
task_namespace
= None¶
-
-
class
superpyrate.pipeline.
GetFolderOfArchives
(*args, **kwargs)[source]¶ Bases:
luigi.task.ExternalTask
Returns the folder of zipped archives as a luigi.file.LocalTarget
-
folder_of_zips
= <luigi.parameter.Parameter object>¶
-
task_namespace
= None¶
-
-
class
superpyrate.pipeline.
GetZipArchive
(*args, **kwargs)[source]¶ Bases:
luigi.task.ExternalTask
Returns a zipped archive as a luigi.file.LocalTarget
-
task_namespace
= None¶
-
zip_file
= <luigi.parameter.Parameter object>¶
-
-
class
superpyrate.pipeline.
LoadCleanedAIS
(*args, **kwargs)[source]¶ Bases:
luigi.postgres.CopyToTable
Update
ais_sources
table with name of CSV file processedAfter the valid csv files are successfully written to the database, this function updates the
sources
table with the name of the file which has been written-
column_separator
= ','¶
-
csvfile
= <luigi.parameter.Parameter object>¶
-
database
= ''¶
-
host
= ''¶
-
null_values
= (None, '')¶
-
password
= ''¶
-
table
= 'ais_sources'¶
-
task_namespace
= None¶
-
user
= ''¶
-
-
class
superpyrate.pipeline.
MakeAllIndices
(*args, **kwargs)[source]¶ Bases:
superpyrate.pipeline.MakeAllIndices
Creates the indices required for a specified table
The list of indices are derived from the table specification in
pyrate
Parameters: table (str, default='ais_clean') – -
requires
(_self)¶
-
task_namespace
= None¶
-
-
class
superpyrate.pipeline.
ProcessCsv
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Yields: ValidMessages -
output
()[source]¶ Dummy files are placed in a folder of the same name as the zip file
The files are placed in a subdirectory of
LUIGIWORK
calledtmp/processcsv
-
task_namespace
= None¶
-
zip_file
= <luigi.parameter.Parameter object>¶
-
-
class
superpyrate.pipeline.
ProcessZipArchives
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Dynamically spawns
WriteCsvToDb
orProcessCsv
depending on databaseParameters: with_db (bool) – Indicate whether a database is available for writing csv files
Yields: -
folder_of_zips
= <luigi.parameter.Parameter object>¶
-
task_namespace
= None¶
-
with_db
= <luigi.parameter.BoolParameter object>¶
-
-
class
superpyrate.pipeline.
RunQueryOnTable
(*args, **kwargs)[source]¶ Bases:
luigi.postgres.PostgresQuery
Runs a query on a table in the database
Used for passing in utility type queries to the database such as creation of indices etc.
Parameters: - query (str) – A legal sql query
- table (str, default='ais_clean') – A table on which to run the query
-
database
= ''¶
-
host
= ''¶
-
password
= ''¶
-
query
= <luigi.parameter.Parameter object>¶
-
table
= <luigi.parameter.Parameter object>¶
-
task_namespace
= None¶
-
update_id
= <luigi.parameter.Parameter object>¶
-
user
= ''¶
-
class
superpyrate.pipeline.
UnzippedArchive
(*args, **kwargs)[source]¶ Bases:
luigi.contrib.external_program.ExternalProgramTask
Unzips the zipped archive into a folder of AIS csv format files the same name as the original file
Parameters: zip_file (str) – The absolute path of the zipped archives -
output
()[source]¶ Outputs the files into a folder of the same name as the zip file
The files are placed in a subdirectory of
LUIGIWORK
calledfiles/unzipped
-
program_args
()[source]¶ Runs 7zip to extract the archives of AIS files
Notes
e
- Unzip all ignoring folder structure (i.e. to highest level)
-o
- Output folder
-y
- Answer yes to everything
-
task_namespace
= None¶
-
zip_file
= <luigi.parameter.Parameter object>¶
-
-
class
superpyrate.pipeline.
ValidMessages
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
Takes AIS messages and runs validation functions, generating valid csv files in folder called ‘cleancsv’ at the same level as unzipped_ais_path
-
csvfile
= <luigi.parameter.Parameter object>¶
-
output
()[source]¶ Validated files are named as the original csv file
The files are placed in a subdirectory of
LUIGIWORK
calledfiles/cleancsv
-
task_namespace
= None¶
-
-
class
superpyrate.pipeline.
ValidMessagesToDatabase
(*args, **kwargs)[source]¶ Bases:
luigi.postgres.CopyToTable
Writes the valid csv files to the postgres database
Parameters: original_csvfile (luigi.Parameter) – The raw csvfile containing AIS data -
cols
= ['MMSI', 'Time', 'Message_ID', 'Navigational_status', 'SOG', 'Longitude', 'Latitude', 'COG', 'Heading', 'IMO', 'Draught', 'Destination', 'Vessel_Name', 'ETA_month', 'ETA_day', 'ETA_hour', 'ETA_minute']¶
-
column_separator
= ','¶
-
columns
= ['mmsi', 'time', 'message_id', 'navigational_status', 'sog', 'longitude', 'latitude', 'cog', 'heading', 'imo', 'draught', 'destination', 'vessel_name', 'eta_month', 'eta_day', 'eta_hour', 'eta_minute']¶
-
database
= ''¶
-
host
= ''¶
-
null_values
= (None, '')¶
-
original_csvfile
= <luigi.parameter.Parameter object>¶
-
password
= ''¶
-
rows
()[source]¶ Return/yield tuples or lists corresponding to each row to be inserted.
Yields: row (iterable)
-
run
()[source]¶ Inserts data generated by rows() into target table.
If the target table doesn’t exist, self.create_table will be called to attempt to create the table.
-
table
= 'ais_clean'¶
-
task_namespace
= None¶
-
user
= ''¶
-
-
class
superpyrate.pipeline.
WriteCsvToDb
(*args, **kwargs)[source]¶ Bases:
superpyrate.pipeline.WriteCsvToDb
Dynamically spawns
LoadCleanedAIS
to load valid csvs into the database-
requires
(_self)¶
-
task_namespace
= None¶
-
-
superpyrate.pipeline.
get_environment_variable
(name)[source]¶ Tries to access an environment variable, and handles the error by replacing the value with a dummy value (an empty string)
-
superpyrate.pipeline.
get_working_folder
(folder_of_zips=None)[source]¶ Parameters: folder_of_zips (str) – The absolute path of the folder of zips e.g. /tests/fixtures/testais/
Returns: working_folder – The path of the working folder. This is either set by the environment variable LUIGIWORK
, or if empty is computed from the argumentsReturn type: str
-
superpyrate.pipeline.
setup_working_folder
()[source]¶ Setup the working folder structure for the entire luigi pipeline
The working folder (‘LUIGIWORK’) must contain two subfolders - files and tmp. The files subfolder contains the unzipped and cleancsv folders, with all of the respective temporary files stored within. The tmp subfolder contains processcsv, writecsv, archives and database folders and contains files which are generated by the tasks which do not produce an actual file as output, rather spawn child-tasks.
superpyrate.task_countfiles module¶
Holds the luigi tasks which count the number of rows in the files
Records the number of clean and dirty rows in the AIS data, writing these stats to the database and finally producing a report of the statistics
- Count the number of rows in the raw csv files (in
files/unzipped/<archive>
) - Count the number of rows int the clean csv files (in
files/cleancsv/
) - Write the clean rows in the clean column of ais_sources
- Write the dirty (raw - clean) rows into the dirty column of ais_sources
-
class
superpyrate.task_countfiles.
CountLines
(*args, **kwargs)[source]¶ Bases:
superpyrate.task_countfiles.CountLines
Counts the number of lines for all the csvfiles in a folder
Writes all the counts and filenames to a delimited file with the name of the folder
Parameters: folder_name (str) – The absolute path of the csv file -
requires
(_self)¶
-
task_namespace
= None¶
-
-
class
superpyrate.task_countfiles.
DoIt
(*args, **kwargs)[source]¶ Bases:
luigi.task.Task
-
folder_of_zips
= <luigi.parameter.Parameter object>¶
-
task_namespace
= None¶
-
with_db
= <luigi.parameter.BoolParameter object>¶
-
-
class
superpyrate.task_countfiles.
GetCountsForAllFiles
(*args, **kwargs)[source]¶ Bases:
superpyrate.task_countfiles.GetCountsForAllFiles
Counts the rows in all clean (validated) and raw files
-
requires
(_self)¶
-
task_namespace
= None¶
-
-
class
superpyrate.task_countfiles.
ProduceStatisticsReport
(*args, **kwargs)[source]¶ Bases:
superpyrate.task_countfiles.ProduceStatisticsReport
Produces a report of the data statistics
-
requires
(_self)¶
-
task_namespace
= None¶
-
superpyrate.tasks module¶
Contains the code for validating AIS messages
-
superpyrate.tasks.
learn_columns
(read_cols, required_cols, csv_or_xml='csv')[source]¶ Tries to match the read columns with the list given
Parameters:
-
superpyrate.tasks.
produce_valid_csv_file
(inputf, outputf)[source]¶ Parameters: - input_file – File path to a large CSV file of AIS data
- output_file – File path for a CSV file containing validated and cleaned data
-
superpyrate.tasks.
readcsv
(fp, forced_col_map=None, columns=None)[source]¶ Yields a dctionary of the subset of columns required
Reads each line in CSV file, checks if all columns are available, and returns a dictionary of the subset of columns required (as per AIS_CSV_COLUMNS).
If row is invalid (too few columns), returns an empty dictionary.
Parameters: - fp (TextIOWrapper) – An open TextIOWrapper (returned by open())
- forced_col_map (dict) – A dictionary mapping the keys defined in columns to columns with different names
- columns (dict, default=AIS_CSV_COLUMNS) – A dictionary of columns
Yields: rowsubset (dict) – A dictionary of the subset of columns as per columns