Commit d3f83282 authored by Federico Sismondi's avatar Federico Sismondi

some bug fixes in the rpc-like calls

parent 07e4aee7
# -*- coding: utf-8 -*-
# !/usr/bin/env python3
import pika
import time
from coap_testing_tool import AMQP_URL, AMQP_EXCHANGE
from coap_testing_tool.utils.event_bus_messages import *
# timeout in seconds
AMQP_REPLY_TOUT = 10
def publish_message(channel, message):
""" Published which uses message object metadata
......@@ -28,8 +27,8 @@ def publish_message(channel, message):
def amqp_request(request_message: Message, component_id: str):
# check first that sender didnt forget about reply to and corr id
assert (request_message.reply_to)
assert (request_message.correlation_id)
assert request_message.reply_to
assert request_message.correlation_id
# setup blocking connection, do not reuse the conection from coord, it needs to be a new one
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
......@@ -60,11 +59,13 @@ def amqp_request(request_message: Message, component_id: str):
retries_left = 5
while retries_left > 0:
time.sleep(0.5)
method, props, body = channel.basic_get(reply_queue_name)
if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
break
if method:
channel.basic_ack(method.delivery_tag)
if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
break
retries_left -= 1
time.sleep(0.5)
if retries_left > 0:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment