Commit 34ec3f70 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

plenty of modifs

parent bc6bab5a
# -*- coding: utf-8 -*-
# !/usr/bin/env python3
from coap_testing_tool.utils.event_bus_messages import *
from tests.pcap_base64_examples import *
from urllib.parse import urlparse
import logging
import unittest
import pika
import sys
import time
import os
import threading
import datetime
COMPONENT_ID = 'fake_session'
MESSAGES_WAIT_INTERVAL = 1 # in seconds
AMQP_EXCHANGE = ''
AMQP_URL = ''
message_count = 0
stop_generator_signal = False
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
logger = logging.getLogger(__name__)
logging.getLogger('pika').setLevel(logging.INFO)
# queue which tracks all non answered services requests
services_mid_backlog = []
services_events_tracelog = []
"""
PRE-CONDITIONS:
- Export AMQP_URL in the running environment
- Have CoAP testing tool running & listening to the bus
"""
# for a typical user input, for a user (coap client) vs automated-iut ( coap server) session type:
user_sequence = [
MsgTestSuiteGetStatus(),
MsgInteropSessionConfiguration(), # from TC1 to TC3
MsgTestSuiteStart(),
MsgTestSuiteGetStatus(),
MsgTestCaseSkip(testcase_id='TD_COAP_CORE_02_v01'),
MsgTestSuiteGetStatus(),
MsgTestCaseSkip(testcase_id='TD_COAP_CORE_03_v01'),
MsgTestSuiteGetStatus(),
MsgTestCaseStart(), # execute TC1 ( w/ no IUT in the bus )
MsgTestSuiteGetStatus(),
MsgStepStimuliExecuted(),
MsgTestSuiteGetStatus(),
MsgStepVerifyExecuted(),
MsgTestSuiteGetStatus(),
MsgStepVerifyExecuted(
verify_response=False,
description='User indicates that IUT didnt behave as expected '),
MsgTestSuiteGetStatus(), # at this point we should see a TC verdict
MsgTestCaseRestart(),
MsgTestSuiteGetStatus(),
MsgTestSuiteAbort(),
MsgTestSuiteGetStatus(),
]
service_api_calls = [
# TAT calls
MsgTestSuiteGetStatus(),
MsgTestSuiteGetTestCases(),
MsgInteropTestCaseAnalyze(
testcase_id="TD_COAP_CORE_01",
testcase_ref="http://f-interop.paris.inria.fr/tests/TD_COAP_CORE_01_v01",
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
value=PCAP_TC_COAP_01_base64,
),
# Sniffer calls (order matters!)
MsgSniffingStart(
capture_id='TD_COAP_CORE_01',
filter_if='tun0',
filter_proto='udp port 5683'
),
MsgSniffingStop(),
MsgSniffingGetCapture(tescase_id='TD_COAP_CORE_01'),
MsgSniffingGetCaptureLast(),
# Dissector calls
MsgDissectionDissectCapture(),
MsgDissectionDissectCapture(
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
protocol_selection='coap',
value=PCAP_TC_COAP_01_base64,
),
# complete dissection of pcap
MsgDissectionDissectCapture(
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
value=PCAP_TC_COAP_01_base64,
),
# complete dissection of pcap with extra TCP traffic
MsgDissectionDissectCapture(
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
value=PCAP_TC_COAP_01_mingled_with_tcp_traffic_base64,
),
# same as dis4 but filtering coap messages
MsgDissectionDissectCapture(
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
protocol_selection='coap',
value=PCAP_TC_COAP_01_mingled_with_tcp_traffic_base64,
),
# this should generate an error
MsgTestCaseSkip(testcase_id='TD_COAP_CORE_04_v01'),
# pcap sniffed using AMQP based packet sniffer
MsgDissectionDissectCapture(
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
value=PCAP_COAP_GET_OVER_TUN_INTERFACE_base64,
)
]
class ApiTests(unittest.TestCase):
def setUp(self):
global stop_generator_signal
stop_generator_signal = False
import_env_vars()
self.conn = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
self.channel = self.conn.channel()
# MESSAGE VALIDATOR BOUND TO THE CONTROL EVENTS QUEUE
control_queue_name = 'control_queue@%s' % COMPONENT_ID
# lets' first clean up the queue
self.channel.queue_delete(queue=control_queue_name)
self.channel.queue_declare(queue=control_queue_name, auto_delete=True)
self.channel.queue_bind(exchange=AMQP_EXCHANGE, queue=control_queue_name, routing_key='control.#')
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(validate_message, queue=control_queue_name)
# ERROR MSG VERIFIER BOUND TO ERRORS LOGS AND OTHER ERROR EVENTS QUEUE
errors_queue_name = 'bus_errors_queue@%s' % COMPONENT_ID
# lets' first clean up the queue
self.channel.queue_delete(queue=errors_queue_name)
self.channel.queue_declare(queue=errors_queue_name, auto_delete=True)
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=errors_queue_name,
routing_key='log.error.*')
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=errors_queue_name,
routing_key='control.session.error')
# for getting the terminate signal
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=errors_queue_name,
routing_key=MsgTestingToolTerminate.routing_key)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(check_for_bus_error, queue=errors_queue_name)
def tearDown(self):
self.conn.close()
def test_user_emulation(self):
"""
This basically checks that the testing tool doesnt crash while user is pushing message inputs into to the bus.
We check for:
- log errors in the bus
- malformed messages in the bus
"""
# prepare the message generator
messages = [] # list of messages to send
messages += user_sequence
messages.append(MsgTestingToolTerminate()) # message that triggers stop_generator_signal
thread_msg_gen = MessageGenerator(AMQP_URL, AMQP_EXCHANGE, messages)
logger.debug("Starting Message Generator thread ")
thread_msg_gen.start()
try:
self.channel.start_consuming()
except Exception as e:
thread_msg_gen.stop()
assert False, str(e)
def test_testing_tool_internal_services(self):
"""
This checks for:
- log errors in the bus
- malformed messages in the bus
- request reply correlation (there's one response per each request)
"""
global COMPONENT_ID
# some non request/response messages types exchanged during a session
events_to_ignore = [
'testingtool.ready',
'testingtool.compoent.ready',
'agent.configured'
]
# auxiliary function
def check_for_correlated_request_reply(ch, method, props, body):
global services_mid_backlog
global services_events_tracelog
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
msg_type = body_dict['_type']
logger.info(
'[%s] Checking correlated request/response for message %s'
% (sys._getframe().f_code.co_name, props.message_id))
ch.basic_ack(delivery_tag=method.delivery_tag)
if msg_type == 'testingtool.terminate':
ch.stop_consuming()
return
if msg_type in events_to_ignore:
# forget about these.. we are checking services and services reply only
return
if '.service.reply' in method.routing_key:
if props.correlation_id in services_mid_backlog:
services_mid_backlog.remove(props.correlation_id)
services_events_tracelog.append(msg_type)
else:
assert False, 'got a reply but theres no request in the backlog'
elif '.service' in method.routing_key:
services_mid_backlog.append(props.correlation_id)
services_events_tracelog.append(msg_type)
else:
assert False, 'error! we shouldnt be here!'
logging.info("[%s] current backlog: %s . history: %s"
% (
sys._getframe().f_code.co_name,
services_mid_backlog,
services_events_tracelog
)
)
# CORRELATION VALIDATOR BOUND TO SERVICES & REPLIES QUEUE
services_queue_name = 'services_queue@%s' % COMPONENT_ID
self.channel.queue_delete(queue=services_queue_name)
self.channel.queue_declare(queue=services_queue_name, auto_delete=True)
# get all services messages and their replies
self.channel.queue_bind(exchange=AMQP_EXCHANGE, queue=services_queue_name, routing_key='#.service')
self.channel.queue_bind(exchange=AMQP_EXCHANGE, queue=services_queue_name, routing_key='#.service.reply')
self.channel.basic_qos(prefetch_count=1)
# for getting the terminate signal
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=services_queue_name,
routing_key=MsgTestingToolTerminate.routing_key)
# for checking that for every request we get a reply
self.channel.basic_consume(check_for_correlated_request_reply, queue=services_queue_name)
# prepare the message generator
messages = [] # list of messages to send
messages += service_api_calls
messages.append(MsgTestingToolTerminate()) # message that triggers stop_generator_signal
thread_msg_gen = MessageGenerator(AMQP_URL, AMQP_EXCHANGE, messages)
logger.debug("[%s] Starting Message Generator thread " % sys._getframe().f_code.co_name)
try:
thread_msg_gen.start()
self.channel.start_consuming()
if len(services_mid_backlog) > 0:
assert False, 'A least one of the services request was not answered. backlog: %s. History: %s' \
% (
services_mid_backlog,
services_events_tracelog
)
except Exception as e:
thread_msg_gen.stop()
assert False, str(e)
# # # # # # AUXILIARY METHODS # # # # # # #
def import_env_vars():
global AMQP_EXCHANGE
global AMQP_URL
try:
AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
except KeyError as e:
AMQP_EXCHANGE = "amq.topic"
try:
AMQP_URL = str(os.environ['AMQP_URL'])
p = urlparse(AMQP_URL)
AMQP_USER = p.username
AMQP_SERVER = p.hostname
logger.info(
"Env variables imported for AMQP connection, User: {0} @ Server: {1} ".format(AMQP_USER, AMQP_SERVER)
)
except KeyError as e:
logger.error('Cannot retrieve environment variables for AMQP connection. Loading defaults..')
# load default values
AMQP_URL = "amqp://{0}:{1}@{2}/{3}".format("guest", "guest", "localhost", "/")
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
connection.close()
def publish_message(channel, message):
properties = pika.BasicProperties(**message.get_properties())
channel.basic_publish(
exchange=AMQP_EXCHANGE,
routing_key=message.routing_key,
properties=properties,
body=message.to_json(),
)
def stop_generator():
global stop_generator_signal
logger.debug("The test is finished!")
stop_generator_signal = True
def check_for_bus_error(ch, method, props, body):
logger.info('[%s] Checking if is error, message %s' % (sys._getframe().f_code.co_name, props.message_id))
msg = None
try:
msg = Message.from_json(body)
if isinstance(m, MsgTestingToolTerminate):
ch.stop_consuming()
return
except:
pass
list_of_audited_components = [
'tat',
'test_coordinator',
'packer_router',
'sniffer',
'dissector'
'session',
# TODO add agent_TT messages
]
r_key = method.routing_key
logger.info('[%s] Auditing: %s' % (sys._getframe().f_code.co_name, r_key))
for c in list_of_audited_components:
if c in r_key:
err = 'audited component %s pushed an error into the bus. messsage: %s' % (c, body)
logger.error(err)
raise Exception(err)
def validate_message(ch, method, props, body):
global message_count
tab = '\t'
# obj hook so json.loads respects the order of the fields sent -just for visualization purposeses-
req_body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag)
message_count += 1
logger.info('[%s] Checking valid format for message %s' % (sys._getframe().f_code.co_name, props.message_id))
print('\n' + tab + '* * * * * * MESSAGE SNIFFED by INSPECTOR (%s) * * * * * * *' % message_count)
print(tab + "TIME: %s" % datetime.datetime.time(datetime.datetime.now()))
print(tab + "ROUTING_KEY: %s" % method.routing_key)
print(tab + "MESSAGE ID: %s" % props.message_id)
if hasattr(props, 'correlation_id'):
print(tab + "CORRELATION ID: %s" % props.correlation_id)
print(tab + 'EVENT %s' % (req_body_dict['_type']))
print(tab + '* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * \n')
if props.content_type != "application/json":
print(tab + '* * * * * * API VALIDATION ERROR * * * * * * * ')
print(tab + "props.content_type : " + str(props.content_type))
print(tab + "application/json was expected")
print(tab + '* * * * * * * * * * * * * * * * * * * * * * * * * \n')
raise Exception
if '_type' not in req_body_dict.keys():
print(tab + '* * * * * * API VALIDATION ERROR * * * * * * * ')
print(tab + "no < _type > field found")
print(tab + '* * * * * * * * * * * * * * * * * * * * * * * * * \n')
raise Exception
# lets check messages against the messaging library
list_of_messages_to_check = list(message_types_dict.keys())
if req_body_dict['_type'] in list_of_messages_to_check:
m = Message.from_json(body)
try:
if isinstance(m, MsgTestingToolTerminate):
ch.stop_consuming()
stop_generator()
else:
logger.debug(repr(m))
except NonCompliantMessageFormatError as e:
print(tab + '* * * * * * API VALIDATION ERROR * * * * * * * ')
print(tab + "AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
print(tab + '* * * * * * * * * * * * * * * * * * * * * * * * * \n')
raise NonCompliantMessageFormatError("AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
class MessageGenerator(threading.Thread):
keepOnRunning = True
def __init__(self, amqp_url, amqp_exchange, messages_list):
threading.Thread.__init__(self)
self.messages = messages_list
self.connection = pika.BlockingConnection(pika.URLParameters(amqp_url))
self.channel = self.connection.channel()
logger.info("[%s] AMQP connection established" % (self.__class__.__name__))
def run(self):
global MESSAGES_WAIT_INTERVAL
logger.info("[%s] lets start 'blindly' generating the messages which take part on a coap session "
"(for a coap client)" % (self.__class__.__name__))
try:
while self.keepOnRunning:
time.sleep(MESSAGES_WAIT_INTERVAL)
m = self.messages.pop(0)
publish_message(self.channel, m)
logger.info("[%s] Publishing in the bus: %s" % (self.__class__.__name__, repr(m)))
except IndexError:
# list finished, lets wait so all messages are sent and processed
time.sleep(5)
pass
except pika.exceptions.ChannelClosed:
pass
def stop(self):
self.keepOnRunning = False
self.connection.close()
......@@ -22,8 +22,6 @@ import logging
import threading
import traceback
from tabulate import tabulate
from collections import OrderedDict
from click.testing import CliRunner
from messages import *
import click
......@@ -31,6 +29,8 @@ from click_repl import register_repl, ExitReplException
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.WARNING)
COMPONENT_ID = 'cli'
# click colors:: black (might gray) , red, green, yellow (might be an orange), blue, magenta, cyan, white (might gray)
COLOR_DEFAULT = 'white'
COLOR_SESSION_LOG = 'white'
......@@ -49,7 +49,6 @@ DEFAULT_TOPIC_SUBSCRIPTIONS = [
]
MESSAGE_TYPES_NOT_ECHOED = [
MsgPacketSniffedRaw,
MsgPacketInjectRaw,
]
......@@ -79,6 +78,7 @@ profile_choices = {
'node': ['coap_client', 'coap_server', 'both']
}
# e.g. MsgTestingToolConfigured is normally followed by a test suite start (ts_start)
UI_suggested_actions = {
MsgTestingToolConfigured: 'ts_start',
MsgTestCaseReady: 'tc_start',
......@@ -87,6 +87,8 @@ UI_suggested_actions = {
}
def _init_action_suggested():
state['suggested_cmd'] = 'ts_start'
class AmqpListener(threading.Thread):
......@@ -97,7 +99,6 @@ class AmqpListener(threading.Thread):
threading.Thread.__init__(self)
if callback is None:
self.message_dipatcher = print
else:
......@@ -172,6 +173,7 @@ class AmqpListener(threading.Thread):
try:
m = Message.from_json(body)
m.update_properties(**props_dict)
m.routing_key = method.routing_key
self.message_dipatcher(m)
except NonCompliantMessageFormatError as e:
......@@ -197,6 +199,72 @@ class AmqpListener(threading.Thread):
self.message_dipatcher('Bye byes!')
def amqp_request(channel, request_message, component_id):
"""
NOTE: channel must be a pika channel
"""
amqp_exchange = session_profile['amqp_exchange']
# check first that sender didnt forget about reply to and corr id
assert request_message.reply_to
assert request_message.correlation_id
if amqp_exchange is None:
amqp_exchange = 'amq.topic'
response = None
reply_queue_name = 'amqp_rpc_%s@%s' % (str(uuid.uuid4())[:8], component_id)
result = channel.queue_declare(queue=reply_queue_name, auto_delete=True)
callback_queue = result.method.queue
# bind and listen to reply_to topic
channel.queue_bind(
exchange=amqp_exchange,
queue=callback_queue,
routing_key=request_message.reply_to
)
channel.basic_publish(
exchange=amqp_exchange,
routing_key=request_message.routing_key,
properties=pika.BasicProperties(**request_message.get_properties()),
body=request_message.to_json(),
)
time.sleep(0.2)
retries_left = 5
while retries_left > 0:
time.sleep(0.5)
method, props, body = channel.basic_get(reply_queue_name)
if method:
channel.basic_ack(method.delivery_tag)
if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
break
retries_left -= 1
if retries_left > 0:
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
response = MsgReply(request_message, **body_dict)
else:
raise Exception(
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
request_message._type
)
)
# clean up
channel.queue_delete(reply_queue_name)
return response
def publish_message(message):
if not _connection_ok():
_echo_dispatcher('No connection established yet')
......@@ -251,6 +319,48 @@ def clear():
click.clear()
def _handle_testcase_select():
# requires testing tool to implement GetTestCases feature see MsgTestSuiteGetTestCases
_handle_get_testcase_list()
ls = state['tc_list'].copy()
i = 1
for tc_item in ls:
_echo_dispatcher("%s -> %s" % (i, tc_item['testcase_id']))
i += 1
resp = click.prompt('Select number of test case to execute from list', type=int)
try:
_echo_input("entered %s, corresponding to %s" % (resp, ls[resp - 1]['testcase_id']))
except Exception as e:
_echo_error("wrong input \n %s" % e)
return
msg = MsgTestCaseSelect(
testcase_id=ls[resp - 1]['testcase_id']
)
publish_message(msg)
def _handle_get_testcase_list():
# requires testing tool to implement GetTestCases feature see MsgTestSuiteGetTestCases
if _connection_ok():
temp_channel = state['connection'].channel()
request_message = MsgTestSuiteGetTestCases()
testcases_list_reponse = amqp_request(temp_channel, request_message, COMPONENT_ID)
try:
state['tc_list'] = testcases_list_reponse.tc_list