Commit e06d228f authored by Federico Sismondi's avatar Federico Sismondi

Merge branch 'refact_package' into 'master'

Refact package

See merge request !15
parents 2b144e68 5e149417
Pipeline #2208 passed with stage
in 0 seconds
from .ioppytest_cli import *
from . import *
if __name__ == '__main__':
main()
import os
from os.path import expanduser
import sys
import pika
import errno
import base64
import logging
import threading
import traceback
from collections import OrderedDict
import click
from click_repl import ExitReplException
from click_repl import repl as repl_base
......@@ -17,13 +21,15 @@ try:
from event_bus_utils import AmqpListener
from tabulate import tabulate
except:
from .messages import *
from .event_bus_utils import AmqpListener
from .tabulate import tabulate
from ..messages import *
from ..event_bus_utils import AmqpListener
from ..tabulate import tabulate
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.WARNING)
COMPONENT_ID = 'cli'
COMPONENT_ID = 'ioppytest-cli'
HOME = expanduser("~")
# click colors:: black (might gray) , red, green, yellow (might be an orange), blue, magenta, cyan, white (might gray)
COLOR_DEFAULT = 'white'
......@@ -37,6 +43,7 @@ COLOR_TEST_SESSION_HELPER_MESSAGE = 'yellow'
# DIR used for network dumps and other type of tmp files
TEMP_DIR = 'tmp'
WAIT_TIME_FOR_USER_INPUT = 60
DEFAULT_TOPIC_SUBSCRIPTIONS = [
'#'
......@@ -57,15 +64,6 @@ session_profile = OrderedDict(
}
)
# TODO handle the lock automatically with an state object
# def some_dummuy_release():
# _echo_log_message('releasing!')
# def some_dummuy_acquire():
# _echo_log_message('acquiring!')
# state_lock = Message()
# state_lock.__setattr__('release', some_dummuy_release)
# state_lock.__setattr__('acquire', some_dummuy_acquire)
state_lock = threading.RLock()
state = {
'testcase_id': None,
......@@ -93,107 +91,6 @@ def _init_action_suggested():
state['suggested_cmd'] = 'ts_start'
def amqp_request(request_message, component_id=COMPONENT_ID):
"""
NOTE: channel must be a pika channel
"""
state_lock.acquire()
channel = state['channel']
amqp_exchange = session_profile['amqp_exchange']
# check first that sender didnt forget about reply to and corr id
assert request_message.reply_to
assert request_message.correlation_id
if amqp_exchange is None:
amqp_exchange = 'amq.topic'
response = None
reply_queue_name = 'amqp_rpc_%s@%s' % (str(uuid.uuid4())[:8], component_id)
try:
result = channel.queue_declare(queue=reply_queue_name, auto_delete=True)
callback_queue = result.method.queue
# bind and listen to reply_to topic
channel.queue_bind(
exchange=amqp_exchange,
queue=callback_queue,
routing_key=request_message.reply_to
)
channel.basic_publish(
exchange=amqp_exchange,
routing_key=request_message.routing_key,
properties=pika.BasicProperties(**request_message.get_properties()),
body=request_message.to_json(),
)
time.sleep(0.2)
retries_left = 5
while retries_left > 0:
time.sleep(0.5)
method, props, body = channel.basic_get(reply_queue_name)
if method:
channel.basic_ack(method.delivery_tag)
if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
break
retries_left -= 1
if retries_left > 0:
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
response = MsgReply(request_message, **body_dict)
else:
raise Exception(
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
type(request_message)
)
)
finally:
# clean up
channel.queue_delete(reply_queue_name)
state_lock.release()
return response
def publish_message(message):
if not _connection_ok():
_echo_dispatcher('No connection established yet')
return
_echo_dispatcher('Sending message..')
for i in range(1, 4):
try:
state_lock.acquire()
channel = state['connection'].channel()
channel.basic_publish(
exchange=session_profile['amqp_exchange'],
routing_key=message.routing_key,
properties=pika.BasicProperties(**message.get_properties()),
body=message.to_json(),
)
channel.close()
break
except pika.exceptions.ConnectionClosed as err:
_echo_error(err)
_echo_error('Unexpected connection closed, retrying %s/%s' % (i, 4))
_set_up_connection()
finally:
state_lock.acquire()
@click.group()
def cli():
pass
......@@ -202,11 +99,21 @@ def cli():
@cli.command()
def repl():
"""
Interactive shell
Interactive shell, allows user to interact with the ioppytest testing tool
"""
history_path_file = "{dir}{sep}{comp_name}-history".format(dir=HOME, sep=os.path.sep, comp_name=COMPONENT_ID)
if not os.path.exists(os.path.dirname(history_path_file)):
try:
os.makedirs(os.path.dirname(history_path_file))
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise
prompt_kwargs = {
'history': FileHistory('tmp/myrepl-history'),
'history': FileHistory(history_path_file),
}
_echo_log_message('saving CMD history at: {}'.format(history_path_file))
_echo_welcome_message()
_pre_configuration()
......@@ -227,7 +134,7 @@ def repl():
@click.option('-ll', '--lazy-listener',
is_flag=True,
default=False,
help="lazy-listener doest perform convertion to Messages class")
help="lazy-listener doest perform conversion to Messages objects")
def connect(lazy_listener):
"""
Connect to an AMQP session and start consuming messages
......@@ -238,7 +145,7 @@ def connect(lazy_listener):
@cli.command()
def exit():
"""
Exit test CLI
Exits REPL
"""
_exit()
......@@ -258,10 +165,10 @@ def download_network_traces():
try:
tc_id = tc_item['testcase_id']
msg = MsgSniffingGetCapture(capture_id=tc_id)
response = amqp_request(msg, COMPONENT_ID)
response = _amqp_request(msg, COMPONENT_ID)
if response.ok:
save_pcap_from_base64(response.filename, response.value, TEMP_DIR)
_save_file_from_base64(response.filename, response.value, TEMP_DIR)
_echo_input("downloaded network trace %s , into dir: %s" % (response.filename, TEMP_DIR))
else:
raise Exception(response.error_message)
......@@ -271,6 +178,64 @@ def download_network_traces():
_echo_error(e)
@cli.command()
@click.argument('path-to-file', type=click.Path(exists=True))
@click.option('--text-message', default=None, help="Message to be displayed in GUI")
@click.option('--user-id', default='all', help="User ID in case there are several users in session")
def gui_request_file_upload(path_to_file, text_message, user_id):
"""
Request user to upload a file, saves it in directory (if provided) or else in .tmp
"""
global state
msg_request = MsgUiRequestUploadFile()
msg_request.fields = [
{
"name": text_message,
"type": "file",
}
]
if user_id:
msg_request.routing_key = "ui.user.{}.request".format(user_id)
_echo_input("sending request to {}".format(msg_request.routing_key))
msg_response = _amqp_request(msg_request, COMPONENT_ID, timeout=WAIT_TIME_FOR_USER_INPUT)
values_dict = msg_response.fields.pop() # If there's more than one then GUI fucked up
value = values_dict['value']
filename = values_dict['filename']
_save_file_from_base64(filename, value, path_to_file)
_echo_input("saved file {} in path {}".format(filename, path_to_file))
@cli.command()
@click.argument('text-message')
@click.option('--user-id', default='all', help="User ID in case there are several users in session")
def gui_display_message(text_message, user_id):
"""
Sends message to GUI
"""
global state
msg_display = MsgUiDisplay()
msg_display.fields = [
{
"name": text_message,
"type": "p",
}
]
if user_id:
msg_display.routing_key = "ui.user.{}.display".format(user_id)
_publish_message(msg_display)
_echo_input("message display sent to {}".format(msg_display.routing_key))
@cli.command()
def clear():
"""
......@@ -302,61 +267,58 @@ def _handle_testcase_select():
testcase_id=ls[resp - 1]['testcase_id']
)
publish_message(msg)
_publish_message(msg)
def _handle_get_testcase_list():
# requires testing tool to implement GetTestCases feature, see MsgTestSuiteGetTestCases
if _connection_ok():
request_message = MsgTestSuiteGetTestCases()
request_message = MsgTestSuiteGetTestCases()
try:
testcases_list_reponse = amqp_request(request_message, COMPONENT_ID)
except Exception as e:
_echo_error('Is testing tool up?')
_echo_error(e)
return
try:
testcases_list_reponse = _amqp_request(request_message, COMPONENT_ID)
except Exception as e:
_echo_error('Is testing tool up?')
_echo_error(e)
return
try:
state['tc_list'] = testcases_list_reponse.tc_list
except Exception as e:
_echo_error(e)
return
try:
state['tc_list'] = testcases_list_reponse.tc_list
except Exception as e:
_echo_error(e)
return
_echo_list_of_dicts_as_table(state['tc_list'])
else:
_echo_error('No connection established')
_echo_list_of_dicts_as_table(state['tc_list'])
def _handle_action_testsuite_start():
if click.confirm('Do you want START test suite?'):
msg = MsgTestSuiteStart()
publish_message(msg)
_publish_message(msg)
def _handle_action_testcase_start():
if click.confirm('Do you want START test case?'):
msg = MsgTestCaseStart() # TODO no testcase id input?
publish_message(msg)
_publish_message(msg)
def _handle_action_testsuite_abort():
if click.confirm('Do you want ABORT test suite?'):
msg = MsgTestSuiteAbort()
publish_message(msg)
_publish_message(msg)
def _handle_action_testcase_skip():
if click.confirm('Do you want SKIP current test case?'):
msg = MsgTestCaseSkip()
publish_message(msg)
_publish_message(msg)
def _handle_action_testcase_restart():
if click.confirm('Do you want RESTART current test case?'):
msg = MsgTestCaseRestart()
publish_message(msg)
_publish_message(msg)
def _handle_action_stimuli():
......@@ -370,7 +332,7 @@ def _handle_action_stimuli():
node=session_profile['node'],
node_execution_mode="user_assisted"
)
publish_message(msg)
_publish_message(msg)
else:
_echo_error('Please execute all pending STIMULI steps')
......@@ -389,7 +351,7 @@ def _handle_action_verify():
node_execution_mode="user_assisted"
)
publish_message(msg)
_publish_message(msg)
message_handles_options = {'ts_start': _handle_action_testsuite_start,
......@@ -414,10 +376,6 @@ def action(api_call):
_echo_input(api_call)
if not _connection_ok():
_echo_dispatcher('No connection established yet')
return
if api_call == 'suggested':
if state['suggested_cmd']:
_echo_dispatcher("Executing : %s" % state['suggested_cmd'])
......@@ -451,7 +409,7 @@ ignorable_message_types = {
@click.argument('message_type', type=click.Choice(list(ignorable_message_types.keys())))
def ignore(message_type):
"""
Do not notify any more on message type
(REPL only) Do not notify any more on message type
"""
try:
for item in ignorable_message_types[message_type]:
......@@ -465,7 +423,7 @@ def ignore(message_type):
@cli.command()
def enter_debug_context():
"""
Provides user with some extra debugging commands
(REPL only) Provides user with some extra debugging commands
"""
global message_handles_options
......@@ -482,7 +440,7 @@ def enter_debug_context():
try:
message_t = dummy_messages_dict[message_name]()
_echo_input("trying to send message: %s" % repr(message_t))
publish_message(message_t)
_publish_message(message_t)
except Exception as e:
_echo_error("Error found: %s" % e)
......@@ -517,7 +475,7 @@ def enter_debug_context():
msg = MsgSniffingStart(capture_id=testcase_id,
filter_if='tun0',
filter_proto='udp')
publish_message(msg)
_publish_message(msg)
@cli.command()
def _sniffer_stop():
......@@ -526,7 +484,7 @@ def enter_debug_context():
"""
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
msg = MsgSniffingStop()
publish_message(msg)
_publish_message(msg)
@cli.command()
def _sniffer_get_last_capture():
......@@ -535,7 +493,7 @@ def enter_debug_context():
"""
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
msg = MsgSniffingGetCaptureLast()
publish_message(msg)
_publish_message(msg)
@cli.command()
def _configure_perf_tt():
......@@ -545,7 +503,7 @@ def enter_debug_context():
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
from message_examples import PERF_TT_CONFIGURATION
message = MsgSessionConfiguration(**PERF_TT_CONFIGURATION) # builds a config for the perf TT
publish_message(message)
_publish_message(message)
@cli.command()
def _configure_comi_tt():
......@@ -555,7 +513,7 @@ def enter_debug_context():
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
from message_examples import COMI_TT_CONFIGURATION
message = MsgSessionConfiguration(**COMI_TT_CONFIGURATION) # builds a config message
publish_message(message)
_publish_message(message)
@cli.command()
def _configure_6lowpan_tt():
......@@ -565,7 +523,7 @@ def enter_debug_context():
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
from message_examples import SIXLOWPAN_TT_CONFIGURATION
message = MsgSessionConfiguration(**SIXLOWPAN_TT_CONFIGURATION) # builds a config message
publish_message(message)
_publish_message(message)
@cli.command()
def _test_tat_analyze_6lowpan():
......@@ -575,7 +533,7 @@ def enter_debug_context():
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
from message_examples import SIXLOWPAN_TAT_ANALYZE
message = MsgInteropTestCaseAnalyze(**SIXLOWPAN_TAT_ANALYZE)
publish_message(message)
_publish_message(message)
@cli.command()
def _configure_coap_tt():
......@@ -585,7 +543,7 @@ def enter_debug_context():
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
from message_examples import COAP_TT_CONFIGURATION
message = MsgSessionConfiguration(**COAP_TT_CONFIGURATION) # builds a config message
publish_message(message)
_publish_message(message)
@cli.command()
def _get_session_configuration_from_ui():
......@@ -594,7 +552,7 @@ def enter_debug_context():
"""
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
req = MsgUiRequestSessionConfiguration()
publish_message(req)
_publish_message(req)
@cli.command()
@click.argument('testcase_id')
......@@ -607,7 +565,7 @@ def enter_debug_context():
msg = MsgTestCaseSkip(
testcase_id=testcase_id
)
publish_message(msg)
_publish_message(msg)
@cli.command()
@click.argument('text')
......@@ -630,7 +588,7 @@ def enter_debug_context():
]
msg.fields = fields
publish_message(msg)
_publish_message(msg)
@cli.command()
@click.argument('text')
......@@ -650,7 +608,7 @@ def enter_debug_context():
"value": True
}]
publish_message(msg)
_publish_message(msg)
@cli.command()
@click.argument('text')
......@@ -666,7 +624,7 @@ def enter_debug_context():
)
_echo_input(text)
publish_message(msg)
_publish_message(msg)
@cli.command()
@click.argument('text')
......@@ -682,7 +640,7 @@ def enter_debug_context():
)
_echo_input(text)
publish_message(msg)
_publish_message(msg)
_echo_session_helper("Entering debugger context, added extra CMDs, please type --help for more info")
......@@ -694,10 +652,6 @@ def chat(message):
Send chat message, useful for user-to-user test sessions
"""
if not _connection_ok():
_echo_dispatcher('No connection established yet')
return
m = ''
for word in message:
......@@ -706,13 +660,13 @@ def chat(message):
c = MsgSessionChat(description=m,
user_name=session_profile['user_name'],
iut_node=session_profile['node'])
publish_message(c)
_publish_message(c)
@cli.command()
def check_connection():
"""
Check if AMQP connection is active
(REPL only) Check if AMQP connection is active
"""
conn_ok = _connection_ok()
_echo_dispatcher('connection is %s' % 'OK' if conn_ok else 'not OK')
......@@ -730,7 +684,7 @@ def get_session_status():
request_message = MsgTestSuiteGetStatus()
try:
status_resp = amqp_request(request_message, COMPONENT_ID)
status_resp = _amqp_request(request_message, COMPONENT_ID)
except Exception as e:
_echo_error('Is testing tool up?')
_echo_error(e)
......@@ -787,7 +741,7 @@ def _echo_context():
_echo_list_as_table(table)
def _set_up_connection(lazy_listener=False):
def _set_up_connection(create_listener=True, lazy_listener=False):
# conn for repl publisher
try:
retries_left = CONNECTION_SETUP_RETRIES
......@@ -822,6 +776,11 @@ def _set_up_connection(lazy_listener=False):
_echo_log_message('amqp listener thread doesnt want to stop, lets terminate it..')
th.terminate()
if create_listener is False:
return
# set up listener thread which will call callback each time there's a new message in bus and matches a topic
if lazy_listener:
amqp_listener_thread = AmqpListener(
amqp_url=session_profile['amqp_url'],
......@@ -928,10 +887,10 @@ def _echo_welcome_message():
_echo_session_helper(m)
m = """
*********************************************************************************
* If you experience any problems, or you have any suggestions or feedback *
* don't hesitate to drop me an email at: federico.sismondi@inria.fr *
*********************************************************************************
*************************************************************************************
* If you experience any problems, or you have any suggestions or feedback *
* don't hesitate to drop me an email at: federico[dot]sismondi[at]gmail[dot]com *
*************************************************************************************
"""
_echo_session_helper(m)
......@@ -974,7 +933,7 @@ def _echo_backend_message(msg):
assert isinstance(msg, Message)
try:
m = "\n[Session message] [%s] " % type(msg)
m = "\n[Event bus message] [%s] " % type(msg)
if hasattr(m, 'description'):
m += m.description
......@@ -1065,19 +1024,24 @@ def _echo_report_as_table(tc_report_list):
assert type(tc_report_list) is list
#testcases = [(k, v) for k, v in report_dict.items() if k.lower().startswith('td')]
# testcases = [(k, v) for k, v in report_dict.items() if k.lower().startswith('td')]
testcases = tc_report_list
for tc_report in testcases:
table = []
table.append(("Testcase ID", 'Final verdict', 'Description'))
table.append((tc_report['testcase_id'], tc_report['verdict'], tc_report['description']))
tc_id = tc_report['testcase_id'] if 'testcase_id' in tc_report else '???'
verd = tc_report['verdict'] if 'verdict' in tc_report else '???'
desc = tc_report['description'] if 'description' in tc_report else '???'
table.append((tc_id, verd, desc))
# testcase report
click.echo()
click.echo(click.style(tabulate(table, headers="firstrow"), fg=COLOR_TEST_SESSION_HELPER_MESSAGE))
click.echo()
_echo_testcase_partial_verdicts_as_table(tc_report['partial_verdicts'])
if 'partial_verdicts' in tc_report:
click.echo()
_echo_testcase_partial_verdicts_as_table(tc_report['partial_verdicts'])
click.echo()
except Exception as e:
......@@ -1235,12 +1199,12 @@ def list_to_str(ls):
return ret
def save_pcap_from_base64(filename, pcap_file_base64, dir=None):
def _save_file_from_base64(filename, base64_encoded_file, dir=None):
"""
Returns number of bytes saved.
:param filename:
:param pcap_file_base64:
:param base64_encoded_file:
:return:
"""
......@@ -1250,12 +1214,116 @@ def save_pcap_from_base64(filename, pcap_file_base64, dir=None):
file_path = os.path.join(os.getcwd(), filename)
with open(file_path, "wb") as pcap_file:
nb = pcap_file.write(base64.b64decode(pcap_file_base64))
nb = pcap_file.write(base64.b64decode(base64_encoded_file))
return nb
if __name__ == "__main__":
# # # AMQP connection, channel, publish and requests/replies handling # # #
def _amqp_request(request_message, component_id=COMPONENT_ID, timeout=10):
TIME_WAIT_BETWEEN_POLLS = 0.5
state_lock.acquire()
if not _connection_ok():
_echo_dispatcher('No connection established yet, setting up one..')
_set_up_connection(create_listener=False)
channel = state['channel']
amqp_exchange = session_profile['amqp_exchange']
# check first that sender didnt forget about reply to and corr id
assert request_message.reply_to
assert request_message.correlation_id
if amqp_exchange is None:
amqp_exchange = 'amq.topic'
response = None
reply_queue_name = 'amqp_rpc_%s@%s' % (str(uuid.uuid4())[:8], component_id)
try:
result = channel.queue_declare(queue=reply_queue_name, auto_delete=True)
callback_queue = result.method.queue
# bind and listen to reply_to topic
channel.queue_bind(
exchange=amqp_exchange,
queue=callback_queue,
routing_key=request_message.reply_to
)
channel.basic_publish(
exchange=amqp_exchange,
routing_key=request_message.routing_key,
properties=pika.BasicProperties(**request_message.get_properties()),
body=request_message.to_json(),
)
retries_left = int(timeout / TIME_WAIT_BETWEEN_POLLS)
while retries_left > 0:
time.sleep(TIME_WAIT_BETWEEN_POLLS)
method, props, body = channel.basic_get(reply_queue_name)
if method:
channel.basic_ack(method.delivery_tag)
if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
break
retries_left -= 1
if retries_left > 0:
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
response = MsgReply(request_message, **body_dict)
else:
raise Exception(
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
type(request_message)
)
)
finally:
# clean up
channel.queue_delete(reply_queue_name)
state_lock.release()
return response