Commit e192645a authored by Matteo Filipponi's avatar Matteo Filipponi

Add RSS messages + Add possibility to use an already declared queue in event_bus_utils.AmqpListener

parent 439a3125
......@@ -25,7 +25,7 @@ def save_pcap_from_base64(filename, pcap_file_base64, dir=None):
def get_from_environment(variable, default):
if variable in os.environ:
v = os.environ.get(variable)
logging.info("Using environment variable %s=%s" % (variable, default))
logging.info("Using environment variable %s=%s" % (variable, v))
else:
v = default
logging.warning("Using default variable %s=%s" % (variable, default))
......
......@@ -24,13 +24,17 @@ class AmqpListener(threading.Thread):
DEFAULT_EXCHAGE = 'amq.topic'
DEFAULT_AMQP_URL = 'amqp://guest:guest@locahost/'
def __init__(self, amqp_url, amqp_exchange, callback, topics=None, use_message_typing=True):
def __init__(self, amqp_url, amqp_exchange, callback, topics=None, use_message_typing=True, queue_name=None):
self.COMPONENT_ID = 'amqp_listener_%s' % str(uuid.uuid4())[:8]
self.connection = None
self.channel = None
self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID
self.queue_is_declared = False
if queue_name:
self.services_queue_name = queue_name
self.queue_is_declared = True
self.use_message_typing = use_message_typing
threading.Thread.__init__(self)
......@@ -69,15 +73,15 @@ class AmqpListener(threading.Thread):
self.connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
self.channel = self.connection.channel()
# queues & default exchange declaration
self.channel.queue_declare(queue=self.services_queue_name,
auto_delete=True,
arguments={'x-max-length': 200})
for t in self.topics:
self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name,
routing_key=t)
# Declare queue if necessary
if not self.queue_is_declared:
self.channel.queue_declare(queue=self.services_queue_name,
auto_delete=True,
arguments={'x-max-length': 200})
for t in self.topics:
self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name,
routing_key=t)
# Hello world message
m = MsgTestingToolComponentReady(
component=self.COMPONENT_ID,
......@@ -85,6 +89,7 @@ class AmqpListener(threading.Thread):
)
# Send hello world message
self.channel.basic_publish(
body=m.to_json(),
routing_key=m.routing_key,
......@@ -94,6 +99,7 @@ class AmqpListener(threading.Thread):
)
)
# Start consuming
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.services_queue_name)
......
......@@ -2664,6 +2664,25 @@ class MsgVizInitReply(MsgReply):
}
# # # # # # RESULTS STORE SERVICE MESSAGES # # # # # #
class MsgReportSaveRequest(Message):
routing_key = "results_store.session.report.save.request"
_msg_data_template = {
"type": "final",
"data": {}
}
class MsgReportSaveReply(MsgReply):
routing_key = "results_store.session.report.save.reply"
_msg_data_template = {
"ok": True
}
# attention
rk_pattern_to_message_type_map = RoutingKeyToMessageMap(
{
......@@ -2788,6 +2807,9 @@ rk_pattern_to_message_type_map = RoutingKeyToMessageMap(
"viztool-grafana.set_dashboard.reply": MsgVizDashboardReply,
"viztool-grafana.write_data": MsgVizWrite,
# results-store-service API
"results_store.session.report.save.request": MsgReportSaveRequest,
"results_store.session.report.save.reply": MsgReportSaveReply
}
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment