ETL flow framework based on Yaml configs in Python

Overview

logo

ETL framework based on Yaml configs in Python

Supported Python Versions License Code style: black

A light framework for creating data streams. Setting up streams through configuration in the Yaml file. There is a schedule, task pools, concurrency limitation. Works quickly, does not require a lot of resources. Runs on Windows and Linux. Flow run in parallel via threading library. Internally SQLite Database. Native data transformation. There is a web interface.

At the moment there are connectors to sources

  • CSV file
  • SQLite
  • Postgres
  • MySQL
  • Yandex Metrika Management API
  • Yandex Metrika Stats API
  • Yandex Metrika Logs API
  • Yandex Direct API
  • Yandex Direct Report API
  • Criteo
  • Google Sheets

Storages

  • Save to csv file
  • Clickhouse

Documentation

Requirements

  • python >=3.9
  • virtual environment

Settings

It is highly recommended to install in a virtual environment.

Flowmaster needs a home, '{HOME}/FlowMaster' is the default,
but you can lay foundation somewhere else if you prefer
(optional)

For Windows

setx FLOWMASTER_HOME "{YOUR_PATH}"

For Linux

export FLOWMASTER_HOME={YOUR_PATH}

Installing

pip install flowmaster==0.7.1

# For install web UI.
pip install flowmaster[webui]==0.7.1

# Optional libraries.
pip install flowmaster[clickhouse,postgres,mysql,yandexdirect,yandexmetrika,criteo,googlesheets]==0.7.1

Run

flowmaster run --help
flowmaster run

WEB UI

http://localhost:8822

CHANGELOG

Support

Telegram support chat

Author

Pavel Maksimov

My contacts Telegram, Facebook

Удачи тебе, друг! Поставь звездочку ;)

You might also like...
signac-flow - manage workflows with signac
signac-flow - manage workflows with signac

signac-flow - manage workflows with signac The signac framework helps users manage and scale file-based workflows, facilitating data reuse, sharing, a

Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.
Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.

Data lineage made simple, reliable, and automated. Effortlessly track the flow of data, understand dependencies and analyze impact. Features Visualiza

Randomisation-based inference in Python based on data resampling and permutation.

Randomisation-based inference in Python based on data resampling and permutation.

Karate Club: An API Oriented Open-source Python Framework for Unsupervised Learning on Graphs (CIKM 2020)
Karate Club: An API Oriented Open-source Python Framework for Unsupervised Learning on Graphs (CIKM 2020)

Karate Club is an unsupervised machine learning extension library for NetworkX. Please look at the Documentation, relevant Paper, Promo Video, and Ext

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code. Tuplex has similar Python APIs to Apache Spark or Dask, but rather than invoking the Python interpreter, Tuplex generates optimized LLVM bytecode for the given pipeline and input data set.

BioMASS - A Python Framework for Modeling and Analysis of Signaling Systems
BioMASS - A Python Framework for Modeling and Analysis of Signaling Systems

Mathematical modeling is a powerful method for the analysis of complex biological systems. Although there are many researches devoted on produ

 PyChemia, Python Framework for Materials Discovery and Design
PyChemia, Python Framework for Materials Discovery and Design

PyChemia, Python Framework for Materials Discovery and Design PyChemia is an open-source Python Library for materials structural search. The purpose o

wikirepo is a Python package that provides a framework to easily source and leverage standardized Wikidata information
wikirepo is a Python package that provides a framework to easily source and leverage standardized Wikidata information

Python based Wikidata framework for easy dataframe extraction wikirepo is a Python package that provides a framework to easily source and leverage sta

PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams

PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams Motivation When dataset freshness is critical, the annotating of high speed

Comments
  •  No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'

    No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'

    Привет, очень хороший проект, однако столкнулся со следующей проблемой при устанвоке библиотеки

    1. с ванильным python pip такого пакета вообще не видно
    2. при установке через conda установка проходит замечательно, однако при запуске получаю
    (base) [email protected]:~/FlowMaster$ flowmaster run
    Traceback (most recent call last):
      File "/home/ubuntu/miniforge3/bin/flowmaster", line 5, in <module>
        from flowmaster.__main__ import app
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/__main__.py", line 9, in <module>
        import flowmaster.cli.notebook
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/cli/notebook.py", line 5, in <module>
        from flowmaster.service import (
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/service.py", line 11, in <module>
        from flowmaster.operators.etl.policy import ETLNotebook
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/__init__.py", line 3, in <module>
        from flowmaster.operators.etl.providers.abstract import ProviderAbstract, ExportAbstract
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/__init__.py", line 4, in <module>
        from flowmaster.operators.etl.providers.criteo import CriteoProvider
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/criteo/__init__.py", line 2, in <module>
        from flowmaster.operators.etl.providers.criteo.export import (
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/criteo/export.py", line 8, in <module>
        from flowmaster.executors import SleepIteration
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/executors/__init__.py", line 16, in <module>
        from flowmaster.pool import pools
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/pool.py", line 106, in <module>
        pools_dict = YamlHelper.parse_file(str(Settings.POOL_CONFIG_FILEPATH))
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/utils/yaml_helper.py", line 14, in parse_file
        with open(path, "rb") as f:
    FileNotFoundError: [Errno 2] No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'
    

    Что я делаю не так?(

    opened by micweeks 1
Releases(0.7.1)
  • 0.7.1(Aug 29, 2021)

    • prevented planned of tasks from one instance of the operator class
    • fixed error GeneratorExit
    • fixed transform array type for Clickhouse loader
    Source code(tar.gz)
    Source code(zip)
  • 0.6.1(Jun 22, 2021)

    Redesigned executor

    New

    • add politics 'time_limit_seconds_from_worktime', 'soft_time_limit_seconds'.
    • add provider 'flowmaster'

    Fixing

    • fix schedule (interval seconds mode)
    • add logging 'loguru'
    • fix clear_statuses_of_lost_items
    • fix allow_execute_flow
    • change command 'db reset'

    There are backward incompatible changes

    • new field 'expires_utc' in FlowItem
    • rename command 'run' to 'run_local' and rename command 'run_thread' to 'run'
    • add new class ExecutorIterationTask.
    • change, moving and rename class ThreadExecutor to ThreadAsyncExecutor.
    • change and rename class SleepTask to SleepIteration.
    • change and rename class TaskPool to NextIterationInPools.
    • ETLOperator return ExecutorIterationTask.
    • rename func order_flow to ordering_flow_tasks.
    • rename func start_executor to sync_executor.
    • rename field FlowItem.config_hash to FlowItem.notebook_hash
    • change FLOW_CONFIGS_DIR and rename FLOW_CONFIGS_DIR to NOTEBOOKS_DIR
    • rename objects config to notebook
    • add class Settings
    Source code(tar.gz)
    Source code(zip)
  • 0.3.1(May 15, 2021)

  • 0.2.2(May 13, 2021)

Owner
Павел Максимов
Python Data Engineer, Python Developer, ETL, Разработчик рекомендательных систем
Павел Максимов
💬 Python scripts to parse Messenger, Hangouts, WhatsApp and Telegram chat logs into DataFrames.

Chatistics Python 3 scripts to convert chat logs from various messaging platforms into Pandas DataFrames. Can also generate histograms and word clouds

Florian 893 Jan 02, 2023
Python dataset creator to construct datasets composed of OpenFace extracted features and Shimmer3 GSR+ Sensor datas

Python dataset creator to construct datasets composed of OpenFace extracted features and Shimmer3 GSR+ Sensor datas

Gabriele 3 Jul 05, 2022
Monitor the stability of a pandas or spark dataframe ⚙︎

Population Shift Monitoring popmon is a package that allows one to check the stability of a dataset. popmon works with both pandas and spark datasets.

ING Bank 403 Dec 07, 2022
Synthetic Data Generation for tabular, relational and time series data.

An Open Source Project from the Data to AI Lab, at MIT Website: https://sdv.dev Documentation: https://sdv.dev/SDV User Guides Developer Guides Github

The Synthetic Data Vault Project 1.2k Jan 07, 2023
BioMASS - A Python Framework for Modeling and Analysis of Signaling Systems

Mathematical modeling is a powerful method for the analysis of complex biological systems. Although there are many researches devoted on produ

BioMASS 22 Dec 27, 2022
Scraping and analysis of leetcode-compensations page.

Leetcode compensations report Scraping and analysis of leetcode-compensations page.

utsav 96 Jan 01, 2023
Maximum Covariance Analysis in Python

xMCA | Maximum Covariance Analysis in Python The aim of this package is to provide a flexible tool for the climate science community to perform Maximu

Niclas Rieger 39 Jan 03, 2023
Python Project on Pro Data Analysis Track

Udacity-BikeShare-Project: Python Project on Pro Data Analysis Track Basic Data Exploration with pandas on Bikeshare Data Basic Udacity project using

Belal Mohammed 0 Nov 10, 2021
Accurately separate the TLD from the registered domain and subdomains of a URL, using the Public Suffix List.

tldextract Python Module tldextract accurately separates the gTLD or ccTLD (generic or country code top-level domain) from the registered domain and s

John Kurkowski 1.6k Jan 03, 2023
This python script allows you to manipulate the audience data from Sl.ido surveys

Slido-Automated-VoteBot This python script allows you to manipulate the audience data from Sl.ido surveys Since Slido blocks interference from automat

Pranav Menon 1 Jan 24, 2022
A forecasting system dedicated to smart city data

smart-city-predictions System prognostyczny dedykowany dla danych inteligentnych miast Praca inżynierska realizowana przez Michała Stawikowskiego and

Kevin Lai 1 Nov 08, 2021
LynxKite: a complete graph data science platform for very large graphs and other datasets.

LynxKite is a complete graph data science platform for very large graphs and other datasets. It seamlessly combines the benefits of a friendly graphical interface and a powerful Python API.

124 Dec 14, 2022
Desafio proposto pela IGTI em seu bootcamp de Cloud Data Engineer

Desafio Modulo 4 - Cloud Data Engineer Bootcamp - IGTI Objetivos Criar infraestrutura como código Utuilizando um cluster Kubernetes na Azure Ingestão

Otacilio Filho 4 Jan 23, 2022
Unsub is a collection analysis tool that assists libraries in analyzing their journal subscriptions.

About Unsub is a collection analysis tool that assists libraries in analyzing their journal subscriptions. The tool provides rich data and a summary g

9 Nov 16, 2022
Udacity-api-reporting-pipeline - Udacity api reporting pipeline

udacity-api-reporting-pipeline In this exercise, you'll use portions of each of

Fabio Barbazza 1 Feb 15, 2022
A set of tools to analyse the output from TraDIS analyses

QuaTradis (Quadram TraDis) A set of tools to analyse the output from TraDIS analyses Contents Introduction Installation Required dependencies Bioconda

Quadram Institute Bioscience 2 Feb 16, 2022
Snakemake workflow for converting FASTQ files to self-contained CRAM files with maximum lossless compression.

Snakemake workflow: name A Snakemake workflow for description Usage The usage of this workflow is described in the Snakemake Workflow Catalog. If

Algorithms for reproducible bioinformatics (Koesterlab) 1 Dec 16, 2021
A Python Tools to imaging the shallow seismic structure

ShallowSeismicImaging Tools to imaging the shallow seismic structure, above 10 km, based on the ZH ratio measured from the ambient seismic noise, and

Xiao Xiao 9 Aug 09, 2022
Feature Detection Based Template Matching

Feature Detection Based Template Matching The classification of the photos was made using the OpenCv template Matching method. Installation Use the pa

Muhammet Erem 2 Nov 18, 2021
The Spark Challenge Student Check-In/Out Tracking Script

The Spark Challenge Student Check-In/Out Tracking Script This Python Script uses the Student ID Database to match the entries with the ID Card Swipe a

1 Dec 09, 2021