Commit 9a227e59 authored by Federico Sismondi's avatar Federico Sismondi

created event_bus_utils, moved AmqpListener to that module

parent 9f9e91be
import os import os
import pika import pika
import logging
# for using it as library and as a __main__ # for using it as library and as a __main__
try: try:
...@@ -14,6 +15,9 @@ AMQP_EXCHANGE = 'amq.topic' ...@@ -14,6 +15,9 @@ AMQP_EXCHANGE = 'amq.topic'
class AmqpSynchCallTimeoutError(Exception): class AmqpSynchCallTimeoutError(Exception):
pass pass
# IMPORTANT deprecate usage of this in favour of event_bus_utils
logging.warning("deprecate usage of this in favour of event_bus_utils.py")
def publish_message(connection, message): def publish_message(connection, message):
""" """
......
import os
import pika
import threading
# for using it as library and as a __main__
try:
from messages import *
except:
from .messages import *
VERSION = '0.0.8'
AMQP_EXCHANGE = 'amq.topic'
class AmqpSynchCallTimeoutError(Exception):
pass
class AmqpListener(threading.Thread):
COMPONENT_ID = 'amqp_listener_%s' % uuid.uuid1()
DEFAULT_EXCHAGE = 'amq.topic'
def __init__(self, amqp_url, amqp_exchange, topics, callback):
threading.Thread.__init__(self)
if callback is None:
self.message_dipatcher = print
else:
self.message_dipatcher = callback
try:
self.connection = pika.BlockingConnection(pika.URLParameters(amqp_url))
self.channel = self.connection.channel()
except pika.exceptions.ProbableAccessDeniedError:
self.message_dipatcher('Probable access denied error. Is provided AMQP_URL correct?')
self.exit()
if amqp_exchange:
self.exchange = amqp_exchange
else:
self.exchange = self.DEFAULT_EXCHAGE
# queues & default exchange declaration
self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.services_queue_name,
auto_delete=True,
arguments={'x-max-length': 200})
if topics: # subscribe only to passed list
for t in topics:
self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name,
routing_key=t)
else: # subscribe to all events
self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name,
routing_key='#')
# Hello world message
m = MsgTestingToolComponentReady(
component=self.COMPONENT_ID,
description="%s is READY" % self.COMPONENT_ID
)
self.channel.basic_publish(
body=m.to_json(),
routing_key=m.routing_key,
exchange=self.exchange,
properties=pika.BasicProperties(
content_type='application/json',
)
)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.services_queue_name)
def stop(self):
self.channel.queue_delete(self.services_queue_name)
self.channel.stop_consuming()
self.connection.close()
def on_request(self, ch, method, props, body):
props_dict = {
'content_type': props.content_type,
'delivery_mode': props.delivery_mode,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
}
m = None
try:
m = Message.from_json(body)
m.update_properties(**props_dict)
m.routing_key = method.routing_key
self.message_dipatcher(m)
except NonCompliantMessageFormatError as e:
self.message_dipatcher('%s got a non compliant message error %s' % (self.__class__.__name__, e))
except Exception as e:
pass
# self.message_dipatcher('Error : %s' % str(e))
# self.message_dipatcher(str(body))
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
def run(self):
self.message_dipatcher("Starting thread listening on the event bus..")
for i in range(1, 4):
try:
self.channel.start_consuming()
except pika.exceptions.ConnectionClosed as err:
self.message_dipatcher(err)
self.message_dipatcher('Unexpected connection closed, retrying %s/%s' % (i, 4))
self.message_dipatcher('Bye byes!')
def publish_message(connection, message):
"""
Publishes message into the correct topic (uses Message object metadata)
Creates temporary channel on it's own
Connection must be a pika.BlockingConnection
"""
channel = None
try:
channel = connection.channel()
properties = pika.BasicProperties(**message.get_properties())
channel.basic_publish(
exchange=AMQP_EXCHANGE,
routing_key=message.routing_key,
properties=properties,
body=message.to_json(),
)
finally:
if channel and channel.is_open:
channel.close()
def amqp_request(connection, request_message, component_id):
"""
Publishes message into the correct topic (uses Message object metadata)
Returns reply message.
Uses reply_to and corr id amqp's properties for matching the reply
Creates temporary channel, and queues on it's own
Connection must be a pika.BlockingConnection
"""
# check first that sender didnt forget about reply to and corr id
assert request_message.reply_to
assert request_message.correlation_id
channel = None
try:
response = None
reply_queue_name = 'amqp_rpc_%s@%s' % (str(uuid.uuid4())[:8], component_id)
channel = connection.channel()
result = channel.queue_declare(queue=reply_queue_name, auto_delete=True)
callback_queue = result.method.queue
# bind and listen to reply_to topic
channel.queue_bind(
exchange=AMQP_EXCHANGE,
queue=callback_queue,
routing_key=request_message.reply_to
)
channel.basic_publish(
exchange=AMQP_EXCHANGE,
routing_key=request_message.routing_key,
properties=pika.BasicProperties(**request_message.get_properties()),
body=request_message.to_json(),
)
time.sleep(0.2)
retries_left = 10
while retries_left > 0:
time.sleep(0.5)
method, props, body = channel.basic_get(reply_queue_name)
if method:
channel.basic_ack(method.delivery_tag)
if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
break
retries_left -= 1
if retries_left > 0:
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
response = MsgReply(request_message, **body_dict)
else:
# clean up
channel.queue_delete(reply_queue_name)
raise AmqpSynchCallTimeoutError(
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
request_message._type
)
)
return response
finally:
if channel and channel.is_open:
# clean up
channel.queue_delete(reply_queue_name)
channel.close()
if __name__ == '__main__':
try:
AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
except KeyError as e:
AMQP_EXCHANGE = "amq.topic"
try:
AMQP_URL = str(os.environ['AMQP_URL'])
print('Env vars for AMQP connection succesfully imported')
except KeyError as e:
AMQP_URL = "amqp://guest:guest@localhost/"
def callback_function(message_received):
print("Callback function received: \n\t" + repr(message_received))
# amqp listener example:
amqp_listener_thread = AmqpListener(
amqp_url=AMQP_URL,
amqp_exchange=AMQP_EXCHANGE,
topics='#',
callback=callback_function)
try:
amqp_listener_thread.start()
except Exception as e:
print(e)
# publish message example
retries_left = 3
while retries_left > 0:
try:
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
m = MsgTest()
publish_message(connection, m)
break
except pika.exceptions.ConnectionClosed:
retries_left -= 1
print('retrying..')
time.sleep(0.2)
# example of a request sent into the bus
m = MsgTestSuiteGetTestCases()
try:
r = amqp_request(connection, m, 'someImaginaryComponent')
print("This is the response I got:\n\t" + repr(r))
except AmqpSynchCallTimeoutError as e:
print("Nobody answered to our request :'(")
...@@ -14,9 +14,11 @@ from prompt_toolkit.history import FileHistory ...@@ -14,9 +14,11 @@ from prompt_toolkit.history import FileHistory
# for using it as library and as a __main__ # for using it as library and as a __main__
try: try:
from messages import * from messages import *
from event_bus_utils import AmqpListener
from tabulate import tabulate from tabulate import tabulate
except: except:
from .messages import * from .messages import *
from .event_bus_utils import AmqpListener
from .tabulate import tabulate from .tabulate import tabulate
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.WARNING) logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.WARNING)
...@@ -85,112 +87,6 @@ def _init_action_suggested(): ...@@ -85,112 +87,6 @@ def _init_action_suggested():
state['suggested_cmd'] = 'ts_start' state['suggested_cmd'] = 'ts_start'
class AmqpListener(threading.Thread):
COMPONENT_ID = 'amqp_listener_%s' % uuid.uuid1()
DEFAULT_EXCHAGE = 'amq.topic'
def __init__(self, amqp_url, amqp_exchange, topics, callback):
threading.Thread.__init__(self)
if callback is None:
self.message_dipatcher = print
else:
self.message_dipatcher = callback
try:
self.connection = pika.BlockingConnection(pika.URLParameters(amqp_url))
self.channel = self.connection.channel()
except pika.exceptions.ProbableAccessDeniedError:
self.message_dipatcher('Probable access denied error. Is provided AMQP_URL correct?')
self.exit()
if amqp_exchange:
self.exchange = amqp_exchange
else:
self.exchange = self.DEFAULT_EXCHAGE
# queues & default exchange declaration
self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.services_queue_name,
auto_delete=True,
arguments={'x-max-length': 200})
if topics: # subscribe only to passed list
for t in topics:
self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name,
routing_key=t)
else: # subscribe to all events
self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name,
routing_key='#')
# Hello world message
m = MsgTestingToolComponentReady(
component=self.COMPONENT_ID,
description="%s is READY" % self.COMPONENT_ID
)
self.channel.basic_publish(
body=m.to_json(),
routing_key=m.routing_key,
exchange=self.exchange,
properties=pika.BasicProperties(
content_type='application/json',
)
)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.services_queue_name)
def stop(self):
self.channel.queue_delete(self.services_queue_name)
self.channel.stop_consuming()
self.connection.close()
def on_request(self, ch, method, props, body):
props_dict = {
'content_type': props.content_type,
'delivery_mode': props.delivery_mode,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
}
m = None
try:
m = Message.from_json(body)
m.update_properties(**props_dict)
m.routing_key = method.routing_key
self.message_dipatcher(m)
except NonCompliantMessageFormatError as e:
self.message_dipatcher('%s got a non compliant message error %s' % (self.__class__.__name__, e))
except Exception as e:
pass
# self.message_dipatcher('Error : %s' % str(e))
# self.message_dipatcher(str(body))
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
def run(self):
self.message_dipatcher("Starting thread listening on the event bus..")
for i in range(1, 4):
try:
self.channel.start_consuming()
except pika.exceptions.ConnectionClosed as err:
self.message_dipatcher(err)
self.message_dipatcher('Unexpected connection closed, retrying %s/%s' % (i, 4))
self.message_dipatcher('Bye byes!')
def amqp_request(channel, request_message, component_id): def amqp_request(channel, request_message, component_id):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment