Commit bbb4973d authored by Federico Sismondi's avatar Federico Sismondi
Browse files

Merge branch 'generic_session_config_message' into 'develop'

Generic session config message

See merge request !41
parents dd86f962 775c51fd
Pipeline #1406 passed with stage
in 0 seconds
......@@ -13,7 +13,7 @@ import signal
import logging
import threading
from coap_testing_tool.utils.event_bus_messages import *
from coap_testing_tool.utils.messages import *
from coap_testing_tool.utils.amqp_synch_call import publish_message
from coap_testing_tool import AMQP_URL, AMQP_EXCHANGE, INTERACTIVE_SESSION, RESULTS_DIR
......
Subproject commit 591ade863018eada4256898ef18b2acb52b1a876
Subproject commit accc772ff99cc70117cd8beff56b972a4b9b05f7
......@@ -100,7 +100,7 @@
"mandatory": false
},
{
"field_name": "testsuite.reference_iut",
"field_name": "testsuite.additional_session_resource",
"description": "Reference implementation (mandatory)",
"type": "selection",
"value": [
......
......@@ -99,7 +99,7 @@
"mandatory": false
},
{
"field_name": "testsuite.automated_iut",
"field_name": "testsuite.additional_session_resource",
"description": "Automated-IUT selection (optional)",
"type": "selection",
"value": [
......
......@@ -8,7 +8,7 @@ import sys
import logging
from coap_testing_tool.utils.rmq_handler import RabbitMQHandler, JsonFormatter
from coap_testing_tool import AMQP_URL, AMQP_EXCHANGE, AGENT_NAMES, AGENT_TT_ID
from coap_testing_tool.utils.event_bus_messages import *
from coap_testing_tool.utils.messages import *
from coap_testing_tool.utils.amqp_synch_call import publish_message
COMPONENT_ID = 'packet_router'
......
......@@ -12,7 +12,7 @@ import logging
from coap_testing_tool.utils.amqp_synch_call import publish_message
from coap_testing_tool import TMPDIR, DATADIR, LOGDIR, AMQP_EXCHANGE, AMQP_URL
from coap_testing_tool.utils.rmq_handler import RabbitMQHandler, JsonFormatter
from coap_testing_tool.utils.event_bus_messages import *
from coap_testing_tool.utils.messages import *
COMPONENT_ID = 'packet_sniffer'
last_capture_name = None
......
test_analysis_tool @ 7b3dd52e
Subproject commit c2d2bf936d3be196b670f9e4be6a521551cf3452
Subproject commit 7b3dd52e6a2df0fa7eedcdfea25d6205ab46a929
......@@ -17,7 +17,7 @@ from coap_testing_tool import TD_COAP, TD_COAP_CFG, TD_6LOWPAN, TD_6LOWPAN_CFG
from coap_testing_tool import DATADIR, TMPDIR, LOGDIR, TD_DIR, RESULTS_DIR, PCAP_DIR
from coap_testing_tool.utils.rmq_handler import RabbitMQHandler, JsonFormatter
from coap_testing_tool.utils.amqp_synch_call import publish_message
from coap_testing_tool.utils.event_bus_messages import MsgTestingToolReady, MsgTestingToolComponentReady, Message
from coap_testing_tool.utils.messages import MsgTestingToolReady, MsgTestingToolComponentReady, Message
from coap_testing_tool.test_coordinator.states_machine import Coordinator
COMPONENT_ID = 'test_coordinator'
......
......@@ -13,7 +13,7 @@ from coap_testing_tool import TMPDIR, TD_DIR, PCAP_DIR, RESULTS_DIR, AGENT_NAMES
from coap_testing_tool.utils.amqp_synch_call import publish_message, amqp_request
from coap_testing_tool.utils.rmq_handler import RabbitMQHandler, JsonFormatter
from coap_testing_tool.utils.exceptions import CoordinatorError
from coap_testing_tool.utils.event_bus_messages import *
from coap_testing_tool.utils.messages import *
# TODO these VARs need to come from the session orchestrator + test configuratio files
# TODO get filter from config of the TEDs
......@@ -101,14 +101,16 @@ class CoordinatorAmqpInterface(object):
# callbacks to state_machine transitions (see transitions table)
self.control_events_triggers = {
MsgSessionConfiguration: 'configure_testsuite',
MsgInteropSessionConfiguration: 'configure_testsuite',
# TODO deprecate this, use generic MsgSessionConfiguration
MsgConfigurationExecuted: 'iut_configuration_executed',
MsgTestCaseStart: 'start_testcase',
MsgStepStimuliExecuted: 'step_executed',
MsgStepVerifyExecuted: 'step_executed',
MsgStepCheckExecuted: 'step_executed',
MsgTestCaseSelect: 'select_testcase',
MsgTestSuiteStart: 'start_testsuite',
MsgStepCheckExecuted: 'step_executed',
MsgTestCaseSkip: 'skip_testcase',
}
......@@ -212,10 +214,8 @@ class CoordinatorAmqpInterface(object):
# # # FSM coordination publish/notify functions # # #
def notify_testsuite_configured(self, received_event):
configs = dict()
configs.update({'tc_list': self.testsuite.get_testsuite_configuration()})
event = MsgTestingToolConfigured(
**configs
**self.testsuite.get_testsuite_configuration()
)
publish_message(self.connection, event)
......@@ -293,7 +293,8 @@ class CoordinatorAmqpInterface(object):
)
#
elif step_info_dict['step_type'] == "check" or step_info_dict['step_type'] == "feature":
raise NotImplementedError()
logger.warning('CMD Step check or CMD step very not yet implemented')
return # not implemented
publish_message(self.connection, event)
......@@ -317,7 +318,7 @@ class CoordinatorAmqpInterface(object):
for agent, assigned_ip, ipv6_no_fw in agents_config:
bootstrap_agent.bootstrap(AMQP_URL, AMQP_EXCHANGE, agent, assigned_ip, "bbbb", ipv6_no_fw)
def notify_testsuite_ready(self,received_event):
def notify_testsuite_ready(self, received_event):
pass
def notify_testsuite_started(self, received_event):
......
......@@ -8,12 +8,12 @@ from time import sleep
from urllib.parse import urlparse
from transitions import Machine
from transitions.extensions.states import add_state_features, Tags, Timeout
from transitions.core import MachineError
from transitions.extensions.states import add_state_features, Tags, Timeout
from coap_testing_tool import TMPDIR, TD_DIR, PCAP_DIR, RESULTS_DIR, AGENT_NAMES, AGENT_TT_ID
from coap_testing_tool import TMPDIR, TD_DIR, PCAP_DIR, RESULTS_DIR, AGENT_NAMES, AGENT_TT_ID, AMQP_URL, AMQP_EXCHANGE
from coap_testing_tool.utils.amqp_synch_call import *
from coap_testing_tool.utils.event_bus_messages import *
from coap_testing_tool.utils.messages import *
from coap_testing_tool.utils.rmq_handler import RabbitMQHandler, JsonFormatter
from coap_testing_tool.utils.exceptions import CoordinatorError
from coap_testing_tool.test_coordinator.amqp_connector import CoordinatorAmqpInterface
......@@ -119,31 +119,61 @@ class Coordinator(CoordinatorAmqpInterface):
self.notify_tun_interfaces_start(received_event)
def handle_testsuite_start(self, received_event):
self.testsuite.start(self.tc_list_requested)
self.testsuite.reinit()
def handle_testsuite_config(self, received_event):
tc_list_requested = []
session_config = received_event.to_dict()
session_tc_list = []
session_id = None
session_users = None
session_config = None
logging.info(" Interop session configuration received : %s" % session_config)
logging.info(" Interop session configuration received : %s" % received_event)
try:
for test in received_event.tests:
test_url = urlparse(test['testcase_ref'])
tc_id = str(test_url.path).lstrip("/tests/")
tc_list_requested.append(tc_id)
if isinstance(received_event, MsgInteropSessionConfiguration):
# TODO deprecate this in favour of MsgSessionConfiguration
try:
for test in received_event.tests:
test_url = urlparse(test['testcase_ref'])
tc_id = str(test_url.path).lstrip("/tests/")
session_tc_list.append(tc_id)
except Exception as e:
error_msg = "Wrong message format sent for session configuration."
raise CoordinatorError(message=error_msg)
except Exception as e:
error_msg = "Wrong message format sent for session configuration."
raise CoordinatorError(message=error_msg)
try:
session_id = received_event.session_id
session_users = received_event.users
session_config = received_event.configuration
except:
logger.warning("Missing fields in message configuration: %s" % received_event)
self.testsuite.configure_test_suite(tc_list_requested)
self.tc_list_requested = tc_list_requested
self.session_config = session_config
self.testsuite.configure_testsuite(session_tc_list, session_id, session_users, session_config)
elif isinstance(received_event, MsgSessionConfiguration):
try:
event_tc_list = received_event.configuration['testsuite.testcases']
assert type(event_tc_list) is list, 'Testcases list expected'
for t in event_tc_list:
test_url = urlparse(t)
session_tc_list.append(str(test_url.path).lstrip("/tests/"))
except Exception as e:
error_msg = "Wrong message format sent for session configuration."
raise CoordinatorError(message=error_msg)
try:
session_id = received_event.session_id
session_users = received_event.users
session_config = received_event.configuration
except:
logger.warning("Missing fields in message configuration: %s" % received_event)
self.testsuite.configure_testsuite(session_tc_list, session_id, session_users, session_config)
def handle_step_executed(self, received_event):
logger.info("Handling step executed %s" % type(received_event))
if isinstance(received_event, MsgStepStimuliExecuted):
self.testsuite.finish_stimuli_step()
elif isinstance(received_event, MsgStepCheckExecuted):
......@@ -182,7 +212,7 @@ class Coordinator(CoordinatorAmqpInterface):
def get_states_summary(self):
states = self.testsuite.states_summary()
states.update({'tc_list': self.testsuite.get_testsuite_configuration()})
states.update(self.testsuite.get_testsuite_configuration())
return states
def finish_testcase(self):
......@@ -387,6 +417,7 @@ class Coordinator(CoordinatorAmqpInterface):
CoordinatorError('Sniffer couldnt be started')
def _prepare_next_testcase(self, received_event):
logger.info('Preparing next testcase..')
testcase_to_execute = None
......@@ -412,6 +443,7 @@ class Coordinator(CoordinatorAmqpInterface):
self.trigger('_finish_testsuite', None)
def _prepare_next_step(self, received_event):
logger.info('Preparing next step..')
if self.testsuite.next_step():
self.testsuite.set_current_testcase_state('executing')
......@@ -720,6 +752,14 @@ if __name__ == '__main__':
"""
select testcases, then skip all
"""
default_configuration = {
"testsuite.testcases": [
"http://doc.f-interop.eu/tests/TD_COAP_CORE_01",
"http://doc.f-interop.eu/tests/TD_COAP_CORE_02",
"http://doc.f-interop.eu/tests/TD_COAP_CORE_03"
]
}
logger.setLevel(logging.DEBUG)
from coap_testing_tool import TD_COAP_CFG, TD_COAP
......@@ -735,7 +775,7 @@ if __name__ == '__main__':
test_coordinator.bootstrap()
assert test_coordinator.state == 'waiting_for_testsuite_config'
test_coordinator.configure_testsuite(MsgInteropSessionConfiguration())
test_coordinator.configure_testsuite(MsgSessionConfiguration(configuration=default_configuration))
assert test_coordinator.state != 'waiting_for_testcase_start'
test_coordinator.start_testsuite(MsgTestSuiteStart())
......
import unittest, logging, os, pika, json
from time import sleep
from coap_testing_tool.utils.event_bus_messages import *
from coap_testing_tool.utils.messages import *
from coap_testing_tool import AMQP_URL, AMQP_EXCHANGE, TD_COAP_CFG, TD_COAP
from coap_testing_tool.test_coordinator.testsuite import import_teds
from coap_testing_tool.test_coordinator.states_machine import Coordinator
......@@ -9,11 +9,20 @@ COMPONENT_ID = '%s|%s' % ('test_coordinator', 'unitesting')
# init logging to stnd output and log files
logger = logging.getLogger(COMPONENT_ID)
default_configuration = {
"testsuite.testcases": [
"http://doc.f-interop.eu/tests/TD_COAP_CORE_01",
"http://doc.f-interop.eu/tests/TD_COAP_CORE_02",
"http://doc.f-interop.eu/tests/TD_COAP_CORE_03"
]
}
class CoordinatorStateMachineTests(unittest.TestCase):
"""
python3 -m unittest coap_testing_tool.test_coordinator.tests.tests.CoordinatorStateMachineTests
"""
def setUp(self):
logger.setLevel(logging.DEBUG)
from coap_testing_tool import TD_COAP_CFG, TD_COAP
......@@ -27,7 +36,7 @@ class CoordinatorStateMachineTests(unittest.TestCase):
assert self.test_coordinator.state == 'waiting_for_testsuite_config'
self.test_coordinator.configure_testsuite(MsgInteropSessionConfiguration())
self.test_coordinator.configure_testsuite(MsgSessionConfiguration(configuration=default_configuration))
assert self.test_coordinator.state != 'waiting_for_testcase_start'
self.test_coordinator.start_testsuite(MsgTestSuiteStart())
......@@ -93,7 +102,7 @@ class CoordinatorStateMachineTests(unittest.TestCase):
assert self.test_coordinator.state == 'waiting_for_testsuite_config'
self.test_coordinator.configure_testsuite(MsgInteropSessionConfiguration())
self.test_coordinator.configure_testsuite(MsgSessionConfiguration(configuration=default_configuration))
assert self.test_coordinator.state != 'waiting_for_testcase_start'
self.test_coordinator.start_testsuite(MsgTestSuiteStart())
......@@ -110,7 +119,7 @@ class CoordinatorStateMachineTests(unittest.TestCase):
"""
assert self.test_coordinator.state == 'waiting_for_testsuite_config'
self.test_coordinator.configure_testsuite(MsgInteropSessionConfiguration())
self.test_coordinator.configure_testsuite(MsgSessionConfiguration(configuration=default_configuration))
assert self.test_coordinator.state != 'waiting_for_testcase_start'
self.test_coordinator.start_testsuite(MsgTestSuiteStart())
......@@ -131,7 +140,8 @@ class CoordinatorStateMachineTests(unittest.TestCase):
"""
assert self.test_coordinator.state == 'waiting_for_testsuite_config'
self.test_coordinator.configure_testsuite(MsgInteropSessionConfiguration()) # config 3 TCs
self.test_coordinator.configure_testsuite(
MsgSessionConfiguration(configuration=default_configuration)) # config 3 TCs
assert self.test_coordinator.state != 'waiting_for_testcase_start'
self.test_coordinator.start_testsuite(MsgTestSuiteStart())
......
......@@ -142,6 +142,12 @@ class TestSuite:
self._ted_it = cycle(self.teds.values())
self.current_tc = None
# session info (published in bus after testing tool is spawned):
self.session_id = None
self.session_users = None
self.session_configuration = None
self.session_selected_tc_list = None
# final testsuite report
self.report = None
......@@ -253,14 +259,16 @@ class TestSuite:
def get_report(self):
return self.report
def start(self, tc_list_requested):
def reinit(self, tc_list_selection=None):
# resets all previously executed TC
for tc in self.teds.values():
tc.reinit()
# reconfigure test suite
if tc_list_requested:
self.configure_test_suite(tc_list_requested)
# reconfigure test cases selection
if tc_list_selection:
self.configure_testsuite(tc_list_selection)
elif self.session_selected_tc_list:
self.configure_testsuite(self.session_selected_tc_list)
def abort_current_testcase(self):
self.current_tc.abort()
......@@ -305,9 +313,16 @@ class TestSuite:
logger.debug("Testsuite finished. No more test cases to execute.")
return True
def configure_test_suite(self, tc_list_requested):
def configure_testsuite(self, tc_list_requested, session_id=None, users=None, configuration=None):
assert tc_list_requested is not None
# this info is not used in the testing tool
self.session_id = session_id
self.session_users = users
self.session_configuration = configuration
self.session_selected_tc_list = tc_list_requested
# get all TCs
tc_list_available = self.get_testcases_list()
......@@ -328,7 +343,13 @@ class TestSuite:
self.skip_testcase(item)
def get_testsuite_configuration(self):
return self.get_testcases_basic(verbose=True)
resp = {}
resp.update({'session_id': self.session_id})
resp.update({'users': self.session_users})
resp.update({'configuration': self.session_configuration})
resp.update({'tc_list': self.get_testcases_basic(verbose=True)})
return resp
def skip_testcase(self, testcase_id=None):
"""
......
# -*- coding: utf-8 -*-
# !/usr/bin/env python3
import os
import pika
import logging
from coap_testing_tool import AMQP_URL, AMQP_EXCHANGE
from coap_testing_tool.utils.event_bus_messages import *
# for using it as library and as a __main__
try:
from messages import *
except:
from .messages import *
VERSION = '0.0.6'
AMQP_EXCHANGE = 'amq.topic'
def publish_message(connection, message):
""" Published which uses message object metadata
:param channel:
:param message:
:return:
def publish_message(connection, message):
"""
Publishes message into the correct topic (uses Message object metadata)
Creates temporary channel on it's own
Connection must be a pika.BlockingConnection
"""
channel = None
......@@ -33,8 +36,14 @@ def publish_message(connection, message):
channel.close()
def amqp_request(connection, request_message: Message, component_id: str):
# NOTE: channel must be a pika channel
def amqp_request(connection, request_message, component_id):
"""
Publishes message into the correct topic (uses Message object metadata)
Returns reply message.
Uses reply_to and corr id amqp's properties for matching the reply
Creates temporary channel, and queues on it's own
Connection must be a pika.BlockingConnection
"""
# check first that sender didnt forget about reply to and corr id
assert request_message.reply_to
......@@ -103,7 +112,45 @@ def amqp_request(connection, request_message: Message, component_id: str):
if __name__ == '__main__':
try:
AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
except KeyError as e:
AMQP_EXCHANGE = "amq.topic"
try:
from urllib.parse import urlparse
AMQP_URL = str(os.environ['AMQP_URL'])
p = urlparse(AMQP_URL)
AMQP_USER = p.username
AMQP_PASS = p.password
AMQP_SERVER = p.hostname
AMQP_VHOST = p.path.strip('/')
print('Env vars for AMQP connection succesfully imported')
except KeyError as e:
print('Cannot retrieve environment variables for AMQP connection. Loading defaults..')
# load default values
AMQP_SERVER = "localhost"
AMQP_USER = "guest"
AMQP_PASS = "guest"
AMQP_VHOST = "/"
AMQP_URL = "amqp://{0}:{1}@{2}/{3}".format(AMQP_USER, AMQP_PASS, AMQP_SERVER, AMQP_VHOST)
print(json.dumps(
{
'server': AMQP_SERVER,
'session': AMQP_VHOST,
'user': AMQP_USER,
'pass': '#' * len(AMQP_PASS),
'exchange': AMQP_EXCHANGE
}
))
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
m = MsgSniffingGetCapture()
r = amqp_request(connection.channel(), m, 'someImaginaryComponent')
r = amqp_request(connection, m, 'someImaginaryComponent')
print(repr(r))
......@@ -27,7 +27,7 @@ Usage:
>>> from messages import * # doctest: +SKIP
>>> m = MsgTestCaseSkip(testcase_id = 'some_testcase_id')
>>> m
MsgTestCaseSkip(_api_version = 0.1.47, _type = testcoordination.testcase.skip, description = Skip testcase, node = someNode, testcase_id = some_testcase_id, )
MsgTestCaseSkip(_api_version = 0.1.48, _type = testcoordination.testcase.skip, description = Skip testcase, node = someNode, testcase_id = some_testcase_id, )
>>> m.routing_key
'control.testcoordination'
>>> m.message_id # doctest: +SKIP
......@@ -38,18 +38,18 @@ MsgTestCaseSkip(_api_version = 0.1.47, _type = testcoordination.testcase.skip, d
# also we can modify some of the fields (rewrite the default ones)
>>> m = MsgTestCaseSkip(testcase_id = 'TD_COAP_CORE_03')
>>> m
MsgTestCaseSkip(_api_version = 0.1.47, _type = testcoordination.testcase.skip, description = Skip testcase, node = someNode, testcase_id = TD_COAP_CORE_03, )
MsgTestCaseSkip(_api_version = 0.1.48, _type = testcoordination.testcase.skip, description = Skip testcase, node = someNode, testcase_id = TD_COAP_CORE_03, )
>>> m.testcase_id
'TD_COAP_CORE_03'
# and even export the message in json format (for example for sending the message though the amqp event bus)
>>> m.to_json()
'{"_api_version": "0.1.47", "_type": "testcoordination.testcase.skip", "description": "Skip testcase", "node": "someNode", "testcase_id": "TD_COAP_CORE_03"}'
'{"_api_version": "0.1.48", "_type": "testcoordination.testcase.skip", "description": "Skip testcase", "node": "someNode", "testcase_id": "TD_COAP_CORE_03"}'
# We can use the Message class to import json into Message objects:
>>> m=MsgTestSuiteStart()
>>> m.to_json()
'{"_api_version": "0.1.47", "_type": "testcoordination.testsuite.start", "description": "Event test suite START"}'
'{"_api_version": "0.1.48", "_type": "testcoordination.testsuite.start", "description": "Event test suite START"}'
>>> json_message = m.to_json()
>>> obj=Message.from_json(json_message)
>>> type(obj)
......@@ -62,7 +62,7 @@ MsgTestCaseSkip(_api_version = 0.1.47, _type = testcoordination.testcase.skip, d
# the error reply (note that we pass the message of the request to build the reply):
>>> err = MsgErrorReply(m)
>>> err
MsgErrorReply(_api_version = 0.1.47, _type = sniffing.start, error_code = Some error code TBD, error_message = Some error message TBD, ok = False, )
MsgErrorReply(_api_version = 0.1.48, _type = sniffing.start, error_code = Some error code TBD, error_message = Some error message TBD, ok = False, )
>>> m.reply_to
'control.sniffing.service.reply'
>>> err.routing_key
......@@ -80,7 +80,7 @@ import time
import json
import uuid
API_VERSION = '0.1.47'
API_VERSION = '0.1.48'
# TODO use metaclasses instead?
......@@ -464,8 +464,38 @@ class MsgSessionLog(Message):
}
# TODO delete "Interop" to generalize
class MsgSessionConfiguration(Message):
"""
Requirements: Testing Tool MUST listen to event
Type: Event
Pub/Sub: Orchestrator -> Testing Tool
Description: Testing tool MUST listen to this message and configure the testsuite correspondingly
"""
routing_key = "control.session"
# the information in the configuration field is defined by the testing tool in the index.json
_msg_data_template = {
"_type": "session.configuration",
"session_id": "666",
"configuration": {
'testsuite.testcases': [
'someTestCaseId1',
'someTestCaseId2'
],
},
"testing_tools": "f-interop/someTestToolId",
"users": [
"u1",
"f-interop"
],
}
# TODO deprecate this in favor of the generic MsgSessionConfiguration
class MsgInteropSessionConfiguration(Message):
"""
Requirements: Testing Tool MUST listen to event
......@@ -481,29 +511,19 @@ class MsgInteropSessionConfiguration(Message):
_msg_data_template = {
"_type": "session.interop.configuration",
"session_id": "TBD",
"configuration":
{
'testsuite.testcases': [
'http://doc.f-interop.eu/tests/TD_COAP_CORE_01',
'http://doc.f-interop.eu/tests/TD_COAP'
]
},
"testing_tools": "f-interop/interoperability-coap",
"users": [
"u1",
"f-interop"
],
"iuts": [
{
"id": "someImplementationFromAUser",
"role": "coap_server",
"execution_mode": "user-assisted",
"location": "user-facilities",