Очередь ZMQ

[ ROT ]

13 Sep 2013

Задачка из курса Распределённых Объектных технологий.

  • master генерирует пары чисел
  • worker складывает числа
  • master —async (pub/sub)–> worker
  • worker —async (pub/sub) –> master

Запуск

  • python master.py
  • python worker.py

Поведение

Мастер генерирует пар чисел (REPEATS = 100), выводит на консоль, отправляет их worker’у и слушает ответы.

Worker считает и отправляет ответ по другому каналу.

Master выводит на консоль ответ (выражение = ответ).

В Master.run() есть закомментированная строка возведения в степень, которая более наглядно показывает асинхронность происходящего.

master.py

#!/usr/bin/python3
# -*- coding: utf-8 -*-
from IN import INT64_MAX, INT64_MIN

__author__ = 'Messiah'

import zmq
import random
REPEATS = 100

class Master():
    def __init__(self, port1, port2):
        self.context = zmq.Context()
        self.output = self.context.socket(zmq.PUB)
        self.output.bind("tcp://127.0.0.1:{}".format(port1))
        self.input = self.context.socket(zmq.SUB)
        self.input.connect("tcp://127.0.0.1:{}".format(port2))
        self.input.setsockopt(zmq.SUBSCRIBE, b"")

    def run(self):
        self.output.send(b"hello")
        for i in range(REPEATS):
            #msg = "{}**{}".format(random.randint(0, INT64_MAX),
            #                     random.randint(10000, 20000))
            msg = "{}+{}".format(random.randint(INT64_MIN, INT64_MAX),
                                 random.randint(INT64_MIN, INT64_MAX))
            self.output.send(msg.encode())
            print(msg)
            for i in range(100000):
                try:
                    msg = self.input.recv(zmq.NOBLOCK).decode()
                except zmq.core.error.ZMQError:
                    pass
                else:
                    print(msg)

        self.output.send(b"exit")
        print("exit")
        while True:
            try:
                msg = self.input.recv(zmq.NOBLOCK).decode()
            except zmq.core.error.ZMQError:
                pass
            else:
                if msg == "exit":
                    return
                print(msg)

if __name__ == "__main__":
    M = Master(5000, 6000)
    M.run()

worker.py

#!/usr/bin/python3
# -*- coding: utf-8 -*-

__author__ = 'Messiah'

import zmq

class Worker():
    def __init__(self, port1, port2):
        self.context = zmq.Context()
        self.input = self.context.socket(zmq.SUB)
        self.input.connect("tcp://127.0.0.1:{}".format(port1))
        self.input.setsockopt(zmq.SUBSCRIBE, b"")
        self.output = self.context.socket(zmq.PUB)
        self.output.bind("tcp://127.0.0.1:{}".format(port2))
        self.queue = []

    def run(self):
        while True:
            try:
                self.queue.append(self.input.recv(zmq.NOBLOCK).decode())
            except zmq.core.error.ZMQError:
                pass
            if self.queue:
                for msg in self.queue:
                    if msg == "exit":
                        self.output.send(b"exit")
                    else:
                        self.output.send("{} = {}".format(msg,
                                                          eval(msg)).encode())
                    self.queue.remove(msg)

if __name__ == "__main__":
    W = Worker(5000, 6000)
    W.run()

GitHub