A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.

Overview

Introduction

A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.

Requirements

  • Python >=3.7.3
  • Pika ==1.2.0
  • Aio-pika ==6.8.0
  • Requests >=2.25.1

Installation

pip install rctiplus-rabbitmq-python-sdk

Getting latest version

pip install rctiplus-rabbitmq-python-sdk --upgrade

Usage

To start using this SDK, you may follow given instructions bellow in order.

Payload handler

First, you need to create a payload class handler that implement MessagePayload. For example, we want to make a class to handle JSON payload:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname'])">
import json
from rctiplus_rabbitmq_python_sdk import MessagePayload

class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])

MessagePayload class from the SDK's core has this functions that require to implemented:

'MessagePayload': """Generate data from specified string payload message format Raises: NotImplementedError: Raise an error if not implemented """ raise NotImplementedError() def __str__(self) -> str: """Convert specified data format to string payload message Raises: NotImplementedError: Raise an error if not implemented Returns: str: String payload message """ raise NotImplementedError()">
class MessagePayload:
    """Python RabbitMQ message payload
    """
    
    @classmethod
    def from_str(cls, message: str) -> 'MessagePayload':
        """Generate data from specified string payload message format

        Raises:
            NotImplementedError: Raise an error if not implemented
        """
        raise NotImplementedError()

    def __str__(self) -> str:
        """Convert specified data format to string payload message

        Raises:
            NotImplementedError: Raise an error if not implemented

        Returns:
            str: String payload message
        """
        raise NotImplementedError()

Connect to RabbitMQ

Making connection to RabbitMQ server can be done by doing this simple way:

from rctiplus_rabbitmq_python_sdk import RabbitMQ

conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

Sending message

After you have payload class handler & connected to the RabbitMQ server, now you can try to send a messsage to queue channel. For example, we will send JSON payload message to test queue:

payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)

Receiving message

Great. Now, in our consumer app, we want to listen & receive that message, and then doing some stuff:

def callback(ch, method, properties, body):
    print("[x] Received %r" % body)
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

conn.receive('test', callback)

For callback function, according to Pikas standart library, you need to pass 4 arguments ch, method, properties and body to catch all needed values from incomming message.

Putting it all together

Here is the complete example from the code above:

Complete example of sender or producer app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Connect to RabbitMQ conn = RabbitMQ() conn.connect(host='localhost', port=5672, username='guest', password='guest') # Send payload to queue payload = JSONPayload('John', 'Doe') print('payload:', payload) conn.send('test', payload)">
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

# Send payload to queue
payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)

Complete example of consumer or receiver app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Connect to RabbitMQ conn = RabbitMQ() conn.connect(host='localhost', port=5672, username='guest', password='guest') # Create a callback to be executed immadiately after recieved a message def callback(ch, method, properties, body): print("[x] Received %r" % body) # Generate data from string payload message data = JSONPayload.from_str(body) print(f'data: firstname={data.firstname}, lastname={data.lastname}') # Receive & listen messages from queue channel conn.receive('test', callback)">
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })

    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

# Create a callback to be executed immadiately after recieved a message
def callback(ch, method, properties, body):
    print("[x] Received %r" % body)
    
    # Generate data from string payload message
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

# Receive & listen messages from queue channel
conn.receive('test', callback)

Asynchronous

This SDK also support asynchronous process. To use this feature, use AIORabbitMQ instead of RabbitMQ. All methods provided in AIORabbitMQ are treated as async function. So, when you calling the methods, you need to await them.

Async connect to RabbitMQ

from rctiplus_rabbitmq_python_sdk import AIORabbitMQ

conn = AIORabbitMQ(loop)
await conn.connect(host='localhost', port=5672, username='guest', password='guest')

loop is an asynchronous event loop, example: asyncio.get_event_loop()

Async sending message

payload = JSONPayload('John', 'Doe')
print('payload:', payload)
await conn.send('test', payload)

Async receiving message

async def callback(message):
    body = message.body
    print("[x] Received %r" % body)
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

await conn.receive('test', callback)

In asynchronous process, you just need pass 1 argument on callback function. This argument is a representation of aio_pika.IncomingMessage to catch all needed values from incomming message.

Complete example of asynchronous process

Here is the complete example of asynchronous process above:

Complete example of asynchronous sender or producer app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Main function async def main(loop): # Connect to RabbitMQ conn = AIORabbitMQ(loop) await conn.connect(host='localhost', port=5672, username='guest', password='guest') async with conn.connection: # Send payload to queue payload = JSONPayload('John', 'Doe') print('payload:', payload) await conn.send('test', payload) # Event loop loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()">
import json
import asyncio
from rctiplus_rabbitmq_python_sdk import AIORabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Main function
async def main(loop):

    # Connect to RabbitMQ
    conn = AIORabbitMQ(loop)
    await conn.connect(host='localhost', port=5672, username='guest', password='guest')
    
    async with conn.connection:
        # Send payload to queue
        payload = JSONPayload('John', 'Doe')
        print('payload:', payload)
        await conn.send('test', payload)


# Event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

Complete example of asynchronous consumer or receiver app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Main function async def main(loop): # Connect to RabbitMQ conn = AIORabbitMQ(loop) await conn.connect(host='localhost', port=5672, username='guest', password='guest') # Create a callback to be executed immadiately after recieved a message async def callback(message): body = message.body print("[x] Received %r" % body) # Generate data from string payload message data = JSONPayload.from_str(body) print(f'data: firstname={data.firstname}, lastname={data.lastname}') # Receive & listen messages from queue channel await conn.receive('test', callback) return conn # Event loop loop = asyncio.get_event_loop() connection = loop.run_until_complete(main(loop)) try: loop.run_forever() finally: loop.run_until_complete(connection.disconnect())">
import json
import asyncio
from rctiplus_rabbitmq_python_sdk import AIORabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Main function
async def main(loop):

    # Connect to RabbitMQ
    conn = AIORabbitMQ(loop)
    await conn.connect(host='localhost', port=5672, username='guest', password='guest')
    
    # Create a callback to be executed immadiately after recieved a message
    async def callback(message):
        body = message.body
        print("[x] Received %r" % body)
        
        # Generate data from string payload message
        data = JSONPayload.from_str(body)
        print(f'data: firstname={data.firstname}, lastname={data.lastname}')

    # Receive & listen messages from queue channel
    await conn.receive('test', callback)

    return conn


# Event loop
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(main(loop))
try:
    loop.run_forever()
finally:
    loop.run_until_complete(connection.disconnect())

License

GNU General Public License v3

Owner
Dali Kewara
An unexpected journey and gonna make it simple but Spectacular!
Dali Kewara
The producer-consumer problem implemented with threads in Python

This was developed using a Python virtual environment, I would strongly recommend to do the same if you want to clone this repository. How to run this

Omar Beltran 1 Oct 30, 2021
Automatically Generate Rulesets for IIS for Intelligent HTTP/S C2 Redirection

Automatically Generate Rulesets for IIS for Intelligent HTTP/S C2 Redirection This project converts a Cobalt Strike profile to a functional web.config

Jesse 99 Dec 13, 2022
Modeling Category-Selective Cortical Regions with Topographic Variational Autoencoders

Modeling Category-Selective Cortical Regions with Topographic Variational Autoencoders Getting Started Install requirements with Anaconda: conda env c

T. Andy Keller 4 Aug 22, 2022
Simple collection of GTPS Flood in Python.

GTPS Flood Simple collection of GTPS Flood in Python. NOTE Give me credit if you use this source, don't trade/sell this tool, And USE AT YOUR OWN RISK

PhynX 6 Dec 07, 2021
A time table app to notify the user about their class timings

kivyTimeTable A time table app to notify the user about their class timings Features This project incorporates some features i wanted to see in a time

2 Dec 15, 2021
async parser for JET

This project is mainly aims to provide an async parsing option for NTDS.dit database file for obtaining user secrets.

15 Mar 08, 2022
An okayish python script to generate a random Euler circuit with given number of vertices and edges.

Euler-Circuit-Test-Case-Generator An okayish python script to generate a random Euler circuit with given number of vertices and edges. Executing the S

Alen Antony 1 Nov 13, 2021
Manage your exceptions in Python like a PRO

A linter to manage all your python exceptions and try/except blocks (limited only for those who like dinosaurs).

Guilherme Latrova 353 Dec 31, 2022
This script allows you to retrieve all functions / variables names of a Python code, and the variables values.

Memory Extractor This script allows you to retrieve all functions / variables names of a Python code, and the variables values. How to use it ? The si

Venax 2 Dec 26, 2021
kawadi is a versatile tool that used as a form of weapon and is used to cut, shape and split wood.

kawadi kawadi (કવાડિ in Gujarati) (Axe in English) is a versatile tool that used as a form of weapon and is used to cut, shape and split wood. kawadi

Jay Vala 2 Jan 10, 2022
Simple RGB to HEX game made in python

Simple RGB to HEX game made in python

5 Aug 26, 2022
Trying to replicate (albeit unsuccessfully) the phenomenon of boids using Ursina in a naive manner.

Boids_Boi Trying to replicate (albeit unsuccessfully) the phenomenon of boids using Ursina in a naive manner. Please install the Ursina module before

2 Oct 19, 2021
A python module to validate input.

A python module to validate input.

Matthias 6 Sep 13, 2022
.bvh to .mcfunction file converter.

bvh-to-mcf .bvh file to .mcfunction converter

Hanmin Kim 28 Nov 21, 2022
A Python package for floating-point binary fractions. Do math in base 2!

An implementation of a floating-point binary fractions class and module in Python. Work with binary fractions and binary floats with ease!

10 Oct 29, 2022
Rabbito is a mini tool to find serialized objects in input values

Rabbito-ObjectFinder Rabbito is a mini tool to find serialized objects in input values What does Rabbito do Rabbito has the main object finding Serial

7 Dec 13, 2021
🦩 A Python tool to create comment-free Jupyter notebooks.

Pelikan Pelikan lets you convert notebooks to comment-free notebooks. In other words, It removes Python block and inline comments from source cells in

Hakan Özler 7 Nov 20, 2021
Shut is an opinionated tool to simplify publishing pure Python packages.

Welcome to Shut Shut is an opinionated tool to simplify publishing pure Python packages. What can Shut do for you? Generate setup files (setup.py, MAN

Niklas Rosenstein 6 Nov 18, 2022
Use generator for range function

Use the generator for the range function! installation method: pip install yrange How to use: First import yrange in your application. You can then wo

1 Oct 28, 2021
A Python class for checking the status of an enabled Minecraft server

mcstatus provides an easy way to query Minecraft servers for any information they can expose. It provides three modes of access (query, status and ping), the differences of which are listed below in

Nathan Adams 1.1k Jan 06, 2023