An ETL framework + Monitoring UI/API (experimental project for learning purposes)

Related tags

Data Analysisfastlane
Overview

Fastlane

An ETL framework for building pipelines, and Flask based web API/UI for monitoring pipelines.

Project structure

fastlane
|- fastlane: (ETL framework)
|- fastlane_web: (web API/UI for monitoring pipelines)
   |- migrations: (database migrations)
   |- web_api: Flask backend API
   |- web_ui: TBD

Install

  1. Clone the repository
  2. pip install -e .

Example

fastlane --source=mysql --target=s3 --config=examples/mysql_to_athena_example.json

--source: The pipeline's source type (mysql, bigquery, mongodb are only implemented sources so far)

--target: The pipeline's target type (s3, influxdb, mysql, firehose are only implemented targets so far)

--transform: The pipeline's tranform type (default is the only implemented transform so far)

--config: The path to JSON configuration file for the pipeline

--logs_to_slack: Send error logs to slack

--logs_to_cloudwatch: Send logs to cloudwatch

--logs_to_file: Send logs to a file

Extending the framework

The ETL framework has 4 concepts:

Source

The base class fastlane.source.Source provides basic functionality, and defines a standard interface for extracting data from a particular source type. An instance of Source is responsible only for extracting data from source and returning as a python list of dictionaries.

Implementations of the Source base class must fulfill the following functions at minimum:

str: """Return a string describing type of source this is, for example mysql or bigquery""" @classmethod def configuration_schema(cls) -> SourceConfigSchema: """Return a marshmallow schema inherited from SourceConfigSchema base schema. This schema is used to validate the sources configuration, so all possible fields should be covered in schema returned here."""">
from fastlane.source import Source, SourceConfigSchema
import fastlane.utils as utils


class SourceImpl(Source):
    ...

    def extract(self) -> List[dict]:
        """This function should retrieve data from the source and return it as a list of dictionaries.
            The Source class is an iterator, and this function is called on each iteration. 
            The iterator stops (and source worker exits) when this function returns an empty list. 
            So when there are no more records to fetch, this function should return [].
        """

    @utils.classproperty
    def source_type(self) -> str:
        """Return a string describing type of source this is, for example mysql or bigquery"""

    @classmethod
    def configuration_schema(cls) -> SourceConfigSchema:
        """Return a marshmallow schema inherited from SourceConfigSchema base schema.
            This schema is used to validate the sources configuration, so all possible fields should be covered in
            schema returned here."""

Example implementation of Source interface is in fastlane.sources.impl.source_mysql

Implementation Coverage

  • MySQL
  • BigQuery
  • MongoDB

Transform

The base class fastlane.transform.Transform provides basic functionality, and defines a standard interface for transforming data to be ready for target. An instance of Transform is responsible only for transforming data from source into a format compatible with target.

Implementations of the Transform base class must fulfill the following functions at minimum:

str: """Return a string describing type of transform this is.""" @classmethod def configuration_schema(cls) -> TransformConfigSchema: """Return a marshmallow schema inherited from TransformConfigSchema base schema. This schema is used to validate the transforms configuration, so all possible fields should be covered in schema returned here."""">
from fastlane.transform import Transform, TransformConfigSchema
import fastlane.utils as utils


class TransformImpl(Transform):
    ...

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """This function should run any transformation on the dataframe and return the transformed dataframe.
            Ideally the same dataframe should by transformed on in place, but if a new dataframe needs to be created, 
            Make sure to remove the old dataframes from memory.
            This function is called by the transform worker every time a new batch of source data has been received.
        """

    @utils.classproperty
    def transform_type(self) -> str:
        """Return a string describing type of transform this is."""

    @classmethod
    def configuration_schema(cls) -> TransformConfigSchema:
        """Return a marshmallow schema inherited from TransformConfigSchema base schema.
            This schema is used to validate the transforms configuration, so all possible fields should be covered in
            schema returned here."""

Example implementation of Transform interface is in fastlane.transform.impl.transform_default

Target

The base class fastlane.target.Target provides basic functionality, and defines a standard interface for loading data into a destination. An instance of Target is responsible only for storing data which has been transformed into a destination.

Implementations of the Target base class must fulfill the following functions at minimum:

str: """Return a string describing type of target this is.""" @classmethod def target_id(cls, configuration: dict) -> str: """Return a unique identifier from this specific targets configuration. The id should be unique across the whole target destination. For example the target_id for mysql target is built from table and database"""">
from fastlane.target import Target, TargetConfigSchema
import fastlane.utils as utils


class TargetImpl(Target):
    ...

    def load(self, df: pd.DataFrame):
        """This function is called by the target worker every time a new batch of transformed data has been received.
            This function should store the dataframe in whatever destination it implements.
        """

    def get_offset(self):
        """Get the largest key which has been stored in the target. Used from incrementally loaded tables."""

    @utils.classproperty
    def target_type(self) -> str:
        """Return a string describing type of target this is."""

    @classmethod
    def target_id(cls, configuration: dict) -> str:
        """Return a unique identifier from this specific targets configuration. 
            The id should be unique across the whole target destination. 
            For example the target_id for mysql target is built from table and database"""

Example implementation of Target interface is in fastlane.target.impl.target_athena

Implementation Coverage

  • S3
  • InfluxDB
  • MySQL
  • Firehose

Pipeline

The fastlane.pipeline.Pipeline class is what drives the ETL process. It manages the source, transform and target processes, and runs monitoring processes which give insight into the performance/state of the running pipeline.

The Pipeline class works by spawning a number of worker threads for each stage of the ETL process (source, transform, target). Each stage passes work to the next via Queues:

        _________________        Queue       ____________________         Queue        ________________    load
extract | source_worker | -->  [|.|.|.|] -->| transform_worker_1 | -->  [|.|.|.|] --> | target_worker_1 | ------>
------> |_______________|                    --------------------                      ----------------    load
                                         -->| transform_worker_2 |                --> | target_worker_2 | ------>
                                             --------------------                      ----------------    load
                                         -->| transform_worker_3 |                --> | target_worker_3 | ------>
                                             --------------------                      ----------------    load
                                                                                  --> | target_worker_4 | ------>
                                                                                       ----------------

Throughout the ETL process, few small monitoring processes are collecting metrics at periodic intervals such as memory usage, records loaded per second, total records loaded, queue sizes. See fastlane.monitoring.pipeline_monitor for more details on how thats done.

Pipelines Web API

This project includes a Pipeline web API built w Flask which is used as a backend for collecting and storing the metrics from running Pipelines, as well as to serve the Pipeline monitoring web UI.

Resources

CRUD on pipelines

Methods

/api/pipeline

POST
GET
DELETE

/api/pipelines

list pipelines

Methods
GET

/api/pipeline/run

invocation of a particular pipeline

Methods
POST
PUT
GET
DELETE

/api/pipeline/run/latest

latest invocation of a particular pipeline.

Methods
GET

/api/pipeline/run/rps

records per second metrics for a particuar pipeline run.

Methods
GET
POST

/api/pipeline/run/memory_usage

memory usage metrics for a particular pipeline run.

Methods
GET
POST

/api/pipeline/run/logs

logs (from cloudwatch) for a particular pipeline run.

Methods
GET
POST

Pipeline Web UI

Will pvoide a user interface to moniter currently running pipelines, as well as debug and analyze previously invoked pipelines.

Owner
Dan Katz
Seasoned software engineer working in prototyping, architecting, developing and testing full stack applications
Dan Katz
A Python Tools to imaging the shallow seismic structure

ShallowSeismicImaging Tools to imaging the shallow seismic structure, above 10 km, based on the ZH ratio measured from the ambient seismic noise, and

Xiao Xiao 9 Aug 09, 2022
Fit models to your data in Python with Sherpa.

Table of Contents Sherpa License How To Install Sherpa Using Anaconda Using pip Building from source History Release History Sherpa Sherpa is a modeli

134 Jan 07, 2023
Python package for analyzing sensor-collected human motion data

Python package for analyzing sensor-collected human motion data

Simon Ho 71 Nov 05, 2022
Bearsql allows you to query pandas dataframe with sql syntax.

Bearsql adds sql syntax on pandas dataframe. It uses duckdb to speedup the pandas processing and as the sql engine

14 Jun 22, 2022
Making the DAEN information accessible.

The purpose of this repository is to make the information on Australian COVID-19 adverse events accessible. The Therapeutics Goods Administration (TGA) keeps a database of adverse reactions to medica

10 May 10, 2022
AWS Glue ETL Code Samples

AWS Glue ETL Code Samples This repository has samples that demonstrate various aspects of the new AWS Glue service, as well as various AWS Glue utilit

AWS Samples 1.2k Jan 03, 2023
PyChemia, Python Framework for Materials Discovery and Design

PyChemia, Python Framework for Materials Discovery and Design PyChemia is an open-source Python Library for materials structural search. The purpose o

Materials Discovery Group 61 Oct 02, 2022
A real-time financial data streaming pipeline and visualization platform using Apache Kafka, Cassandra, and Bokeh.

Realtime Financial Market Data Visualization and Analysis Introduction This repo shows my project about real-time stock data pipeline. All the code is

6 Sep 07, 2022
This is a python script to navigate and extract the FSD50K dataset

FSD50K navigator This is a script I use to navigate the sound dataset from FSK50K.

sweemeng 2 Nov 23, 2021
Common bioinformatics database construction

biodb Common bioinformatics database construction 1.taxonomy (Substance classification database) Download the database wget -c https://ftp.ncbi.nlm.ni

sy520 2 Jan 04, 2022
Clean and reusable data-sciency notebooks.

KPACUBO KPACUBO is a set Jupyter notebooks focused on the best practices in both software development and data science, namely, code reuse, explicit d

Matvey Morozov 1 Jan 28, 2022
Uses MIT/MEDSL, New York Times, and US Census datasources to analyze per-county COVID-19 deaths.

Covid County Executive summary Setup Install miniconda, then in the command line, run conda create -n covid-county conda activate covid-county conda i

Ahmed Fasih 1 Dec 22, 2021
yt is an open-source, permissively-licensed Python library for analyzing and visualizing volumetric data.

The yt Project yt is an open-source, permissively-licensed Python library for analyzing and visualizing volumetric data. yt supports structured, varia

The yt project 367 Dec 25, 2022
First steps with Python in Life Sciences

First steps with Python in Life Sciences This course material is part of the "First Steps with Python in Life Science" three-day course of SIB-trainin

SIB Swiss Institute of Bioinformatics 22 Jan 08, 2023
International Space Station data with Python research 🌎

International Space Station data with Python research 🌎 Plotting ISS trajectory, calculating the velocity over the earth and more. Plotting trajector

Facundo Pedaccio 41 Jun 16, 2022
Python utility to extract differences between two pandas dataframes.

Python utility to extract differences between two pandas dataframes.

Jaime Valero 8 Jan 07, 2023
Datashredder is a simple data corruption engine written in python. You can corrupt anything text, images and video.

Datashredder is a simple data corruption engine written in python. You can corrupt anything text, images and video. You can chose the cha

2 Jul 22, 2022
A meta plugin for processing timelapse data timepoint by timepoint in napari

napari-time-slicer A meta plugin for processing timelapse data timepoint by timepoint. It enables a list of napari plugins to process 2D+t or 3D+t dat

Robert Haase 2 Oct 13, 2022
signac-flow - manage workflows with signac

signac-flow - manage workflows with signac The signac framework helps users manage and scale file-based workflows, facilitating data reuse, sharing, a

Glotzer Group 44 Oct 14, 2022
Galvanalyser is a system for automatically storing data generated by battery cycling machines in a database

Galvanalyser is a system for automatically storing data generated by battery cycling machines in a database, using a set of "harvesters", whose job it

Battery Intelligence Lab 20 Sep 28, 2022