Commit dbd67f91 authored by Federico Sismondi's avatar Federico Sismondi

udpated AmqpListener for logging more info on errors

parent c65cd6ac
...@@ -2,6 +2,7 @@ import os ...@@ -2,6 +2,7 @@ import os
import pika import pika
import logging import logging
import threading import threading
import traceback
# for using it as library and as a __main__ # for using it as library and as a __main__
try: try:
...@@ -18,12 +19,19 @@ class AmqpSynchCallTimeoutError(Exception): ...@@ -18,12 +19,19 @@ class AmqpSynchCallTimeoutError(Exception):
class AmqpListener(threading.Thread): class AmqpListener(threading.Thread):
COMPONENT_ID = 'amqp_listener_%s' % uuid.uuid1()
DEFAULT_TOPIC_SUSBCRIPTIONS = ['#'] DEFAULT_TOPIC_SUSBCRIPTIONS = ['#']
DEFAULT_EXCHAGE = 'amq.topic' DEFAULT_EXCHAGE = 'amq.topic'
DEFAULT_AMQP_URL = 'amqp://guest:guest@locahost/'
def __init__(self, amqp_url, amqp_exchange, callback, topics=None): def __init__(self, amqp_url, amqp_exchange, callback, topics=None):
self.COMPONENT_ID = 'amqp_listener_%s' % str(uuid.uuid4())[:8]
self.connection = None
self.channel = None
self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID
threading.Thread.__init__(self) threading.Thread.__init__(self)
if callback is None: if callback is None:
...@@ -31,30 +39,36 @@ class AmqpListener(threading.Thread): ...@@ -31,30 +39,36 @@ class AmqpListener(threading.Thread):
else: else:
self.message_dispatcher = callback self.message_dispatcher = callback
self.connection = pika.BlockingConnection(pika.URLParameters(amqp_url)) if topics: # subscribe only to passed list
self.channel = self.connection.channel() self.topics = topics
else:
self.topics = self.DEFAULT_TOPIC_SUSBCRIPTIONS
if amqp_exchange: if amqp_exchange:
self.exchange = amqp_exchange self.exchange = amqp_exchange
else: else:
self.exchange = self.DEFAULT_EXCHAGE self.exchange = self.DEFAULT_EXCHAGE
if amqp_url:
self.amqp_url = amqp_url
else:
self.amqp_url = self.DEFAULT_AMQP_URL
self.amqp_connect()
def amqp_connect(self):
self.connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
self.channel = self.connection.channel()
# queues & default exchange declaration # queues & default exchange declaration
self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.services_queue_name, self.channel.queue_declare(queue=self.services_queue_name,
auto_delete=True, auto_delete=True,
arguments={'x-max-length': 200}) arguments={'x-max-length': 200})
if topics: # subscribe only to passed list
self.topics = topics
else:
self.topics = self.DEFAULT_EXCHAGE
for t in self.topics: for t in self.topics:
self.channel.queue_bind(exchange=self.exchange, self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name, queue=self.services_queue_name,
routing_key=t) routing_key=t)
# Hello world message # Hello world message
m = MsgTestingToolComponentReady( m = MsgTestingToolComponentReady(
component=self.COMPONENT_ID, component=self.COMPONENT_ID,
...@@ -92,11 +106,13 @@ class AmqpListener(threading.Thread): ...@@ -92,11 +106,13 @@ class AmqpListener(threading.Thread):
'app_id': props.app_id, 'app_id': props.app_id,
} }
m = None
try: try:
m = Message.from_json(body) m = Message.from_json(body)
if m is None:
raise Exception("Couldnt build message from json %s, amqp props: %s " % (body, props_dict))
m.update_properties(**props_dict) m.update_properties(**props_dict)
m.routing_key = method.routing_key m.routing_key = method.routing_key
logging.debug('Message in bus: %s'%repr(m))
self.message_dispatcher(m) self.message_dispatcher(m)
except NonCompliantMessageFormatError as e: except NonCompliantMessageFormatError as e:
...@@ -104,19 +120,26 @@ class AmqpListener(threading.Thread): ...@@ -104,19 +120,26 @@ class AmqpListener(threading.Thread):
except Exception as e: except Exception as e:
logging.error(e) logging.error(e)
logging.error('message received:\n\tr_key: %s\n\t%s'%(method.routing_key,body)) logging.error('message received:\n\tr_key: %s\n\t%s' % (method.routing_key, body))
raise e
finally: finally:
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
def run(self): def run(self):
logging.info("Starting thread listening on the event bus on topics %s" % self.topics) logging.info("Starting thread listening on the event bus on topics %s" % self.topics)
for i in range(1, 4): for i in range(1, 4):
try: try:
self.channel.start_consuming() self.channel.start_consuming()
except pika.exceptions.ConnectionClosed as err:
logging.error('Unexpected connection closed, retrying %s/%s' % (i, 4)) except (pika.exceptions.ConnectionClosed,
logging.error(err) pika.exceptions.ChannelError,
pika.exceptions.ChannelClosed) as err:
logging.error('[AmqpListener] Unexpected connection closed, reconnecting %s/%s' % (i, 4))
logging.error(traceback.format_exc())
self.amqp_connect()
logging.info('Bye byes!') logging.info('Bye byes!')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment