Commit 67534276 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

patch for amqp lib error UnexpectedFrame

parent 1fb1c9a8
...@@ -6,6 +6,8 @@ import logging ...@@ -6,6 +6,8 @@ import logging
from multiprocessing import Process from multiprocessing import Process
from utils.messages import Message as EventBusMessage, MsgTestingToolComponentReady as EventBusMessageComponentReady from utils.messages import Message as EventBusMessage, MsgTestingToolComponentReady as EventBusMessageComponentReady
from amqp.exceptions import UnexpectedFrame
from kombu import Connection, Queue, Exchange, Consumer from kombu import Connection, Queue, Exchange, Consumer
from kombu.mixins import ConsumerMixin from kombu.mixins import ConsumerMixin
...@@ -60,7 +62,7 @@ class BaseConsumer(ConsumerMixin): ...@@ -60,7 +62,7 @@ class BaseConsumer(ConsumerMixin):
def subscribe_to_topics(self, topic_list): def subscribe_to_topics(self, topic_list):
for t in topic_list: for t in topic_list:
queue = Queue( queue = Queue(
name="{name}.{consumer_name}::{rkey}".format( name="consumer:{name}.{consumer_name}::RKey:{rkey}".format(
name=self.name, name=self.name,
consumer_name=self.consumer_name, consumer_name=self.consumer_name,
rkey=t rkey=t
...@@ -86,7 +88,12 @@ class BaseConsumer(ConsumerMixin): ...@@ -86,7 +88,12 @@ class BaseConsumer(ConsumerMixin):
self.log.debug("DEFAULT on_message callback, got: {}".format(message.delivery_info.get('routing_key'))) self.log.debug("DEFAULT on_message callback, got: {}".format(message.delivery_info.get('routing_key')))
msg = EventBusMessage.load(json_body, message.delivery_info.get('routing_key')) msg = EventBusMessage.load(json_body, message.delivery_info.get('routing_key'))
self._on_message(msg) try:
self._on_message(msg)
except UnexpectedFrame as uf_err:
self.log.error(
uf_err.message
)
def _on_message(self, message): def _on_message(self, message):
"Class to be overridden by children calss" "Class to be overridden by children calss"
...@@ -96,7 +103,7 @@ class BaseConsumer(ConsumerMixin): ...@@ -96,7 +103,7 @@ class BaseConsumer(ConsumerMixin):
# control plane info # control plane info
for q in self.queues: for q in self.queues:
self.log.info( self.log.info(
"Listening on {queue_name} bound to {rkey} ".format(queue_name=q.name, "Queue: {queue_name} bound to: {rkey} ".format(queue_name=q.name,
rkey=q.routing_key) rkey=q.routing_key)
) )
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment