Commit 38a24d75 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

plenty of new features to REPL component, the most notorious one is the...

plenty of new features to REPL component, the most notorious one is the possibility to run it in a lazy mode meaning that messages body are just printed in STDOUT instead of being processes as Messages
parent ca2c1188
......@@ -31,6 +31,8 @@ from pure_pcapy import Dumper, Pkthdr, DLT_IEEE802_15_4, DLT_RAW
# globals
message_count = 0
print("THIS CLI IS NOT SUPPORTED ANY MORE")
logging.warning("THIS CLI IS NOT SUPPORTED ANY MORE")
def print_message(method, props, body):
global message_count
......@@ -547,29 +549,40 @@ if __name__ == '__main__':
})
service_tat = OrderedDict({
'tat0': MsgInteropTestCaseAnalyze(),
'tat0': MsgInteropTestCaseAnalyze(
protocol='coap',
),
'tat1': MsgInteropTestCaseAnalyze(
protocol='coap',
testcase_id="TD_COAP_CORE_01",
testcase_ref="http://f-interop.paris.inria.fr/tests/TD_COAP_CORE_01_v01",
testcase_ref="http://doc.f-interop.eu/tests/TD_COAP_CORE_01_v01",
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
value=PCAP_empty_base64,
),
'tat2': MsgInteropTestCaseAnalyze(
protocol='coap',
testcase_id="TD_COAP_CORE_01",
testcase_ref="http://f-interop.paris.inria.fr/tests/TD_COAP_CORE_01_v01",
testcase_ref="http://doc.f-interop.eu/tests/TD_COAP_CORE_01_v01",
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
value=PCAP_TC_COAP_01_base64,
),
# 'tat3': MsgInteropTestCaseAnalyze(
# testcase_id="TD_COAP_CORE_04",
# testcase_ref="http://f-interop.paris.inria.fr/tests/TD_COAP_CORE_04_v01",
# testcase_ref="http://doc.f-interop.eu/tests/TD_COAP_CORE_04_v01",
# file_enc="pcap_base64",
# filename="TD_COAP_CORE_04.pcap",
# value=PCAP_COAP_TC4_OVER_TUN_INTERFACE_base64,
# )
'tat_onem2m': MsgInteropTestCaseAnalyze(
protocol='onem2m',
testcase_id="TD_M2M_NH_01",
testcase_ref="http://doc.f-interop.eu/tests/TD_M2M_NH_01",
file_enc="pcap_base64",
filename="TD_M2M_NH_01.pcap",
value=PCAP_ONEM2M_TD_M2M_NH_01,
),
})
service_dissection = OrderedDict({
......@@ -679,16 +692,27 @@ if __name__ == '__main__':
'conf_format_04_eut2': MsgConfigurationExecute(testcase_id='TD_6LoWPAN_FORMAT_04', node='eut2'),
'conf_format_06_eut1': MsgConfigurationExecute(testcase_id='TD_6LoWPAN_FORMAT_06', node='eut1'),
'conf_format_06_eut2': MsgConfigurationExecute(testcase_id='TD_6LoWPAN_FORMAT_06', node='eut2'),
's_hc_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_01', node='eut1', target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"), # need to update the target_Address before sending the message!!
's_hc_03': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_03', node='eut1', target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"),
's_hc_05_step_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_05', node='eut2', target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"),
's_hc_05_step_02': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_05', node='eut1', target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"),
's_hc_07_step_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_07', node='eut2', target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"),
's_hc_07_step_02': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_07', node='eut1', target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"),
's_format_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_01', node='eut1',target_address="fe80:0000:0000:0000:0212:4b00:060d:97f5"),
's_format_03': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_03', node='eut1',target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"),
's_format_04': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_04', node='eut1',target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"),
's_format_06': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_06', node='eut1',target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"),
's_hc_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_01', node='eut1',
target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"),
# need to update the target_Address before sending the message!!
's_hc_03': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_03', node='eut1',
target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"),
's_hc_05_step_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_05', node='eut2',
target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"),
's_hc_05_step_02': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_05', node='eut1',
target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"),
's_hc_07_step_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_07', node='eut2',
target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"),
's_hc_07_step_02': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_07', node='eut1',
target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"),
's_format_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_01', node='eut1',
target_address="fe80:0000:0000:0000:0212:4b00:060d:97f5"),
's_format_03': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_03', node='eut1',
target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"),
's_format_04': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_04', node='eut1',
target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"),
's_format_06': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_06', node='eut1',
target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"),
# 'tc_hc_01': MsgTestCaseReady(testcase_id='TD_6LoWPAN_HC_01'), They have no effect on the iut controller
# 'tc_hc_02': MsgTestCaseReady(testcase_id='TD_6LoWPAN_HC_02'),
# 'tc_hc_03': MsgTestCaseReady(testcase_id='TD_6LoWPAN_HC_03'),
......@@ -713,7 +737,6 @@ if __name__ == '__main__':
# target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"),
})
event_type = params[0]
print(event_type)
......
......@@ -24,18 +24,19 @@ class AmqpListener(threading.Thread):
DEFAULT_EXCHAGE = 'amq.topic'
DEFAULT_AMQP_URL = 'amqp://guest:guest@locahost/'
def __init__(self, amqp_url, amqp_exchange, callback, topics=None):
def __init__(self, amqp_url, amqp_exchange, callback, topics=None, use_message_typing=True):
self.COMPONENT_ID = 'amqp_listener_%s' % str(uuid.uuid4())[:8]
self.connection = None
self.channel = None
self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID
self.use_message_typing = use_message_typing
threading.Thread.__init__(self)
if callback is None:
self.message_dispatcher = print
self.message_dispatcher = AmqpListener.default_message_handler
else:
self.message_dispatcher = callback
......@@ -56,6 +57,11 @@ class AmqpListener(threading.Thread):
self.amqp_connect()
@classmethod
def default_message_handler(cls,message_as_dict):
clean_dict = dict((k, v) for k, v in message_as_dict.items() if v)
print(json.dumps(clean_dict,indent=4))
def amqp_connect(self):
self.connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
self.channel = self.connection.channel()
......@@ -106,25 +112,34 @@ class AmqpListener(threading.Thread):
'app_id': props.app_id,
}
try:
m = Message.from_json(body)
if m is None:
raise Exception("Couldnt build message from json %s, amqp props: %s " % (body, props_dict))
m.update_properties(**props_dict)
m.routing_key = method.routing_key
logging.debug('Message in bus: %s'%repr(m))
self.message_dispatcher(m)
except NonCompliantMessageFormatError as e:
logging.error('%s got a non compliant message error %s' % (self.__class__.__name__, e))
except Exception as e:
logging.error(e)
logging.error('message received:\n\tr_key: %s\n\t%s' % (method.routing_key, body))
raise e
finally:
if self.use_message_typing:
try:
m = Message.from_json(body)
if m is None:
raise Exception("Couldnt build message from json %s, amqp props: %s " % (body, props_dict))
m.update_properties(**props_dict)
m.routing_key = method.routing_key
logging.debug('Message in bus: %s'%repr(m))
self.message_dispatcher(m)
except NonCompliantMessageFormatError as e:
logging.error('%s got a non compliant message error %s' % (self.__class__.__name__, e))
except Exception as e:
logging.error(e)
logging.error('message received:\n\tr_key: %s\n\t%s' % (method.routing_key, body))
raise e
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag)
text_based_message_representation = OrderedDict()
text_based_message_representation.update({'routing_key':method.routing_key})
text_based_message_representation.update(props_dict)
text_based_message_representation.update(body_dict)
self.message_dispatcher(text_based_message_representation)
def run(self):
logging.info("Starting thread listening on the event bus on topics %s" % self.topics)
......@@ -169,7 +184,7 @@ def publish_message(connection, message):
channel.close()
def amqp_request(connection, request_message, component_id):
def amqp_request(connection, request_message, component_id, retries = 10):
"""
Publishes message into the correct topic (uses Message object metadata)
Returns reply message.
......@@ -181,6 +196,9 @@ def amqp_request(connection, request_message, component_id):
# check first that sender didnt forget about reply to and corr id
assert request_message.reply_to
assert request_message.correlation_id
assert retries > 0
time_between_requests = 0.5
channel = None
......@@ -207,10 +225,10 @@ def amqp_request(connection, request_message, component_id):
)
time.sleep(0.2)
retries_left = 10
retries_left = retries
while retries_left > 0:
time.sleep(0.5)
time.sleep(time_between_requests)
method, props, body = channel.basic_get(reply_queue_name)
if method:
channel.basic_ack(method.delivery_tag)
......@@ -261,37 +279,48 @@ if __name__ == '__main__':
print("Callback function received: \n\t" + repr(message_received))
# amqp listener example:
amqp_listener_thread = AmqpListener(
amqp_url=AMQP_URL,
amqp_exchange=AMQP_EXCHANGE,
callback=callback_function,
topics='#'
)
try:
amqp_listener_thread.start()
except Exception as e:
print(e)
# publish message example
retries_left = 3
while retries_left > 0:
try:
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
m = MsgTest()
publish_message(connection, m)
break
except pika.exceptions.ConnectionClosed:
retries_left -= 1
print('retrying..')
time.sleep(0.2)
# example of a request sent into the bus
m = MsgTestSuiteGetTestCases()
try:
r = amqp_request(connection, m, 'someImaginaryComponent')
print("This is the response I got:\n\t" + repr(r))
except AmqpSynchCallTimeoutError as e:
print("Nobody answered to our request :'(")
# EXAMPLE ON AMQP LISTENER
# # amqp listener example:
# amqp_listener_thread = AmqpListener(
# amqp_url=AMQP_URL,
# amqp_exchange=AMQP_EXCHANGE,
# callback=callback_function,
# topics='#'
# )
#
# try:
# amqp_listener_thread.start()
# except Exception as e:
# print(e)
#
# # publish message example
# retries_left = 3
# while retries_left > 0:
# try:
# connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
# m = MsgTest()
# publish_message(connection, m)
# break
# except pika.exceptions.ConnectionClosed:
# retries_left -= 1
# print('retrying..')
# time.sleep(0.2)
#
# # example of a request sent into the bus
# m = MsgTestSuiteGetTestCases()
# try:
# r = amqp_request(connection, m, 'someImaginaryComponent')
# print("This is the response I got:\n\t" + repr(r))
#
# except AmqpSynchCallTimeoutError as e:
# print("Nobody answered to our request :'(")
# EXAMPLE ON REQUEST REPLY FOR UI BUTTONS:
con = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
channel = con.channel()
ui_request = MsgUiRequestConfirmationButton()
ui_reply = amqp_request(con,ui_request,'dummy_component')
print(repr(ui_reply))
\ No newline at end of file
......@@ -47,9 +47,6 @@ DEFAULT_TOPIC_SUBSCRIPTIONS = [
MESSAGE_TYPES_NOT_ECHOED = [
MsgPacketInjectRaw,
MsgUiReply,
MsgUiDisplay,
MsgUiDisplayMarkdownText
]
CONNECTION_SETUP_RETRIES = 3
......@@ -64,6 +61,16 @@ session_profile = OrderedDict(
}
)
# TODO handle the lock automatically with an state object
# def some_dummuy_release():
# _echo_log_message('releasing!')
# def some_dummuy_acquire():
# _echo_log_message('acquiring!')
# state_lock = Message()
# state_lock.__setattr__('release', some_dummuy_release)
# state_lock.__setattr__('acquire', some_dummuy_acquire)
state_lock = threading.RLock()
state = {
'testcase_id': None,
'step_id': None,
......@@ -90,10 +97,12 @@ def _init_action_suggested():
state['suggested_cmd'] = 'ts_start'
def amqp_request(channel, request_message, component_id):
def amqp_request(request_message, component_id=COMPONENT_ID):
"""
NOTE: channel must be a pika channel
"""
state_lock.acquire()
channel = state['channel']
amqp_exchange = session_profile['amqp_exchange']
# check first that sender didnt forget about reply to and corr id
......@@ -155,6 +164,7 @@ def amqp_request(channel, request_message, component_id):
finally:
# clean up
channel.queue_delete(reply_queue_name)
state_lock.release()
return response
......@@ -168,6 +178,7 @@ def publish_message(message):
for i in range(1, 4):
try:
state_lock.acquire()
state['channel'].basic_publish(
exchange=session_profile['amqp_exchange'],
routing_key=message.routing_key,
......@@ -181,6 +192,9 @@ def publish_message(message):
_echo_error('Unexpected connection closed, retrying %s/%s' % (i, 4))
_set_up_connection()
finally:
state_lock.acquire()
@click.group()
def cli():
......@@ -212,11 +226,15 @@ def repl():
@cli.command()
def connect():
@click.option('-ll', '--lazy-listener',
is_flag=True,
default=False,
help="lazy-listener doest perform convertion to Messages class")
def connect(lazy_listener):
"""
Connect to an AMQP session and start consuming messages
"""
_set_up_connection()
_set_up_connection(lazy_listener=lazy_listener)
@cli.command()
......@@ -238,16 +256,11 @@ def download_network_traces():
_handle_get_testcase_list()
ls = state['tc_list'].copy()
try:
channel = state['connection'].channel()
except Exception as e:
_echo_error(e)
for tc_item in ls:
try:
tc_id = tc_item['testcase_id']
msg = MsgSniffingGetCapture(capture_id=tc_id)
response = amqp_request(channel, msg, COMPONENT_ID)
response = amqp_request(msg, COMPONENT_ID)
if response.ok:
save_pcap_from_base64(response.filename, response.value, TEMP_DIR)
......@@ -298,11 +311,10 @@ def _handle_get_testcase_list():
# requires testing tool to implement GetTestCases feature, see MsgTestSuiteGetTestCases
if _connection_ok():
temp_channel = state['connection'].channel()
request_message = MsgTestSuiteGetTestCases()
try:
testcases_list_reponse = amqp_request(temp_channel, request_message, COMPONENT_ID)
testcases_list_reponse = amqp_request(request_message, COMPONENT_ID)
except Exception as e:
_echo_error('Is testing tool up?')
_echo_error(e)
......@@ -494,6 +506,13 @@ def enter_debug_context():
"""
_send_configuration_default_message_for_coap_testsuite()
@cli.command()
def _get_session_configuration_from_ui():
"""
Get session config from UI
"""
_get_session_configuration()
@cli.command()
@click.argument('testcase_id')
def _testcase_skip(testcase_id):
......@@ -573,11 +592,10 @@ def get_session_status():
# requires testing tool to implement GetStatus feature, see MsgTestSuiteGetStatus
if _connection_ok():
temp_channel = state['connection'].channel()
request_message = MsgTestSuiteGetStatus()
try:
status_resp = amqp_request(temp_channel, request_message, COMPONENT_ID)
status_resp = amqp_request(request_message, COMPONENT_ID)
except Exception as e:
_echo_error('Is testing tool up?')
_echo_error(e)
......@@ -634,14 +652,14 @@ def _echo_context():
_echo_list_as_table(table)
def _set_up_connection():
global state
def _set_up_connection(lazy_listener=False):
# conn for repl publisher
try:
retries_left = CONNECTION_SETUP_RETRIES
state_lock.acquire()
while retries_left > 0:
try:
state['connection'] = pika.BlockingConnection(pika.URLParameters(session_profile['amqp_url']))
state['channel'] = state['connection'].channel()
break
......@@ -656,6 +674,9 @@ def _set_up_connection():
state['channel'] = None
return
finally:
state_lock.release()
# note we have a separate conn for amqp listener (each pika threads needs a different connection)
if 'amqp_listener_thread' in state and state['amqp_listener_thread'] is not None:
_echo_log_message('stopping amqp listener thread')
......@@ -666,12 +687,22 @@ def _set_up_connection():
_echo_log_message('amqp listener thread doesnt want to stop, lets terminate it..')
th.terminate()
amqp_listener_thread = AmqpListener(
amqp_url=session_profile['amqp_url'],
amqp_exchange=session_profile['amqp_exchange'],
callback=_message_handler,
topics=DEFAULT_TOPIC_SUBSCRIPTIONS,
)
if lazy_listener:
amqp_listener_thread = AmqpListener(
amqp_url=session_profile['amqp_url'],
amqp_exchange=session_profile['amqp_exchange'],
callback=None,
topics=DEFAULT_TOPIC_SUBSCRIPTIONS,
use_message_typing=False,
)
else:
amqp_listener_thread = AmqpListener(
amqp_url=session_profile['amqp_url'],
amqp_exchange=session_profile['amqp_exchange'],
callback=_message_handler,
topics=DEFAULT_TOPIC_SUBSCRIPTIONS,
use_message_typing=True,
)
amqp_listener_thread.start()
state['amqp_listener_thread'] = amqp_listener_thread
......@@ -1020,7 +1051,7 @@ def _echo_gui_message(msg):
str(msg.tags),
str(msg.fields)[:70],
msg.routing_key,
msg.correlation_id if hasattr(msg,'correlation_id') else ''
msg.correlation_id if hasattr(msg, 'correlation_id') else ''
),
fg=COLOR_SESSION_LOG))
......@@ -1088,6 +1119,12 @@ def _send_configuration_default_message_for_coap_testsuite():
publish_message(message)
def _get_session_configuration():
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
req = MsgUiRequestSessionConfiguration()
publish_message(req)
def _ui_send_confirmation_button(text=None):
_echo_input("Executing debug message %s" % sys._getframe().f_code.co_name)
......
......@@ -510,6 +510,7 @@ class MsgUiReply(Message):
]
}
class MsgUiRequestTextInput(Message):
"""
Requirements: ...
......@@ -551,7 +552,7 @@ class MsgUiRequestConfirmationButton(Message):
"tags": [],
"fields": [
{
"name": "Please confirm that Uruguay es el mejor pais",
"name": "test_button",
"type": "button",
"value": True
},
......@@ -559,6 +560,23 @@ class MsgUiRequestConfirmationButton(Message):
}
class MsgUiRequestSessionConfiguration(Message):
"""
Requirements: ...
Type: Event
Pub/Sub: TT -> UI
Description: Message for requesting configuration message to UI
"""
routing_key = "ui.core.session.configuration.get.request"
_msg_data_template = {
"_type": "ui.core.session.configuration.get.request",
}
class MsgUiDisplay(Message):
"""
Requirements: ...
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment