Commit ab1b4fad authored by Federico Sismondi's avatar Federico Sismondi
Browse files

added new utils like exceptions, fixed imports to allow run libs as external...

added new utils like exceptions, fixed imports to allow run libs as external libs and also from __main__
parent 4026ee7c
import os
import pika
from messages import *
VERSION = '0.0.5'
# for using it as library and as a __main__
try:
from messages import *
except:
from .messages import *
VERSION = '0.0.6'
AMQP_EXCHANGE = 'amq.topic'
def publish_message(connection, message):
""" Published which uses message object metadata
:param channel:
:param message:
:return:
"""
Publishes message into the correct topic (uses Message object metadata)
Creates temporary channel on it's own
Connection must be a pika.BlockingConnection
"""
channel = None
......@@ -33,7 +37,13 @@ def publish_message(connection, message):
def amqp_request(connection, request_message, component_id):
# NOTE: channel must be a pika channel
"""
Publishes message into the correct topic (uses Message object metadata)
Returns reply message.
Uses reply_to and corr id amqp's properties for matching the reply
Creates temporary channel, and queues on it's own
Connection must be a pika.BlockingConnection
"""
# check first that sender didnt forget about reply to and corr id
assert request_message.reply_to
......
# -*- coding: utf-8 -*-
# !/usr/bin/env python3
from coap_testing_tool.utils.event_bus_messages import *
from tests.pcap_base64_examples import *
from urllib.parse import urlparse
import logging
import unittest
import pika
import sys
import time
import os
import threading
import datetime
COMPONENT_ID = 'fake_session'
MESSAGES_WAIT_INTERVAL = 1 # in seconds
AMQP_EXCHANGE = ''
AMQP_URL = ''
message_count = 0
stop_generator_signal = False
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
logger = logging.getLogger(__name__)
logging.getLogger('pika').setLevel(logging.INFO)
# queue which tracks all non answered services requests
services_mid_backlog = []
services_events_tracelog = []
"""
PRE-CONDITIONS:
- Export AMQP_URL in the running environment
- Have CoAP testing tool running & listening to the bus
"""
# for a typical user input, for a user (coap client) vs automated-iut ( coap server) session type:
user_sequence = [
MsgTestSuiteGetStatus(),
MsgInteropSessionConfiguration(), # from TC1 to TC3
MsgTestSuiteStart(),
MsgTestSuiteGetStatus(),
MsgTestCaseSkip(testcase_id='TD_COAP_CORE_02_v01'),
MsgTestSuiteGetStatus(),
MsgTestCaseSkip(testcase_id='TD_COAP_CORE_03_v01'),
MsgTestSuiteGetStatus(),
MsgTestCaseStart(), # execute TC1 ( w/ no IUT in the bus )
MsgTestSuiteGetStatus(),
MsgStepStimuliExecuted(),
MsgTestSuiteGetStatus(),
MsgStepVerifyExecuted(),
MsgTestSuiteGetStatus(),
MsgStepVerifyExecuted(
verify_response=False,
description='User indicates that IUT didnt behave as expected '),
MsgTestSuiteGetStatus(), # at this point we should see a TC verdict
MsgTestCaseRestart(),
MsgTestSuiteGetStatus(),
MsgTestSuiteAbort(),
MsgTestSuiteGetStatus(),
]
service_api_calls = [
# TAT calls
MsgTestSuiteGetStatus(),
MsgTestSuiteGetTestCases(),
MsgInteropTestCaseAnalyze(
testcase_id="TD_COAP_CORE_01",
testcase_ref="http://f-interop.paris.inria.fr/tests/TD_COAP_CORE_01_v01",
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
value=PCAP_TC_COAP_01_base64,
),
# Sniffer calls (order matters!)
MsgSniffingStart(
capture_id='TD_COAP_CORE_01',
filter_if='tun0',
filter_proto='udp port 5683'
),
MsgSniffingStop(),
MsgSniffingGetCapture(tescase_id='TD_COAP_CORE_01'),
MsgSniffingGetCaptureLast(),
# Dissector calls
MsgDissectionDissectCapture(),
MsgDissectionDissectCapture(
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
protocol_selection='coap',
value=PCAP_TC_COAP_01_base64,
),
# complete dissection of pcap
MsgDissectionDissectCapture(
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
value=PCAP_TC_COAP_01_base64,
),
# complete dissection of pcap with extra TCP traffic
MsgDissectionDissectCapture(
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
value=PCAP_TC_COAP_01_mingled_with_tcp_traffic_base64,
),
# same as dis4 but filtering coap messages
MsgDissectionDissectCapture(
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
protocol_selection='coap',
value=PCAP_TC_COAP_01_mingled_with_tcp_traffic_base64,
),
# this should generate an error
MsgTestCaseSkip(testcase_id='TD_COAP_CORE_04_v01'),
# pcap sniffed using AMQP based packet sniffer
MsgDissectionDissectCapture(
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
value=PCAP_COAP_GET_OVER_TUN_INTERFACE_base64,
)
]
class ApiTests(unittest.TestCase):
def setUp(self):
global stop_generator_signal
stop_generator_signal = False
import_env_vars()
self.conn = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
self.channel = self.conn.channel()
# MESSAGE VALIDATOR BOUND TO THE CONTROL EVENTS QUEUE
control_queue_name = 'control_queue@%s' % COMPONENT_ID
# lets' first clean up the queue
self.channel.queue_delete(queue=control_queue_name)
self.channel.queue_declare(queue=control_queue_name, auto_delete=True)
self.channel.queue_bind(exchange=AMQP_EXCHANGE, queue=control_queue_name, routing_key='control.#')
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(validate_message, queue=control_queue_name)
# ERROR MSG VERIFIER BOUND TO ERRORS LOGS AND OTHER ERROR EVENTS QUEUE
errors_queue_name = 'bus_errors_queue@%s' % COMPONENT_ID
# lets' first clean up the queue
self.channel.queue_delete(queue=errors_queue_name)
self.channel.queue_declare(queue=errors_queue_name, auto_delete=True)
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=errors_queue_name,
routing_key='log.error.*')
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=errors_queue_name,
routing_key='control.session.error')
# for getting the terminate signal
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=errors_queue_name,
routing_key=MsgTestingToolTerminate.routing_key)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(check_for_bus_error, queue=errors_queue_name)
def tearDown(self):
self.conn.close()
def test_user_emulation(self):
"""
This basically checks that the testing tool doesnt crash while user is pushing message inputs into to the bus.
We check for:
- log errors in the bus
- malformed messages in the bus
"""
# prepare the message generator
messages = [] # list of messages to send
messages += user_sequence
messages.append(MsgTestingToolTerminate()) # message that triggers stop_generator_signal
thread_msg_gen = MessageGenerator(AMQP_URL, AMQP_EXCHANGE, messages)
logger.debug("Starting Message Generator thread ")
thread_msg_gen.start()
try:
self.channel.start_consuming()
except Exception as e:
thread_msg_gen.stop()
assert False, str(e)
def test_testing_tool_internal_services(self):
"""
This checks for:
- log errors in the bus
- malformed messages in the bus
- request reply correlation (there's one response per each request)
"""
global COMPONENT_ID
# some non request/response messages types exchanged during a session
events_to_ignore = [
'testingtool.ready',
'testingtool.compoent.ready',
'agent.configured'
]
# auxiliary function
def check_for_correlated_request_reply(ch, method, props, body):
global services_mid_backlog
global services_events_tracelog
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
msg_type = body_dict['_type']
logger.info(
'[%s] Checking correlated request/response for message %s'
% (sys._getframe().f_code.co_name, props.message_id))
ch.basic_ack(delivery_tag=method.delivery_tag)
if msg_type == 'testingtool.terminate':
ch.stop_consuming()
return
if msg_type in events_to_ignore:
# forget about these.. we are checking services and services reply only
return
if '.service.reply' in method.routing_key:
if props.correlation_id in services_mid_backlog:
services_mid_backlog.remove(props.correlation_id)
services_events_tracelog.append(msg_type)
else:
assert False, 'got a reply but theres no request in the backlog'
elif '.service' in method.routing_key:
services_mid_backlog.append(props.correlation_id)
services_events_tracelog.append(msg_type)
else:
assert False, 'error! we shouldnt be here!'
logging.info("[%s] current backlog: %s . history: %s"
% (
sys._getframe().f_code.co_name,
services_mid_backlog,
services_events_tracelog
)
)
# CORRELATION VALIDATOR BOUND TO SERVICES & REPLIES QUEUE
services_queue_name = 'services_queue@%s' % COMPONENT_ID
self.channel.queue_delete(queue=services_queue_name)
self.channel.queue_declare(queue=services_queue_name, auto_delete=True)
# get all services messages and their replies
self.channel.queue_bind(exchange=AMQP_EXCHANGE, queue=services_queue_name, routing_key='#.service')
self.channel.queue_bind(exchange=AMQP_EXCHANGE, queue=services_queue_name, routing_key='#.service.reply')
self.channel.basic_qos(prefetch_count=1)
# for getting the terminate signal
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=services_queue_name,
routing_key=MsgTestingToolTerminate.routing_key)
# for checking that for every request we get a reply
self.channel.basic_consume(check_for_correlated_request_reply, queue=services_queue_name)
# prepare the message generator
messages = [] # list of messages to send
messages += service_api_calls
messages.append(MsgTestingToolTerminate()) # message that triggers stop_generator_signal
thread_msg_gen = MessageGenerator(AMQP_URL, AMQP_EXCHANGE, messages)
logger.debug("[%s] Starting Message Generator thread " % sys._getframe().f_code.co_name)
try:
thread_msg_gen.start()
self.channel.start_consuming()
if len(services_mid_backlog) > 0:
assert False, 'A least one of the services request was not answered. backlog: %s. History: %s' \
% (
services_mid_backlog,
services_events_tracelog
)
except Exception as e:
thread_msg_gen.stop()
assert False, str(e)
# # # # # # AUXILIARY METHODS # # # # # # #
def import_env_vars():
global AMQP_EXCHANGE
global AMQP_URL
try:
AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
except KeyError as e:
AMQP_EXCHANGE = "amq.topic"
try:
AMQP_URL = str(os.environ['AMQP_URL'])
p = urlparse(AMQP_URL)
AMQP_USER = p.username
AMQP_SERVER = p.hostname
logger.info(
"Env variables imported for AMQP connection, User: {0} @ Server: {1} ".format(AMQP_USER, AMQP_SERVER)
)
except KeyError as e:
logger.error('Cannot retrieve environment variables for AMQP connection. Loading defaults..')
# load default values
AMQP_URL = "amqp://{0}:{1}@{2}/{3}".format("guest", "guest", "localhost", "/")
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
connection.close()
def publish_message(channel, message):
properties = pika.BasicProperties(**message.get_properties())
channel.basic_publish(
exchange=AMQP_EXCHANGE,
routing_key=message.routing_key,
properties=properties,
body=message.to_json(),
)
def stop_generator():
global stop_generator_signal
logger.debug("The test is finished!")
stop_generator_signal = True
def check_for_bus_error(ch, method, props, body):
logger.info('[%s] Checking if is error, message %s' % (sys._getframe().f_code.co_name, props.message_id))
msg = None
try:
msg = Message.from_json(body)
if isinstance(m, MsgTestingToolTerminate):
ch.stop_consuming()
return
except:
pass
list_of_audited_components = [
'tat',
'test_coordinator',
'packer_router',
'sniffer',
'dissector'
'session',
# TODO add agent_TT messages
]
r_key = method.routing_key
logger.info('[%s] Auditing: %s' % (sys._getframe().f_code.co_name, r_key))
for c in list_of_audited_components:
if c in r_key:
err = 'audited component %s pushed an error into the bus. messsage: %s' % (c, body)
logger.error(err)
raise Exception(err)
def validate_message(ch, method, props, body):
global message_count
tab = '\t'
# obj hook so json.loads respects the order of the fields sent -just for visualization purposeses-
req_body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag)
message_count += 1
logger.info('[%s] Checking valid format for message %s' % (sys._getframe().f_code.co_name, props.message_id))
print('\n' + tab + '* * * * * * MESSAGE SNIFFED by INSPECTOR (%s) * * * * * * *' % message_count)
print(tab + "TIME: %s" % datetime.datetime.time(datetime.datetime.now()))
print(tab + "ROUTING_KEY: %s" % method.routing_key)
print(tab + "MESSAGE ID: %s" % props.message_id)
if hasattr(props, 'correlation_id'):
print(tab + "CORRELATION ID: %s" % props.correlation_id)
print(tab + 'EVENT %s' % (req_body_dict['_type']))
print(tab + '* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * \n')
if props.content_type != "application/json":
print(tab + '* * * * * * API VALIDATION ERROR * * * * * * * ')
print(tab + "props.content_type : " + str(props.content_type))
print(tab + "application/json was expected")
print(tab + '* * * * * * * * * * * * * * * * * * * * * * * * * \n')
raise Exception
if '_type' not in req_body_dict.keys():
print(tab + '* * * * * * API VALIDATION ERROR * * * * * * * ')
print(tab + "no < _type > field found")
print(tab + '* * * * * * * * * * * * * * * * * * * * * * * * * \n')
raise Exception
# lets check messages against the messaging library
list_of_messages_to_check = list(message_types_dict.keys())
if req_body_dict['_type'] in list_of_messages_to_check:
m = Message.from_json(body)
try:
if isinstance(m, MsgTestingToolTerminate):
ch.stop_consuming()
stop_generator()
else:
logger.debug(repr(m))
except NonCompliantMessageFormatError as e:
print(tab + '* * * * * * API VALIDATION ERROR * * * * * * * ')
print(tab + "AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
print(tab + '* * * * * * * * * * * * * * * * * * * * * * * * * \n')
raise NonCompliantMessageFormatError("AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
class MessageGenerator(threading.Thread):
keepOnRunning = True
def __init__(self, amqp_url, amqp_exchange, messages_list):
threading.Thread.__init__(self)
self.messages = messages_list
self.connection = pika.BlockingConnection(pika.URLParameters(amqp_url))
self.channel = self.connection.channel()
logger.info("[%s] AMQP connection established" % (self.__class__.__name__))
def run(self):
global MESSAGES_WAIT_INTERVAL
logger.info("[%s] lets start 'blindly' generating the messages which take part on a coap session "
"(for a coap client)" % (self.__class__.__name__))
try:
while self.keepOnRunning:
time.sleep(MESSAGES_WAIT_INTERVAL)
m = self.messages.pop(0)
publish_message(self.channel, m)
logger.info("[%s] Publishing in the bus: %s" % (self.__class__.__name__, repr(m)))
except IndexError:
# list finished, lets wait so all messages are sent and processed
time.sleep(5)
pass
except pika.exceptions.ChannelClosed:
pass
def stop(self):
self.keepOnRunning = False
self.connection.close()
class CoordinatorError(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
"""Return the exception message"""
return self.message
class TestSuiteError(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
"""Return the exception message"""
return self.message
class SnifferError(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
"""Return the exception message"""
return self.message
class TatError(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
"""Return the exception message"""
return self.message
class AmqpMessageError(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
"""Return the exception message"""
return self.message
class ApiMessageFormatError(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
"""Return the exception message"""
return self.message
......@@ -5,8 +5,14 @@ import signal
import shutil
import logging
from datetime import datetime
from messages import *
from pure_pcapy import Dumper, Pkthdr, DLT_RAW, DLT_IEEE802_15_4_NOFCS
# use this as main and also lib:
try:
from messages import *
from pure_pcapy import Dumper, Pkthdr, DLT_RAW, DLT_IEEE802_15_4_NOFCS
except:
from .messages import *
from .pure_pcapy import Dumper, Pkthdr, DLT_RAW, DLT_IEEE802_15_4_NOFCS
try:
# For Python 3.0 and later
......@@ -18,7 +24,7 @@ except ImportError:
logger = logging.getLogger(__name__)
VERSION = '0.0.1'
VERSION = '0.0.2'
def launch_amqp_data_to_pcap_dumper(amqp_url=None, amqp_exchange=None, topics=None, dump_dir=None):
def signal_int_handler(self, frame):
......@@ -54,15 +60,11 @@ def launch_amqp_data_to_pcap_dumper(amqp_url=None, amqp_exchange=None, topics=No
# load default values
amqp_url = "amqp://{0}:{1}@{2}/{3}".format("guest", "guest", "localhost", "/")
if topics:
pcap_amqp_topic_subscriptions = topics
else:
pcap_amqp_topic_subscriptions = ['data.serial.fromAgent.coap_client_agent',
'data.serial.fromAgent.coap_server_agent',
'data.tun.fromAgent.coap_server_agent',
'data.tun.fromAgent.coap_client_agent',
]
pcap_amqp_topic_subscriptions = ['data.tun.fromAgent.*',
'data.serial.fromAgent.*']
# init pcap_dumper
pcap_dumper = AmqpDataPacketDumper(
......@@ -298,7 +300,6 @@ class AmqpDataPacketDumper:
else:
logger.info('drop amqp message: ' + repr(m))
except NonCompliantMessageFormatError as e:
print('* * * * * * API VALIDATION ERROR * * * * * * * ')
print("AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment