Commit 5a0d2745 authored by Federico Sismondi's avatar Federico Sismondi

extend messaging library usage

parent d958dd54
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
# !/usr/bin/env python3
import logging
from threading import Timer
from coap_testing_tool.test_coordinator.coordinator import *
from coap_testing_tool import AMQP_URL, AMQP_EXCHANGE
from coap_testing_tool import DATADIR,TMPDIR,LOGDIR,TD_DIR
from coap_testing_tool import DATADIR, TMPDIR, LOGDIR, TD_DIR
from coap_testing_tool.utils.rmq_handler import RabbitMQHandler, JsonFormatter
COMPONENT_ID = 'test_coordinator'
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
# init logging to stnd output and log files
......@@ -58,12 +56,12 @@ if __name__ == '__main__':
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
except pika.exceptions.ConnectionClosed as cc:
logger.error(' AMQP cannot be established, is message broker up? \n More: %s' %traceback.format_exc())
logger.error(' AMQP cannot be established, is message broker up? \n More: %s' % traceback.format_exc())
sys.exit(1)
channel = connection.channel()
#in case exchange not declared
# in case exchange not declared
channel.exchange_declare(
exchange=AMQP_EXCHANGE,
type='topic',
......@@ -75,44 +73,36 @@ if __name__ == '__main__':
channel.queue_bind(
exchange=AMQP_EXCHANGE,
queue='bootstrapping',
routing_key='control.session.bootstrap',
routing_key='control.session',
)
# starting verification of the testing tool components
channel.basic_publish(
body=json.dumps({'message': '%s is up!' % COMPONENT_ID, "_type": 'testcoordination.ready'}),
exchange=AMQP_EXCHANGE,
routing_key='control.session.bootstrap',
properties=pika.BasicProperties(
content_type='application/json',
)
msg = MsgTestingToolComponentReady(
component='testcoordination'
)
publish_message(channel, msg)
def on_ready_signal( ch, method, props, body):
def on_ready_signal(ch, method, props, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
# we should only get messages with: ROUTING_KEY: control.session.bootstrap
# assert this, else an exception will be risen after
assert method.routing_key == 'control.session.bootstrap'
event = Message.from_json(body)
event = json.loads(body.decode('utf-8'),object_pairs_hook=OrderedDict)
signal = event['_type']
if isinstance(event, MsgTestingToolComponentReady):
component = event.component
logger.info('ready signals received %s' % component)
if component in TT_check_list:
TT_check_list.remove(component)
return
# final signal generated by coordinator
if signal == "testingtool.ready":
elif isinstance(event, MsgTestingToolReady):
logger.info('all signals processed')
channel.queue_delete('bootstrapping')
return
else:
pass
for s in TT_check_list:
if s in signal:
TT_check_list.remove(s)
return
logger.info('ready signals still not received %s , from %s'%(len(TT_check_list),TT_check_list))
logger.warning('not processed signal %s'%signal)
# bind callback funtion to signal queue
# bind callback function to signal queue
channel.basic_consume(on_ready_signal,
no_ack=False,
queue='bootstrapping')
......@@ -121,6 +111,7 @@ if __name__ == '__main__':
# wait for all testing tool component's signal
timeout = False
def timeout_f():
global timeout
timeout = True
......@@ -128,40 +119,28 @@ if __name__ == '__main__':
t = Timer(READY_SIGNAL_TOUT, timeout_f)
t.start()
while len(TT_check_list)!=0 and not timeout:
while len(TT_check_list) != 0 and not timeout:
time.sleep(0.3)
connection.process_data_events()
if timeout:
logger.error("Some components havent sent READY signal: %s"%str(TT_check_list))
logger.error("Some components havent sent READY signal: %s" % str(TT_check_list))
sys.exit(1)
logger.info('All components ready')
assert len(TT_check_list)==0
channel.basic_publish(
routing_key='control.session.bootstrap',
exchange=AMQP_EXCHANGE,
body=json.dumps(
{
'message': 'All testing tool components are ready!',
"_type": 'testingtool.ready'
}
),
properties=pika.BasicProperties(
content_type='application/json',
)
)
assert len(TT_check_list) == 0
publish_message(channel, MsgTestingToolReady())
# lets start the test suite coordination phase
try:
logger.info('Instantiating coordinator..')
coordinator = Coordinator(connection, TD_COAP, TD_COAP_CFG)
except Exception as e:
# at this level i cannot emit AMQP messages if sth fails
# cannot emit AMQP messages for the fail
error_msg = str(e)
logger.error(' Critical exception found: %s , traceback: %s' %(error_msg,traceback.format_exc()))
logger.error(' Critical exception found: %s , traceback: %s' % (error_msg, traceback.format_exc()))
logger.debug(traceback.format_exc())
sys.exit(1)
......@@ -178,29 +157,29 @@ if __name__ == '__main__':
sys.exit(1)
except KeyboardInterrupt as KI:
#close AMQP connection
# close AMQP connection
connection.close()
sys.exit(1)
except Exception as e:
error_msg = str(e)
logger.error(' Critical exception found: %s, traceback: %s' %(error_msg,traceback.format_exc()))
logger.error(' Critical exception found: %s, traceback: %s' % (error_msg, traceback.format_exc()))
logger.debug(traceback.format_exc())
#lets push the error message into the bus
# lets push the error message into the bus
coordinator.channel.basic_publish(
body = json.dumps({
'traceback':traceback.format_exc(),
'message': error_msg,
'_type': 'testcoordination.error',
}),
exchange = AMQP_EXCHANGE,
routing_key ='control.session.error',
properties=pika.BasicProperties(
content_type='application/json',
)
body=json.dumps({
'traceback': traceback.format_exc(),
'message': error_msg,
'_type': 'testcoordination.error',
}),
exchange=AMQP_EXCHANGE,
routing_key='control.session.error',
properties=pika.BasicProperties(
content_type='application/json',
)
)
#close AMQP connection
# close AMQP connection
connection.close()
sys.exit(1)
\ No newline at end of file
sys.exit(1)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment