Commit 0ee50062 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

thread handles amqp connection independenlty now

parent 4d471ae5
...@@ -94,18 +94,18 @@ class NullLogHandler(logging.Handler): ...@@ -94,18 +94,18 @@ class NullLogHandler(logging.Handler):
class AmqpSniffer(threading.Thread): class AmqpSniffer(threading.Thread):
COMPONENT_ID = 'amqp_sniffer_%s' % uuid.uuid1() COMPONENT_ID = 'amqp_sniffer_%s' % uuid.uuid1()
DEFAULT_EXCHAGE = 'amq.topic' DEFAULT_EXCHAGE = 'amq.topic'
DEFAULT_URL = 'amqp://guest:guest@localhost'
def __init__(self, conn, exchange=None, topics=None): def __init__(self, url=None, exchange=None, topics=None):
threading.Thread.__init__(self) threading.Thread.__init__(self)
if exchange: self.exchange = exchange if exchange else self.DEFAULT_EXCHAGE
self.exchange = exchange
else: self.url = url if url else self.DEFAULT_URL
self.exchange = self.DEFAULT_EXCHAGE
# queues & default exchange declaration # queues & default exchange declaration
self.connection = conn self.connection = pika.BlockingConnection(pika.URLParameters(self.url))
self.channel = self.connection.channel() self.channel = self.connection.channel()
self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.services_queue_name, self.channel.queue_declare(queue=self.services_queue_name,
...@@ -756,7 +756,7 @@ if __name__ == '__main__': ...@@ -756,7 +756,7 @@ if __name__ == '__main__':
cli.start() cli.start()
amqp_listener = AmqpSniffer(connection, AMQP_EXCHANGE, None) # if None subscribe to all messages amqp_listener = AmqpSniffer(AMQP_URL, AMQP_EXCHANGE, ['#']) # if None subscribe to all messages
amqp_listener.start() amqp_listener.start()
# interrumpted # interrumpted
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment