Commit 2b2d0398 authored by Federico Sismondi's avatar Federico Sismondi

Merge branch 'fulltest' into 'master'

Add message typing support to amqp_request

See merge request !18
parents 2dfa8c51 188b7a75
...@@ -14,6 +14,7 @@ VERSION = '0.0.10' ...@@ -14,6 +14,7 @@ VERSION = '0.0.10'
AMQP_EXCHANGE = 'amq.topic' AMQP_EXCHANGE = 'amq.topic'
MAX_LOG_LINE_LENGTH = 120 MAX_LOG_LINE_LENGTH = 120
logger = logging.getLogger(__name__)
class AmqpSynchCallTimeoutError(Exception): class AmqpSynchCallTimeoutError(Exception):
pass pass
...@@ -28,6 +29,7 @@ class AmqpListener(threading.Thread): ...@@ -28,6 +29,7 @@ class AmqpListener(threading.Thread):
self.COMPONENT_ID = 'amqp_listener_%s' % str(uuid.uuid4())[:8] self.COMPONENT_ID = 'amqp_listener_%s' % str(uuid.uuid4())[:8]
self.stopping = False
self.connection = None self.connection = None
self.channel = None self.channel = None
if pre_declared_queue: if pre_declared_queue:
...@@ -105,6 +107,7 @@ class AmqpListener(threading.Thread): ...@@ -105,6 +107,7 @@ class AmqpListener(threading.Thread):
self.channel.basic_consume(self.on_request, queue=self.services_queue_name) self.channel.basic_consume(self.on_request, queue=self.services_queue_name)
def stop(self): def stop(self):
self.stopping = True
self.channel.queue_delete(self.services_queue_name) self.channel.queue_delete(self.services_queue_name)
self.channel.stop_consuming() self.channel.stop_consuming()
self.connection.close() self.connection.close()
...@@ -117,15 +120,15 @@ class AmqpListener(threading.Thread): ...@@ -117,15 +120,15 @@ class AmqpListener(threading.Thread):
if m is None: if m is None:
raise Exception("Couldnt build message from json %s, rkey: %s " % (body, method.routing_key)) raise Exception("Couldnt build message from json %s, rkey: %s " % (body, method.routing_key))
m.routing_key = method.routing_key m.routing_key = method.routing_key
logging.debug('Message in bus: %s' % repr(m)[:MAX_LOG_LINE_LENGTH]) logger.debug('Message in bus: %s' % repr(m)[:MAX_LOG_LINE_LENGTH])
self.message_dispatcher(m) self.message_dispatcher(m)
except NonCompliantMessageFormatError as e: except NonCompliantMessageFormatError as e:
logging.error('%s got a non compliant message error %s' % (self.__class__.__name__, e)) logger.error('%s got a non compliant message error %s' % (self.__class__.__name__, e))
except Exception as e: except Exception as e:
logging.error(e) logger.error(e)
logging.error('message received:\n\tr_key: %s\n\t%s' % (method.routing_key, body)) logger.error('message received:\n\tr_key: %s\n\t%s' % (method.routing_key, body))
raise e raise e
finally: finally:
...@@ -152,7 +155,7 @@ class AmqpListener(threading.Thread): ...@@ -152,7 +155,7 @@ class AmqpListener(threading.Thread):
self.message_dispatcher(text_based_message_representation) self.message_dispatcher(text_based_message_representation)
def run(self): def run(self):
logging.info("Starting thread listening on the event bus on topics %s" % self.topics) logger.info("Starting thread listening on the event bus on topics %s" % self.topics)
for i in range(1, 4): for i in range(1, 4):
try: try:
...@@ -161,12 +164,12 @@ class AmqpListener(threading.Thread): ...@@ -161,12 +164,12 @@ class AmqpListener(threading.Thread):
except (pika.exceptions.ConnectionClosed, except (pika.exceptions.ConnectionClosed,
pika.exceptions.ChannelError, pika.exceptions.ChannelError,
pika.exceptions.ChannelClosed) as err: pika.exceptions.ChannelClosed) as err:
if not self.stopping:
logger.error('[AmqpListener] Unexpected connection closed, reconnecting %s/%s' % (i, 4))
logger.error(traceback.format_exc())
self.amqp_connect()
logging.error('[AmqpListener] Unexpected connection closed, reconnecting %s/%s' % (i, 4)) logger.info('%s says Bye byes!' % self.COMPONENT_ID)
logging.error(traceback.format_exc())
self.amqp_connect()
logging.info('%s says Bye byes!' % self.COMPONENT_ID)
def publish_message(connection, message): def publish_message(connection, message):
...@@ -189,7 +192,7 @@ def publish_message(connection, message): ...@@ -189,7 +192,7 @@ def publish_message(connection, message):
channel.close() channel.close()
def amqp_request(connection, request_message, component_id, retries=10): def amqp_request(connection, request_message, component_id, retries=10, time_between_retries=0.5, use_message_typing=False):
""" """
Publishes message into the correct topic (uses Message object metadata) Publishes message into the correct topic (uses Message object metadata)
Returns reply message. Returns reply message.
...@@ -203,8 +206,6 @@ def amqp_request(connection, request_message, component_id, retries=10): ...@@ -203,8 +206,6 @@ def amqp_request(connection, request_message, component_id, retries=10):
assert request_message.correlation_id assert request_message.correlation_id
assert retries > 0 assert retries > 0
time_between_requests = 0.5
channel = None channel = None
response = None response = None
...@@ -232,7 +233,7 @@ def amqp_request(connection, request_message, component_id, retries=10): ...@@ -232,7 +233,7 @@ def amqp_request(connection, request_message, component_id, retries=10):
retries_left = retries retries_left = retries
while retries_left > 0: while retries_left > 0:
time.sleep(time_between_requests) time.sleep(time_between_retries)
method, props, body = channel.basic_get(reply_queue_name) method, props, body = channel.basic_get(reply_queue_name)
if method: if method:
channel.basic_ack(method.delivery_tag) channel.basic_ack(method.delivery_tag)
...@@ -241,8 +242,25 @@ def amqp_request(connection, request_message, component_id, retries=10): ...@@ -241,8 +242,25 @@ def amqp_request(connection, request_message, component_id, retries=10):
retries_left -= 1 retries_left -= 1
if retries_left > 0: if retries_left > 0:
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
response = MsgReply(request_message, **body_dict) if use_message_typing:
try:
response = Message.load_from_pika(method, props, body)
if response is None:
raise Exception("Couldnt build message from json %s, rkey: %s " % (body, method.routing_key))
except NonCompliantMessageFormatError as e:
logger.error('amqp_request got a non compliant message error %s' % e)
except Exception as e:
logger.error(e)
logger.error('message received:\n\tr_key: %s\n\t%s' % (method.routing_key, body))
raise e
else:
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
response = MsgReply(request_message, **body_dict)
else: else:
# clean up # clean up
channel.queue_delete(reply_queue_name) channel.queue_delete(reply_queue_name)
...@@ -261,7 +279,6 @@ def amqp_request(connection, request_message, component_id, retries=10): ...@@ -261,7 +279,6 @@ def amqp_request(connection, request_message, component_id, retries=10):
return response return response
if __name__ == '__main__': if __name__ == '__main__':
try: try:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment