simple way to build the declarative and destributed data pipelines with python

Overview

unipipeline

simple way to build the declarative and distributed data pipelines.

Why you should use it

  • Declarative strict config
  • Scaffolding
  • Fully typed
  • Python support 3.6+
  • Brokers support
    • kafka
    • rabbitmq
    • inmemory simple pubsub
  • Interruption handling = safe user code transactions
  • CLI

How to Install

$ pip3 install unipipeline

Example

# dag.yml
---

service:
  name: "example"
  echo_colors: true
  echo_level: error


external:
  service_name: {}


brokers:
  default_broker:
    import_template: "unipipeline.brokers.uni_memory_broker:UniMemoryBroker"

  ender_broker:
    import_template: "example.brokers.uni_log_broker:LogBroker"


messages:
  __default__:
    import_template: "example.messages.{{name}}:{{name|camel}}"

  input_message: {}

  inetermediate_message: {}

  ender_message: {}


cron:
  my_super_task:
    worker: my_super_cron_worker
    when: 0/1 * * * *

  my_mega_task:
    worker: my_super_cron_worker
    when: 0/2 * * * *

  my_puper_task:
    worker: my_super_cron_worker
    when: 0/3 * * * *


waitings:
  __default__:
    import_template: example.waitings.{{name}}_wating:{{name|camel}}Waiting

  common_db: {}


workers:
  __default__:
    import_template: "example.workers.{{name}}:{{name|camel}}"

  my_super_cron_worker:
    input_message: uni_cron_message

  input_worker:
    input_message: input_message
    waiting_for:
      - common_db

  intermediate_first_worker:
    input_message: inetermediate_message
    output_workers:
      - ender_second_worker
    waiting_for:
      - common_db

  intermediate_second_worker:
    input_message: inetermediate_message
    external: service_name
    output_workers:
      - ender_frist_worker

  ender_frist_worker:
    input_message: ender_message

  ender_second_worker:
    input_message: ender_message
    broker: ender_broker
    waiting_for:
      - common_db

Get Started

  1. create ./unipipeline.yml such as example above

  2. run cli command

unipipeline -f ./unipipeline.yml scaffold

It should create all structure of your workers, brokers and so on

  1. remove error raising from workers

  2. correct message structure for make more usefull

  3. correct broker connection (if need)

  4. run cli command to run your consumer

unipipeline -f ./unipipeline.yml consume input_worker

or with python

from unipipeline import Uni
u = Uni(f'./unipipeline.yml')
u.init_consumer_worker(f'input_worker')
u.initialize()
u.start_consuming()
  1. produce some message to the message broker by your self or with tools
unipipeline -f ./unipipeline.yml produce --worker input_worker --data='{"some": "prop"}'

or with python

# main.py
from unipipeline import Uni

u = Uni(f'./unipipeline.yml')
u.init_producer_worker(f'input_worker')
u.initialize()
u.send_to(f'input_worker', dict(some='prop'))

Definition

Service

service:
  name: some_name       # need for health-check file name
  echo_level: warning   # level of uni console logs (debug, info, warning, error)
  echo_colors: true     # show colors in console

External

external:
  some_name_of_external_service: {}
  • no props

  • it needs for declarative grouping the external workers with service

Worker

workers:
  __default__:                                        # each worker get this default props if defined
    retry_max_count: 10
    
  some_worker_name:
    retry_max_count: 3                                # just counter. message move to /dev/null if limit has reached 
    retry_delay_s: 1                                  # delay before retry
    topic: "{{name}}"                                 # template string
    error_payload_topic: "{{topic}}__error__payload"  # template string
    error_topic: "{{topic}}__error"                   # template string
    broker: "default_broker"                          # broker name. reference to message transport 
    external: null                                    # name of external service. reference in this config file 
    ack_after_success: true                           # automatic ack after process message
    waiting_for:                                      # list of references
      - some_waiting_name                             # name of block. this worker must wait for connection of this external service if need
    output_workers:                                   # list of references
      - some_other_worker_name                        # allow worker sending messages to this worker
    
    inport_template: "some.module.hierarchy.to.worker.{{name}}:{{name|camel}}OfClass"   # required module and classname for import

    input_message: "name_of_message"                  # required reference of input message type 

Waiting

waitings:
  some_blocked_service_name:
    retry_max_count: 3                         # the same semantic as worker.retry_max_count
    retry_delay_s: 10                          # the same semantic as worker.retry_delay_s
    import_template: "some.module:SomeClass"   # required. the same semantic as worker.import_template

Broker

brokers:
  some_name_of_broker:
    retry_max_count: 3                         # the same semantic as worker.retry_max_count
    retry_delay_s: 10                          # the same semantic as worker.retry_delay_s
    content_type: application/json             # content type
    compression: null                          # compression (null, application/x-gzip, application/x-bz2, application/x-lzma)
    import_template: "some.module:SomeClass"   # required. the same semantic as worker.import_template

Message

messages:
  name_of_message:
    import_template: "some.module:SomeClass"   # required. the same semantic as worker.import_template

build in messages:

messages:
  uni_cron_message:
    import_template: unipipeline.messages.uni_cron_message:UniCronMessage

CLI

unipipeline

usage: unipipeline --help

UNIPIPELINE: simple way to build the declarative and distributed data pipelines. this is cli tool for unipipeline

positional arguments:
  {check,scaffold,init,consume,cron,produce}
                        sub-commands
    check               check loading of all modules
    scaffold            create all modules and classes if it is absent. no args
    init                initialize broker topics for workers
    consume             start consuming workers. connect to brokers and waiting for messages
    cron                start cron jobs, That defined in config file
    produce             publish message to broker. send it to worker

optional arguments:
  -h, --help            show this help message and exit
  --config-file CONFIG_FILE, -f CONFIG_FILE
                        path to unipipeline config file (default: ./unipipeline.yml)
  --verbose [VERBOSE]   verbose output (default: false)

unipipeline check

usage: 
    unipipeline -f ./unipipeline.yml check
    unipipeline -f ./unipipeline.yml --verbose=yes check

check loading of all modules

optional arguments:
  -h, --help  show this help message and exit

unipipeline init

usage: 
    unipipeline -f ./unipipeline.yml init
    unipipeline -f ./unipipeline.yml --verbose=yes init
    unipipeline -f ./unipipeline.yml --verbose=yes init --workers some_worker_name_01 some_worker_name_02

initialize broker topics for workers

optional arguments:
  -h, --help            show this help message and exit
  --workers INIT_WORKERS [INIT_WORKERS ...], -w INIT_WORKERS [INIT_WORKERS ...]
                        workers list for initialization (default: [])

unipipeline scaffold

usage: 
    unipipeline -f ./unipipeline.yml scaffold
    unipipeline -f ./unipipeline.yml --verbose=yes scaffold

create all modules and classes if it is absent. no args

optional arguments:
  -h, --help  show this help message and exit

unipipeline consume

usage: 
    unipipeline -f ./unipipeline.yml consume
    unipipeline -f ./unipipeline.yml --verbose=yes consume
    unipipeline -f ./unipipeline.yml consume --workers some_worker_name_01 some_worker_name_02
    unipipeline -f ./unipipeline.yml --verbose=yes consume --workers some_worker_name_01 some_worker_name_02

start consuming workers. connect to brokers and waiting for messages

optional arguments:
  -h, --help            show this help message and exit
  --workers CONSUME_WORKERS [CONSUME_WORKERS ...], -w CONSUME_WORKERS [CONSUME_WORKERS ...]
                        worker list for consuming

unipipeline produce

usage: 
    unipipeline -f ./unipipeline.yml produce --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}
    unipipeline -f ./unipipeline.yml --verbose=yes produce --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}
    unipipeline -f ./unipipeline.yml produce --alone --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}
    unipipeline -f ./unipipeline.yml --verbose=yes produce --alone --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}

publish message to broker. send it to worker

optional arguments:
  -h, --help            show this help message and exit
  --alone [PRODUCE_ALONE], -a [PRODUCE_ALONE]
                        message will be sent only if topic is empty
  --worker PRODUCE_WORKER, -w PRODUCE_WORKER
                        worker recipient
  --data PRODUCE_DATA, -d PRODUCE_DATA
                        data for sending

unipipeline cron

usage: 
    unipipeline -f ./unipipeline.yml cron
    unipipeline -f ./unipipeline.yml --verbose=yes cron

start cron jobs, That defined in config file

optional arguments:
  -h, --help  show this help message and exit

Contributing

TODO LIST

  1. RPC Gateways: http, tcp, udp
  2. Close/Exit uni by call method
  3. Async producer
  4. Common Error Handling
  5. Async get_answer
  6. Server of Message layout
  7. Prometheus api
  8. req/res Sdk
  9. request tasks result registry
  10. Async consumer
  11. Async by default
  12. Multi-threading start with run-groups
Owner
aliaksandr-master
aliaksandr-master
MS in Data Science capstone project. Studying attacks on autonomous vehicles.

Surveying Attack Models for CAVs Guide to Installing CARLA and Collecting Data Our project focuses on surveying attack models for Connveced Autonomous

Isabela Caetano 1 Dec 09, 2021
follow-analyzer helps GitHub users analyze their following and followers relationship

follow-analyzer follow-analyzer helps GitHub users analyze their following and followers relationship by providing a report in html format which conta

Yin-Chiuan Chen 2 May 02, 2022
This repo contains a simple but effective tool made using python which can be used for quality control in statistical approach.

📈 Statistical Quality Control 📉 This repo contains a simple but effective tool made using python which can be used for quality control in statistica

SasiVatsal 8 Oct 18, 2022
Investigating EV charging data

Investigating EV charging data Introduction: Got an opportunity to work with a home monitoring technology company over the last 6 months whose goal wa

Yash 2 Apr 07, 2022
InDels analysis of CRISPR lines by NGS amplicon sequencing technology for a multicopy gene family.

CRISPRanalysis InDels analysis of CRISPR lines by NGS amplicon sequencing technology for a multicopy gene family. In this work, we present a workflow

2 Jan 31, 2022
Data Intelligence Applications - Online Product Advertising and Pricing with Context Generation

Data Intelligence Applications - Online Product Advertising and Pricing with Context Generation Overview Consider the scenario in which advertisement

Manuel Bressan 2 Nov 18, 2021
Multiple Pairwise Comparisons (Post Hoc) Tests in Python

scikit-posthocs is a Python package that provides post hoc tests for pairwise multiple comparisons that are usually performed in statistical data anal

Maksim Terpilowski 264 Dec 30, 2022
Business Intelligence (BI) in Python, OLAP

Open Mining Business Intelligence (BI) Application Server written in Python Requirements Python 2.7 (Backend) Lua 5.2 or LuaJIT 5.1 (OML backend) Mong

Open Mining 1.2k Dec 27, 2022
Data and code accompanying the paper Politics and Virality in the Time of Twitter

Politics and Virality in the Time of Twitter Data and code accompanying the paper Politics and Virality in the Time of Twitter. In specific: the code

Cardiff NLP 3 Jul 02, 2022
Sensitivity Analysis Library in Python (Numpy). Contains Sobol, Morris, Fractional Factorial and FAST methods.

Sensitivity Analysis Library (SALib) Python implementations of commonly used sensitivity analysis methods. Useful in systems modeling to calculate the

SALib 663 Jan 05, 2023
University Challenge 2021 With Python

University Challenge 2021 This repository contains: The TeX file of the technical write-up describing the University / HYPER Challenge 2021 under late

2 Nov 27, 2021
A pipeline that creates consensus sequences from a Nanopore reads. I

A pipeline that creates consensus sequences from a Nanopore reads. It clusters reads that are similar to each other and creates a consensus that is then identified using BLAST.

Ada Madejska 2 May 15, 2022
ETL pipeline on movie data using Python and postgreSQL

Movies-ETL ETL pipeline on movie data using Python and postgreSQL Overview This project consisted on a automated Extraction, Transformation and Load p

Juan Nicolas Serrano 0 Jul 07, 2021
Data Analytics: Modeling and Studying data relating to climate change and adoption of electric vehicles

Correlation-Study-Climate-Change-EV-Adoption Data Analytics: Modeling and Studying data relating to climate change and adoption of electric vehicles I

Jonathan Feng 1 Jan 03, 2022
Extract data from a wide range of Internet sources into a pandas DataFrame.

pandas-datareader Up to date remote data access for pandas, works for multiple versions of pandas. Installation Install using pip pip install pandas-d

Python for Data 2.5k Jan 09, 2023
A Python package for Bayesian forecasting with object-oriented design and probabilistic models under the hood.

Disclaimer This project is stable and being incubated for long-term support. It may contain new experimental code, for which APIs are subject to chang

Uber Open Source 1.6k Dec 29, 2022
Pipeline and Dataset helpers for complex algorithm evaluation.

tpcp - Tiny Pipelines for Complex Problems A generic way to build object-oriented datasets and algorithm pipelines and tools to evaluate them pip inst

Machine Learning and Data Analytics Lab FAU 3 Dec 07, 2022
📊 Python Flask game that consolidates data from Nasdaq, allowing the user to practice buying and selling stocks.

Web Trader Web Trader is a trading website that consolidates data from Nasdaq, allowing the user to search up the ticker symbol and price of any stock

Paulina Khew 21 Aug 30, 2022
A probabilistic programming language in TensorFlow. Deep generative models, variational inference.

Edward is a Python library for probabilistic modeling, inference, and criticism. It is a testbed for fast experimentation and research with probabilis

Blei Lab 4.7k Jan 09, 2023
ToeholdTools is a Python package and desktop app designed to facilitate analyzing and designing toehold switches, created as part of the 2021 iGEM competition.

ToeholdTools Category Status Repository Package Build Quality A library for the analysis of toehold switch riboregulators created by the iGEM team Cit

0 Dec 01, 2021