simple way to build the declarative and destributed data pipelines with python

Overview

unipipeline

simple way to build the declarative and distributed data pipelines.

Why you should use it

  • Declarative strict config
  • Scaffolding
  • Fully typed
  • Python support 3.6+
  • Brokers support
    • kafka
    • rabbitmq
    • inmemory simple pubsub
  • Interruption handling = safe user code transactions
  • CLI

How to Install

$ pip3 install unipipeline

Example

# dag.yml
---

service:
  name: "example"
  echo_colors: true
  echo_level: error


external:
  service_name: {}


brokers:
  default_broker:
    import_template: "unipipeline.brokers.uni_memory_broker:UniMemoryBroker"

  ender_broker:
    import_template: "example.brokers.uni_log_broker:LogBroker"


messages:
  __default__:
    import_template: "example.messages.{{name}}:{{name|camel}}"

  input_message: {}

  inetermediate_message: {}

  ender_message: {}


cron:
  my_super_task:
    worker: my_super_cron_worker
    when: 0/1 * * * *

  my_mega_task:
    worker: my_super_cron_worker
    when: 0/2 * * * *

  my_puper_task:
    worker: my_super_cron_worker
    when: 0/3 * * * *


waitings:
  __default__:
    import_template: example.waitings.{{name}}_wating:{{name|camel}}Waiting

  common_db: {}


workers:
  __default__:
    import_template: "example.workers.{{name}}:{{name|camel}}"

  my_super_cron_worker:
    input_message: uni_cron_message

  input_worker:
    input_message: input_message
    waiting_for:
      - common_db

  intermediate_first_worker:
    input_message: inetermediate_message
    output_workers:
      - ender_second_worker
    waiting_for:
      - common_db

  intermediate_second_worker:
    input_message: inetermediate_message
    external: service_name
    output_workers:
      - ender_frist_worker

  ender_frist_worker:
    input_message: ender_message

  ender_second_worker:
    input_message: ender_message
    broker: ender_broker
    waiting_for:
      - common_db

Get Started

  1. create ./unipipeline.yml such as example above

  2. run cli command

unipipeline -f ./unipipeline.yml scaffold

It should create all structure of your workers, brokers and so on

  1. remove error raising from workers

  2. correct message structure for make more usefull

  3. correct broker connection (if need)

  4. run cli command to run your consumer

unipipeline -f ./unipipeline.yml consume input_worker

or with python

from unipipeline import Uni
u = Uni(f'./unipipeline.yml')
u.init_consumer_worker(f'input_worker')
u.initialize()
u.start_consuming()
  1. produce some message to the message broker by your self or with tools
unipipeline -f ./unipipeline.yml produce --worker input_worker --data='{"some": "prop"}'

or with python

# main.py
from unipipeline import Uni

u = Uni(f'./unipipeline.yml')
u.init_producer_worker(f'input_worker')
u.initialize()
u.send_to(f'input_worker', dict(some='prop'))

Definition

Service

service:
  name: some_name       # need for health-check file name
  echo_level: warning   # level of uni console logs (debug, info, warning, error)
  echo_colors: true     # show colors in console

External

external:
  some_name_of_external_service: {}
  • no props

  • it needs for declarative grouping the external workers with service

Worker

workers:
  __default__:                                        # each worker get this default props if defined
    retry_max_count: 10
    
  some_worker_name:
    retry_max_count: 3                                # just counter. message move to /dev/null if limit has reached 
    retry_delay_s: 1                                  # delay before retry
    topic: "{{name}}"                                 # template string
    error_payload_topic: "{{topic}}__error__payload"  # template string
    error_topic: "{{topic}}__error"                   # template string
    broker: "default_broker"                          # broker name. reference to message transport 
    external: null                                    # name of external service. reference in this config file 
    ack_after_success: true                           # automatic ack after process message
    waiting_for:                                      # list of references
      - some_waiting_name                             # name of block. this worker must wait for connection of this external service if need
    output_workers:                                   # list of references
      - some_other_worker_name                        # allow worker sending messages to this worker
    
    inport_template: "some.module.hierarchy.to.worker.{{name}}:{{name|camel}}OfClass"   # required module and classname for import

    input_message: "name_of_message"                  # required reference of input message type 

Waiting

waitings:
  some_blocked_service_name:
    retry_max_count: 3                         # the same semantic as worker.retry_max_count
    retry_delay_s: 10                          # the same semantic as worker.retry_delay_s
    import_template: "some.module:SomeClass"   # required. the same semantic as worker.import_template

Broker

brokers:
  some_name_of_broker:
    retry_max_count: 3                         # the same semantic as worker.retry_max_count
    retry_delay_s: 10                          # the same semantic as worker.retry_delay_s
    content_type: application/json             # content type
    compression: null                          # compression (null, application/x-gzip, application/x-bz2, application/x-lzma)
    import_template: "some.module:SomeClass"   # required. the same semantic as worker.import_template

Message

messages:
  name_of_message:
    import_template: "some.module:SomeClass"   # required. the same semantic as worker.import_template

build in messages:

messages:
  uni_cron_message:
    import_template: unipipeline.messages.uni_cron_message:UniCronMessage

CLI

unipipeline

usage: unipipeline --help

UNIPIPELINE: simple way to build the declarative and distributed data pipelines. this is cli tool for unipipeline

positional arguments:
  {check,scaffold,init,consume,cron,produce}
                        sub-commands
    check               check loading of all modules
    scaffold            create all modules and classes if it is absent. no args
    init                initialize broker topics for workers
    consume             start consuming workers. connect to brokers and waiting for messages
    cron                start cron jobs, That defined in config file
    produce             publish message to broker. send it to worker

optional arguments:
  -h, --help            show this help message and exit
  --config-file CONFIG_FILE, -f CONFIG_FILE
                        path to unipipeline config file (default: ./unipipeline.yml)
  --verbose [VERBOSE]   verbose output (default: false)

unipipeline check

usage: 
    unipipeline -f ./unipipeline.yml check
    unipipeline -f ./unipipeline.yml --verbose=yes check

check loading of all modules

optional arguments:
  -h, --help  show this help message and exit

unipipeline init

usage: 
    unipipeline -f ./unipipeline.yml init
    unipipeline -f ./unipipeline.yml --verbose=yes init
    unipipeline -f ./unipipeline.yml --verbose=yes init --workers some_worker_name_01 some_worker_name_02

initialize broker topics for workers

optional arguments:
  -h, --help            show this help message and exit
  --workers INIT_WORKERS [INIT_WORKERS ...], -w INIT_WORKERS [INIT_WORKERS ...]
                        workers list for initialization (default: [])

unipipeline scaffold

usage: 
    unipipeline -f ./unipipeline.yml scaffold
    unipipeline -f ./unipipeline.yml --verbose=yes scaffold

create all modules and classes if it is absent. no args

optional arguments:
  -h, --help  show this help message and exit

unipipeline consume

usage: 
    unipipeline -f ./unipipeline.yml consume
    unipipeline -f ./unipipeline.yml --verbose=yes consume
    unipipeline -f ./unipipeline.yml consume --workers some_worker_name_01 some_worker_name_02
    unipipeline -f ./unipipeline.yml --verbose=yes consume --workers some_worker_name_01 some_worker_name_02

start consuming workers. connect to brokers and waiting for messages

optional arguments:
  -h, --help            show this help message and exit
  --workers CONSUME_WORKERS [CONSUME_WORKERS ...], -w CONSUME_WORKERS [CONSUME_WORKERS ...]
                        worker list for consuming

unipipeline produce

usage: 
    unipipeline -f ./unipipeline.yml produce --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}
    unipipeline -f ./unipipeline.yml --verbose=yes produce --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}
    unipipeline -f ./unipipeline.yml produce --alone --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}
    unipipeline -f ./unipipeline.yml --verbose=yes produce --alone --worker some_worker_name_01 --data {"some": "json", "value": "for worker"}

publish message to broker. send it to worker

optional arguments:
  -h, --help            show this help message and exit
  --alone [PRODUCE_ALONE], -a [PRODUCE_ALONE]
                        message will be sent only if topic is empty
  --worker PRODUCE_WORKER, -w PRODUCE_WORKER
                        worker recipient
  --data PRODUCE_DATA, -d PRODUCE_DATA
                        data for sending

unipipeline cron

usage: 
    unipipeline -f ./unipipeline.yml cron
    unipipeline -f ./unipipeline.yml --verbose=yes cron

start cron jobs, That defined in config file

optional arguments:
  -h, --help  show this help message and exit

Contributing

TODO LIST

  1. RPC Gateways: http, tcp, udp
  2. Close/Exit uni by call method
  3. Async producer
  4. Common Error Handling
  5. Async get_answer
  6. Server of Message layout
  7. Prometheus api
  8. req/res Sdk
  9. request tasks result registry
  10. Async consumer
  11. Async by default
  12. Multi-threading start with run-groups
Owner
aliaksandr-master
aliaksandr-master
A project consists in a set of assignements corresponding to a BI process: data integration, construction of an OLAP cube, qurying of a OPLAP cube and reporting.

TennisBusinessIntelligenceProject - A project consists in a set of assignements corresponding to a BI process: data integration, construction of an OLAP cube, qurying of a OPLAP cube and reporting.

carlo paladino 1 Jan 02, 2022
Python script for transferring data between three drives in two separate stages

Waterlock Waterlock is a Python script meant for incrementally transferring data between three folder locations in two separate stages. It performs ha

David Swanlund 13 Nov 10, 2021
Utilize data analytics skills to solve real-world business problems using Humana’s big data

Humana-Mays-2021-HealthCare-Analytics-Case-Competition- The goal of the project is to utilize data analytics skills to solve real-world business probl

Yongxian (Caroline) Lun 1 Dec 27, 2021
Using Python to derive insights on particular Pokemon, Types, Generations, and Stats

Pokémon Analysis Andreas Nikolaidis February 2022 Introduction Exploratory Analysis Correlations & Descriptive Statistics Principal Component Analysis

Andreas 1 Feb 18, 2022
This repo is dedicated to the data extraction and manipulation of the World Bank's database called STEP.

Overview Welcome to the Step-X repository. This repo is dedicated to the data extraction and manipulation of the World Bank's database called STEP. Be

Keanu Pang 0 Jan 20, 2022
A collection of robust and fast processing tools for parsing and analyzing web archive data.

ChatNoir Resiliparse A collection of robust and fast processing tools for parsing and analyzing web archive data. Resiliparse is part of the ChatNoir

ChatNoir 24 Nov 29, 2022
Statistical package in Python based on Pandas

Pingouin is an open-source statistical package written in Python 3 and based mostly on Pandas and NumPy. Some of its main features are listed below. F

Raphael Vallat 1.2k Dec 31, 2022
An easy-to-use feature store

A feature store is a data storage system for data science and machine-learning. It can store raw data and also transformed features, which can be fed straight into an ML model or training script.

ByteHub AI 48 Dec 09, 2022
Snakemake workflow for converting FASTQ files to self-contained CRAM files with maximum lossless compression.

Snakemake workflow: name A Snakemake workflow for description Usage The usage of this workflow is described in the Snakemake Workflow Catalog. If

Algorithms for reproducible bioinformatics (Koesterlab) 1 Dec 16, 2021
A Big Data ETL project in PySpark on the historical NYC Taxi Rides data

Processing NYC Taxi Data using PySpark ETL pipeline Description This is an project to extract, transform, and load large amount of data from NYC Taxi

Unnikrishnan 2 Dec 12, 2021
Scraping and analysis of leetcode-compensations page.

Leetcode compensations report Scraping and analysis of leetcode-compensations page.

utsav 96 Jan 01, 2023
PyPDC is a Python package for calculating asymptotic Partial Directed Coherence estimations for brain connectivity analysis.

Python asymptotic Partial Directed Coherence and Directed Coherence estimation package for brain connectivity analysis. Free software: MIT license Doc

Heitor Baldo 3 Nov 26, 2022
Python scripts aim to use a Random Forest machine learning algorithm to predict the water affinity of Metal-Organic Frameworks

The following Python scripts aim to use a Random Forest machine learning algorithm to predict the water affinity of Metal-Organic Frameworks (MOFs). The training set is extracted from the Cambridge S

1 Jan 09, 2022
Automated Exploration Data Analysis on a financial dataset

Automated EDA on financial dataset Just a simple way to get automated Exploration Data Analysis from financial dataset (OHLCV) using Streamlit and ta.

Darío López Padial 28 Nov 27, 2022
This repo contains a simple but effective tool made using python which can be used for quality control in statistical approach.

This repo contains a powerful tool made using python which is used to visualize, analyse and finally assess the quality of the product depending upon the given observations

SasiVatsal 8 Oct 18, 2022
DaDRA (day-druh) is a Python library for Data-Driven Reachability Analysis.

DaDRA (day-druh) is a Python library for Data-Driven Reachability Analysis. The main goal of the package is to accelerate the process of computing estimates of forward reachable sets for nonlinear dy

2 Nov 08, 2021
CSV database for chihuahua (HUAHUA) blockchain transactions

super-fiesta Shamelessly ripped components from https://github.com/hodgerpodger/staketaxcsv - Thanks for doing all the hard work. This code does only

Arlene Macciaveli 1 Jan 07, 2022
Exploring the Top ML and DL GitHub Repositories

This repository contains my work related to my project where I scraped data on the most popular machine learning and deep learning GitHub repositories in order to further visualize and analyze it.

Nico Van den Hooff 17 Aug 21, 2022
Manage large and heterogeneous data spaces on the file system.

signac - simple data management The signac framework helps users manage and scale file-based workflows, facilitating data reuse, sharing, and reproduc

Glotzer Group 109 Dec 14, 2022
ped-crash-techvol: Texas Ped Crash Tech Volume Pack

ped-crash-techvol: Texas Ped Crash Tech Volume Pack In conjunction with the Final Report "Identifying Risk Factors that Lead to Increase in Fatal Pede

Network Modeling Center; Center for Transportation Research; The University of Texas at Austin 2 Sep 28, 2022