-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreceiver.py
57 lines (46 loc) · 1.58 KB
/
receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import pika
from threading import Thread
class ReceiverRabbitMqConfigure:
def __init__(
self,
host="localhost",
queue="",
exchange="",
exchange_type="fanout",
exclusive=True,
):
"""Server initialization"""
self.host = host
self.queue = queue
self.exchange = exchange
self.exchange_type = exchange_type
self.exclusive = exclusive
class Receiver:
def __init__(self, config, user=None):
self.user = user
self.config = config
def connect(self):
self.exchange = self.config.exchange
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.config.host)
)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange=self.config.exchange, exchange_type=self.config.exchange_type
)
result = self.channel.queue_declare(
queue=self.config.queue, exclusive=self.config.exclusive
)
self.queue_name = result.method.queue
self.channel.queue_bind(exchange=self.config.exchange, queue=self.queue_name)
def listen_channel(self, cb):
self.channel.basic_consume(
queue=self.queue_name, on_message_callback=cb, auto_ack=True
)
self.channel.basic_qos(prefetch_count=0)
self.channel.start_consuming()
def async_consumer(self, cb):
worker = Thread(target=self.listen_channel, args=[cb])
worker.start()
def discard_channel(self):
self.channel.stop_consuming()