Задачка из курса Распределённых Объектных технологий.
- master генерирует пары чисел
- worker складывает числа
- master —async (pub/sub)–> worker
- worker —async (pub/sub) –> master
Запуск
- python master.py
- python worker.py
Поведение
Мастер генерирует пар чисел (REPEATS = 100), выводит на консоль, отправляет их worker’у и слушает ответы.
Worker считает и отправляет ответ по другому каналу.
Master выводит на консоль ответ (выражение = ответ).
В Master.run() есть закомментированная строка возведения в степень, которая более наглядно показывает асинхронность происходящего.
master.py
#!/usr/bin/python3
# -*- coding: utf-8 -*-
from IN import INT64_MAX, INT64_MIN
__author__ = 'Messiah'
import zmq
import random
REPEATS = 100
class Master():
def __init__(self, port1, port2):
self.context = zmq.Context()
self.output = self.context.socket(zmq.PUB)
self.output.bind("tcp://127.0.0.1:{}".format(port1))
self.input = self.context.socket(zmq.SUB)
self.input.connect("tcp://127.0.0.1:{}".format(port2))
self.input.setsockopt(zmq.SUBSCRIBE, b"")
def run(self):
self.output.send(b"hello")
for i in range(REPEATS):
#msg = "{}**{}".format(random.randint(0, INT64_MAX),
# random.randint(10000, 20000))
msg = "{}+{}".format(random.randint(INT64_MIN, INT64_MAX),
random.randint(INT64_MIN, INT64_MAX))
self.output.send(msg.encode())
print(msg)
for i in range(100000):
try:
msg = self.input.recv(zmq.NOBLOCK).decode()
except zmq.core.error.ZMQError:
pass
else:
print(msg)
self.output.send(b"exit")
print("exit")
while True:
try:
msg = self.input.recv(zmq.NOBLOCK).decode()
except zmq.core.error.ZMQError:
pass
else:
if msg == "exit":
return
print(msg)
if __name__ == "__main__":
M = Master(5000, 6000)
M.run()
worker.py
#!/usr/bin/python3
# -*- coding: utf-8 -*-
__author__ = 'Messiah'
import zmq
class Worker():
def __init__(self, port1, port2):
self.context = zmq.Context()
self.input = self.context.socket(zmq.SUB)
self.input.connect("tcp://127.0.0.1:{}".format(port1))
self.input.setsockopt(zmq.SUBSCRIBE, b"")
self.output = self.context.socket(zmq.PUB)
self.output.bind("tcp://127.0.0.1:{}".format(port2))
self.queue = []
def run(self):
while True:
try:
self.queue.append(self.input.recv(zmq.NOBLOCK).decode())
except zmq.core.error.ZMQError:
pass
if self.queue:
for msg in self.queue:
if msg == "exit":
self.output.send(b"exit")
else:
self.output.send("{} = {}".format(msg,
eval(msg)).encode())
self.queue.remove(msg)
if __name__ == "__main__":
W = Worker(5000, 6000)
W.run()