Commit dd86f962 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

Merge branch 'refact_coordinator' into 'develop'

Refact coordinator

See merge request !39
parents a5abb2af df9ff0ef
Pipeline #1384 passed with stage
in 0 seconds
......@@ -2,13 +2,15 @@
FROM ubuntu:16.04
MAINTAINER federico.sismondi@inria.fr
RUN apt-get update -y -qq && apt-get -y -qq install python3-dev
RUN apt-get -y install python3-setuptools
RUN apt-get -y install python3-pip
RUN apt-get -y install python-pip
RUN apt-get -y install supervisor
RUN apt-get -y install tcpdump
RUN apt-get -y install net-tools
RUN apt-get update -y -qq
RUN apt-get -y -qq install python3-dev
RUN apt-get -y -qq install build-essential
RUN apt-get -y -qq install python3-setuptools
RUN apt-get -y -qq install python3-pip
RUN apt-get -y -qq install python-pip
RUN apt-get -y -qq install supervisor
RUN apt-get -y -qq install tcpdump
RUN apt-get -y -qq install net-tools
ADD . /coap_testing_tool
ENV PATH="/coap_testing_tool:$PATH"
......@@ -17,16 +19,14 @@ WORKDIR /coap_testing_tool
# HACK to avoid "cannot open shared object file: Permission denied" , see https://github.com/dotcloud/docker/issues/5490
RUN mv /usr/sbin/tcpdump /usr/bin/tcpdump
#py2 requirements
RUN pip install -r coap_testing_tool/agent/requirements.txt
#py3 requirements
RUN pip3 install -r coap_testing_tool/test_coordinator/requirements.txt
RUN pip3 install -r coap_testing_tool/test_analysis_tool/requirements.txt
RUN pip3 install -r coap_testing_tool/packet_router/requirements.txt
RUN pip3 install -r coap_testing_tool/sniffer/requirements.txt
RUN pip3 install -r coap_testing_tool/webserver/requirements.txt
#installing py2 dependencies
RUN python -m pip install -r coap_testing_tool/agent/requirements.txt --upgrade
#installing py3 dependencies
RUN python3 -m pip install -r coap_testing_tool/test_coordinator/requirements.txt --upgrade
RUN python3 -m pip install -r coap_testing_tool/test_analysis_tool/requirements.txt --upgrade
RUN python3 -m pip install -r coap_testing_tool/packet_router/requirements.txt --upgrade
RUN python3 -m pip install -r coap_testing_tool/sniffer/requirements.txt --upgrade
RUN python3 -m pip install -r coap_testing_tool/webserver/requirements.txt --upgrade
#RUN groupadd -g 500 coap && useradd -u 500 -g 500 coap
#USER coap
......@@ -35,4 +35,3 @@ EXPOSE 5671 5672
# launch processes
CMD ["/usr/bin/supervisord", "--nodaemon", "--configuration", "coap_testing_tool/docker.coap_testing_tool.conf"]
......@@ -78,13 +78,13 @@ get-logs:
install-requirements:
@echo 'installing py2 dependencies'
@python -m pip install -r coap_testing_tool/agent/requirements.txt --upgrade
@python -m pip -qq install -r coap_testing_tool/agent/requirements.txt --upgrade
@echo 'installing py3 dependencies'
@python3 -m pip install -r coap_testing_tool/test_coordinator/requirements.txt --upgrade
@python3 -m pip install -r coap_testing_tool/test_analysis_tool/requirements.txt --upgrade
@python3 -m pip install -r coap_testing_tool/packet_router/requirements.txt --upgrade
@python3 -m pip install -r coap_testing_tool/sniffer/requirements.txt --upgrade
@python3 -m pip install -r coap_testing_tool/webserver/requirements.txt --upgrade
@python3 -m pip -qq install -r coap_testing_tool/test_coordinator/requirements.txt --upgrade
@python3 -m pip -qq install -r coap_testing_tool/test_analysis_tool/requirements.txt --upgrade
@python3 -m pip -qq install -r coap_testing_tool/packet_router/requirements.txt --upgrade
@python3 -m pip -qq install -r coap_testing_tool/sniffer/requirements.txt --upgrade
@python3 -m pip -qq install -r coap_testing_tool/webserver/requirements.txt --upgrade
info_message = """ \
......
CoAP Testing Tool:
------------------
This repo conaints all necessary software (and their dependencies) for running a
This repo contains all necessary software (and their dependencies) for running a
CoAP interoperability test session.
This can be run as standalone software and also integrated to f-interop
......@@ -177,5 +177,5 @@ automated-IUT into CoAP Testing Tool
**TBD**
- Docker build returns a "cannot fetch package" or a "cannot resolve .."
http://stackoverflow.com/questions/24991136/docker-build-could-not-resolve-archive-ubuntu-com-apt-get-fails-to-install-a
\ No newline at end of file
-> try using --no-cache for the docker build
-> more info http://stackoverflow.com/questions/24991136/docker-build-could-not-resolve-archive-ubuntu-com-apt-get-fails-to-install-a
\ No newline at end of file
......@@ -17,13 +17,15 @@ from coap_testing_tool.utils.event_bus_messages import *
from coap_testing_tool.utils.amqp_synch_call import publish_message
from coap_testing_tool import AMQP_URL, AMQP_EXCHANGE, INTERACTIVE_SESSION, RESULTS_DIR
logger = logging.getLogger(__name__)
# timeout in seconds
STIMULI_HANDLER_TOUT = 10
COMPONENT_ID = 'automation'
logger = logging.getLogger(COMPONENT_ID)
@property
def NotImplementedField(self):
......@@ -32,10 +34,9 @@ def NotImplementedField(self):
def signal_int_handler(signal, frame):
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
channel = connection.channel()
publish_message(
channel,
connection,
MsgTestingToolComponentShutdown(component=COMPONENT_ID)
)
......@@ -71,7 +72,7 @@ class AutomatedIUT(threading.Thread):
queue=services_queue_name,
routing_key='control.testcoordination')
# send hello message
publish_message(self.channel, MsgTestingToolComponentReady(component=self.component_id))
publish_message(self.connection, MsgTestingToolComponentReady(component=self.component_id))
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=services_queue_name)
......@@ -112,7 +113,7 @@ class AutomatedIUT(threading.Thread):
if event.testcase_id not in self.implemented_testcases_list:
time.sleep(0.1)
logging.info('IUT %s pushing test case skip message for %s' % (self.component_id, event.testcase_id))
publish_message(self.channel, MsgTestCaseSkip(testcase_id=event.testcase_id))
publish_message(self.connection, MsgTestCaseSkip(testcase_id=event.testcase_id))
else:
logging.info('IUT %s ready to execute testcase' % self.component_id)
......@@ -126,7 +127,7 @@ class AutomatedIUT(threading.Thread):
if cmd:
self._execute_stimuli(step, cmd,
addr) # this should be a blocking call until stimuli has been executed
publish_message(self.channel, MsgStepStimuliExecuted(node=self.node))
publish_message(self.connection, MsgStepStimuliExecuted(node=self.node))
else:
logging.info('Event received and ignored: %s (node: %s - step: %s)' %
(
......@@ -140,7 +141,7 @@ class AutomatedIUT(threading.Thread):
if event.node == self.node:
step = event.step_id
self._execute_verify(step)
publish_message(self.channel, MsgStepVerifyExecuted(verify_response=True,
publish_message(self.connection, MsgStepVerifyExecuted(verify_response=True,
node=self.node
))
else:
......@@ -167,13 +168,13 @@ class AutomatedIUT(threading.Thread):
event.node) # this should be a blocking call until configuration has been done
if ipaddr != '':
m = MsgConfigurationExecuted(testcase_id=event.testcase_id, node=event.node, ipv6_address=ipaddr)
publish_message(self.channel, m)
publish_message(self.connection, m)
else:
logging.info('Event received and ignored: %s' % event._type)
def _exit(self):
m = MsgTestingToolComponentShutdown(component=self.component_id)
publish_message(self.channel, m)
publish_message(self.connection, m)
time.sleep(2)
self.connection.close()
sys.exit(0)
......@@ -203,6 +204,7 @@ class UserMock(threading.Thread):
threading.Thread.__init__(self)
self.shutdown = False
self.connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
self.channel = self.connection.channel()
......@@ -225,7 +227,7 @@ class UserMock(threading.Thread):
queue=services_queue_name,
routing_key='control.session')
publish_message(self.channel, MsgTestingToolComponentReady(component=self.component_id))
publish_message(self.connection, MsgTestingToolComponentReady(component=self.component_id))
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=services_queue_name)
......@@ -253,7 +255,7 @@ class UserMock(threading.Thread):
elif isinstance(event, MsgTestingToolReady) or isinstance(event, MsgTestingToolConfigured):
m = MsgTestSuiteStart()
publish_message(self.channel, m)
publish_message(self.connection, m)
logging.info('Event received %s' % event._type)
logging.info('Event description %s' % event.description)
logging.info('Event pushed %s' % m)
......@@ -264,12 +266,12 @@ class UserMock(threading.Thread):
if event.testcase_id in self.implemented_testcases_list:
m = MsgTestCaseStart()
publish_message(self.channel, m)
publish_message(self.connection, m)
logging.info('Event pushed %s' % m)
else:
m = MsgTestCaseSkip(testcase_id=event.testcase_id)
publish_message(self.channel, m)
publish_message(self.connection, m)
logging.info('Event pushed %s' % m)
elif isinstance(event, MsgTestCaseVerdict):
......@@ -289,7 +291,7 @@ class UserMock(threading.Thread):
logging.info('Test suite finished, final report: %s' % event.to_json())
time.sleep(2)
m = MsgTestingToolTerminate()
publish_message(self.channel, m)
publish_message(self.connection, m)
time.sleep(2)
elif isinstance(event, MsgTestingToolTerminate):
......@@ -318,6 +320,7 @@ class UserMock(threading.Thread):
logging.info('Event received and ignored: %s' % event._type)
def stop(self):
self.shutdown = True
self.channel.stop_consuming()
def exit(self):
......@@ -327,5 +330,7 @@ class UserMock(threading.Thread):
self.connection.close()
def run(self):
self.channel.start_consuming()
while self.shutdown is False:
self.connection.process_data_events()
time.sleep(0.3)
self.exit()
......@@ -7,7 +7,7 @@ from coap_testing_tool import TMPDIR
from automated_IUTs import COAP_SERVER_PORT, COAP_SERVER_HOST, COAP_CLIENT_HOST
from automated_IUTs.automation import *
logger = logging.getLogger(__name__)
logger = logging.getLogger()
# timeout in seconds
STIMULI_HANDLER_TOUT = 3600
......
......@@ -4,7 +4,7 @@
from automated_IUTs.automation import *
from automated_IUTs import COAP_SERVER_HOST, COAP_SERVER_PORT, COAP_CLIENT_HOST
logger = logging.getLogger(__name__)
logger = logging.getLogger()
# timeout in seconds
STIMULI_HANDLER_TOUT = 10
......
Subproject commit f883cf371cd4f0e3e5dae969a2b397c32ca27288
Subproject commit 591ade863018eada4256898ef18b2acb52b1a876
......@@ -112,16 +112,16 @@ stderr_logfile_maxbytes=0
;stdout_logfile_backups = 5
;redirect_stderr=true
[program:webserver]
command = sh -c "sleep 5;/usr/bin/python3 -m coap_testing_tool.webserver"
stopsignal=INT
stopasgroup=true
;autorestart=false
loglevel=debug
redirect_stderr=true
stdout_logfile = /var/log/files_server-stdout.log
stdout_logfile_maxbytes = 10MB
stdout_logfile_backups = 5
;[program:webserver]
;command = sh -c "sleep 5;/usr/bin/python3 -m coap_testing_tool.webserver"
;stopsignal=INT
;stopasgroup=true
;;autorestart=false
;loglevel=debug
;redirect_stderr=true
;stdout_logfile = /var/log/files_server-stdout.log
;stdout_logfile_maxbytes = 10MB
;stdout_logfile_backups = 5
;[program:emulated-user]
;command = sh -c "sleep 3;python3 -m automated_IUTs.user_emulation"
......
from coap_testing_tool import TD_DIR, TD_COAP, TD_COAP_CFG, TD_6LOWPAN
from coap_testing_tool.test_coordinator.coordinator import import_teds
from collections import OrderedDict
import json, unittest, os
from coap_testing_tool.test_coordinator.testsuite import import_teds
import unittest
"""
python3 -m pytest coap_testing_tool/extended_test_descriptions/tests/tests.py
......
......@@ -9,6 +9,7 @@ import logging
from coap_testing_tool.utils.rmq_handler import RabbitMQHandler, JsonFormatter
from coap_testing_tool import AMQP_URL, AMQP_EXCHANGE, AGENT_NAMES, AGENT_TT_ID
from coap_testing_tool.utils.event_bus_messages import *
from coap_testing_tool.utils.amqp_synch_call import publish_message
COMPONENT_ID = 'packet_router'
......@@ -31,24 +32,6 @@ logger.addHandler(rabbitmq_handler)
logger.setLevel(logging.DEBUG)
def publish_message(channel, message):
""" Published which uses message object metadata
:param channel:
:param message:
:return:
"""
properties = pika.BasicProperties(**message.get_properties())
channel.basic_publish(
exchange=AMQP_EXCHANGE,
routing_key=message.routing_key,
properties=properties,
body=message.to_json(),
)
class PacketRouter(threading.Thread):
DEFAULT_ROUTING = {
'data.tun.fromAgent.%s' % AGENT_1_ID: ['data.tun.toAgent.%s' % AGENT_2_ID,
......@@ -80,7 +63,7 @@ class PacketRouter(threading.Thread):
msg = MsgTestingToolComponentReady(
component='packetrouting'
)
publish_message(self.channel, msg)
publish_message(self.connection, msg)
logger.info('packet router waiting for new messages in the data plane..')
......
......@@ -22,7 +22,7 @@ logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
logging.getLogger('pika').setLevel(logging.INFO)
# init logging to stnd output and log files
logger = logging.getLogger(__name__)
logger = logging.getLogger(COMPONENT_ID)
# default handler
sh = logging.StreamHandler()
......@@ -39,6 +39,7 @@ logger.setLevel(logging.DEBUG)
TIME_WAIT_FOR_TCPDUMP_ON = 5
TIME_WAIT_FOR_COMPONENTS_FINISH_EXECUTION = 2
connection = None
def on_request(ch, method, props, body):
"""
......@@ -53,6 +54,7 @@ def on_request(ch, method, props, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
global last_capture_name
global connection
try:
props_dict = {
......@@ -96,7 +98,7 @@ def on_request(ch, method, props, body):
except Exception as e:
publish_message(
ch,
connection,
MsgErrorReply(request, error_message=str(e))
)
logger.error(str(e))
......@@ -118,18 +120,18 @@ def on_request(ch, method, props, body):
except Exception as e:
err_mess = str(e)
m_resp = MsgErrorReply(request, error_message=err_mess)
publish_message(ch, m_resp)
publish_message(connection, m_resp)
logger.warning(err_mess)
return
logger.info("Response ready, PCAP bytes: \n" + repr(response))
logger.info("Sending response through AMQP interface ...")
publish_message(ch, response)
publish_message(connection, response)
else:
err_mess = 'No previous capture found.'
m_resp = MsgErrorReply(request, error_message=err_mess)
publish_message(ch, m_resp)
publish_message(connection, m_resp)
logger.warning(err_mess)
return
......@@ -151,7 +153,7 @@ def on_request(ch, method, props, body):
logger.warning('Coulnt retrieve file %s from dir' % file)
logger.warning(str(fne))
publish_message(
ch,
connection,
MsgErrorReply(
request,
error_message=str(fne)
......@@ -175,7 +177,7 @@ def on_request(ch, method, props, body):
logger.info("Response ready, PCAP bytes: \n" + repr(response))
logger.info("Sending response through AMQP interface ...")
publish_message(ch, response)
publish_message(connection, response)
return
elif isinstance(request, MsgSniffingStart):
......@@ -185,7 +187,7 @@ def on_request(ch, method, props, body):
except:
err_mess = 'No capture id provided'
m_resp = MsgErrorReply(request, error_message=err_mess)
publish_message(ch, m_resp)
publish_message(connection, m_resp)
logger.error(err_mess)
return
......@@ -210,7 +212,7 @@ def on_request(ch, method, props, body):
last_capture_name = capture_id # keep track of the undergoing capture name
time.sleep(TIME_WAIT_FOR_TCPDUMP_ON) # to avoid race conditions
response = MsgReply(request) # by default sends ok = True
publish_message(ch, response)
publish_message(connection, response)
elif isinstance(request, MsgSniffingStop):
......@@ -223,7 +225,7 @@ def on_request(ch, method, props, body):
logger.error('Didnt succeed stopping the sniffer')
response = MsgReply(request) # by default sends ok = True
publish_message(ch, response)
publish_message(connection, response)
else:
logger.warning('Ignoring unrecognised service request: %s' % repr(request))
......@@ -294,7 +296,8 @@ def main():
### SETUPING UP CONNECTION ###
connection = None
global connection
try:
logger.info('Setting up AMQP connection..')
......@@ -320,7 +323,8 @@ def main():
msg = MsgTestingToolComponentReady(
component='sniffing'
)
publish_message(channel, msg)
publish_message(connection, msg)
try:
logger.info("Awaiting AMQP requests on topic: control.sniffing.service")
......
# -*- coding: utf-8 -*-
# !/usr/bin/env python3
import os
import sys
import json
import errno
import pika
import time
import traceback
import logging
import argparse
from threading import Timer
from coap_testing_tool.test_coordinator.coordinator import *
from coap_testing_tool import AMQP_URL, AMQP_EXCHANGE
from coap_testing_tool import TD_COAP, TD_COAP_CFG, TD_6LOWPAN, TD_6LOWPAN_CFG
from coap_testing_tool import DATADIR, TMPDIR, LOGDIR, TD_DIR
from coap_testing_tool import DATADIR, TMPDIR, LOGDIR, TD_DIR, RESULTS_DIR, PCAP_DIR
from coap_testing_tool.utils.rmq_handler import RabbitMQHandler, JsonFormatter
from coap_testing_tool.utils.amqp_synch_call import publish_message
from coap_testing_tool.utils.event_bus_messages import MsgTestingToolReady, MsgTestingToolComponentReady, Message
from coap_testing_tool.test_coordinator.states_machine import Coordinator
COMPONENT_ID = 'test_coordinator'
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
# init logging to stnd output and log files
logger = logging.getLogger(__name__)
logger = logging.getLogger(COMPONENT_ID)
# default handler
sh = logging.StreamHandler()
......@@ -47,7 +57,8 @@ if __name__ == '__main__':
try:
parser = argparse.ArgumentParser()
parser.add_argument("testsuite", help="Test Suite", choices=['coap', '6lowpan'])
parser.add_argument("-ncc", "--no_component_checks", help="Do not check if other processes send ready message", action="store_true")
parser.add_argument("-ncc", "--no_component_checks", help="Do not check if other processes send ready message",
action="store_true")
args = parser.parse_args()
testsuite = args.testsuite
......@@ -86,19 +97,20 @@ if __name__ == '__main__':
channel = connection.channel()
bootstrap_q = channel.queue_declare(queue='bootstrapping', auto_delete=True)
bootstrap_q_name = 'bootstrapping'
bootstrap_q = channel.queue_declare(queue=bootstrap_q_name, auto_delete=True)
channel.queue_bind(
exchange=AMQP_EXCHANGE,
queue='bootstrapping',
routing_key='control.session',
exchange=AMQP_EXCHANGE,
queue='bootstrapping',
routing_key='control.session',
)
# starting verification of the testing tool components
msg = MsgTestingToolComponentReady(
component='testcoordination'
component='testcoordination'
)
publish_message(channel, msg)
publish_message(connection, msg)
if no_component_checks:
logger.info('Skipping component readiness checks')
......@@ -117,7 +129,7 @@ if __name__ == '__main__':
TT_check_list.remove(component)
return
elif isinstance(event, MsgTestingToolReady):
elif isinstance(event, MsgTestingToolReady): # listen to self generated event
logger.info('all signals processed')
channel.queue_delete('bootstrapping')
return
......@@ -128,9 +140,7 @@ if __name__ == '__main__':
channel.basic_consume(on_ready_signal,
no_ack=False,
queue='bootstrapping')
logger.info('Waiting components ready signal... signals not checked:' + str(TT_check_list))
# wait for all testing tool component's signal
timeout = False
......@@ -141,7 +151,7 @@ if __name__ == '__main__':
t = Timer(READY_SIGNAL_TOUT, timeout_f)
t.start()
while len(TT_check_list) != 0 and not timeout:
while len(TT_check_list) != 0 and not timeout: # blocking until timeout!
time.sleep(0.3)
connection.process_data_events()
......@@ -152,12 +162,15 @@ if __name__ == '__main__':
assert len(TT_check_list) == 0
logger.info('All components ready')
# clean up
channel.queue_delete(bootstrap_q_name)
# lets start the test coordination
try:
logger.info('Starting test-coordinator for test suite (ts) : %s' % testsuite)
coordinator = Coordinator(connection, ted_tc_file, ted_config_file)
publish_message(channel, MsgTestingToolReady())
logger.info('Starting test-coordinator for test suite: %s' % testsuite)
coordinator = Coordinator(AMQP_URL, AMQP_EXCHANGE, ted_tc_file, ted_config_file)
coordinator.bootstrap()
publish_message(connection, MsgTestingToolReady())
except Exception as e:
# cannot emit AMQP messages for the fail
......@@ -166,7 +179,7 @@ if __name__ == '__main__':
logger.debug(traceback.format_exc())
sys.exit(1)
### RUN TEST COORDINATION COMPONENT ###
# # # RUN TEST COORDINATION COMPONENT # # #
try:
logger.info('Starting coordinator..')
......@@ -190,16 +203,16 @@ if __name__ == '__main__':
# lets push the error message into the bus
coordinator.channel.basic_publish(
body=json.dumps({
'traceback': traceback.format_exc(),
'message': error_msg,
'_type': 'testcoordination.error',
}),
exchange=AMQP_EXCHANGE,
routing_key='control.session.error',
properties=pika.BasicProperties(
content_type='application/json',
)
body=json.dumps({
'traceback': traceback.format_exc(),
'message': error_msg,
'_type': 'testcoordination.error',
}),
exchange=AMQP_EXCHANGE,
routing_key='control.session.error',
properties=pika.BasicProperties(
content_type='application/json',
)
)
# close AMQP connection
connection.close()
......
# -*- coding: utf-8 -*-
# !/usr/bin/env python3