Commit 40775c2f authored by Federico Sismondi's avatar Federico Sismondi

Merge branch 'fix_unexpectedframe' into 'master'

patch for amqp lib error UnexpectedFrame

See merge request !27
parents 4161cc12 67534276
......@@ -6,6 +6,8 @@ import logging
from multiprocessing import Process
from utils.messages import Message as EventBusMessage, MsgTestingToolComponentReady as EventBusMessageComponentReady
from amqp.exceptions import UnexpectedFrame
from kombu import Connection, Queue, Exchange, Consumer
from kombu.mixins import ConsumerMixin
......@@ -60,7 +62,7 @@ class BaseConsumer(ConsumerMixin):
def subscribe_to_topics(self, topic_list):
for t in topic_list:
queue = Queue(
name="{name}.{consumer_name}::{rkey}".format(
name="consumer:{name}.{consumer_name}::RKey:{rkey}".format(
name=self.name,
consumer_name=self.consumer_name,
rkey=t
......@@ -86,7 +88,12 @@ class BaseConsumer(ConsumerMixin):
self.log.debug("DEFAULT on_message callback, got: {}".format(message.delivery_info.get('routing_key')))
msg = EventBusMessage.load(json_body, message.delivery_info.get('routing_key'))
self._on_message(msg)
try:
self._on_message(msg)
except UnexpectedFrame as uf_err:
self.log.error(
uf_err.message
)
def _on_message(self, message):
"Class to be overridden by children calss"
......@@ -96,7 +103,7 @@ class BaseConsumer(ConsumerMixin):
# control plane info
for q in self.queues:
self.log.info(
"Listening on {queue_name} bound to {rkey} ".format(queue_name=q.name,
"Queue: {queue_name} bound to: {rkey} ".format(queue_name=q.name,
rkey=q.routing_key)
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment