Commit a8f5df39 authored by Federico Sismondi's avatar Federico Sismondi

6lowpan auto iut implemented with contiki and serial mote controller

parent 10a98120
Pipeline #1353 failed with stage
in 0 seconds
# -*- coding: utf-8 -*-
# !/usr/bin/env python3
import argparse
import time
import threading
import automated_IUTs.sixlowpan_controller.eut_controller as tcc
from automated_IUTs import OPENMOTES_BAUDRATE
from automated_IUTs.automation import *
from .eut_controller import support_hc_tests, support_format_tests
OPEN_PORT_FAILED = 0x01
OPEN_PORT_SUCCEEDED = 0x02
EUT_READY = 0x03
SENT_PING = 0x04
CONF_SUCCESS = 0x05
CONF_FAILED = 0x06
STIMULI_TIME_OUT=35
CONFIGUATION_TIME_OUT=60
lock = threading.RLock()
global in_handler, iut
timeout=False
def timeout_f():
global timeout
timeout = True
class SixLoMoteController(AutomatedIUT):
iut_cmd = [
'',
]
allowed_platforms = [
'openmote-RevE',
'openmote-RevA1'
]
def __init__(self, node_id):
super().__init__(node_id)
self.test_done=False
self.config_done=False
self.ipaddr=''
# mapping message's stimuli id -> CoAPthon (coap client) commands
stimuli_cmd_dict = {
'TD_6LoWPAN_HC_01_step_01': ['hc_01'],
'TD_6LoWPAN_HC_03_step_01': ['hc_03'],
'TD_6LoWPAN_HC_05_step_01': ['hc_05'],
'TD_6LoWPAN_HC_07_step_01': ['hc_07'],
'TD_6LoWPAN_FORMAT_01_step_01': ['format_01'],
'TD_6LoWPAN_FORMAT_03_step_01': ['format_03'],
'TD_6LoWPAN_FORMAT_04_step_01': ['format_04'],
'TD_6LoWPAN_FORMAT_06_step_01': ['format_06'],
}
implemented_testcases_list = [
'TD_6LoWPAN_HC_01',
'TD_6LoWPAN_HC_03',
'TD_6LoWPAN_HC_05',
'TD_6LoWPAN_HC_07',
'TD_6LoWPAN_FORMAT_01',
'TD_6LoWPAN_FORMAT_03',
'TD_6LoWPAN_FORMAT_04',
'TD_6LoWPAN_FORMAT_06',
]
def _execute_verify(self, verify_step_id, ):
logging.warning('Ignoring: %s. No auto-iut mechanism for verify step implemented.' % verify_step_id)
def _execute_stimuli(self, stimuli_step_id, cmd, addr):
global in_handler,timeout
if cmd[0] in support_hc_tests or cmd[0] in support_format_tests:
self.test_done = False
logger.info(cmd[0])
lock.acquire()
in_handler.remote_addr = addr
logging.info("using address: %s" % addr)
in_handler.run_tc()
lock.release()
timeout = False
t = threading.Timer(STIMULI_TIME_OUT,timeout_f)
t.start()
while not (self.test_done or timeout):
time.sleep(0.1)
if timeout:
logger.error("Timeout error")
else:
logger.error(
"Stimuli %s couldnt be executed for node %s" % (stimuli_step_id, self.node))
return
def _execute_configuration(self, testcase_id, node):
global in_handler,timeout
testcase = None
try:
# let's get the stimuli command using the stim cmd dict (note that tescase_id are substrings of step_id)
for step_id in self.stimuli_cmd_dict:
if testcase_id.lower() in step_id.lower():
testcase = self.stimuli_cmd_dict[step_id]
break
if not testcase:
logger.error(
"Testcase %s couldnt be configured for node %s, testcase_id not in list of known testcases"
% (testcase_id, self.node))
return
except KeyError as e:
logger.error("Testcase %s couldnt be configured for node %s" % (testcase_id, self.node))
logger.error(e)
return
logger.info("%s" % testcase)
self.config_done=False
self.ipaddr=''
in_handler.current_tc = testcase[0]
lock.acquire()
in_handler.reset() # resets mote to zero state and also re-flashes if necessary
lock.release()
timeout = False
t = threading.Timer(CONFIGUATION_TIME_OUT,timeout_f)
t.start()
while not (self.config_done or timeout):
time.sleep(0.1)
if timeout:
logger.error("Timeout error")
return
else:
if self.ipaddr != '':
return self.ipaddr
else:
logger.error('empty ipaddr')
return
def loop_reading():
global iut
while 1:
lock.acquire()
out = in_handler.read_frame()
if out == CONF_SUCCESS:
logger.info("flash done! Configure ip address")
in_handler.setup_addr()
elif out == EUT_READY:
logger.info("%s, %s is ready for the testcase" % (iut.node, in_handler.addr))
iut.ipaddr=in_handler.addr
iut.config_done=True
elif out == SENT_PING:
logger.info("testcase was executed")
iut.test_done = True
lock.release()
time.sleep(0.1)
if __name__ == '__main__':
global in_handler, iut
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--node")
parser.add_argument("-p", "--port")
parser.add_argument("-c", "--channel")
parser.add_argument("-pf", "--platform")
args = parser.parse_args()
in_handler = tcc.SerialInputHandler(port=args.port,
baudrate=OPENMOTES_BAUDRATE,
rts_cts=False)
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
t = threading.Thread(target=loop_reading, args=())
t.start()
iut = SixLoMoteController(args.node)
print('%s' % iut.component_id)
if args.platform in iut.allowed_platforms:
print ('platform is supported')
else:
print ('platform is not supported! Exit')
sys.exit()
if iut.node == 'eut1':
lock.acquire()
in_handler.channel = int(args.channel)
in_handler.node_id = 1
in_handler.platform = args.platform
lock.release()
elif iut.node == 'eut2':
lock.acquire()
in_handler.channel = int(args.channel)
in_handler.node_id = 2
in_handler.platform = args.platform
lock.release()
else:
print('Unknown node type %s for 6lowpan controller' % args.node)
iut.start()
iut.join()
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment