Commit 6e66e290 authored by Federico Sismondi's avatar Federico Sismondi

event bus listener component enhacements

parent 2830d227
......@@ -170,5 +170,3 @@ if __name__ == '__main__':
# r = amqp_request(connection, m, 'someImaginaryComponent')
# print(repr(r))
import os
import pika
import logging
import threading
# for using it as library and as a __main__
......@@ -15,25 +16,23 @@ AMQP_EXCHANGE = 'amq.topic'
class AmqpSynchCallTimeoutError(Exception):
pass
class AmqpListener(threading.Thread):
COMPONENT_ID = 'amqp_listener_%s' % uuid.uuid1()
DEFAULT_TOPIC_SUSBCRIPTIONS = ['#']
DEFAULT_EXCHAGE = 'amq.topic'
def __init__(self, amqp_url, amqp_exchange, topics, callback):
def __init__(self, amqp_url, amqp_exchange, callback, topics=None):
threading.Thread.__init__(self)
if callback is None:
self.message_dipatcher = print
self.message_dispatcher = print
else:
self.message_dipatcher = callback
self.message_dispatcher = callback
try:
self.connection = pika.BlockingConnection(pika.URLParameters(amqp_url))
self.channel = self.connection.channel()
except pika.exceptions.ProbableAccessDeniedError:
self.message_dipatcher('Probable access denied error. Is provided AMQP_URL correct?')
self.exit()
self.connection = pika.BlockingConnection(pika.URLParameters(amqp_url))
self.channel = self.connection.channel()
if amqp_exchange:
self.exchange = amqp_exchange
......@@ -47,15 +46,15 @@ class AmqpListener(threading.Thread):
arguments={'x-max-length': 200})
if topics: # subscribe only to passed list
for t in topics:
self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name,
routing_key=t)
self.topics = topics
else:
self.topics = self.DEFAULT_EXCHAGE
else: # subscribe to all events
for t in self.topics:
self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name,
routing_key='#')
routing_key=t)
# Hello world message
m = MsgTestingToolComponentReady(
component=self.COMPONENT_ID,
......@@ -98,29 +97,28 @@ class AmqpListener(threading.Thread):
m = Message.from_json(body)
m.update_properties(**props_dict)
m.routing_key = method.routing_key
self.message_dipatcher(m)
self.message_dispatcher(m)
except NonCompliantMessageFormatError as e:
self.message_dipatcher('%s got a non compliant message error %s' % (self.__class__.__name__, e))
logging.error('%s got a non compliant message error %s' % (self.__class__.__name__, e))
except Exception as e:
pass
# self.message_dipatcher('Error : %s' % str(e))
# self.message_dipatcher(str(body))
logging.error(e)
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
def run(self):
self.message_dipatcher("Starting thread listening on the event bus..")
logging.info("Starting thread listening on the event bus on topics %s" % self.topics)
for i in range(1, 4):
try:
self.channel.start_consuming()
except pika.exceptions.ConnectionClosed as err:
self.message_dipatcher(err)
self.message_dipatcher('Unexpected connection closed, retrying %s/%s' % (i, 4))
logging.error('Unexpected connection closed, retrying %s/%s' % (i, 4))
logging.error(err)
logging.info('Bye byes!')
self.message_dipatcher('Bye byes!')
def publish_message(connection, message):
"""
......@@ -238,12 +236,14 @@ if __name__ == '__main__':
def callback_function(message_received):
print("Callback function received: \n\t" + repr(message_received))
# amqp listener example:
amqp_listener_thread = AmqpListener(
amqp_url=AMQP_URL,
amqp_exchange=AMQP_EXCHANGE,
topics='#',
callback=callback_function)
callback=callback_function,
topics='#'
)
try:
amqp_listener_thread.start()
......@@ -271,4 +271,3 @@ if __name__ == '__main__':
except AmqpSynchCallTimeoutError as e:
print("Nobody answered to our request :'(")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment