Commit 4f403ad5 authored by Federico Sismondi's avatar Federico Sismondi

Merge branch 'rss-integration' into 'master'

Rs/Rss integration

See merge request !16
parents bbdcd088 e69abb41
Pipeline #2365 passed with stage
in 0 seconds
......@@ -25,7 +25,7 @@ def save_pcap_from_base64(filename, pcap_file_base64, dir=None):
def get_from_environment(variable, default):
if variable in os.environ:
v = os.environ.get(variable)
logging.info("Using environment variable %s=%s" % (variable, default))
logging.info("Using environment variable %s=%s" % (variable, v))
else:
v = default
logging.warning("Using default variable %s=%s" % (variable, default))
......
......@@ -24,13 +24,18 @@ class AmqpListener(threading.Thread):
DEFAULT_EXCHAGE = 'amq.topic'
DEFAULT_AMQP_URL = 'amqp://guest:guest@locahost/'
def __init__(self, amqp_url, amqp_exchange, callback, topics=None, use_message_typing=True):
def __init__(self, amqp_url, amqp_exchange, callback, topics=None, use_message_typing=True, pre_declared_queue=None):
self.COMPONENT_ID = 'amqp_listener_%s' % str(uuid.uuid4())[:8]
self.connection = None
self.channel = None
self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID
if pre_declared_queue:
self.services_queue_name = pre_declared_queue
self.queue_is_declared = True
else:
self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID
self.queue_is_declared = False
self.use_message_typing = use_message_typing
threading.Thread.__init__(self)
......@@ -69,15 +74,15 @@ class AmqpListener(threading.Thread):
self.connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
self.channel = self.connection.channel()
# queues & default exchange declaration
self.channel.queue_declare(queue=self.services_queue_name,
auto_delete=True,
arguments={'x-max-length': 200})
for t in self.topics:
self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name,
routing_key=t)
# Declare queue if necessary
if not self.queue_is_declared:
self.channel.queue_declare(queue=self.services_queue_name,
auto_delete=True,
arguments={'x-max-length': 200})
for t in self.topics:
self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name,
routing_key=t)
# Hello world message
m = MsgTestingToolComponentReady(
component=self.COMPONENT_ID,
......@@ -85,6 +90,7 @@ class AmqpListener(threading.Thread):
)
# Send hello world message
self.channel.basic_publish(
body=m.to_json(),
routing_key=m.routing_key,
......@@ -94,6 +100,7 @@ class AmqpListener(threading.Thread):
)
)
# Start consuming
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.services_queue_name)
......
......@@ -2686,6 +2686,78 @@ class MsgVizInitReply(MsgReply):
}
# # # # # # RESULTS STORE SERVICE MESSAGES # # # # # #
class MsgReportSaveRequest(Message):
routing_key = "results_store.session.report.save.request"
_msg_data_template = {
"type": "final",
"data": {}
}
class MsgReportSaveReply(MsgReply):
routing_key = "results_store.session.report.save.reply"
_msg_data_template = {
"ok": True
}
# # # # # # RESULTS STORE MESSAGES # # # # # #
class MsgInsertResultRequest(Message):
routing_key = "results_store.insert_result.request"
_msg_data_template = {
"resources": [],
"owners": [],
"session_id": "",
"testing_tool_id": "",
"timestamp": 0,
"type": "",
"data": {}
}
class MsgInsertResultReply(MsgReply):
routing_key = "results_store.insert_result.reply"
_msg_data_template = {
"ok": True
}
class MsgGetResultRequest(Message):
routing_key = "results_store.get_result.request"
_msg_data_template = {}
class MsgGetResultReply(MsgReply):
routing_key = "results_store.get_result.reply"
_msg_data_template = {
"ok": True,
"results": []
}
class MsgDeleteResultRequest(Message):
routing_key = "results_store.delete_result.request"
_msg_data_template = {}
class MsgDeleteResultReply(MsgReply):
routing_key = "results_store.delete_result.reply"
_msg_data_template = {
"ok": True
}
# attention
rk_pattern_to_message_type_map = RoutingKeyToMessageMap(
{
......@@ -2810,6 +2882,17 @@ rk_pattern_to_message_type_map = RoutingKeyToMessageMap(
"viztool-grafana.set_dashboard.reply": MsgVizDashboardReply,
"viztool-grafana.write_data": MsgVizWrite,
# results-store-service API
"results_store.session.report.save.request": MsgReportSaveRequest, # TestingTool -> RSS
"results_store.session.report.save.reply": MsgReportSaveReply, # RSS -> TestingTool (reply)
# results-store API
"results_store.insert_result.request": MsgInsertResultRequest, # any on / vhost -> RS
"results_store.insert_result.reply": MsgInsertResultReply, # RS -> any on / vhost (reply)
"results_store.get_result.request": MsgGetResultRequest, # any on / vhost -> RS
"results_store.get_result.reply": MsgGetResultReply, # RS -> any on / vhost (reply)
"results_store.delete_result.request": MsgDeleteResultRequest, # any on / vhost -> RS
"results_store.delete_result.reply": MsgDeleteResultReply, # RS -> any on / vhost (reply)
}
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment