Commit cac4937c authored by Federico Sismondi's avatar Federico Sismondi

refact packages struct, added setup.py for iopppytest-cli

parent 2b144e68
......@@ -5,6 +5,7 @@ import base64
import logging
import threading
import traceback
from collections import OrderedDict
import click
from click_repl import ExitReplException
......@@ -17,9 +18,9 @@ try:
from event_bus_utils import AmqpListener
from tabulate import tabulate
except:
from .messages import *
from .event_bus_utils import AmqpListener
from .tabulate import tabulate
from ..messages import *
from ..event_bus_utils import AmqpListener
from ..tabulate import tabulate
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.WARNING)
......@@ -37,6 +38,7 @@ COLOR_TEST_SESSION_HELPER_MESSAGE = 'yellow'
# DIR used for network dumps and other type of tmp files
TEMP_DIR = 'tmp'
WAIT_TIME_FOR_USER_INPUT = 60
DEFAULT_TOPIC_SUBSCRIPTIONS = [
'#'
......@@ -93,107 +95,6 @@ def _init_action_suggested():
state['suggested_cmd'] = 'ts_start'
def amqp_request(request_message, component_id=COMPONENT_ID):
"""
NOTE: channel must be a pika channel
"""
state_lock.acquire()
channel = state['channel']
amqp_exchange = session_profile['amqp_exchange']
# check first that sender didnt forget about reply to and corr id
assert request_message.reply_to
assert request_message.correlation_id
if amqp_exchange is None:
amqp_exchange = 'amq.topic'
response = None
reply_queue_name = 'amqp_rpc_%s@%s' % (str(uuid.uuid4())[:8], component_id)
try:
result = channel.queue_declare(queue=reply_queue_name, auto_delete=True)
callback_queue = result.method.queue
# bind and listen to reply_to topic
channel.queue_bind(
exchange=amqp_exchange,
queue=callback_queue,
routing_key=request_message.reply_to
)
channel.basic_publish(
exchange=amqp_exchange,
routing_key=request_message.routing_key,
properties=pika.BasicProperties(**request_message.get_properties()),
body=request_message.to_json(),
)
time.sleep(0.2)
retries_left = 5
while retries_left > 0:
time.sleep(0.5)
method, props, body = channel.basic_get(reply_queue_name)
if method:
channel.basic_ack(method.delivery_tag)
if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
break
retries_left -= 1
if retries_left > 0:
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
response = MsgReply(request_message, **body_dict)
else:
raise Exception(
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
type(request_message)
)
)
finally:
# clean up
channel.queue_delete(reply_queue_name)
state_lock.release()
return response
def publish_message(message):
if not _connection_ok():
_echo_dispatcher('No connection established yet')
return
_echo_dispatcher('Sending message..')
for i in range(1, 4):
try:
state_lock.acquire()
channel = state['connection'].channel()
channel.basic_publish(
exchange=session_profile['amqp_exchange'],
routing_key=message.routing_key,
properties=pika.BasicProperties(**message.get_properties()),
body=message.to_json(),
)
channel.close()
break
except pika.exceptions.ConnectionClosed as err:
_echo_error(err)
_echo_error('Unexpected connection closed, retrying %s/%s' % (i, 4))
_set_up_connection()
finally:
state_lock.acquire()
@click.group()
def cli():
pass
......@@ -227,7 +128,7 @@ def repl():
@click.option('-ll', '--lazy-listener',
is_flag=True,
default=False,
help="lazy-listener doest perform convertion to Messages class")
help="lazy-listener doest perform conversion to Messages objects")
def connect(lazy_listener):
"""
Connect to an AMQP session and start consuming messages
......@@ -258,10 +159,10 @@ def download_network_traces():
try:
tc_id = tc_item['testcase_id']
msg = MsgSniffingGetCapture(capture_id=tc_id)
response = amqp_request(msg, COMPONENT_ID)
response = _amqp_request(msg, COMPONENT_ID)
if response.ok:
save_pcap_from_base64(response.filename, response.value, TEMP_DIR)
_save_file_from_base64(response.filename, response.value, TEMP_DIR)
_echo_input("downloaded network trace %s , into dir: %s" % (response.filename, TEMP_DIR))
else:
raise Exception(response.error_message)
......@@ -271,6 +172,38 @@ def download_network_traces():
_echo_error(e)
@cli.command()
@click.argument('path-to-file', type=click.Path(exists=True))
@click.option('--text-message', default=None, help="Message to be displayed in GUI")
@click.option('--user-id', default='all', help="User ID in case there are several users in session")
def gui_request_file_upload(path_to_file, text_message, user_id):
"""
Request user to upload a file, saves it in directory (if provided) or else in .tmp
"""
global state
msg_request = MsgUiRequestUploadFile()
msg_request.fields = [
{
"name": text_message,
"type": "file",
}
]
if user_id:
msg_request.routing_key = "ui.user.{}.request".format(user_id)
msg_response = _amqp_request(msg_request, COMPONENT_ID, timeout=WAIT_TIME_FOR_USER_INPUT)
values_dict = msg_response.fields.pop() # If there's more than one then GUI fucked up
value = values_dict['value']
filename = values_dict['filename']
_save_file_from_base64(filename, value, path_to_file)
_echo_input("saved file {} in path {}".format(filename, path_to_file))
@cli.command()
def clear():
"""
......@@ -302,61 +235,58 @@ def _handle_testcase_select():
testcase_id=ls[resp - 1]['testcase_id']
)
publish_message(msg)
_publish_message(msg)
def _handle_get_testcase_list():
# requires testing tool to implement GetTestCases feature, see MsgTestSuiteGetTestCases
if _connection_ok():
request_message = MsgTestSuiteGetTestCases()
request_message = MsgTestSuiteGetTestCases()
try:
testcases_list_reponse = amqp_request(request_message, COMPONENT_ID)
except Exception as e:
_echo_error('Is testing tool up?')
_echo_error(e)
return
try:
testcases_list_reponse = _amqp_request(request_message, COMPONENT_ID)
except Exception as e:
_echo_error('Is testing tool up?')
_echo_error(e)
return
try:
state['tc_list'] = testcases_list_reponse.tc_list
except Exception as e:
_echo_error(e)
return
try:
state['tc_list'] = testcases_list_reponse.tc_list
except Exception as e:
_echo_error(e)
return
_echo_list_of_dicts_as_table(state['tc_list'])
else:
_echo_error('No connection established')
_echo_list_of_dicts_as_table(state['tc_list'])
def _handle_action_testsuite_start():
if click.confirm('Do you want START test suite?'):
msg = MsgTestSuiteStart()
publish_message(msg)
_publish_message(msg)
def _handle_action_testcase_start():
if click.confirm('Do you want START test case?'):
msg = MsgTestCaseStart() # TODO no testcase id input?
publish_message(msg)
_publish_message(msg)
def _handle_action_testsuite_abort():
if click.confirm('Do you want ABORT test suite?'):
msg = MsgTestSuiteAbort()
publish_message(msg)
_publish_message(msg)
def _handle_action_testcase_skip():
if click.confirm('Do you want SKIP current test case?'):
msg = MsgTestCaseSkip()
publish_message(msg)
_publish_message(msg)
def _handle_action_testcase_restart():
if click.confirm('Do you want RESTART current test case?'):
msg = MsgTestCaseRestart()
publish_message(msg)
_publish_message(msg)
def _handle_action_stimuli():
......@@ -370,7 +300,7 @@ def _handle_action_stimuli():
node=session_profile['node'],
node_execution_mode="user_assisted"
)
publish_message(msg)
_publish_message(msg)
else:
_echo_error('Please execute all pending STIMULI steps')
......@@ -389,7 +319,7 @@ def _handle_action_verify():
node_execution_mode="user_assisted"
)
publish_message(msg)
_publish_message(msg)
message_handles_options = {'ts_start': _handle_action_testsuite_start,
......@@ -414,10 +344,6 @@ def action(api_call):
_echo_input(api_call)
if not _connection_ok():
_echo_dispatcher('No connection established yet')
return
if api_call == 'suggested':
if state['suggested_cmd']:
_echo_dispatcher("Executing : %s" % state['suggested_cmd'])
......@@ -482,7 +408,7 @@ def enter_debug_context():
try:
message_t = dummy_messages_dict[message_name]()
_echo_input("trying to send message: %s" % repr(message_t))
publish_message(message_t)
_publish_message(message_t)
except Exception as e:
_echo_error("Error found: %s" % e)
......@@ -517,7 +443,7 @@ def enter_debug_context():
msg = MsgSniffingStart(capture_id=testcase_id,
filter_if='tun0',
filter_proto='udp')
publish_message(msg)
_publish_message(msg)
@cli.command()
def _sniffer_stop():
......@@ -526,7 +452,7 @@ def enter_debug_context():
"""
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
msg = MsgSniffingStop()
publish_message(msg)
_publish_message(msg)
@cli.command()
def _sniffer_get_last_capture():
......@@ -535,7 +461,7 @@ def enter_debug_context():
"""
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
msg = MsgSniffingGetCaptureLast()
publish_message(msg)
_publish_message(msg)
@cli.command()
def _configure_perf_tt():
......@@ -545,7 +471,7 @@ def enter_debug_context():
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
from message_examples import PERF_TT_CONFIGURATION
message = MsgSessionConfiguration(**PERF_TT_CONFIGURATION) # builds a config for the perf TT
publish_message(message)
_publish_message(message)
@cli.command()
def _configure_comi_tt():
......@@ -555,7 +481,7 @@ def enter_debug_context():
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
from message_examples import COMI_TT_CONFIGURATION
message = MsgSessionConfiguration(**COMI_TT_CONFIGURATION) # builds a config message
publish_message(message)
_publish_message(message)
@cli.command()
def _configure_6lowpan_tt():
......@@ -565,7 +491,7 @@ def enter_debug_context():
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
from message_examples import SIXLOWPAN_TT_CONFIGURATION
message = MsgSessionConfiguration(**SIXLOWPAN_TT_CONFIGURATION) # builds a config message
publish_message(message)
_publish_message(message)
@cli.command()
def _test_tat_analyze_6lowpan():
......@@ -575,7 +501,7 @@ def enter_debug_context():
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
from message_examples import SIXLOWPAN_TAT_ANALYZE
message = MsgInteropTestCaseAnalyze(**SIXLOWPAN_TAT_ANALYZE)
publish_message(message)
_publish_message(message)
@cli.command()
def _configure_coap_tt():
......@@ -585,7 +511,7 @@ def enter_debug_context():
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
from message_examples import COAP_TT_CONFIGURATION
message = MsgSessionConfiguration(**COAP_TT_CONFIGURATION) # builds a config message
publish_message(message)
_publish_message(message)
@cli.command()
def _get_session_configuration_from_ui():
......@@ -594,7 +520,7 @@ def enter_debug_context():
"""
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
req = MsgUiRequestSessionConfiguration()
publish_message(req)
_publish_message(req)
@cli.command()
@click.argument('testcase_id')
......@@ -607,7 +533,7 @@ def enter_debug_context():
msg = MsgTestCaseSkip(
testcase_id=testcase_id
)
publish_message(msg)
_publish_message(msg)
@cli.command()
@click.argument('text')
......@@ -630,7 +556,7 @@ def enter_debug_context():
]
msg.fields = fields
publish_message(msg)
_publish_message(msg)
@cli.command()
@click.argument('text')
......@@ -650,7 +576,7 @@ def enter_debug_context():
"value": True
}]
publish_message(msg)
_publish_message(msg)
@cli.command()
@click.argument('text')
......@@ -666,7 +592,7 @@ def enter_debug_context():
)
_echo_input(text)
publish_message(msg)
_publish_message(msg)
@cli.command()
@click.argument('text')
......@@ -682,7 +608,7 @@ def enter_debug_context():
)
_echo_input(text)
publish_message(msg)
_publish_message(msg)
_echo_session_helper("Entering debugger context, added extra CMDs, please type --help for more info")
......@@ -694,10 +620,6 @@ def chat(message):
Send chat message, useful for user-to-user test sessions
"""
if not _connection_ok():
_echo_dispatcher('No connection established yet')
return
m = ''
for word in message:
......@@ -706,7 +628,7 @@ def chat(message):
c = MsgSessionChat(description=m,
user_name=session_profile['user_name'],
iut_node=session_profile['node'])
publish_message(c)
_publish_message(c)
@cli.command()
......@@ -730,7 +652,7 @@ def get_session_status():
request_message = MsgTestSuiteGetStatus()
try:
status_resp = amqp_request(request_message, COMPONENT_ID)
status_resp = _amqp_request(request_message, COMPONENT_ID)
except Exception as e:
_echo_error('Is testing tool up?')
_echo_error(e)
......@@ -787,7 +709,7 @@ def _echo_context():
_echo_list_as_table(table)
def _set_up_connection(lazy_listener=False):
def _set_up_connection(create_listener=True, lazy_listener=False):
# conn for repl publisher
try:
retries_left = CONNECTION_SETUP_RETRIES
......@@ -822,6 +744,11 @@ def _set_up_connection(lazy_listener=False):
_echo_log_message('amqp listener thread doesnt want to stop, lets terminate it..')
th.terminate()
if create_listener is False:
return
# set up listener thread which will call callback each time there's a new message in bus and matches a topic
if lazy_listener:
amqp_listener_thread = AmqpListener(
amqp_url=session_profile['amqp_url'],
......@@ -928,10 +855,10 @@ def _echo_welcome_message():
_echo_session_helper(m)
m = """
*********************************************************************************
* If you experience any problems, or you have any suggestions or feedback *
* don't hesitate to drop me an email at: federico.sismondi@inria.fr *
*********************************************************************************
*************************************************************************************
* If you experience any problems, or you have any suggestions or feedback *
* don't hesitate to drop me an email at: federico[dot]sismondi[at]gmail[dot]com *
*************************************************************************************
"""
_echo_session_helper(m)
......@@ -1065,19 +992,24 @@ def _echo_report_as_table(tc_report_list):
assert type(tc_report_list) is list
#testcases = [(k, v) for k, v in report_dict.items() if k.lower().startswith('td')]
# testcases = [(k, v) for k, v in report_dict.items() if k.lower().startswith('td')]
testcases = tc_report_list
for tc_report in testcases:
table = []
table.append(("Testcase ID", 'Final verdict', 'Description'))
table.append((tc_report['testcase_id'], tc_report['verdict'], tc_report['description']))
tc_id = tc_report['testcase_id'] if 'testcase_id' in tc_report else '???'
verd = tc_report['verdict'] if 'verdict' in tc_report else '???'
desc = tc_report['description'] if 'description' in tc_report else '???'
table.append((tc_id, verd, desc))
# testcase report
click.echo()
click.echo(click.style(tabulate(table, headers="firstrow"), fg=COLOR_TEST_SESSION_HELPER_MESSAGE))
click.echo()
_echo_testcase_partial_verdicts_as_table(tc_report['partial_verdicts'])
if 'partial_verdicts' in tc_report:
click.echo()
_echo_testcase_partial_verdicts_as_table(tc_report['partial_verdicts'])
click.echo()
except Exception as e:
......@@ -1235,12 +1167,12 @@ def list_to_str(ls):
return ret
def save_pcap_from_base64(filename, pcap_file_base64, dir=None):
def _save_file_from_base64(filename, base64_encoded_file, dir=None):
"""
Returns number of bytes saved.
:param filename:
:param pcap_file_base64:
:param base64_encoded_file:
:return:
"""
......@@ -1250,37 +1182,110 @@ def save_pcap_from_base64(filename, pcap_file_base64, dir=None):
file_path = os.path.join(os.getcwd(), filename)
with open(file_path, "wb") as pcap_file:
nb = pcap_file.write(base64.b64decode(pcap_file_base64))
nb = pcap_file.write(base64.b64decode(base64_encoded_file))
return nb
if __name__ == "__main__":
# # # AMQP connection, channel, publish and requests/replies handling # # #
try:
session_profile.update({'amqp_exchange': str(os.environ['AMQP_EXCHANGE'])})
except KeyError as e:
pass # use default
def _amqp_request(request_message, component_id=COMPONENT_ID, timeout=10):
TIME_WAIT_BETWEEN_POLLS = 0.5
state_lock.acquire()
if not _connection_ok():
_echo_dispatcher('No connection established yet, setting up one..')
_set_up_connection(create_listener=False)
channel = state['channel']
amqp_exchange = session_profile['amqp_exchange']
# check first that sender didnt forget about reply to and corr id
assert request_message.reply_to
assert request_message.correlation_id
if amqp_exchange is None:
amqp_exchange = 'amq.topic'
response = None
reply_queue_name = 'amqp_rpc_%s@%s' % (str(uuid.uuid4())[:8], component_id)
try:
env_url = str(os.environ['AMQP_URL'])
if 'heartbeat_interval' not in env_url:
url = '%s?%s&%s&%s&%s&%s' % (
env_url,
"heartbeat_interval=600",
"blocked_connection_timeout=300",
"retry_delay=1",
"socket_timeout=1",
"connection_attempts=3"
)
result = channel.queue_declare(queue=reply_queue_name, auto_delete=True)
callback_queue = result.method.queue
# bind and listen to reply_to topic
channel.queue_bind(
exchange=amqp_exchange,
queue=callback_queue,
routing_key=request_message.reply_to
)
channel.basic_publish(
exchange=amqp_exchange,
routing_key=request_message.routing_key,
properties=pika.BasicProperties(**request_message.get_properties()),
body=request_message.to_json(),
)
retries_left = int(timeout / TIME_WAIT_BETWEEN_POLLS)
while retries_left > 0:
time.sleep(TIME_WAIT_BETWEEN_POLLS)
method, props, body = channel.basic_get(reply_queue_name)
if method:
channel.basic_ack(method.delivery_tag)
if hasattr(props, 'correlation_id') and props.correlation_id == request_message.correlation_id:
break
retries_left -= 1
if retries_left > 0:
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
response = MsgReply(request_message, **body_dict)
else:
url = env_url
raise Exception(
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
type(request_message)
)
)
finally:
# clean up
channel.queue_delete(reply_queue_name)
state_lock.release()
return response
session_profile.update({'amqp_url': url})
except KeyError as e:
pass # use default
try:
cli()
except ExitReplException:
sys.exit(0)
print('Bye!')
def _publish_message(message):
if not _connection_ok():
_echo_dispatcher('No connection established yet, setting up one..')
_set_up_connection(create_listener=False)
_echo_dispatcher('Sending message..')
for i in range(1, 4):
try:
state_lock.acquire()
channel = state['connection'].channel()
channel.basic_publish(
exchange=session_profile['amqp_exchange'],
routing_key=message.routing_key,
properties=pika.BasicProperties(**message.get_properties()),
body=message.to_json(),
)
channel.close()
break
except pika.exceptions.ConnectionClosed as err:
_echo_error(err)
_echo_error('Unexpected connection closed, retrying %s/%s' % (i, 4))
_set_up_connection()
finally:
state_lock.acquire()
# Author:
# Federico Sismondi <federico.sismondi@gmail.com>
#
# License: see LICENSE document
import io
from setuptools import setup, find_packages
MAJOR = 0
MINOR = 1
PATCH = 2
VERSION = "{}.{}.{}".format(MAJOR, MINOR, PATCH)
name = 'ioppytest-cli'
description = "Command line interface for interacting with ioppytest testing tool (all interactions happen over AMQP even bus)."
CLASSIFIERS = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"Intended Audience :: Testers",
"Intended Audience :: Network Testers",
# "License :: OSI Approved :: BSD License",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Topic :: Networks",
"Topic :: Interoperability testing",
"Topic :: Scientific/Engineering",
# "Operating System :: Microsoft :: Windows", not there yet..
"Operating System :: POSIX",
"Operating System :: Unix",
"Operating System :: MacOS"
]
with open("version.py", "w") as f:
f.write("__version__ = '{}'\n".format(VERSION))
setup(
name=name,
author='Federico Sismondi',
author_email="federico.sismondi@gmail.com",
description=description,
version=VERSION,
license="??",
classifiers=CLASSIFIERS,
packages=find_packages(exclude=["tests"]),
long_description=io.open('README.md', 'r', encoding='utf-8').read(),
install_requires=[
'click==6.7',
'click_repl==0.1.2',
'pika==0.11.0',
'prompt_toolkit==1.0.15',
'wcwidth==0.1.7',
],
entry_points={'console_scripts': [
'ioppytest-cli=ioppytest_cli.ioppytest_cli:main',
],
},
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment