Commit 495e8497 authored by Federico Sismondi's avatar Federico Sismondi

Merge branch 'develop' into 'master'

Develop

See merge request !12
parents c91831c6 a0395cfc
......@@ -78,6 +78,8 @@ if(env.JOB_NAME =~ 'coap_testing_tool/'){
sleep 15
pwd
python3 -m pytest tests/test_api.py -vv
sleep 5
sudo -E supervisorctl -c supervisor.conf stop all
'''
}
}
......
Subproject commit 2c2d9f740e11eb53804768ac7ea568b2c346ec43
Subproject commit bf473606fa79fcea1b2a107fa9a88ae119b1ca04
......@@ -10,7 +10,7 @@ python3 -m pytest coap_testing_tool/extended_test_descriptions/tests/tests.py
class PacketRouterTestCase(unittest.TestCase):
def test_yaml_testcase_sintax(self):
def test_yaml_testcase_syntax(self):
imported_tcs = import_teds(TD_COAP)
for tc in imported_tcs:
......@@ -29,7 +29,7 @@ class PacketRouterTestCase(unittest.TestCase):
assert step.type
assert step.description
def test_yaml_testcase_configuration_sintax(self):
def test_yaml_testcase_configuration_syntax(self):
imported_configs = import_teds(TD_COAP_CFG)
for tc_config in imported_configs:
print(tc_config)
......@@ -43,4 +43,4 @@ class PacketRouterTestCase(unittest.TestCase):
if __name__ == '__main__':
c = PacketRouterTestCase()
c.test_yaml_testcase_sintax()
\ No newline at end of file
c.test_yaml_testcase_syntax()
\ No newline at end of file
......@@ -10,6 +10,8 @@ import sys
import logging
from coap_testing_tool.utils.rmq_handler import RabbitMQHandler, JsonFormatter
from coap_testing_tool import AMQP_URL, AMQP_EXCHANGE, AGENT_NAMES, AGENT_TT_ID
from coap_testing_tool.utils.amqp_synch_call import publish_message
from coap_testing_tool.utils.event_bus_messages import *
COMPONENT_ID = 'packet_router'
# init logging to stnd output and log files
......@@ -26,15 +28,16 @@ rabbitmq_handler.setFormatter(json_formatter)
logger.addHandler(rabbitmq_handler)
logger.setLevel(logging.DEBUG)
class PacketRouter(threading.Thread):
AGENT_1_ID = AGENT_NAMES[0]
AGENT_2_ID = AGENT_NAMES[1]
AGENT_TT_ID = AGENT_TT_ID
def __init__(self, conn, routing_table = None):
def __init__(self, conn, routing_table=None):
threading.Thread.__init__(self)
logger.info("Imported agent names of the test session: %s" %str(AGENT_NAMES))
logger.info("Imported agent names of the test session: %s" % str(AGENT_NAMES))
if routing_table:
self.routing_table = routing_table
......@@ -43,21 +46,21 @@ class PacketRouter(threading.Thread):
# agent_TT is the agent instantiated by the testing tool
self.routing_table = {
# first two entries is for a user to user setup
'data.tun.fromAgent.%s'%PacketRouter.AGENT_1_ID:
'data.tun.fromAgent.%s' % PacketRouter.AGENT_1_ID:
[
'data.tun.toAgent.%s'%PacketRouter.AGENT_2_ID,
'data.tun.toAgent.%s'%PacketRouter.AGENT_TT_ID
'data.tun.toAgent.%s' % PacketRouter.AGENT_2_ID,
'data.tun.toAgent.%s' % PacketRouter.AGENT_TT_ID
],
'data.tun.fromAgent.%s'%PacketRouter.AGENT_2_ID:
'data.tun.fromAgent.%s' % PacketRouter.AGENT_2_ID:
[
'data.tun.toAgent.%s'%PacketRouter.AGENT_1_ID,
'data.tun.toAgent.%s'%PacketRouter.AGENT_TT_ID
'data.tun.toAgent.%s' % PacketRouter.AGENT_1_ID,
'data.tun.toAgent.%s' % PacketRouter.AGENT_TT_ID
],
# entry for a user to automated iut setup (doesnt create any conflict with the previous ones)
'data.tun.fromAgent.%s'%PacketRouter.AGENT_TT_ID:
'data.tun.fromAgent.%s' % PacketRouter.AGENT_TT_ID:
[
'data.tun.toAgent.%s'%PacketRouter.AGENT_1_ID
'data.tun.toAgent.%s' % PacketRouter.AGENT_1_ID
],
}
......@@ -71,20 +74,16 @@ class PacketRouter(threading.Thread):
self.channel = self.connection.channel()
queue_name = 'data_packets_queue@%s' % COMPONENT_ID
self.channel.queue_declare(queue=queue_name, auto_delete = True )
self.channel.queue_declare(queue=queue_name, auto_delete=True)
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=queue_name,
routing_key='data.tun.fromAgent.#')
queue=queue_name,
routing_key='data.tun.fromAgent.#')
self.channel.basic_publish(
body=json.dumps({'message': '%s is up!' % COMPONENT_ID, "_type": 'packetrouting.ready'}),
exchange=AMQP_EXCHANGE,
routing_key='control.session.bootstrap',
properties=pika.BasicProperties(
content_type='application/json',
)
msg = MsgTestingToolComponentReady(
component='packetrouting'
)
publish_message(self.channel, msg)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=queue_name)
......@@ -94,40 +93,40 @@ class PacketRouter(threading.Thread):
def on_request(self, ch, method, props, body):
# obj hook so json.loads respects the order of the fields sent -just for visualization purposeses-
body_dict = json.loads(body.decode('utf-8'),object_pairs_hook=OrderedDict)
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.debug("Message sniffed: %s" %(body_dict['_type']))
logger.debug("Message sniffed: %s" % (body_dict['_type']))
self.message_count += 1
print('\n* * * * * * MESSAGE SNIFFED (%s) * * * * * * *'%self.message_count)
print("TIME: %s"%datetime.datetime.time(datetime.datetime.now()))
print('\n* * * * * * MESSAGE SNIFFED (%s) * * * * * * *' % self.message_count)
print("TIME: %s" % datetime.datetime.time(datetime.datetime.now()))
print(" - - - ")
print("ROUTING_KEY: %s" % method.routing_key)
print(" - - - ")
print("HEADERS: %s" % props.headers)
print(" - - - ")
print("PROPS: %s" %json.dumps(
{
'content_type' : props.content_type,
'content_encoding' : props.content_encoding,
'headers' : props.headers,
'delivery_mode' : props.delivery_mode,
'priority' : props.priority,
'correlation_id' : props.correlation_id,
'reply_to' : props.reply_to,
'expiration' : props.expiration,
'message_id' : props.message_id,
'timestamp' : props.timestamp,
'user_id' : props.user_id,
'app_id' : props.app_id,
'cluster_id' : props.cluster_id,
}
)
print("PROPS: %s" % json.dumps(
{
'content_type': props.content_type,
'content_encoding': props.content_encoding,
'headers': props.headers,
'delivery_mode': props.delivery_mode,
'priority': props.priority,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'expiration': props.expiration,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
'cluster_id': props.cluster_id,
}
)
)
print(" - - - ")
print('BODY %s' % json.dumps(body_dict))
print(" - - - ")
#print("ERRORS: %s" % )
# print("ERRORS: %s" % )
print('* * * * * * * * * * * * * * * * * * * * * \n')
# let's route the message to the right agent
......@@ -152,27 +151,25 @@ class PacketRouter(threading.Thread):
)
)
logger.info("Routing packet (%d) from topic: %s to topic: %s"%(self.message_count,src_rkey,dst_rkey))
logger.info(
"Routing packet (%d) from topic: %s to topic: %s" % (self.message_count, src_rkey, dst_rkey))
else:
logger.error('No known route for r_key source: {r_key}'.format(r_key=src_rkey))
return
def run(self):
self.channel.start_consuming()
logger.info('Bye byes!')
###############################################################################
if __name__ == '__main__':
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
channel = connection.channel()
def signal_int_handler(channel):
# FINISHING... let's send a goodby message
msg = {
......@@ -197,14 +194,10 @@ if __name__ == '__main__':
# in case its not declared
connection.channel().exchange_declare(exchange=AMQP_EXCHANGE,
type='topic',
durable=True,
)
type='topic',
durable=True,
)
# start amqp router thread
r = PacketRouter(connection,None)
r = PacketRouter(connection, None)
r.start()
......@@ -8,6 +8,7 @@ import sys
import base64
import traceback
import pika
import logging
from coap_testing_tool.utils.amqp_synch_call import publish_message
from coap_testing_tool import TMPDIR, DATADIR, LOGDIR, AMQP_EXCHANGE, AMQP_URL
from coap_testing_tool.utils.rmq_handler import RabbitMQHandler, JsonFormatter
......@@ -278,14 +279,11 @@ if __name__ == '__main__':
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='services_queue@%s' % COMPONENT_ID)
channel.basic_publish(
body=json.dumps({'message': '%s is up!' % COMPONENT_ID, "_type": 'sniffing.ready'}),
exchange=AMQP_EXCHANGE,
routing_key='control.session.bootstrap',
properties=pika.BasicProperties(
content_type='application/json',
)
msg = MsgTestingToolComponentReady(
component = 'sniffing'
)
publish_message(channel, msg )
try:
logger.info("Awaiting AMQP requests on topic: control.sniffing.service")
......
test_analysis_tool @ d11fb36e
Subproject commit 6bd88d5cbe6834184ff132252b49f882b684099f
Subproject commit d11fb36eab976509780d4d33b17b763863369e6a
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
# !/usr/bin/env python3
import logging
from threading import Timer
from coap_testing_tool.test_coordinator.coordinator import *
from coap_testing_tool import AMQP_URL, AMQP_EXCHANGE
from coap_testing_tool import DATADIR,TMPDIR,LOGDIR,TD_DIR
from coap_testing_tool import DATADIR, TMPDIR, LOGDIR, TD_DIR
from coap_testing_tool.utils.rmq_handler import RabbitMQHandler, JsonFormatter
COMPONENT_ID = 'test_coordinator'
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
# init logging to stnd output and log files
......@@ -58,12 +57,12 @@ if __name__ == '__main__':
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
except pika.exceptions.ConnectionClosed as cc:
logger.error(' AMQP cannot be established, is message broker up? \n More: %s' %traceback.format_exc())
logger.error(' AMQP cannot be established, is message broker up? \n More: %s' % traceback.format_exc())
sys.exit(1)
channel = connection.channel()
#in case exchange not declared
# in case exchange not declared
channel.exchange_declare(
exchange=AMQP_EXCHANGE,
type='topic',
......@@ -75,44 +74,36 @@ if __name__ == '__main__':
channel.queue_bind(
exchange=AMQP_EXCHANGE,
queue='bootstrapping',
routing_key='control.session.bootstrap',
routing_key='control.session',
)
# starting verification of the testing tool components
channel.basic_publish(
body=json.dumps({'message': '%s is up!' % COMPONENT_ID, "_type": 'testcoordination.ready'}),
exchange=AMQP_EXCHANGE,
routing_key='control.session.bootstrap',
properties=pika.BasicProperties(
content_type='application/json',
)
msg = MsgTestingToolComponentReady(
component='testcoordination'
)
publish_message(channel, msg)
def on_ready_signal( ch, method, props, body):
def on_ready_signal(ch, method, props, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
# we should only get messages with: ROUTING_KEY: control.session.bootstrap
# assert this, else an exception will be risen after
assert method.routing_key == 'control.session.bootstrap'
event = Message.from_json(body)
event = json.loads(body.decode('utf-8'),object_pairs_hook=OrderedDict)
signal = event['_type']
if isinstance(event, MsgTestingToolComponentReady):
component = event.component
logger.info('ready signals received %s' % component)
if component in TT_check_list:
TT_check_list.remove(component)
return
# final signal generated by coordinator
if signal == "testingtool.ready":
elif isinstance(event, MsgTestingToolReady):
logger.info('all signals processed')
channel.queue_delete('bootstrapping')
return
else:
pass
for s in TT_check_list:
if s in signal:
TT_check_list.remove(s)
return
logger.info('ready signals still not received %s , from %s'%(len(TT_check_list),TT_check_list))
logger.warning('not processed signal %s'%signal)
# bind callback funtion to signal queue
# bind callback function to signal queue
channel.basic_consume(on_ready_signal,
no_ack=False,
queue='bootstrapping')
......@@ -121,6 +112,7 @@ if __name__ == '__main__':
# wait for all testing tool component's signal
timeout = False
def timeout_f():
global timeout
timeout = True
......@@ -128,40 +120,28 @@ if __name__ == '__main__':
t = Timer(READY_SIGNAL_TOUT, timeout_f)
t.start()
while len(TT_check_list)!=0 and not timeout:
while len(TT_check_list) != 0 and not timeout:
time.sleep(0.3)
connection.process_data_events()
if timeout:
logger.error("Some components havent sent READY signal: %s"%str(TT_check_list))
logger.error("Some components havent sent READY signal: %s" % str(TT_check_list))
sys.exit(1)
logger.info('All components ready')
assert len(TT_check_list)==0
channel.basic_publish(
routing_key='control.session.bootstrap',
exchange=AMQP_EXCHANGE,
body=json.dumps(
{
'message': 'All testing tool components are ready!',
"_type": 'testingtool.ready'
}
),
properties=pika.BasicProperties(
content_type='application/json',
)
)
assert len(TT_check_list) == 0
publish_message(channel, MsgTestingToolReady())
# lets start the test suite coordination phase
try:
logger.info('Instantiating coordinator..')
coordinator = Coordinator(connection, TD_COAP, TD_COAP_CFG)
except Exception as e:
# at this level i cannot emit AMQP messages if sth fails
# cannot emit AMQP messages for the fail
error_msg = str(e)
logger.error(' Critical exception found: %s , traceback: %s' %(error_msg,traceback.format_exc()))
logger.error(' Critical exception found: %s , traceback: %s' % (error_msg, traceback.format_exc()))
logger.debug(traceback.format_exc())
sys.exit(1)
......@@ -178,29 +158,29 @@ if __name__ == '__main__':
sys.exit(1)
except KeyboardInterrupt as KI:
#close AMQP connection
# close AMQP connection
connection.close()
sys.exit(1)
except Exception as e:
error_msg = str(e)
logger.error(' Critical exception found: %s, traceback: %s' %(error_msg,traceback.format_exc()))
logger.error(' Critical exception found: %s, traceback: %s' % (error_msg, traceback.format_exc()))
logger.debug(traceback.format_exc())
#lets push the error message into the bus
# lets push the error message into the bus
coordinator.channel.basic_publish(
body = json.dumps({
'traceback':traceback.format_exc(),
'message': error_msg,
'_type': 'testcoordination.error',
}),
exchange = AMQP_EXCHANGE,
routing_key ='session.error',
properties=pika.BasicProperties(
content_type='application/json',
)
body=json.dumps({
'traceback': traceback.format_exc(),
'message': error_msg,
'_type': 'testcoordination.error',
}),
exchange=AMQP_EXCHANGE,
routing_key='control.session.error',
properties=pika.BasicProperties(
content_type='application/json',
)
)
#close AMQP connection
# close AMQP connection
connection.close()
sys.exit(1)
\ No newline at end of file
sys.exit(1)
......@@ -385,10 +385,11 @@ class TestCase:
d = OrderedDict()
d['testcase_id'] = self.id
d['testcase_ref'] = self.uri
if verbose:
d['testcase_ref'] = self.uri
d['objective'] = self.objective
d['pre_conditions'] = self.pre_conditions
d['state'] = self.state
return d
......@@ -604,24 +605,27 @@ class Coordinator:
def notify_step_to_execute(self):
step_info_dict = self.current_tc.current_step.to_dict(verbose=True)
tc_info_dict = self.current_tc.to_dict(verbose=False)
event = MsgStepExecute(
message='Next test step to be executed is %s' % self.current_tc.current_step.id,
**step_info_dict
**step_info_dict,
**tc_info_dict,
)
publish_message(self.channel, event)
def notify_testcase_finished(self):
tc_info_dict = self.current_tc.to_dict(verbose=True)
tc_info_dict = self.current_tc.to_dict(verbose=False)
event = MsgTestCaseFinished(
testcase_id = self.current_tc.id,
message='Testcase %s finished' % self.current_tc.id,
message='Testcase %s finished' % tc_info_dict['testcase_id'],
**tc_info_dict
)
publish_message(self.channel, event)
def notify_testcase_verdict(self):
event = MsgTestCaseVerdict(
**self.current_tc.report,
**self.current_tc.to_dict(verbose=True)
**self.current_tc.to_dict(verbose=False)
)
publish_message(self.channel, event)
......@@ -669,14 +673,22 @@ class Coordinator:
with open(json_file, 'w') as f:
f.write(event.to_json())
def notify_current_configuration(self, config_id, node, message):
# TODO get configuration id , node and message from self, and not as param
event = MsgTestCaseConfiguration(
configuration_id=config_id,
node=node,
message=message
)
publish_message(self.channel, event)
def notify_current_configuration(self):
tc_info_dict = self.current_tc.to_dict(verbose=False)
config_id = self.current_tc.configuration_id
config = self.tc_configs[config_id] # Configuration object
for desc in config.description:
message = desc['message']
node = desc['node']
event = MsgTestCaseConfiguration(
configuration_id=config_id,
node=node,
message=message,
**tc_info_dict,
)
publish_message(self.channel, event)
def call_service_sniffer_start(self, **kwargs):
......@@ -1033,13 +1045,11 @@ class Coordinator:
def get_testcases_list(self):
return list(self.teds.keys())
# def select_testcases(self, tc_id):
def select_testcase(self, params):
"""
this is more like a jump to function rather than select
:param params: test case id
:return: dict repr of the selected testcase if found
:return: current testcase object
:raises: CoordinatorError when test case not found
"""
tc_id = params
......@@ -1048,7 +1058,7 @@ class Coordinator:
# in case is was already executed once
self.current_tc.reinit()
logger.debug("Test case selected to be executed: %s" % self.current_tc.id)
return self.current_tc.to_dict(verbose=True)
return self.current_tc
else:
logger.error("%s not found in : %s " % (tc_id, self.teds))
raise CoordinatorError('Testcase not found')
......@@ -1062,7 +1072,6 @@ class Coordinator:
# resets all previously executed TC
for tc in self.teds.values():
tc.reinit()
# init testcase if None
if self.current_tc is None:
# so that we start back from the first
......@@ -1093,19 +1102,12 @@ class Coordinator:
self.current_tc.change_state('executing')
# # # CONFIGURATION PHASE # # #
config_id = self.current_tc.configuration_id
config = self.tc_configs[config_id]
# notify each IUT/user about the current config
# TODO do we need a confirmation for this?
for desc in config.description:
message = desc['message']
node = desc['node']
self.notify_current_configuration(config_id, node, message)
# send the configuration events to each node
self.notify_current_configuration()
# start sniffing each link
# TODO this is still not handled by sniffer, for the time being sniffer only supports sniffing the tun interface
config = self.tc_configs[self.current_tc.configuration_id]
for link in config.topology:
filter_proto = link['capture_filter']
link_id = link['link_id']
......
......@@ -2,7 +2,7 @@
"_type": "testsuite.manifest",
"testing_tool":"CoAP Testing Tool",
"version":"0.0.5",
"tests_type": ["interoperability","conformance"],
"test_types": ["interoperability","conformance"],
"protocols_under_test": ["CoAP", "CoAP_CORE", "CoAP_OBS"],
"protocols_info": [
{
......
......@@ -4,6 +4,7 @@
from coap_testing_tool.utils.event_bus_messages import *
from tests.database_pcap_base64 import *
from urllib.parse import urlparse
import logging
import unittest
import pika
......@@ -155,12 +156,12 @@ class ApiTests(unittest.TestCase):
routing_key='log.error.*')
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=errors_queue_name,
routing_key='session.error')
routing_key='control.session.error')
# for getting the terminate signal
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=errors_queue_name,
routing_key=MsgSessionTerminate.routing_key)
routing_key=MsgTestingToolTerminate.routing_key)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(check_for_bus_error, queue=errors_queue_name)
......@@ -179,7 +180,7 @@ class ApiTests(unittest.TestCase):
# prepare the message generator
messages = [] # list of messages to send
messages += user_sequence
messages.append(MsgSessionTerminate()) # message that triggers stop_generator_signal
messages.append(MsgTestingToolTerminate()) # message that triggers stop_generator_signal
thread_msg_gen = MessageGenerator(AMQP_URL, AMQP_EXCHANGE, messages)
logger.debug("Starting Message Generator thread ")
......@@ -215,7 +216,7 @@ class ApiTests(unittest.TestCase):
ch.basic_ack(delivery_tag=method.delivery_tag)
if msg_type == 'session.terminate':
if msg_type == 'testingtool.terminate':
ch.stop_consuming()
return
......@@ -246,7 +247,7 @@ class ApiTests(unittest.TestCase):
# for getting the terminate signal
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=services_queue_name,
routing_key=MsgSessionTerminate.routing_key)
routing_key=MsgTestingToolTerminate.routing_key)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(check_for_correlated_request_reply, queue=services_queue_name)
......@@ -254,7 +255,7 @@ class ApiTests(unittest.TestCase):
# prepare the message generator
messages = [] # list of messages to send
messages += service_api_calls
messages.append(MsgSessionTerminate()) # message that triggers stop_generator_signal
messages.append(MsgTestingToolTerminate()) # message that triggers stop_generator_signal
thread_msg_gen = MessageGenerator(AMQP_URL, AMQP_EXCHANGE, messages)
logger.debug("[%s] Starting Message Generator thread " % sys._getframe().f_code.co_name)
......@@ -326,7 +327,7 @@ def check_for_bus_error(ch, method, props, body):
try:
m = Message.from_json(body)
if isinstance(m, MsgSessionTerminate):
if isinstance(m, MsgTestingToolTerminate):
ch.stop_consuming()
return
except:
......@@ -387,7 +388,7 @@ def validate_message(ch, method, props, body):
if req_body_dict['_type'] in list_of_messages_to_check:
m = Message.from_json(body)
try:
if isinstance(m, MsgSessionTerminate):
if isinstance(m, MsgTestingToolTerminate):
ch.stop_consuming()
stop_generator()
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment