Commit e226146e authored by Federico Sismondi's avatar Federico Sismondi

Merge branch 'feat_lossy_packet_router' into 'develop'

Feat lossy packet router

See merge request !113
parents 1cb249eb b7263f23
......@@ -27,7 +27,6 @@ from messages import (MsgTestingToolTerminate, MsgSessionLog,
logger = logging.getLogger(__name__)
INTERACTIVE_SESSION = get_from_environment("INTERACTIVE_SESSION", True)
COAP_CLIENT_HOST = get_from_environment("COAP_CLIENT_HOST", 'bbbb::1')
COAP_SERVER_HOST = get_from_environment("COAP_SERVER_HOST", 'bbbb::2')
COAP_SERVER_PORT = get_from_environment("COAP_SERVER_PORT", '5683')
......
......@@ -318,11 +318,20 @@ def run_blocking_process(cmd: list, timeout=300):
if __name__ == '__main__':
help="""
This program drives an interop test suite by sending AMQP API messages to the event bus.
These typically are "testsuite.*" or "testingtool.*" type of messages.
It doesnt really run protocol implementations (implementations under test)
nor testing tool code (ioppytest testing tools) unless explicitly indicated by using options.
See optional arguments help for more information.
"""
MANIFEST_INTEROP_TESTS = 'automated_interop_tests.yaml'
DELIM = "*" * 70
# be careful with the order of the items as it's used along the main
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(description=help)
parser.add_argument("--all-interops",
help="Runs all automated interop tests (requires local docker daemon to be running, "
......@@ -346,6 +355,7 @@ if __name__ == '__main__':
manif = yaml.load(stream)
for test in manif:
logging.info('\n{delim} \n'
'Starting interop test session: \n'
'\tinterop test: {interop_name} \n'
......
pika==0.11.0
art
pika
PyYAML==3.12
ioppytest-utils
......@@ -24,7 +24,6 @@ __all__ = [
'AMQP_URL',
'TEST_DESCRIPTIONS_DICT',
'TEST_DESCRIPTIONS_CONFIGS_DICT',
'INTERACTIVE_SESSION',
'LOGGER_FORMAT'
]
......@@ -143,9 +142,6 @@ AUTO_DISSECTION_FILE = os.path.join(project_dir, '/data/auto_dissection.json')
# # # # # # ENV variables # # # # # # # # # #
# INTERACTIVE_SESSION: if not an interactive session then user input is emulated
INTERACTIVE_SESSION = get_from_environment("INTERACTIVE_SESSION", True)
# AMQP ENV variables (either get them all from ENV or set them all as default)
try:
AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
......
This diff is collapsed.
pika==0.11.0
pika
PyYAML==3.12
pytest==3.9.3
transitions==0.6.1
......
......@@ -9,7 +9,7 @@ import datetime
from transitions.core import MachineError
from ioppytest import AMQP_EXCHANGE, AMQP_URL, LOG_LEVEL
from ioppytest import RESULTS_DIR
from event_bus_utils import amqp_request, AmqpSynchCallTimeoutError
from event_bus_utils import amqp_request, publish_message, AmqpSynchCallTimeoutError
from event_bus_utils.rmq_handler import RabbitMQHandler, JsonFormatter
from ioppytest.exceptions import CoordinatorError
from messages import *
......@@ -166,6 +166,8 @@ class CoordinatorAmqpInterface:
Creates temporary channel on it's own
Connection must be a pika.BlockingConnection
"""
#publish_message(self.connection, message)
connection = None
channel = None
properties = pika.BasicProperties(**message.get_properties())
......@@ -486,3 +488,6 @@ class CoordinatorAmqpInterface:
return response
except AmqpSynchCallTimeoutError as e:
raise e # let caller handle it
def call_service_router_drop_packets(self, number_packets_to_drop=3):
self._publish_message(MsgRoutingStartLossyLink(number_of_packets_to_drop=number_packets_to_drop))
......@@ -19,12 +19,8 @@ from event_bus_utils import AmqpSynchCallTimeoutError
from event_bus_utils.rmq_handler import RabbitMQHandler, JsonFormatter
ANALYSIS_MODE = 'post_mortem' # either step_by_step or post_mortem # TODO test suite param?
# if left empty => packet_sniffer chooses the loopback
SNIFFER_FILTER_IF = 'tun0' # TODO test suite param?
# TODO 6lo FIX ME !
# - tun notify method -> execute only if test suite needs it (create a test suite param profiling)
SNIFFER_FILTER_IF = 'tun0' # TODO test suite param?, # if left empty packet_sniffer chooses the loopback
LOSSY_CONTEXT__NUMBER_OF_PACKETS_TO_DROP = 2 # TODO test suite param?
# component identification & bus params
COMPONENT_ID = '%s|%s' % ('test_coordinator', 'FSM')
......@@ -94,7 +90,7 @@ class Coordinator(CoordinatorAmqpInterface):
f.write(json.dumps(verdict_info))
def configure_agent_data_plane_interfaces(self, received_event):
# todo find a way of switching between different configuration requirements coming from each test suite
# ToDo find a way of switching between different configuration requirements coming from each test suite
# coap config is different from 6lowpan config
self.notify_tun_interfaces_start(received_event)
......@@ -347,12 +343,13 @@ class Coordinator(CoordinatorAmqpInterface):
gen_verdict, gen_description, report = current_tc.generate_testcases_verdict(partial_verd)
else:
error_msg += 'Response from Test Analyzer NOK: %s' % repr(tat_response)
error_msg += 'Error message: %s (err.code: %s)' % (tat_response.error_message,
tat_response.error_code)
logger.warning(error_msg)
# generate verdict and verdict description
try:
gen_description = tat_response.error_code
gen_description = error_msg
gen_verdict = 'inconclusive'
except AttributeError:
gen_description = error_msg
......@@ -415,7 +412,7 @@ class Coordinator(CoordinatorAmqpInterface):
def handle_testcase_select(self, received_event):
"""
this is more like a jump to function rather than select
this is more like a "jump to" function rather than select
"""
self.testsuite.reinit_testcase(self.testsuite.get_current_testcase_id())
......@@ -474,10 +471,18 @@ class Coordinator(CoordinatorAmqpInterface):
# sniffer calls are blocking
if self.call_service_sniffer_start(**sniff_params):
logger.debug('Sniffer succesfully started')
logger.debug('Sniffer successfully started')
else:
logger.error("Sniffer COULDN'T be started")
#check if we need to trigger some special behaviour (e.g. f links)
try:
special_mode = link['special_mode']
if special_mode == "lossy_context":
self.call_service_router_drop_packets(number_packets_to_drop = LOSSY_CONTEXT__NUMBER_OF_PACKETS_TO_DROP)
except KeyError:
pass
def _prepare_next_testcase(self, received_event):
logger.info('Preparing next testcase..')
......
......@@ -88,7 +88,6 @@ uri: http://doc.f-interop.eu/tests/COAP_CFG_02
nodes:
- coap_client
- coap_server
- gateway
addressing:
- node: coap_client
ipv6_prefix : bbbb
......@@ -96,19 +95,12 @@ addressing:
- node: coap_server
ipv6_prefix : bbbb
ipv6_host: 2
- node: gateway
ipv6_prefix : bbbb
ipv6_host: 3
topology:
- link_id : link_01
special_mode : lossy_context
capture_filter: udp
nodes:
- coap_client
- gateway
- link_id : link_02
capture_filter: udp
nodes:
- gateway
- coap_server
description:
- node: coap_client
......
......@@ -694,7 +694,7 @@ sequence:
testcase_id : TD_COAP_CORE_15
uri : http://doc.f-interop.eu/tests/TD_COAP_CORE_15
objective: Perform GET transaction (CON mode, piggybacked response) in a lossy context
configuration: COAP_CFG_01
configuration: COAP_CFG_02
references: "[COAP] 4.4.1, 5.2.1, 5.8.1"
pre_conditions:
- Gateway is introduced and configured to produce packet losses
......@@ -752,7 +752,7 @@ sequence:
testcase_id : TD_COAP_CORE_16
uri : http://doc.f-interop.eu/tests/TD_COAP_CORE_16
objective: Perform GET transaction (CON mode, delayed response) in a lossy context
configuration: COAP_CFG_01
configuration: COAP_CFG_02
references: "[COAP] 4.4.1, 5.2.2, 5.8.1"
pre_conditions:
- Gateway is introduced and configured to produce packet losses
......
......@@ -43,12 +43,12 @@ TESTING_TOOL_TOPIC_SUBSCRIPTIONS = [
'testingtool.#',
'testsuite.#',
'session.#',
#'log.#'
#'log.#',
'fromAgent.#',
'toAgent.#',
MsgRoutingStartLossyLink.routing_key
]
class UiResponseError(Exception):
pass
......
......@@ -274,6 +274,7 @@ class GenericBidirectonalTranslator(object):
MsgStepVerifyExecuted: self._get_ui_message_highlighted_description,
MsgConfigurationExecute: self._get_ui_testcase_configure,
MsgTestCaseSkip: self._get_ui_testcase_skip,
MsgRoutingStartLossyLink: self._get_ui_lossy_context,
# info
MsgTestSuiteGetTestCasesReply: self._get_ui_testcases_list,
......@@ -999,6 +1000,16 @@ class GenericBidirectonalTranslator(object):
]
return MsgUiDisplayMarkdownText(level='highlighted', fields=fields)
def _get_ui_lossy_context(self, message):
fields = [
{
'type': 'p',
'value': 'Test configured to drop the following %s packet(s)' % message.number_of_packets_to_drop
}
]
return MsgUiDisplayMarkdownText(level='highlighted', fields=fields)
def _get_ui_message_highlighted_description(self, message):
fields = [
{
......@@ -1626,7 +1637,6 @@ class CoAPSessionMessageTranslator(GenericBidirectonalTranslator):
user=u,
)
return True
# # # # # # # TT Messages # # # # # # # # # # # # # #
......@@ -2323,11 +2333,7 @@ class DummySessionMessageTranslator(GenericBidirectonalTranslator):
)
try:
ui_reply = amqp_request(connection,
ui_request,
'dummy_component',
retries=5,
time_between_retries=1)
ui_reply = amqp_request(connection,ui_request,'dummy_component',retries=5,time_between_retries=1)
except AmqpSynchCallTimeoutError:
self.basic_display("The message request: \n`%s`" % repr(ui_request),
tags={"snippet": "checkbox"})
......
......@@ -5,7 +5,7 @@ import time
import json
import pika
from messages import MsgPacketInjectRaw
from messages import MsgPacketInjectRaw, MsgRoutingStartLossyLink
from ioppytest.packet_router.__main__ import PacketRouter
from ioppytest import AMQP_URL, AMQP_EXCHANGE
......@@ -39,10 +39,10 @@ class PacketRouterTestCase(unittest.TestCase):
logging.info('using AMQP vars: %s, %s' % (AMQP_URL, AMQP_EXCHANGE,))
self.routing_table = {
'fromAgent.agent1':
['toAgent.agent2'], # routes to only one destination
'fromAgent.agent2':
['toAgent.agent1'], # routes to only one destination
'fromAgent.agent1.ip.tun.packet.raw':
['toAgent.agent2.ip.tun.packet.raw'], # routes to only one destination
'fromAgent.agent2.ip.tun.packet.raw':
['toAgent.agent1.ip.tun.packet.raw'], # routes to only one destination
}
# start packet router
......@@ -51,6 +51,9 @@ class PacketRouterTestCase(unittest.TestCase):
packet_router.start()
def test_packet_routing(self):
"""
Tests that (Agent-like) IP packet messages are routed to correct destination.
"""
assert self.channel.is_open, 'no channel opened for tests'
self._send_packet_fromAgent1()
......@@ -70,6 +73,26 @@ class PacketRouterTestCase(unittest.TestCase):
assert method_frame is not None, 'Expected to get a message, but nothing was received'
self.channel.basic_ack(method_frame.delivery_tag)
def test_packet_routing_drops(self):
"""
Tests that a packet is dropped (not routed) , when we send a drop request, and following this
we make sure that the following one is actually routed.
"""
assert self.channel.is_open, 'no channel opened for tests'
self._send_drop_message_request() # tells router to drop message
self._send_packet_fromAgent1()
time.sleep(TIME_NEEDED_FOR_EVENT_TO_BE_ROUTED)
method_frame, header_frame, body = self.channel.basic_get(self.queue_name)
assert method_frame is None, 'Expected None, meaning that the packet was dropped'
# test that the next one is actually routed
self._send_packet_fromAgent1()
time.sleep(TIME_NEEDED_FOR_EVENT_TO_BE_ROUTED)
method_frame, header_frame, body = self.channel.basic_get(self.queue_name)
print(body)
assert method_frame is not None, 'Expected to get a message, but nothing was received'
self.channel.basic_ack(method_frame.delivery_tag)
def _send_packet_fromAgent1(self):
"""
tests
......@@ -102,3 +125,15 @@ class PacketRouterTestCase(unittest.TestCase):
content_type='application/json',
)
)
def _send_drop_message_request(self):
m = MsgRoutingStartLossyLink(number_of_packets_to_drop=1)
self.channel.basic_publish(
body=m.to_json(),
routing_key=m.routing_key,
exchange=AMQP_EXCHANGE,
properties=pika.BasicProperties(
content_type='application/json',
)
)
......@@ -28,35 +28,3 @@ if __name__ == '__main__':
# finishing Session..
create_html_test_results()
open_test_results_with_browser()
# INTERACTIVE_SESSION = False
# logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
# if INTERACTIVE_SESSION:
# logging.info(' shutting down, as INTERACTIVE MODE selected')
# else:
# iut = UserEmulator("test", "test2")
# iut.start()
# iut.join()
# socketpath = "/tmp/supervisor.sock"
# server = xmlrpclib.ServerProxy('http://127.0.0.1',
# transport=supervisor.xmlrpc.SupervisorTransport(
# None, None, serverurl='unix://' + socketpath))
# print(server.supervisor.getState())
# print(server.supervisor.readLog(0, 101))
# print(server.supervisor.getProcessInfo("agent"))
# print(server.supervisor.getProcessInfo("tat"))
# print(server.supervisor.getAllProcessInfo())
# print(server.supervisor.restart())
# server.supervisor.startProcess("automated_iut-coap_server-coapthon-v0.8", True)
# server.supervisor.stopAllProcesses()
# server.supervisor.startProcessGroup("client_coapthon_vs_server_coapthon", True)
# server.supervisor.startProcessGroup("client_coapthon_vs_server_californium", True)
# server.supervisor.startProcessGroup("client_californium_vs_server_coapthon", True)
# server.supervisor.startProcessGroup("client_californium_vs_server_californium", True)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment