This library is an ongoing effort towards bringing the data exchanging ability between Java/Scala and Python

Overview

PyJava

This library is an ongoing effort towards bringing the data exchanging ability between Java/Scala and Python. PyJava introduces Apache Arrow as the exchanging data format, this means we can avoid ser/der between Java/Scala and Python which can really speed up the communication efficiency than traditional way.

When you invoke python code in Java/Scala side, PyJava will start some python workers automatically and send the data to python worker, and once they are processed, send them back. The python workers are reused
by default.

The initial code in this lib is from Apache Spark.

Install

Setup python(>= 3.6) Env(Conda is recommended):

pip uninstall pyjava && pip install pyjava

Setup Java env(Maven is recommended):

For Scala 2.11/Spark 2.4.3

<dependency>
    <groupId>tech.mlsqlgroupId>
    <artifactId>pyjava-2.4_2.11artifactId>
    <version>0.3.2version>
dependency>

For Scala 2.12/Spark 3.1.1

<dependency>
    <groupId>tech.mlsqlgroupId>
    <artifactId>pyjava-3.0_2.12artifactId>
    <version>0.3.2version>
dependency>

Build Mannually

Install Build Tool:

pip install mlsql_plugin_tool

Build for Spark 3.1.1:

mlsql_plugin_tool spark311
mvn clean install -DskipTests -Pdisable-java8-doclint -Prelease-sign-artifacts

Build For Spark 2.4.3

mlsql_plugin_tool spark243
mvn clean install -DskipTests -Pdisable-java8-doclint -Prelease-sign-artifacts

Using python code snippet to process data in Java/Scala

With pyjava, you can run any python code in your Java/Scala application.

sourceEnconder.toRow(irow).copy() }.iterator // run the code and get the return result val javaConext = new JavaContext val commonTaskContext = new AppContextImpl(javaConext, batch) val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext) //f.copy(), copy function is required columnarBatchIter.flatMap { batch => batch.rowIterator.asScala }.foreach(f => println(f.copy())) javaConext.markComplete javaConext.close ">
val envs = new util.HashMap[String, String]()
// prepare python environment
envs.put(str(PythonConf.PYTHON_ENV), "source activate dev && export ARROW_PRE_0_15_IPC_FORMAT=1 ")

// describe the data which will be transfered to python 
val sourceSchema = StructType(Seq(StructField("value", StringType)))

val batch = new ArrowPythonRunner(
  Seq(ChainedPythonFunctions(Seq(PythonFunction(
    """
      |import pandas as pd
      |import numpy as np
      |
      |def process():
      |    for item in context.fetch_once_as_rows():
      |        item["value1"] = item["value"] + "_suffix"
      |        yield item
      |
      |context.build_result(process())
    """.stripMargin, envs, "python", "3.6")))), sourceSchema,
  "GMT", Map()
)

// prepare data
val sourceEnconder = RowEncoder.apply(sourceSchema).resolveAndBind()
val newIter = Seq(Row.fromSeq(Seq("a1")), Row.fromSeq(Seq("a2"))).map { irow =>
sourceEnconder.toRow(irow).copy()
}.iterator

// run the code and get the return result
val javaConext = new JavaContext
val commonTaskContext = new AppContextImpl(javaConext, batch)
val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext)

//f.copy(), copy function is required 
columnarBatchIter.flatMap { batch =>
  batch.rowIterator.asScala
}.foreach(f => println(f.copy()))
javaConext.markComplete
javaConext.close

Using python code snippet to process data in Spark

val enconder = RowEncoder.apply(struct).resolveAndBind() val envs = new util.HashMap[String, String]() envs.put(str(PythonConf.PYTHON_ENV), "source activate streamingpro-spark-2.4.x") val batch = new ArrowPythonRunner( Seq(ChainedPythonFunctions(Seq(PythonFunction( """ |import pandas as pd |import numpy as np |for item in data_manager.fetch_once(): | print(item) |df = pd.DataFrame({'AAA': [4, 5, 6, 7],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]}) |data_manager.set_output([[df['AAA'],df['BBB']]]) """.stripMargin, envs, "python", "3.6")))), struct, timezoneid, Map() ) val newIter = iter.map { irow => enconder.toRow(irow) } val commonTaskContext = new SparkContextImp(TaskContext.get(), batch) val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext) columnarBatchIter.flatMap { batch => batch.rowIterator.asScala.map(_.copy) } } val wow = SparkUtils.internalCreateDataFrame(session, abc, StructType(Seq(StructField("AAA", LongType), StructField("BBB", LongType))), false) wow.show() ">
val session = spark
import session.implicits._
val timezoneid = session.sessionState.conf.sessionLocalTimeZone
val df = session.createDataset[String](Seq("a1", "b1")).toDF("value")
val struct = df.schema
val abc = df.rdd.mapPartitions { iter =>
  val enconder = RowEncoder.apply(struct).resolveAndBind()
  val envs = new util.HashMap[String, String]()
  envs.put(str(PythonConf.PYTHON_ENV), "source activate streamingpro-spark-2.4.x")
  val batch = new ArrowPythonRunner(
    Seq(ChainedPythonFunctions(Seq(PythonFunction(
      """
        |import pandas as pd
        |import numpy as np
        |for item in data_manager.fetch_once():
        |    print(item)
        |df = pd.DataFrame({'AAA': [4, 5, 6, 7],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]})
        |data_manager.set_output([[df['AAA'],df['BBB']]])
      """.stripMargin, envs, "python", "3.6")))), struct,
    timezoneid, Map()
  )
  val newIter = iter.map { irow =>
    enconder.toRow(irow)
  }
  val commonTaskContext = new SparkContextImp(TaskContext.get(), batch)
  val columnarBatchIter = batch.compute(Iterator(newIter), TaskContext.getPartitionId(), commonTaskContext)
  columnarBatchIter.flatMap { batch =>
    batch.rowIterator.asScala.map(_.copy)
  }
}

val wow = SparkUtils.internalCreateDataFrame(session, abc, StructType(Seq(StructField("AAA", LongType), StructField("BBB", LongType))), false)
wow.show()

Run Python Project

With Pyjava, you can tell the system where is the python project and which is then entrypoint, then you can run this project in Java/Scala.

"/tmp/data", "tempModelLocalPath" -> "/tmp/model" )) output.foreach(println) ">
import tech.mlsql.arrow.python.runner.PythonProjectRunner

val runner = new PythonProjectRunner("./pyjava/examples/pyproject1", Map())
val output = runner.run(Seq("bash", "-c", "source activate dev && python train.py"), Map(
  "tempDataLocalPath" -> "/tmp/data",
  "tempModelLocalPath" -> "/tmp/model"
))
output.foreach(println)

Example In MLSQL

None Interactive Mode:

!python env "PYTHON_ENV=source activate streamingpro-spark-2.4.x";
!python conf "schema=st(field(a,long),field(b,long))";

select 1 as a as table1;

!python on table1 '''

import pandas as pd
import numpy as np
for item in data_manager.fetch_once():
    print(item)
df = pd.DataFrame({'AAA': [4, 5, 6, 8],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]})
data_manager.set_output([[df['AAA'],df['BBB']]])

''' named mlsql_temp_table2;

select * from mlsql_temp_table2 as output; 

Interactive Mode:

!python start;

!python env "PYTHON_ENV=source activate streamingpro-spark-2.4.x";
!python env "schema=st(field(a,integer),field(b,integer))";


!python '''
import pandas as pd
import numpy as np
''';

!python  '''
for item in data_manager.fetch_once():
    print(item)
df = pd.DataFrame({'AAA': [4, 5, 6, 8],'BBB': [10, 20, 30, 40],'CCC': [100, 50, -30, -50]})
data_manager.set_output([[df['AAA'],df['BBB']]])
''';
!python close;

Using PyJava as Arrow Server/Client

Java Server side:

enconder.toRow(irow) }.iterator val javaConext = new JavaContext val commonTaskContext = new AppContextImpl(javaConext, null) val Array(_, host, port) = socketRunner.serveToStreamWithArrow(newIter, dataSchema, 10, commonTaskContext) println(s"${host}:${port}") Thread.currentThread().join() ">
val socketRunner = new SparkSocketRunner("wow", NetUtils.getHost, "Asia/Harbin")

val dataSchema = StructType(Seq(StructField("value", StringType)))
val enconder = RowEncoder.apply(dataSchema).resolveAndBind()
val newIter = Seq(Row.fromSeq(Seq("a1")), Row.fromSeq(Seq("a2"))).map { irow =>
  enconder.toRow(irow)
}.iterator
val javaConext = new JavaContext
val commonTaskContext = new AppContextImpl(javaConext, null)

val Array(_, host, port) = socketRunner.serveToStreamWithArrow(newIter, dataSchema, 10, commonTaskContext)
println(s"${host}:${port}")
Thread.currentThread().join()

Python Client side:

import os
import socket

from pyjava.serializers import \
    ArrowStreamPandasSerializer

out_ser = ArrowStreamPandasSerializer(None, True, True)

out_ser = ArrowStreamPandasSerializer("Asia/Harbin", False, None)
HOST = ""
PORT = -1
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.connect((HOST, PORT))
    buffer_size = int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
    infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size)
    outfile = os.fdopen(os.dup(sock.fileno()), "wb", buffer_size)
    kk = out_ser.load_stream(infile)
    for item in kk:
        print(item)

Python Server side:

import os

import pandas as pd

os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"
from pyjava.api.serve import OnceServer

ddata = pd.DataFrame(data=[[1, 2, 3, 4], [2, 3, 4, 5]])

server = OnceServer("127.0.0.1", 11111, "Asia/Harbin")
server.bind()
server.serve([{'id': 9, 'label': 1}])

Java Client side:

println(enconder.fromRow(i.copy()))) javaConext.close ">
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import tech.mlsql.arrow.python.iapp.{AppContextImpl, JavaContext}
import tech.mlsql.arrow.python.runner.SparkSocketRunner
import tech.mlsql.common.utils.network.NetUtils

val enconder = RowEncoder.apply(StructType(Seq(StructField("a", LongType),StructField("b", LongType)))).resolveAndBind()
val socketRunner = new SparkSocketRunner("wow", NetUtils.getHost, "Asia/Harbin")
val javaConext = new JavaContext
val commonTaskContext = new AppContextImpl(javaConext, null)
val iter = socketRunner.readFromStreamWithArrow("127.0.0.1", 11111, commonTaskContext)
iter.foreach(i => println(enconder.fromRow(i.copy())))
javaConext.close

How to configure python worker runs in Docker (todo)

Owner
Byzer
Let data speak.
Byzer
This is an implementation of NeuronJ work with python.

NeuronJ This is an implementation of NeuronJ work with python. NeuronJ is a plug-in for ImageJ that allows you to create and edit neurons masks. Image

Mohammad Mahdi Samei 3 Aug 28, 2022
Run the Tianxunet software on the Xiaoyao Android simulator

Run the Tianxunet software on the Xiaoyao Android simulator, and automatically fill in the answers of English listening on the premise of having answers

1 Feb 13, 2022
Quick script for automatically extracting syscall numbers for an OS

Syscalls-Extractor Quick script for automatically extracting syscall numbers for an OS $ python3 .\syscalls-extractor.py --help usage: syscalls-extrac

m0rv4i 54 Feb 10, 2022
Insert a Spotify Playlist, Get a list of YouTube URLs from it.

spotbee This is a module that spits out YouTube URLs from Spotify Playlist URLs Why use this? It is asynchronous which makes it compatible to use with

Nishant Sapkota 10 Apr 06, 2022
Shell scripts made simple 🐚

zxpy Shell scripts made simple 🐚 Inspired by Google's zx, but made much simpler and more accessible using Python. Rationale Bash is cool, and it's ex

Tushar Sadhwani 492 Dec 27, 2022
A Python Perforce package that doesn't bring in any other packages to work.

P4CMD 🌴 A Python Perforce package that doesn't bring in any other packages to work. Relies on p4cli installed on the system. p4cmd The p4cmd module h

Niels Vaes 13 Dec 19, 2022
A discord group chat creator just made it because i saw people selling this stuff for like up to 40 bucks

gccreator some discord group chat tools just made it because i saw people selling this stuff for like up to 40 bucks (im currently working on a faster

baum1810 6 Oct 03, 2022
Cylc: a workflow engine for cycling systems

Cylc: a workflow engine for cycling systems. Repository master branch: core meta-scheduler component of cylc-8 (in development); Repository 7.8.x branch: full cylc-7 system.

The Cylc Workflow Engine 205 Dec 20, 2022
Very efficient backup system based on the git packfile format, providing fast incremental saves and global deduplication

Very efficient backup system based on the git packfile format, providing fast incremental saves and global deduplication (among and within files, including virtual machine images). Current release is

bup 6.9k Dec 27, 2022
Скрипт позволяет выгрузить участников чатов/каналов(по чату для комментариев) и сообщения в различные форматы файлов.

TG-Parser Парсер участников и сообщений из ТГ-Чатов и чатов для комментариев в ТГ-Каналах Возможности Выгрузка участников групп/каналов(по чату для ко

50 Jan 06, 2023
Assignment for python course, BUPT 2021.

pyFuujinrokuDestiny Assignment for python course, BUPT 2021. Notice username and password must be ASCII encoding. If username exists in database, syst

Ellias Kiri Stuart 3 Jun 18, 2021
LOL英雄联盟云顶之弈挂机刷代币脚本,全自动操作,智能逻辑,功能齐全。

LOL云顶之弈挂机刷代币脚本 这是2019年全球总决赛写的一个云顶挂机脚本,python完成的。 功能: 自动拿牌卖牌 策略是高星策略,非固定阵容 自动登陆账号、打码、异常重启 战利品截图上传百度云 web中控发号,改密码,查看信息等 代码是三天赶出来的,所以有点混乱,WEB中控代码也不知道扔哪去了

77 Oct 10, 2022
Audio-analytics for music-producers! Automate tedious tasks such as musical scale detection, BPM rate classification and audio file conversion.

Click here to be re-directed to the Beat Inspect Streamlit Web-App You are a music producer? Let's get in touch via LinkedIn Fundamental Analytics for

Stefan Rummer 11 Dec 27, 2022
Python AVL Protocols Server for Codec 8 and Codec 8 Extended Protocols

pycodecs Package provides python AVL Protocols Server for Codec 8 and Codec 8 Extended Protocols This package will parse the AVL Data and log it in hu

Vardharajulu K N 2 Jun 21, 2022
Odoo modules related to website/webshop

Website Apps related to Odoo it's website/webshop features: webshop_public_prices: allow configuring to hide or show product prices and add to cart bu

Yenthe Van Ginneken 9 Nov 04, 2022
A simple Programming Language

R.S.O.C. A custom built programming language About The Project R.S.O.C. is a custom built programming language very similar to a low-level 8085 progra

Ravi Maurya 17 Sep 13, 2022
A middle-to-high level algorithm book designed with coding interview at heart!

Hands-on Algorithmic Problem Solving A one-stop coding interview prep book! About this book In short, this is a middle-to-high level algorithm book de

Li Yin 1.8k Jan 02, 2023
A comprensive software collection for nmea manipulation

nmeatoolkit A comprensive software collection for nmea manipulation; it includes a library and a collections of command line tools. Library pipes: con

Davide Gessa 1 Sep 14, 2022
HungryBall to prosta gra, w której gracz wciela się w piłkę.

README POLSKI Opis gry HungryBall to prosta gra, w której gracz wciela się w piłkę. Sterowanie odbywa się za pomocą przycisków w, a, s i d lub opcjona

Karol 1 Nov 24, 2021
Convert-Decimal-to-Binary-Octal-and-Hexadecimal

Convert-Decimal-to-Binary-Octal-and-Hexadecimal We have a number in a decimal number, and we have to convert it into a binary, octal, and hexadecimal

Maanyu M 2 Oct 08, 2021