AMQP калькулятор

[ ROT ]

13 Sep 2013

Python 2 Требует работающего AMQP (например RabbitMQ) и библиотеки для работы с AMQP (pika)

master.py

Отправляет в RabbitMQ (порт 5672) выражение - сумму двух чисел.

Запуск:

python2 master.py [a [b]]

Если какие-то аргументы не указаны - генерируется случайное число от MinINT64 до MaxINT64.

#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys

__author__ = 'Messiah'

from pika import BlockingConnection, ConnectionParameters, BasicProperties
import random
from IN import INT64_MAX, INT64_MIN
import logging

logging.basicConfig()

def main(a, b):
    connection = BlockingConnection(ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='calc', durable=True)

    message = "{}+{}".format(a, b)

    channel.basic_publish(exchange='',
                          routing_key='calc',
                          body=message,
                          properties=BasicProperties(
                              delivery_mode=2,  # make message persistent
                          ))

    print " [x] Sent {}".format(message)

if __name__ == '__main__':
    if len(sys.argv) > 2:
        main(sys.argv[1], sys.argv[2])
    else:
        main(random.randint(INT64_MIN, INT64_MAX),
             random.randint(INT64_MIN, INT64_MAX))

slave.py

Слушает RabbitMQ очередь.

Если получает сообщение с темой ‘calc’ - пытается его посчитать.

Если получилось - выводит результат.

#!/usr/bin/python
# -*- coding: utf-8 -*-
__author__ = 'Messiah'
from pika import BlockingConnection, ConnectionParameters

def main():
    connection = BlockingConnection(ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='calc', durable=True)
    print ' [*] Waiting for messages. To exit press CTRL+C'

    def callback(ch, method, properties, body):
        print " [x] Persistence {}".format(properties.delivery_mode == 2)
        print " [x] Received: {}".format(body)
        try:
            print " [.] Caclulated: {} = {}".format(body, eval(body))
        except SyntaxError:
            print " [.] Invalid syntax"
        print " [x] Done"
        ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback,
                          queue='calc')
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        print "[*] Exit"
        return

if __name__ == '__main__':
    main()

GitHub