Commit 604a1c23 authored by Federico Sismondi's avatar Federico Sismondi

more efficient way of using channels

parent 20c3b14d
......@@ -25,23 +25,22 @@ def publish_message(channel, message):
)
def amqp_request(request_message: Message, component_id: str):
def amqp_request(channel, request_message: Message, component_id: str):
# NOTE: channel must be a pika channel
# check first that sender didnt forget about reply to and corr id
assert request_message.reply_to
assert request_message.correlation_id
# setup blocking connection, do not reuse the conection from coord, it needs to be a new one
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
response = None
channel = connection.channel()
reply_queue_name = 'amqp_rpc_%s@%s' % (str(uuid.uuid4())[:8], component_id)
result = channel.queue_declare(queue=reply_queue_name, auto_delete=True)
callback_queue = result.method.queue
# by convention routing key of answer is routing_key + .reply
# bind and listen to reply_to topic
channel.queue_bind(
exchange=AMQP_EXCHANGE,
queue=callback_queue,
......@@ -80,13 +79,14 @@ def amqp_request(request_message: Message, component_id: str):
)
)
# cleaning up
# clean up
channel.queue_delete(reply_queue_name)
connection.close()
return response
if __name__ == '__main__':
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
m = MsgSniffingGetCapture()
r = amqp_request(m, 'someImaginaryComponent')
r = amqp_request(connection.channel(), m, 'someImaginaryComponent')
print(repr(r))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment