From 6e66e290d10fcc9c0ecafdd671f33dc91b42e53b Mon Sep 17 00:00:00 2001 From: Federico Sismondi Date: Thu, 30 Nov 2017 16:45:58 +0100 Subject: [PATCH] event bus listener component enhacements --- amqp_synch_call.py | 2 -- event_bus_utils.py | 53 +++++++++++++++++++++++----------------------- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/amqp_synch_call.py b/amqp_synch_call.py index 047da61..c52d6fb 100644 --- a/amqp_synch_call.py +++ b/amqp_synch_call.py @@ -170,5 +170,3 @@ if __name__ == '__main__': # r = amqp_request(connection, m, 'someImaginaryComponent') # print(repr(r)) - - diff --git a/event_bus_utils.py b/event_bus_utils.py index 7a900bd..0ffefee 100644 --- a/event_bus_utils.py +++ b/event_bus_utils.py @@ -1,5 +1,6 @@ import os import pika +import logging import threading # for using it as library and as a __main__ @@ -15,25 +16,23 @@ AMQP_EXCHANGE = 'amq.topic' class AmqpSynchCallTimeoutError(Exception): pass + class AmqpListener(threading.Thread): COMPONENT_ID = 'amqp_listener_%s' % uuid.uuid1() + DEFAULT_TOPIC_SUSBCRIPTIONS = ['#'] DEFAULT_EXCHAGE = 'amq.topic' - def __init__(self, amqp_url, amqp_exchange, topics, callback): + def __init__(self, amqp_url, amqp_exchange, callback, topics=None): threading.Thread.__init__(self) if callback is None: - self.message_dipatcher = print + self.message_dispatcher = print else: - self.message_dipatcher = callback + self.message_dispatcher = callback - try: - self.connection = pika.BlockingConnection(pika.URLParameters(amqp_url)) - self.channel = self.connection.channel() - except pika.exceptions.ProbableAccessDeniedError: - self.message_dipatcher('Probable access denied error. Is provided AMQP_URL correct?') - self.exit() + self.connection = pika.BlockingConnection(pika.URLParameters(amqp_url)) + self.channel = self.connection.channel() if amqp_exchange: self.exchange = amqp_exchange @@ -47,15 +46,15 @@ class AmqpListener(threading.Thread): arguments={'x-max-length': 200}) if topics: # subscribe only to passed list - for t in topics: - self.channel.queue_bind(exchange=self.exchange, - queue=self.services_queue_name, - routing_key=t) + self.topics = topics + else: + self.topics = self.DEFAULT_EXCHAGE - else: # subscribe to all events + for t in self.topics: self.channel.queue_bind(exchange=self.exchange, queue=self.services_queue_name, - routing_key='#') + routing_key=t) + # Hello world message m = MsgTestingToolComponentReady( component=self.COMPONENT_ID, @@ -98,29 +97,28 @@ class AmqpListener(threading.Thread): m = Message.from_json(body) m.update_properties(**props_dict) m.routing_key = method.routing_key - self.message_dipatcher(m) + self.message_dispatcher(m) except NonCompliantMessageFormatError as e: - self.message_dipatcher('%s got a non compliant message error %s' % (self.__class__.__name__, e)) + logging.error('%s got a non compliant message error %s' % (self.__class__.__name__, e)) except Exception as e: - pass - # self.message_dipatcher('Error : %s' % str(e)) - # self.message_dipatcher(str(body)) + logging.error(e) finally: ch.basic_ack(delivery_tag=method.delivery_tag) def run(self): - self.message_dipatcher("Starting thread listening on the event bus..") + logging.info("Starting thread listening on the event bus on topics %s" % self.topics) for i in range(1, 4): try: self.channel.start_consuming() except pika.exceptions.ConnectionClosed as err: - self.message_dipatcher(err) - self.message_dipatcher('Unexpected connection closed, retrying %s/%s' % (i, 4)) + logging.error('Unexpected connection closed, retrying %s/%s' % (i, 4)) + logging.error(err) + + logging.info('Bye byes!') - self.message_dipatcher('Bye byes!') def publish_message(connection, message): """ @@ -238,12 +236,14 @@ if __name__ == '__main__': def callback_function(message_received): print("Callback function received: \n\t" + repr(message_received)) + # amqp listener example: amqp_listener_thread = AmqpListener( amqp_url=AMQP_URL, amqp_exchange=AMQP_EXCHANGE, - topics='#', - callback=callback_function) + callback=callback_function, + topics='#' + ) try: amqp_listener_thread.start() @@ -271,4 +271,3 @@ if __name__ == '__main__': except AmqpSynchCallTimeoutError as e: print("Nobody answered to our request :'(") - -- 2.24.1