Commit 58fb051e authored by Federico Sismondi's avatar Federico Sismondi
Browse files

CLI supports <download_pcaps> command and others

parent 0b9187e6
# import click
# from click_repl import repl
# from prompt_toolkit.history import FileHistory
#
# @click.group()
# def cli():
# pass
#
# @cli.command()
# def myrepl():
# prompt_kwargs = {
# 'history': FileHistory('/etc/myrepl/myrepl-history'),
# }
# repl(click.get_current_context(), prompt_kwargs=prompt_kwargs)
import os
import uuid
import sys
import json
import pika
import base64
import logging
import threading
import traceback
......@@ -41,6 +25,9 @@ COLOR_CHAT_MESSAGE_ECHO = 'green'
COLOR_ERROR_MESSAGE = 'red'
COLOR_TEST_SESSION_HELPER_MESSAGE = 'yellow'
# DIR used for network dumps and other type of tmp files
TEMP_DIR = 'tmp'
DEFAULT_TOPIC_SUBSCRIPTIONS = [
# 'control.testcoordination',
# 'control.dissection',
......@@ -52,7 +39,6 @@ MESSAGE_TYPES_NOT_ECHOED = [
MsgPacketInjectRaw,
]
TEMP_DIR = 'tmp'
session_profile = OrderedDict(
{
......@@ -215,51 +201,54 @@ def amqp_request(channel, request_message, component_id):
reply_queue_name = 'amqp_rpc_%s@%s' % (str(uuid.uuid4())[:8], component_id)
result = channel.queue_declare(queue=reply_queue_name, auto_delete=True)
try:
callback_queue = result.method.queue
result = channel.queue_declare(queue=reply_queue_name, auto_delete=True)
# bind and listen to reply_to topic
channel.queue_bind(
exchange=amqp_exchange,
queue=callback_queue,
routing_key=request_message.reply_to
)
callback_queue = result.method.queue
channel.basic_publish(
exchange=amqp_exchange,
routing_key=request_message.routing_key,
properties=pika.BasicProperties(**request_message.get_properties()),
body=request_message.to_json(),
)
# bind and listen to reply_to topic
channel.queue_bind(
exchange=amqp_exchange,
queue=callback_queue,
routing_key=request_message.reply_to
)
time.sleep(0.2)
retries_left = 5
channel.basic_publish(
exchange=amqp_exchange,
routing_key=request_message.routing_key,
properties=pika.BasicProperties(**request_message.get_properties()),
body=request_message.to_json(),
)
while retries_left > 0:
time.sleep(0.5)
method, props, body = channel.basic_get(reply_queue_name)
if method:
channel.basic_ack(method.delivery_tag)
if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
break
retries_left -= 1
time.sleep(0.2)
retries_left = 5
if retries_left > 0:
while retries_left > 0:
time.sleep(0.5)
method, props, body = channel.basic_get(reply_queue_name)
if method:
channel.basic_ack(method.delivery_tag)
if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
break
retries_left -= 1
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
response = MsgReply(request_message, **body_dict)
if retries_left > 0:
else:
raise Exception(
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
request_message._type
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
response = MsgReply(request_message, **body_dict)
else:
raise Exception(
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
request_message._type
)
)
)
# clean up
channel.queue_delete(reply_queue_name)
finally:
# clean up
channel.queue_delete(reply_queue_name)
return response
......@@ -308,6 +297,37 @@ def exit():
"""
_exit()
@cli.command()
def download_network_traces():
"""
Downloads all networks traces generated during the session
"""
global state
_handle_get_testcase_list()
ls = state['tc_list'].copy()
try:
channel = state['connection'].channel()
except Exception as e:
_echo_error(e)
for tc_item in ls:
try:
tc_id = tc_item['testcase_id']
msg = MsgSniffingGetCapture(capture_id=tc_id)
response = amqp_request(channel, msg, COMPONENT_ID)
if response.ok:
save_pcap_from_base64(response.filename, response.value, TEMP_DIR)
_echo_input("downloaded network trace %s , into dir: %s" % (response.filename, TEMP_DIR))
else:
raise Exception(response.error_message)
except Exception as e:
_echo_error('Error trying to download network traces for testcase : %s' % tc_id)
_echo_error(e)
@cli.command()
def clear():
......@@ -366,40 +386,6 @@ def _handle_get_testcase_list():
else:
_echo_error('No connection established')
def _handle_get_testsuite_status():
# requires testing tool to implement GetStatus feature, see MsgTestSuiteGetStatus
if _connection_ok():
temp_channel = state['connection'].channel()
request_message = MsgTestSuiteGetStatus()
try:
status_resp = amqp_request(temp_channel, request_message, COMPONENT_ID)
except Exception as e:
_echo_error('Is testing tool up?')
_echo_error(e)
return
resp = status_resp.to_dict()
tc_states = resp['tc_list']
del resp['tc_list']
# print general states
_echo_dict_as_table(resp)
list = []
list.append(('testcase id', 'testcase ref', 'testcase status'))
for tc in tc_states:
if tc:
val1, val2, val3, _, _, _ = tc.values()
list.append((val1, val2, val3))
# print tc states
_echo_list_as_table(list, first_row_is_header=True)
else:
_echo_error('No connection established')
def _handle_action_testsuite_start():
if click.confirm('Do you want START test suite?'):
msg = MsgTestSuiteStart()
......@@ -464,7 +450,6 @@ def _handle_action_verify():
message_handles_options = {'ts_start': _handle_action_testsuite_start,
'ts_status': _handle_get_testsuite_status,
'ts_abort': _handle_action_testsuite_abort,
'tc_start': _handle_action_testcase_start,
'tc_restart': _handle_action_testcase_restart,
......@@ -481,7 +466,7 @@ message_handles_options = {'ts_start': _handle_action_testsuite_start,
@click.argument('api_call', type=click.Choice(message_handles_options.keys()))
def action(api_call):
"""
Execute test action
Execute interop test action
"""
_echo_input(api_call)
......@@ -525,6 +510,21 @@ def ignore(message_type):
_echo_dispatcher('Ignore message category %s: (%s)' % (message_type, str(item)))
@cli.command()
def enter_debug_context():
"""
Provides user with some extra debugging commands
"""
global message_handles_options
message_handles_options.update(
{
'send_skip_tc_coap_core_11': send_skip_tescase_coap_core_11,
}
)
@cli.command()
@click.argument('message', nargs=-1)
def chat(message):
......@@ -556,9 +556,46 @@ def check_connection():
_echo_dispatcher('connection is %s' % 'OK' if conn_ok else 'not OK')
return conn_ok
@cli.command()
def get_session_status():
"""
Retrieves status information from testing tool
"""
# requires testing tool to implement GetStatus feature, see MsgTestSuiteGetStatus
if _connection_ok():
temp_channel = state['connection'].channel()
request_message = MsgTestSuiteGetStatus()
try:
status_resp = amqp_request(temp_channel, request_message, COMPONENT_ID)
except Exception as e:
_echo_error('Is testing tool up?')
_echo_error(e)
return
resp = status_resp.to_dict()
tc_states = resp['tc_list']
del resp['tc_list']
# print general states
_echo_dict_as_table(resp)
list = []
list.append(('testcase id', 'testcase ref', 'testcase status'))
for tc in tc_states:
if tc:
val1, val2, val3, _, _, _ = tc.values()
list.append((val1, val2, val3))
# print tc states
_echo_list_as_table(list, first_row_is_header=True)
else:
_echo_error('No connection established')
@cli.command()
def get_session_state():
def get_session_parameters():
"""
Print session state and parameters
"""
......@@ -585,7 +622,7 @@ def _echo_context():
# #('click context', click.get_current_context()),
# ('CLI states', pprint.pformat(state))
# ]:
table.append((key, str(val)))
table.append((key, list_to_str(val)))
# click.echo(click.style('%s: \n %s' % (index, val), fg='cyan'))
_echo_list_as_table(table)
......@@ -762,8 +799,13 @@ def _echo_backend_message(msg):
return
elif isinstance(msg, MsgTestCaseVerdict):
_echo_dict_as_table(msg.to_dict())
if hasattr(msg, 'partial_verdicts') and msg.partial_verdicts is not None:
verdict = msg.to_dict()
partial_verdict = verdict.pop('partial_verdicts')
_echo_dict_as_table(verdict)
click.echo()
if partial_verdict:
click.echo(click.style("Partial verdicts:", fg=COLOR_TEST_SESSION_HELPER_MESSAGE))
_echo_testcase_partial_verdicts_as_table(msg.partial_verdicts)
return
......@@ -816,7 +858,7 @@ def _echo_list_of_dicts_as_table(l):
table.append(tuple(d.keys()))
table.append(tuple(d.values()))
_echo_list_as_table(table,first_row_is_header=True)
_echo_list_as_table(table, first_row_is_header=True)
except Exception as e:
_echo_error('wrong frame format passed?')
......@@ -951,6 +993,18 @@ def _echo_log_message(msg):
click.echo(click.style("[%s] %s" % ('log', list_to_str(msg)), fg=COLOR_SESSION_LOG))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# debugging messages
def send_skip_tescase_coap_core_11():
_echo_input("Executing debug message %s" % "send_skip_tescase_coap_core_11")
msg = MsgTestCaseSkip(
testcase_id='TD_COAP_CORE_11'
)
publish_message(msg)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# some auxiliary functions
......@@ -971,19 +1025,44 @@ def list_to_str(ls):
if type(ls) is str:
return ls
for l in ls:
if l and isinstance(l, list):
for sub_l in l:
if sub_l and not isinstance(sub_l, list):
ret += str(sub_l) + ' \n '
else:
# I truncate in the second level
pass
else:
ret += str(l) + ' \n '
try:
for l in ls:
if l and isinstance(l, list):
for sub_l in l:
if sub_l and not isinstance(sub_l, list):
ret += str(sub_l) + ' \n '
else:
# I truncate in the second level
pass
else:
ret += str(l) + ' \n '
except TypeError as e:
_echo_error(e)
return str(ls)
return ret
def save_pcap_from_base64(filename, pcap_file_base64, dir=None):
"""
Returns number of bytes saved.
:param filename:
:param pcap_file_base64:
:return:
"""
if dir:
file_path = os.path.join(dir, filename)
else:
file_path = os.path.join(os.getcwd(), filename)
with open(file_path, "wb") as pcap_file:
nb = pcap_file.write(base64.b64decode(pcap_file_base64))
return nb
if __name__ == "__main__":
try:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment