coordinator.py 54.1 KB
Newer Older
1
2
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
3

4
5
import base64
import errno
6
7
import json
import os
Federico Sismondi's avatar
Federico Sismondi committed
8
import traceback
9
import sys
10
11
import yaml
import pika
12
import time
13

14
15
from itertools import cycle
from collections import OrderedDict
16
from coap_testing_tool import AMQP_VHOST, AMQP_PASS,AMQP_SERVER,AMQP_USER, AMQP_EXCHANGE
17
from coap_testing_tool import DATADIR,TMPDIR,LOGDIR,TD_DIR, PCAP_DIR, RESULTS_DIR
18
from coap_testing_tool.utils.amqp_synch_call import amqp_reply, AmqpSynchronousCallClient
19
from coap_testing_tool.utils.exceptions import SnifferError,CoordinatorError, AmqpMessageError
20
from coap_testing_tool.utils.logger import initialize_logger
21

Federico Sismondi's avatar
Federico Sismondi committed
22
# TODO these VARs need to come from the session orchestrator + test configuratio files
23
# TODO get filter from config of the TEDs
Federico Sismondi's avatar
Federico Sismondi committed
24
25
COAP_CLIENT_IUT_MODE =  'user-assisted'
COAP_SERVER_IUT_MODE = 'automated'
26

Federico Sismondi's avatar
Federico Sismondi committed
27
ANALYSIS_MODE = 'post_mortem' # either step_by_step or post_mortem
28

29
# if left empty => packet_sniffer chooses the loopback
30
# TODO send flag to sniffer telling him to look for a tun interface instead!
31
SNIFFER_FILTER_IF = 'tun0'
32
33
34
35

# component identification & bus params
COMPONENT_ID = 'test_coordinator'

36
# set temporarily as default
37
# TODO get this from finterop session context!
38
TD_COAP = os.path.join(TD_DIR,"TD_COAP_CORE.yaml")
39
TD_COAP_CFG = os.path.join(TD_DIR,"TD_COAP_CFG.yaml")
40

41
42
# init logging to stnd output and log files
logger = initialize_logger(LOGDIR, COMPONENT_ID)
43
44


45
### AUX functions ###
46
47
48
49
50

def list_to_str(ls):
    """
    flattens a nested list up to two levels of depth

51
    :param ls: the list
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
    :return: single string with all the items inside the list
    """

    ret = ''
    for l in ls:
        if isinstance(l,list):
            for sub_l in l:
                if isinstance(sub_l,list):
                    # I truncate in the second level
                    pass
                else:
                    ret += sub_l +' \n '
        else:
            ret += l +' \n '
    return ret

68
### YAML parser aux classes and methods ###
69
70
71
72
73
74
75
def testcase_constructor(loader, node):
    instance = TestCase.__new__(TestCase)
    yield instance
    state = loader.construct_mapping(node, deep=True)
    #print("pasing test case: " + str(state))
    instance.__init__(**state)

76
77
78
79
80
81
82
83
84
85

def test_config_constructor(loader, node):
    instance = TestConfig.__new__(TestConfig)
    yield instance
    state = loader.construct_mapping(node, deep=True)
    #print("pasing test case: " + str(state))
    instance.__init__(**state)

yaml.add_constructor(u'!configuration', test_config_constructor)

86
87
yaml.add_constructor(u'!testcase', testcase_constructor)

88

89
90
91
92
93
94
95
96
97
98
99
100
101
# def yaml_include(loader, node):
#     # Get the path out of the yaml file
#     file_name = os.path.join(os.path.dirname(loader.name), node.value)
#
#     with open(file_name) as inputfile:
#         return yaml.load(inputfile)
#
# yaml.add_constructor("!include", yaml_include)
# yaml.add_constructor(u'!configuration', testcase_constructor)

def import_teds(yamlfile):
    """
    :param yamlfile:
102
    :return: list of imported testCase(s) and testConfig(s) object(s)
103
104
105
106
107
108
    """
    td_list = []
    with open(yamlfile, "r", encoding="utf-8") as stream:
        yaml_docs = yaml.load_all(stream)
        for yaml_doc in yaml_docs:
             if type(yaml_doc) is TestCase:
109
                 logger.debug(' Parsed test case: %s from yaml file: %s :'%(yaml_doc.id,yamlfile) )
110
                 td_list.append(yaml_doc)
111
112
113
114
115
             elif type(yaml_doc) is TestConfig:
                 logger.debug(' Parsed test case config: %s from yaml file: %s :'%(yaml_doc.id,yamlfile) )
                 td_list.append(yaml_doc)
             else:
                 logger.error('Couldnt processes import: %s from %s'%(str(yaml_doc),yamlfile))
116
117
118

    return td_list

119

120
121
122
123
124
125
126
127
128
129
130
131
132
133
class Verdict:
    """

    Known verdict values are:
     - 'none': No verdict set yet
     - 'pass': The NUT fulfilled the test purpose
     - 'inconclusive': The NUT did not fulfill the test purpose but did not display
                 bad behaviour
     - 'fail': The NUT did not fulfill the test purpose and displayed a bad
               behaviour
     - 'aborted': The test execution was aborted by the user
     - 'error': A runtime error occured during the test

    At initialisation time, the verdict is set to None. Then it can be updated
134
    one or multiple times, either explicitly calling set_verdict() or
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
    implicitly if an unhandled exception is caught by the control module
    (error verdict) or if the user interrupts the test manually (aborted
    verdict).

    Each value listed above has precedence over the previous ones. This means
    that when a verdict is updated, the resulting verdict is changed only if
    the new verdict is worse than the previous one.
    """

    __values = ('none', 'pass', 'inconclusive', 'fail', 'aborted', 'error')

    def __init__(self, initial_value: str = None):
        """
        Initialize the verdict value to 'none' or to the given value

        :param initial_value: The initial value to put the verdict on
        :type initial_value: optional(str)
        """
        self.__value = 0
        self.__message = ''
        if initial_value is not None:
            self.update(initial_value)

    def update(self, new_verdict: str, message: str = ''):
        """
        Update the verdict

        :param new_verdict: The name of the new verdict value
        :param message: The message associated to it
        :type new_verdict: str
        :type message: str
        """
        assert new_verdict in self.__values

        new_value = self.__values.index(new_verdict)
170
        if new_value >= self.__value:
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
            self.__value = new_value
            self.__message = message

    @classmethod
    def values(cls):
        """
        List the known verdict values

        :return: The known verdict values
        :rtype: (str)
        """
        return cls.__values

    def get_value(self) -> str:
        """
        Get the value of the verdict

        :return: The value of the verdict as a string
        :rtype: str
        """
        return self.__values[self.__value]

    def get_message(self) -> str:
        """
        Get the last message update of this verdict

        :return: The last message update
        :rtype: str
        """
        return self.__message


    def __str__(self) -> str:
        """
        Get the value of the verdict as string for printing it

        :return: The value of the verdict as a string
        :rtype: str
        """
        return self.__values[self.__value]

class Iut:
Federico Sismondi's avatar
Federico Sismondi committed
213
    def __init__(self, node = None, mode="user_assisted"):
214
215
216
        # TODO get IUT mode from session config!!!
        self.node = node
        if mode:
Federico Sismondi's avatar
Federico Sismondi committed
217
            assert mode in ("user_assisted", "automated")
218
219
        self.mode = mode

Federico Sismondi's avatar
Federico Sismondi committed
220
    def to_dict(self):
221
222
        ret = OrderedDict({'node':self.node})
        ret.update({'node_execution_mode':self.mode})
Federico Sismondi's avatar
Federico Sismondi committed
223
        return ret
224
225
226
227
228
229
230
231
232
233
234


    # TODO implement this
    def configure(self):
        pass

    def __repr__(self):
        if self.mode:
            return "%s(node=%s, mode=%s)" % (self.__class__.__name__, self.node, self.mode if self.mode else "not defined..")
        return "%s(node=%s)" % (self.__class__.__name__, self.node)

235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
class TestConfig:
    def __init__(self, configuration_id, uri, nodes, topology, description):
        self.id = configuration_id
        self.uri = uri
        self.nodes = nodes
        self.topology = topology
        self.description = description

    def to_dict(self,verbose=None):

        d = OrderedDict()
        d['configuration_id'] = self.id

        if verbose:
            d['configuration_ref'] = self.uri
            d['nodes'] = self.nodes
            d['topology'] = self.topology
            d['description'] = self.description

        return dict(d)
255
256
257
258

class Step():

    # TODO check step id uniqueness
259
    def __init__(self, step_id, type, description, node=None):
Federico Sismondi's avatar
Federico Sismondi committed
260
        self.id = step_id
261
262
263
264
        assert type in ("stimuli","check","verify")
        # TODO sth else might need to be defined for conformance testing TBD (inject? drop packet?)..
        self.type = type
        self.description = description
Federico Sismondi's avatar
Federico Sismondi committed
265
266
267

        # stimuli and verify step MUST have a iut field in the YAML file
        if type=='stimuli' or type=='verify':
268
269
            assert node is not None
            self.iut = Iut(node)
Federico Sismondi's avatar
Federico Sismondi committed
270
271
272
273
274
275

            # Check and verify steps need a partial verdict
            self.partial_verdict = Verdict()
        else:
            self.iut = None

276
277
278
        self.state = None


279
    def __repr__(self):
Federico Sismondi's avatar
Federico Sismondi committed
280
281
282
283
284
        node = ''
        mode = ''
        if self.iut is not None:
            node = self.iut.node
            mode =  self.iut.mode
285
        return "%s(step_id=%s, type=%s, description=%s, iut node=%s, iut execution mode =%s)" \
Federico Sismondi's avatar
Federico Sismondi committed
286
               %(self.__class__.__name__, self.id, self.type, self.description, node , mode)
287
288

    def reinit(self):
289

290
291
292
        if self.type in ('check','verify'):
            self.partial_verdict = Verdict()

293
            # when using post_mortem analysis mode all checks are postponed , and analysis is done at the end of the TC
294
            logger.debug('Processing step init, step_id: %s, step_type: %s, ANALYSIS_MODE is %s' % (
295
296
297
298
299
300
301
302
            self.id, self.type, ANALYSIS_MODE))
            if self.type == 'check' and ANALYSIS_MODE == 'post_mortem':
                self.change_state('postponed')
            else:
                self.change_state(None)
        else: #its a stimuli
            self.change_state(None)

Federico Sismondi's avatar
Federico Sismondi committed
303
    def to_dict(self, verbose = None):
304
        step_dict = OrderedDict()
Federico Sismondi's avatar
Federico Sismondi committed
305
306
307
308
        step_dict['step_id'] = self.id
        if verbose:
            step_dict['step_type'] = self.type
            step_dict['step_info'] = self.description
309
            step_dict['step_state'] = self.state
Federico Sismondi's avatar
Federico Sismondi committed
310
311
312
            # it the step is a stimuli then lets add the IUT info(note that checks dont have that info)
            if self.type == 'stimuli' or self.type == 'verify':
                step_dict.update(self.iut.to_dict())
313
314
315
316
317
318
        return step_dict

    def change_state(self,state):
        # postponed state used when checks are postponed for the end of the TC execution
        assert state in (None,'executing','finished','postponed')
        self.state = state
319
        logger.info('Step %s state changed to: %s'%(self.id,self.state))
320
321
322
323
324
325
326

    def set_result(self,result,result_info):
        # Only check and verify steps can have a result
        assert self.type in ('check','verify')
        assert result in Verdict.values()
        self.partial_verdict.update(result,result_info)

327
class TestCase:
328
329
330
331
332
333
    """
    FSM:
    (None,'skipped', 'executing','ready_for_analysis','analyzing','finished')
    None -> Rest state
    Skipped -> Nothing to do here, just skip this TC
    Executing -> Executing intermediary states (executing steps, partial analysis for check steps -if active testing-, executing verify steps)
Federico Sismondi's avatar
Federico Sismondi committed
334
    Analyzing -> Either TAT analysing PCAP -post_mortem type- or processing all partial verdicts from check steps -step_by_step-
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
    Finished -> all steps finished, all checks analyzed, and verdict has been emitted

    ready_for_analysis -> intermediate state between executing and analyzing for waiting for user call to analyse TC
    """

    def __init__(self, testcase_id , uri, objective, configuration, references, pre_conditions, sequence ):
        self.id = testcase_id
        self.state = None
        self.uri = uri
        self.objective = objective
        self.configuration_id = configuration
        self.references = references
        self.pre_conditions = pre_conditions
        self.sequence=[]
        for s in sequence:
350
            # some sanity checks of imported steps
351
352
            try:
                assert "step_id" and "description" and "type" in s
Federico Sismondi's avatar
Federico Sismondi committed
353
                if s['type']=='stimuli':
354
                    assert "node" in s
355
356
                self.sequence.append(Step(**s))
            except:
357
                logger.error("Error found while trying to parse: %s" %str(s))
358
                raise
Federico Sismondi's avatar
Federico Sismondi committed
359
        self._step_it = iter(self.sequence)
360
        self.current_step = None
Federico Sismondi's avatar
Federico Sismondi committed
361
362
363
        self.report = None

        # TODO if ANALYSIS is post mortem change all check step states to postponed at init!
364
365
366
367
368
369
370
371
372

    def reinit(self):
        """
        - prepare test case to be re-executed
        - brings to state zero variables that might have changed during a previous execution
        :return:
        """
        self.state = None
        self.current_step = None
Federico Sismondi's avatar
Federico Sismondi committed
373
        self._step_it = iter(self.sequence)
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390

        for s in self.sequence:
            s.reinit()


    def __repr__(self):
        return "%s(testcase_id=%s, uri=%s, objective=%s, configuration=%s, test_sequence=%s)" % (self.__class__.__name__, self.id ,
         self.uri, self.objective, self.configuration_id,self.sequence)

    def to_dict(self,verbose=None):

        d = OrderedDict()
        d['testcase_id'] = self.id

        if verbose:
            d['testcase_ref'] = self.uri
            d['objective'] = self.objective
Federico Sismondi's avatar
Federico Sismondi committed
391
            d['state'] = self.state
392
393
394
395
396
397
398
399
400
401
402
403

        return d

    def seq_to_dict(self):
        steps = []
        for step in self.sequence:
            steps.append(step.to_dict())
        return steps

    def change_state(self,state):
        assert state in (None,'skipped', 'executing','ready_for_analysis','analyzing','finished')
        self.state = state
404
        logger.info('Testcase %s changed state to %s'%(self.id, state))
405
406
407

    def check_all_steps_finished (self):
        it = iter(self.sequence)
Federico Sismondi's avatar
Federico Sismondi committed
408
        step = next(it)
409
410
411
412
413

        try:
            while True:
                # check that there's no steps in state = None or executing
                if step.state is None or step.state == 'executing':
414
                    logger.debug("[TESTCASE] - there are still steps to execute or under execution")
415
416
417
418
                    return False
                else:
                    step = it.__next__()
        except StopIteration:
419
            logger.debug("[TESTCASE] - all steps in TC are either finished or pending -> ready for analysis)")
420
421
            return True

422
    def generate_final_verdict(self,tat_post_mortem_analysis_report=None):
423
        """
424
        Generates the final verdict of TC and report taking into account the CHECKs and VERIFYs of the testcase
425
426
427
        :return: tuple: (final_verdict, verdict_description, tc_report) ,
                 where final_verdict in ("None", "error", "inconclusive","pass","fail")
                 where description is String type
428
429
                 where tc report is a list :
                                [(step, step_partial_verdict, step_verdict_info, associated_frame_id (can be null))]
430
431
432
433
434
435
        """
        # TODO hanlde frame id associated to the step , used for GUI purposes
        assert self.check_all_steps_finished()

        final_verdict = Verdict()
        tc_report = []
436
        logger.debug("[VERDICT GENERATION] starting the verdict generation")
437
        for step in self.sequence:
Federico Sismondi's avatar
Federico Sismondi committed
438
            # for the verdict we use the info in the checks and verify steps
439
440
            if step.type in ("check","verify"):

441
                logger.debug("[VERDICT GENERATION] Processing step %s" %step.id)
442
443

                if step.state == "postponed":
Federico Sismondi's avatar
Federico Sismondi committed
444
                    tc_report.append((step.id, None, "%s postponed" %step.type.upper(), ""))
445
                elif step.state == "finished":
446
                    tc_report.append((step.id, step.partial_verdict.get_value(), step.partial_verdict.get_message(),""))
447
448
449
                    # update global verdict
                    final_verdict.update(step.partial_verdict.get_value(),step.partial_verdict.get_message())
                else:
Federico Sismondi's avatar
Federico Sismondi committed
450
                    msg="step %s not ready for analysis"%(step.id)
451
                    logger.error("[VERDICT GENERATION] " + msg)
452
453
454
                    raise CoordinatorError(msg)

        # append at the end of the report the analysis done a posteriori (if any)
455
456
457
        if tat_post_mortem_analysis_report and len(tat_post_mortem_analysis_report)!=0:
            logger.warning('WTF PASEE' + str(tat_post_mortem_analysis_report))
            for item in tat_post_mortem_analysis_report:
458
459
                # TODO process the items correctly
                tc_report.append(item)
460
                logger.warning('WTF ' +str(item))
461
                final_verdict.update(item[1], item[2])
462
463
464
465
        else:
            # we cannot emit a final verdict if the report from TAT is empy (no CHECKS-> error verdict)
            logger.warning('[VERDICT GENERATION] Empty list of report passed from TAT')
            final_verdict.update('error', 'Test Analysis Tool returned an empty analysis report')
466

467
        # hack to overwrite the final verdict MESSAGE in case of pass
468
        if final_verdict.get_value() == 'pass':
469
            final_verdict.update('pass','No interoperability error was detected,')
470
471
472
473
            logger.debug("[VERDICT GENERATION] Test case executed correctly, a PASS was issued.")
        else:
            logger.debug("[VERDICT GENERATION] Test case executed correctly, but FAIL was issued as verdict.")
            logger.debug("[VERDICT GENERATION] info: %s' "%final_verdict.get_value() )
474
475
476

        return final_verdict.get_value(), final_verdict.get_message(), tc_report

477
class Coordinator:
478
    """
Federico Sismondi's avatar
Federico Sismondi committed
479
480
481
    see F-Interop API for the coordination events and services
    http://doc.f-interop.eu/#services-provided

482
483
    """

484
    def __init__(self, amqp_connection, ted_file, tc_configs_files):
485

486
487
488
489
490
491
        # first let's import the TC configurations
        imported_configs = import_teds(tc_configs_files)
        self.tc_configs = OrderedDict()
        for tc_config in imported_configs:
            self.tc_configs[tc_config.id]=tc_config

Federico Sismondi's avatar
Federico Sismondi committed
492
        logger.info('Imports: %s TC_CONFIG imported'%len(self.tc_configs))
493
494
495

        # lets import TCs and make sure there's a tc config for each one of them
        imported_teds = import_teds(ted_file)
496
        self.teds=OrderedDict()
497
        for ted in imported_teds:
498
            self.teds[ted.id]=ted
499
500
501
502
            if ted.configuration_id not in self.tc_configs:
                logger.error('Missing configuration:%s for test case:%s '%(ted.configuration_id,ted.id))
            assert ted.configuration_id in self.tc_configs

Federico Sismondi's avatar
Federico Sismondi committed
503
        logger.info('Imports: %s TC execution scripts imported' % len(self.teds))
504

505
506
507
508
        # test cases iterator (over the TC objects, not the keys)
        self._ted_it = cycle(self.teds.values())
        self.current_tc = None

509
        # AMQP queues and callbacks config
510
        self.connection = amqp_connection
511
        self.channel = self.connection.channel()
512

513
514
515
516
517
518
519
520
        self.services_q_name = 'services@%s' %COMPONENT_ID
        self.events_q_name = 'events@%s' %COMPONENT_ID

        result1 = self.channel.queue_declare(queue=self.services_q_name,auto_delete = True)
        result2= self.channel.queue_declare(queue=self.events_q_name,auto_delete = True)

        # self.services_q = result1.method.queue
        # self.events_q = result2.method.queue
521

522
        self.channel.queue_bind(exchange = AMQP_EXCHANGE,
523
                           queue = self.services_q_name,
524
                           routing_key = 'control.testcoordination.service')
525

526
        self.channel.queue_bind(exchange = AMQP_EXCHANGE,
527
                           queue = self.events_q_name,
528
                           routing_key = 'control.testcoordination')
Federico Sismondi's avatar
Federico Sismondi committed
529
530


531
        self.channel.basic_consume(self.handle_service,
532
                              queue = self.services_q_name,
533
                              no_ack = False)
534
535

        self.channel.basic_consume(self.handle_control,
536
                              queue = self.events_q_name,
537
                              no_ack = False)
Federico Sismondi's avatar
Federico Sismondi committed
538

539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
    def check_testsuite_finished (self):
        #cyclic as user may not have started by the first TC
        it = cycle(self.teds.values())

        # we need to check if we already did a cycle (cycle never raises StopIteration)
        iter_counts = len(self.teds)
        tc = next(it)

        while iter_counts >=0 :
            # check that there's no steps in state = None or executing
            if tc.state in (None,'executing','ready_for_analysis','analyzing'):
                logger.debug("[TESTSUITE] - there is still unfinished & non-skipped test cases")
                return False
            else: # TC state is 'skipped' or 'finished'
                tc = next(it)
            iter_counts -= 1
        if iter_counts < 0:
            logger.debug("[TESTSUITE] - Testsuite finished. No more test cases to execute.")
            return True
558

559
560
561
    def run(self):
        logger.info('start consuming..')
        self.channel.start_consuming()
Federico Sismondi's avatar
Federico Sismondi committed
562

563
564
    ### AUXILIARY AMQP MESSAGING FUNCTIONS ###

565
566
567
    def notify_tun_interfaces_start(self):
        """
        Starts tun interface in agent1, agent2 and agent TT
568

569
570
571
        Returns:

        """
572
        # TODO check which queues exist, get those names from somewhere and not just asumme agent1 agent2 agentTT
573
574
575
576
577
578
579
580
        d = {
            "_type": "tun.start",
            "ipv6_host": ":1",
            "ipv6_prefix": "bbbb"
        }

        logger.debug("Let's start the bootstrap the agents")

581
582
        for agent, assigned_ip in (('agent1',':1'),('agent2',':2'),('agent_TT',':3')):
            d["ipv6_host"] = assigned_ip
583
584
585
586
587
588
589
590
591
592
            self.channel.basic_publish(
                    exchange=AMQP_EXCHANGE,
                    routing_key='control.tun.toAgent.%s'%agent,
                    mandatory=True,
                    body=json.dumps(d),
                    properties=pika.BasicProperties(
                        content_type='application/json',
                    )
            )

593
    def notify_current_testcase(self):
594
595
596
        _type = 'testcoordination.testcase.next'
        r_key = 'control.testcoordination'

597
        coordinator_notif = OrderedDict()
598
        coordinator_notif.update({'_type':_type })
599
600
601
602
603
604

        if self.current_tc:
            coordinator_notif.update({'message': 'Next test case to be executed is %s' % self.current_tc.id})
            coordinator_notif.update(self.current_tc.to_dict(verbose = True))
        else:
            coordinator_notif.update({'message': 'No test case selected, or no more available'})
Federico Sismondi's avatar
Federico Sismondi committed
605

606
        self.channel.basic_publish(
607
608
609
610
611
612
                body=json.dumps(coordinator_notif, ensure_ascii=False),
                routing_key=r_key,
                exchange=AMQP_EXCHANGE,
                properties=pika.BasicProperties(
                        content_type='application/json',
                )
613
        )
Federico Sismondi's avatar
Federico Sismondi committed
614

615
    def notify_current_step_execute(self):
616
617
618
        _type = 'testcoordination.step.execute'
        r_key = 'control.testcoordination'

619
        coordinator_notif = OrderedDict()
620
        coordinator_notif.update({'_type':_type })
621
622
        coordinator_notif.update({'message': 'Next test step to be executed is %s' % self.current_tc.current_step.id})
        coordinator_notif.update(self.current_tc.current_step.to_dict(verbose = True))
623
        #coordinator_notif={**coordinator_notif,**self.current_tc.current_step.to_dict(verbose=True)}
624
625

        self.channel.basic_publish(
626
627
628
629
630
            body=json.dumps(coordinator_notif, ensure_ascii=False),
            routing_key=r_key,
            exchange=AMQP_EXCHANGE,
            properties=pika.BasicProperties(
                content_type='application/json',
Federico Sismondi's avatar
Federico Sismondi committed
631
            )
632
        )
Federico Sismondi's avatar
Federico Sismondi committed
633

634
    def notify_testcase_finished(self):
635
636
637
638
639
640
641
        _type = 'testcoordination.testcase.finished'
        r_key = 'control.testcoordination'
        # testcoordination notification
        coordinator_notif = OrderedDict()
        coordinator_notif.update({'_type': _type})
        coordinator_notif.update({'message': 'Testcase %s finished' % self.current_tc.id})
        coordinator_notif.update(self.current_tc.to_dict(verbose=True))
642

643
644
645
646
647
648
649

        self.channel.basic_publish(
            body=json.dumps(coordinator_notif, ensure_ascii=False),
            routing_key=r_key,
            exchange=AMQP_EXCHANGE,
            properties=pika.BasicProperties(
                content_type='application/json',
650
            )
651
        )
652
653

    def notify_testcase_verdict(self):
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
        _type = 'testcoordination.testcase.verdict'
        r_key = 'control.testcoordination'

        coordinator_notif = OrderedDict()
        coordinator_notif.update({'_type':_type })
        # lets add the report info of the TC into the answer
        coordinator_notif.update(self.current_tc.report)
        # lets add basic info about the TC
        coordinator_notif.update(self.current_tc.to_dict(verbose=True))

        self.channel.basic_publish(
            body=json.dumps(coordinator_notif, ensure_ascii=False),
            routing_key=r_key,
            exchange=AMQP_EXCHANGE,
            properties=pika.BasicProperties(
                content_type='application/json',
670
            )
671
        )
672

Federico Sismondi's avatar
Federico Sismondi committed
673
674
675
676
677
678
679
680
        # Overwrite final verdict file with final details
        json_file = os.path.join(
                RESULTS_DIR,
                self.current_tc.id + '_verdict.json'
        )
        with open(json_file, 'w') as f:
            json.dump(json_file, f)

681
    def notify_coordination_error(self, message, error_code):
682
683
684
        _type = 'testcoordination.error'
        r_key =  'control.testcoordination.error'

685
686
687
        # testcoordination.error notification
        # TODO error codes?
        coordinator_notif = OrderedDict()
688
        coordinator_notif.update({'_type':_type })
689
        coordinator_notif.update({'message': message,})
690
691
        coordinator_notif.update({'error_code' : error_code})
        coordinator_notif.update({'testsuite_status': self.states_summary() })
692
693

        self.channel.basic_publish(
694
695
696
            body=json.dumps(coordinator_notif, ensure_ascii=False),
            routing_key=r_key,
            exchange=AMQP_EXCHANGE,
697
698
699
700
            properties=pika.BasicProperties(
                content_type='application/json',
            )
        )
Federico Sismondi's avatar
Federico Sismondi committed
701

702
    def notify_testsuite_finished(self):
703
704
        _type = 'testcoordination.testsuite.finished'
        r_key =  'control.testcoordination'
705

706
        coordinator_notif = OrderedDict()
707
        coordinator_notif.update({'_type':_type })
708
        coordinator_notif.update(self.testsuite_report())
709
710
711

        self.channel.basic_publish(
            body=json.dumps(coordinator_notif, ensure_ascii=False),
712
            routing_key=r_key,
713
714
715
716
717
718
            exchange=AMQP_EXCHANGE,
            properties=pika.BasicProperties(
                content_type='application/json',
            )
        )

719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
    def notify_current_configuration(self,config_id,node,message):
        _type = 'testcoordination.testcase.configuration'
        r_key =  'control.testcoordination'

        coordinator_notif = OrderedDict()
        coordinator_notif.update({'_type':_type })
        coordinator_notif.update({'configuration_id': config_id})
        coordinator_notif.update({'node': node})
        coordinator_notif.update({'message': message})

        self.channel.basic_publish(
            body=json.dumps(coordinator_notif, ensure_ascii=False),
            routing_key=r_key,
            exchange=AMQP_EXCHANGE,
            properties=pika.BasicProperties(
                content_type='application/json',
            )
        )

    def call_service_sniffer_start(self,capture_id,filter_if,filter_proto,link_id):
739
        _type = 'sniffing.start'
740
        r_key = 'control.sniffing.service'
741
        body = OrderedDict()
742
        body.update({'_type': _type})
743
744
745
        body.update({'capture_id': capture_id})
        body.update({'filter_if': filter_if})
        body.update({'filter_proto': filter_proto})
746
        body.update({'link_id':link_id})
747
748
749
750

        try:
            amqp_rpc_client = AmqpSynchronousCallClient(component_id=COMPONENT_ID)
            ret = ''
751
            ret = amqp_rpc_client.call(routing_key=r_key, body=body)
752
            logger.info("Received answer from sniffer: %s, answer: %s" % (_type,json.dumps(ret)))
753
            return ret['ok']
754
755
        except AmqpMessageError as e:
            logger.error("Sniffer API doesn't respond on %s, maybe it isn't up yet \n Exception info%s"
756
757
758
                           % (str(ret), str(e)))

    def call_service_sniffer_stop(self):
759
760
        _type = 'sniffing.stop'
        r_key = 'control.sniffing.service'
761
        body = OrderedDict()
762
        body.update({'_type': _type})
763
764
765
766

        try:
            amqp_rpc_client = AmqpSynchronousCallClient(component_id=COMPONENT_ID)
            ret = ''
767
            ret = amqp_rpc_client.call(routing_key=r_key, body=body)
768
            logger.info("Received answer from sniffer: %s, answer: %s" % (_type, json.dumps(ret)))
769
            return ret['ok']
770
771
        except AmqpMessageError as e:
            logger.error("Sniffer API doesn't respond on %s, maybe it isn't up yet \n Exception info%s"
772
                                   % (str(ret), str(e)))
773

774
775
776
777
778
779
780
781
782
783
784
    def call_service_sniffer_get_capture(self, capture_id):
        _type = 'sniffing.getcapture'
        r_key = 'control.sniffing.service'
        body = OrderedDict()
        body.update({'_type': _type})
        body.update({'capture_id': capture_id})

        try:
            amqp_rpc_client = AmqpSynchronousCallClient(component_id=COMPONENT_ID)
            ret = ''
            ret = amqp_rpc_client.call(routing_key=r_key, body=body)
785
            logger.info("Received answer from sniffer: %s, answer: %s" % (_type,json.dumps(ret)))
786
787
            return ret

788
789
        except AmqpMessageError as e:
            logger.error("Sniffer API doesn't respond on %s, maybe it isn't up yet \n Exception info%s"
790
791
                           % (str(ret), str(e)))

792
    def call_service_testcase_analysis(self, testcase_id, testcase_ref, file_enc, filename, value):
793
794
795
796
797
798
        _type = 'analysis.testcase.analyze'
        r_key = 'control.analysis.service'
        body = OrderedDict()
        body.update({'_type': _type})
        body.update({'testcase_id': testcase_id})
        body.update({'testcase_ref': testcase_ref})
799
        body.update({'file_enc': file_enc})
800
801
802
803
804
805
806
        body.update({'filename': filename})
        body.update({'value': value})

        try:
            amqp_rpc_client = AmqpSynchronousCallClient(component_id=COMPONENT_ID)
            ret = ''
            ret = amqp_rpc_client.call(routing_key=r_key, body=body)
807
            logger.info("Received answer from TAT: %s, answer: %s" % (_type, json.dumps(ret)))
808
809
            return ret

810
811
        except AmqpMessageError as e:
            logger.error("TAT API doesn't respond on %s, maybe it isn't up yet \n Exception info%s"
812
813
814
                                       % (str(ret), str(e)))


815
816
    ### API ENDPOINTS ###

817
    def handle_service(self, ch, method, properties, body):
Federico Sismondi's avatar
Federico Sismondi committed
818

819
        ch.basic_ack(delivery_tag=method.delivery_tag)
Federico Sismondi's avatar
Federico Sismondi committed
820
821

        # horribly long composition of methods,but  needed for keeping the order of fields of the received json object
822
        logger.debug('[services queue callback] service request received on the queue: %s || %s'
Federico Sismondi's avatar
Federico Sismondi committed
823
824
                     % (
                     method.routing_key, json.dumps(json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict))))
825
826

        # TODO check malformed messages first
827
        event = json.loads(body.decode('utf-8'),object_pairs_hook=OrderedDict)
828
829
        event_type = event['_type']

830
        # prepare response
Federico Sismondi's avatar
Federico Sismondi committed
831
        response = OrderedDict()
832

Federico Sismondi's avatar
Federico Sismondi committed
833
834
        if event_type == "testcoordination.testsuite.gettestcases":
            # this is a request so I answer directly on the message
835
            testcases = self.get_testcases_basic(verbose=True)
836
837
            response.update({'_type': event_type})
            response.update({'ok': True})
Federico Sismondi's avatar
Federico Sismondi committed
838
            response.update(testcases)
839
            amqp_reply(self.channel, properties,response)
840
841
842
843

        elif event_type == "testcoordination.testsuite.getstatus":
            status = self.states_summary()
            # this is a request so I answer directly on the message
844
845
            response.update({'_type': event_type})
            response.update({'ok': True})
846
            response.update({'status': status})
847
            amqp_reply(self.channel, properties, response)
848
849
850
851
852
853
854
855
856
857
858

        else:
            logger.warning('Cannot dispatch event: \nrouting_key %s \nevent_type %s' % (method.routing_key,event_type))
            return


        logger.info('Service request handled, response sent through the bus: %s'%(json.dumps(response)))


    def handle_control(self, ch, method, properties, body):

859
        ch.basic_ack(delivery_tag=method.delivery_tag)
Federico Sismondi's avatar
Federico Sismondi committed
860
        # horribly long composition of methods,but  needed for keeping the order of fields of the received json object
861
        logger.debug('[event queue callback] service request received on the queue: %s || %s'
Federico Sismondi's avatar
Federico Sismondi committed
862
863
                     % (
                     method.routing_key, json.dumps(json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict))))
864
865

        # TODO check malformed messages first
866
        event = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
867
868
869
870
        event_type = event['_type']

        #prepare response
        response = OrderedDict()
871

872
        if event_type == "testcoordination.testcase.skip":
Federico Sismondi's avatar
Federico Sismondi committed
873
874
875
876
877
878
879
880
881
882
883
884

            # if no testcase_id was sent then I skip  the current one
            try:
                testcase_skip = event['testcase_id']
            except KeyError:
                testcase_skip = self.current_tc.id

            # change tc state to 'skipped'
            testcase_t = self.get_testcase(testcase_skip)
            testcase_t.change_state("skipped")

            # if skipped tc is current test case then next tc
885
886
            if self.current_tc is not None and (testcase_skip == self.current_tc.id):
                self.next_testcase()
887
                self.notify_current_testcase()
Federico Sismondi's avatar
Federico Sismondi committed
888

889
890
891
            if self.current_tc is None and self.check_testsuite_finished():
                self.finish_testsuite()
                self.notify_testsuite_finished()
Federico Sismondi's avatar
Federico Sismondi committed
892
893

        elif event_type == "testcoordination.testsuite.start":
894
895
            # TODO in here maybe launch the enxt configuration of IUT
            # TODO reboot automated IUTs
896
897
898

            # lets open tun interfaces
            # TODO do it before the testsuite start signal, after opened send TESTGIN TOOL ready signal (for GUI)
899
900
            self.notify_tun_interfaces_start()
            time.sleep(2)
901
902

            self.start_test_suite()
Federico Sismondi's avatar
Federico Sismondi committed
903
904

            # send general notif
905
            self.notify_current_testcase()
906

Federico Sismondi's avatar
Federico Sismondi committed
907
908
909
910
911
        elif event_type == "testcoordination.testcase.select":

            # assert and get testcase_id from message
            try:
                # jump to selected tc
912
                self.select_testcase(event['testcase_id'])
913

Federico Sismondi's avatar
Federico Sismondi committed
914
915
            except KeyError:
                error_msg = "Incorrect or empty testcase_id"
916

Federico Sismondi's avatar
Federico Sismondi committed
917
                # send general notif
918
919
                self.notify_coordination_error(message=error_msg,error_code=None)

Federico Sismondi's avatar
Federico Sismondi committed
920
921
922
            except CoordinatorError as e:
                error_msg = e.message
                # send general notif
923
                self.notify_coordination_error(message=error_msg, error_code=None)
Federico Sismondi's avatar
Federico Sismondi committed
924
925

            # send general notif
926
            self.notify_current_testcase()
Federico Sismondi's avatar
Federico Sismondi committed
927
928
929
930
931
932


        elif event_type == "testcoordination.testcase.start":

            if self.current_tc is None:
                error_msg = "No testcase selected"
933

Federico Sismondi's avatar
Federico Sismondi committed
934
                # notify all
935
                self.notify_coordination_error(message =error_msg, error_code=None)
Federico Sismondi's avatar
Federico Sismondi committed
936
                return
937
938

            # TODO handle configuration phase before execution!
Federico Sismondi's avatar
Federico Sismondi committed
939

940
941
942
            if self.check_testsuite_finished():
                self.notify_testsuite_finished()
            else:
943
                self.start_testcase()
Federico Sismondi's avatar
Federico Sismondi committed
944

945
946
                # send general notif
                self.notify_current_step_execute()
Federico Sismondi's avatar
Federico Sismondi committed
947
948


949
        elif event_type == "testcoordination.step.stimuli.executed":
Federico Sismondi's avatar
Federico Sismondi committed
950

951
952
953
954
955
956
957
958
959
960
961
962
            if self.current_tc is None:
                error_msg = "No testcase selected"
                # notify all
                self.notify_coordination_error(message =error_msg, error_code=None)
                return

            if self.current_tc.state is None:
                error_msg = "Test case not yet started"
                # notify all
                self.notify_coordination_error(message =error_msg, error_code=None)
                return

963
964
965
966
967
968
969
            # process event only if I current step is a STIMULI
            if self.current_tc.current_step.type != 'stimuli':
                message = 'Coordination was expecting message for step type: %s , but got type: STIMULI' \
                          % (self.current_tc.current_step.type.upper())
                logger.error(message)
                self.notify_coordination_error(message, None)
                return
Federico Sismondi's avatar
Federico Sismondi committed
970

971
            self.handle_stimuli_step_executed()
972

973
            # go to next step
Federico Sismondi's avatar
Federico Sismondi committed
974
            if self.next_step():
975
                self.notify_current_step_execute()
976
            else:
977
                # im at the end of the TC:
978
                self.finish_testcase()
979
980
                self.notify_testcase_finished()
                self.notify_testcase_verdict()
981
982
983
984
985
986
987
988
989
990

                # there is at least a TC left
                if not self.check_testsuite_finished():
                    self.next_testcase()
                    self.notify_current_testcase()

                # im at the end of the TC and also of the TS
                else:
                    self.finish_testsuite()
                    self.notify_testsuite_finished()
Federico Sismondi's avatar
Federico Sismondi committed
991
992
993

        elif event_type == "testcoordination.step.verify.response":

994
995
996
997
998
999
1000
1001
            # process event only if I current step is a verify
            if self.current_tc.current_step.type != 'verify':
                message = 'Coordination was expecting message for step type: %s , but got type: VERIFY' \
                          % (self.current_tc.current_step.type.upper())
                logger.error(message)
                self.notify_coordination_error(message, None)
                return

Federico Sismondi's avatar
Federico Sismondi committed
1002
1003
1004
1005
1006
            # assert and get testcase_id from message
            try:
                verify_response = event['verify_response']
            except KeyError:
                error_msg = "Verify_response field needs to be provided"
1007

Federico Sismondi's avatar
Federico Sismondi committed
1008
                # send general notif
1009
                self.notify_coordination_error(message=error_msg, error_code=None)
Federico Sismondi's avatar
Federico Sismondi committed
1010
1011


1012
            self.handle_verify_step_response(verify_response)
Federico Sismondi's avatar
Federico Sismondi committed
1013

1014
            # go to next step
Federico Sismondi's avatar
Federico Sismondi committed
1015
            if self.next_step():
1016
                self.notify_current_step_execute()
1017
            else:
1018
                # im at the end of the TC:
1019
                self.finish_testcase()
1020
1021
                self.notify_testcase_finished()
                self.notify_testcase_verdict()
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031

                # there is at least a TC left
                if not self.check_testsuite_finished():
                    self.next_testcase()
                    self.notify_current_testcase()

                # im at the end of the TC and also of the TS
                else:
                    self.finish_testsuite()
                    self.notify_testsuite_finished()
1032
1033


Federico Sismondi's avatar
Federico Sismondi committed
1034
1035
        elif event_type == "testcoordination.step.check.response":
            # This is call is just used when we have step_by_step analysis mode
1036
1037
1038
1039
1040
1041
1042
1043
1044
            #assert ANALYSIS_MODE == 'step_by_step'

            # process event only if I current step is a check
            if self.current_tc.current_step.type != 'check':
                message = 'Coordination was expecting message for step type: %s , but got type: CHECK' \
                          % (self.current_tc.current_step.type.upper())
                logger.error(message)
                self.notify_coordination_error(message, None)
                return
1045

Federico Sismondi's avatar
Federico Sismondi committed
1046
1047
1048
1049
            try:
                verdict = event['partial_verdict']
                description = event['description']
            except KeyError:
1050
                self.notify_coordination_error(message='Malformed CHECK response', error_code=None)
1051

1052
            self.handle_check_step_response(verdict,description)
1053

1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
            # # go to next step
            # if self.next_step():
            #     self.notify_current_step_execute()
            # elif not self.check_testsuite_finished():
            #     # im at the end of the TC:
            #     self.finish_testcase()
            #     self.notify_testcase_finished()
            #     self.notify_testcase_verdict()
            # else:
            #     self.finish_testsuite()
            #     self.notify_testsuite_finished()

1066
            # go to next step
Federico Sismondi's avatar
Federico Sismondi committed
1067
            if self.next_step():
Federico Sismondi's avatar