ETL flow framework based on Yaml configs in Python

Overview

logo

ETL framework based on Yaml configs in Python

Supported Python Versions License Code style: black

A light framework for creating data streams. Setting up streams through configuration in the Yaml file. There is a schedule, task pools, concurrency limitation. Works quickly, does not require a lot of resources. Runs on Windows and Linux. Flow run in parallel via threading library. Internally SQLite Database. Native data transformation. There is a web interface.

At the moment there are connectors to sources

  • CSV file
  • SQLite
  • Postgres
  • MySQL
  • Yandex Metrika Management API
  • Yandex Metrika Stats API
  • Yandex Metrika Logs API
  • Yandex Direct API
  • Yandex Direct Report API
  • Criteo
  • Google Sheets

Storages

  • Save to csv file
  • Clickhouse

Documentation

Requirements

  • python >=3.9
  • virtual environment

Settings

It is highly recommended to install in a virtual environment.

Flowmaster needs a home, '{HOME}/FlowMaster' is the default,
but you can lay foundation somewhere else if you prefer
(optional)

For Windows

setx FLOWMASTER_HOME "{YOUR_PATH}"

For Linux

export FLOWMASTER_HOME={YOUR_PATH}

Installing

pip install flowmaster==0.7.1

# For install web UI.
pip install flowmaster[webui]==0.7.1

# Optional libraries.
pip install flowmaster[clickhouse,postgres,mysql,yandexdirect,yandexmetrika,criteo,googlesheets]==0.7.1

Run

flowmaster run --help
flowmaster run

WEB UI

http://localhost:8822

CHANGELOG

Support

Telegram support chat

Author

Pavel Maksimov

My contacts Telegram, Facebook

Удачи тебе, друг! Поставь звездочку ;)

You might also like...
signac-flow - manage workflows with signac
signac-flow - manage workflows with signac

signac-flow - manage workflows with signac The signac framework helps users manage and scale file-based workflows, facilitating data reuse, sharing, a

Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.
Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.

Data lineage made simple, reliable, and automated. Effortlessly track the flow of data, understand dependencies and analyze impact. Features Visualiza

Randomisation-based inference in Python based on data resampling and permutation.

Randomisation-based inference in Python based on data resampling and permutation.

Karate Club: An API Oriented Open-source Python Framework for Unsupervised Learning on Graphs (CIKM 2020)
Karate Club: An API Oriented Open-source Python Framework for Unsupervised Learning on Graphs (CIKM 2020)

Karate Club is an unsupervised machine learning extension library for NetworkX. Please look at the Documentation, relevant Paper, Promo Video, and Ext

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code. Tuplex has similar Python APIs to Apache Spark or Dask, but rather than invoking the Python interpreter, Tuplex generates optimized LLVM bytecode for the given pipeline and input data set.

BioMASS - A Python Framework for Modeling and Analysis of Signaling Systems
BioMASS - A Python Framework for Modeling and Analysis of Signaling Systems

Mathematical modeling is a powerful method for the analysis of complex biological systems. Although there are many researches devoted on produ

 PyChemia, Python Framework for Materials Discovery and Design
PyChemia, Python Framework for Materials Discovery and Design

PyChemia, Python Framework for Materials Discovery and Design PyChemia is an open-source Python Library for materials structural search. The purpose o

wikirepo is a Python package that provides a framework to easily source and leverage standardized Wikidata information
wikirepo is a Python package that provides a framework to easily source and leverage standardized Wikidata information

Python based Wikidata framework for easy dataframe extraction wikirepo is a Python package that provides a framework to easily source and leverage sta

PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams

PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams Motivation When dataset freshness is critical, the annotating of high speed

Comments
  •  No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'

    No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'

    Привет, очень хороший проект, однако столкнулся со следующей проблемой при устанвоке библиотеки

    1. с ванильным python pip такого пакета вообще не видно
    2. при установке через conda установка проходит замечательно, однако при запуске получаю
    (base) [email protected]:~/FlowMaster$ flowmaster run
    Traceback (most recent call last):
      File "/home/ubuntu/miniforge3/bin/flowmaster", line 5, in <module>
        from flowmaster.__main__ import app
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/__main__.py", line 9, in <module>
        import flowmaster.cli.notebook
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/cli/notebook.py", line 5, in <module>
        from flowmaster.service import (
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/service.py", line 11, in <module>
        from flowmaster.operators.etl.policy import ETLNotebook
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/__init__.py", line 3, in <module>
        from flowmaster.operators.etl.providers.abstract import ProviderAbstract, ExportAbstract
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/__init__.py", line 4, in <module>
        from flowmaster.operators.etl.providers.criteo import CriteoProvider
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/criteo/__init__.py", line 2, in <module>
        from flowmaster.operators.etl.providers.criteo.export import (
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/operators/etl/providers/criteo/export.py", line 8, in <module>
        from flowmaster.executors import SleepIteration
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/executors/__init__.py", line 16, in <module>
        from flowmaster.pool import pools
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/pool.py", line 106, in <module>
        pools_dict = YamlHelper.parse_file(str(Settings.POOL_CONFIG_FILEPATH))
      File "/home/ubuntu/miniforge3/lib/python3.9/site-packages/flowmaster/utils/yaml_helper.py", line 14, in parse_file
        with open(path, "rb") as f:
    FileNotFoundError: [Errno 2] No such file or directory: '/home/ubuntu/FlowMaster/pools.yaml'
    

    Что я делаю не так?(

    opened by micweeks 1
Releases(0.7.1)
  • 0.7.1(Aug 29, 2021)

    • prevented planned of tasks from one instance of the operator class
    • fixed error GeneratorExit
    • fixed transform array type for Clickhouse loader
    Source code(tar.gz)
    Source code(zip)
  • 0.6.1(Jun 22, 2021)

    Redesigned executor

    New

    • add politics 'time_limit_seconds_from_worktime', 'soft_time_limit_seconds'.
    • add provider 'flowmaster'

    Fixing

    • fix schedule (interval seconds mode)
    • add logging 'loguru'
    • fix clear_statuses_of_lost_items
    • fix allow_execute_flow
    • change command 'db reset'

    There are backward incompatible changes

    • new field 'expires_utc' in FlowItem
    • rename command 'run' to 'run_local' and rename command 'run_thread' to 'run'
    • add new class ExecutorIterationTask.
    • change, moving and rename class ThreadExecutor to ThreadAsyncExecutor.
    • change and rename class SleepTask to SleepIteration.
    • change and rename class TaskPool to NextIterationInPools.
    • ETLOperator return ExecutorIterationTask.
    • rename func order_flow to ordering_flow_tasks.
    • rename func start_executor to sync_executor.
    • rename field FlowItem.config_hash to FlowItem.notebook_hash
    • change FLOW_CONFIGS_DIR and rename FLOW_CONFIGS_DIR to NOTEBOOKS_DIR
    • rename objects config to notebook
    • add class Settings
    Source code(tar.gz)
    Source code(zip)
  • 0.3.1(May 15, 2021)

  • 0.2.2(May 13, 2021)

Owner
Павел Максимов
Python Data Engineer, Python Developer, ETL, Разработчик рекомендательных систем
Павел Максимов
Probabilistic reasoning and statistical analysis in TensorFlow

TensorFlow Probability TensorFlow Probability is a library for probabilistic reasoning and statistical analysis in TensorFlow. As part of the TensorFl

3.8k Jan 05, 2023
Creating a statistical model to predict 10 year treasury yields

Predicting 10-Year Treasury Yields Intitially, I wanted to see if the volatility in the stock market, represented by the VIX index (data source), had

10 Oct 27, 2021
Helper tools to construct probability distributions built from expert elicited data for use in monte carlo simulations.

Elicited Helper tools to construct probability distributions built from expert elicited data for use in monte carlo simulations. Credit to Brett Hoove

Ryan McGeehan 3 Nov 04, 2022
Projeto para realizar o RPA Challenge . Utilizando Python e as bibliotecas Selenium e Pandas.

RPA Challenge in Python Projeto para realizar o RPA Challenge (www.rpachallenge.com), utilizando Python. O objetivo deste desafio é criar um fluxo de

Henrique A. Lourenço 1 Apr 12, 2022
This tool parses log data and allows to define analysis pipelines for anomaly detection.

logdata-anomaly-miner This tool parses log data and allows to define analysis pipelines for anomaly detection. It was designed to run the analysis wit

AECID 32 Nov 27, 2022
A powerful data analysis package based on mathematical step functions. Strongly aligned with pandas.

The leading use-case for the staircase package is for the creation and analysis of step functions. Pretty exciting huh. But don't hit the close button

48 Dec 21, 2022
Python utility to extract differences between two pandas dataframes.

Python utility to extract differences between two pandas dataframes.

Jaime Valero 8 Jan 07, 2023
A Streamlit web-app for a data-science project that aims to evaluate if the answer to a question is helpful.

How useful is the aswer? A Streamlit web-app for a data-science project that aims to evaluate if the answer to a question is helpful. If you want to l

1 Dec 17, 2021
Fit models to your data in Python with Sherpa.

Table of Contents Sherpa License How To Install Sherpa Using Anaconda Using pip Building from source History Release History Sherpa Sherpa is a modeli

134 Jan 07, 2023
Hidden Markov Models in Python, with scikit-learn like API

hmmlearn hmmlearn is a set of algorithms for unsupervised learning and inference of Hidden Markov Models. For supervised learning learning of HMMs and

2.7k Jan 03, 2023
Pyspark Spotify ETL

This is my first Data Engineering project, it extracts data from the user's recently played tracks using Spotify's API, transforms data and then loads it into Postgresql using SQLAlchemy engine. Data

16 Jun 09, 2022
Validated, scalable, community developed variant calling, RNA-seq and small RNA analysis

Validated, scalable, community developed variant calling, RNA-seq and small RNA analysis. You write a high level configuration file specifying your in

Blue Collar Bioinformatics 917 Jan 03, 2023
A tax calculator for stocks and dividends activities.

Revolut Stocks calculator for Bulgarian National Revenue Agency Information Processing and calculating the required information about stock possession

Doino Gretchenliev 200 Oct 25, 2022
t-SNE and hierarchical clustering are popular methods of exploratory data analysis, particularly in biology.

tree-SNE t-SNE and hierarchical clustering are popular methods of exploratory data analysis, particularly in biology. Building on recent advances in s

Isaac Robinson 61 Nov 21, 2022
Additional tools for particle accelerator data analysis and machine information

PyLHC Tools This package is a collection of useful scripts and tools for the Optics Measurements and Corrections group (OMC) at CERN. Documentation Au

PyLHC 3 Apr 13, 2022
statDistros is a Python library for dealing with various statistical distributions

StatisticalDistributions statDistros statDistros is a Python library for dealing with various statistical distributions. Now it provides various stati

1 Oct 03, 2021
PrimaryBid - Transform application Lifecycle Data and Design and ETL pipeline architecture for ingesting data from multiple sources to redshift

Transform application Lifecycle Data and Design and ETL pipeline architecture for ingesting data from multiple sources to redshift This project is composed of two parts: Part1 and Part2

Emmanuel Boateng Sifah 1 Jan 19, 2022
Hg002-qc-snakemake - HG002 QC Snakemake

HG002 QC Snakemake To Run Resources and data specified within snakefile (hg002QC

Juniper A. Lake 2 Feb 16, 2022
Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.

Data lineage made simple, reliable, and automated. Effortlessly track the flow of data, understand dependencies and analyze impact. Features Visualiza

898 Jan 09, 2023
A pipeline that creates consensus sequences from a Nanopore reads. I

A pipeline that creates consensus sequences from a Nanopore reads. It clusters reads that are similar to each other and creates a consensus that is then identified using BLAST.

Ada Madejska 2 May 15, 2022