From 38a24d75499d520e4dbfb50f5e4b0460cde3b7b1 Mon Sep 17 00:00:00 2001 From: Federico Sismondi Date: Fri, 15 Dec 2017 13:06:55 +0100 Subject: [PATCH] plenty of new features to REPL component, the most notorious one is the possibility to run it in a lazy mode meaning that messages body are just printed in STDOUT instead of being processes as Messages --- cli.py | 55 ++++++++++++----- event_bus_utils.py | 143 +++++++++++++++++++++++++++------------------ interop_cli.py | 89 +++++++++++++++++++--------- messages.py | 20 ++++++- 4 files changed, 207 insertions(+), 100 deletions(-) diff --git a/cli.py b/cli.py index 4d91841..e30bdbb 100644 --- a/cli.py +++ b/cli.py @@ -31,6 +31,8 @@ from pure_pcapy import Dumper, Pkthdr, DLT_IEEE802_15_4, DLT_RAW # globals message_count = 0 +print("THIS CLI IS NOT SUPPORTED ANY MORE") +logging.warning("THIS CLI IS NOT SUPPORTED ANY MORE") def print_message(method, props, body): global message_count @@ -547,29 +549,40 @@ if __name__ == '__main__': }) service_tat = OrderedDict({ - 'tat0': MsgInteropTestCaseAnalyze(), + 'tat0': MsgInteropTestCaseAnalyze( + protocol='coap', + ), 'tat1': MsgInteropTestCaseAnalyze( + protocol='coap', testcase_id="TD_COAP_CORE_01", - testcase_ref="http://f-interop.paris.inria.fr/tests/TD_COAP_CORE_01_v01", + testcase_ref="http://doc.f-interop.eu/tests/TD_COAP_CORE_01_v01", file_enc="pcap_base64", filename="TD_COAP_CORE_01.pcap", value=PCAP_empty_base64, ), 'tat2': MsgInteropTestCaseAnalyze( + protocol='coap', testcase_id="TD_COAP_CORE_01", - testcase_ref="http://f-interop.paris.inria.fr/tests/TD_COAP_CORE_01_v01", + testcase_ref="http://doc.f-interop.eu/tests/TD_COAP_CORE_01_v01", file_enc="pcap_base64", filename="TD_COAP_CORE_01.pcap", value=PCAP_TC_COAP_01_base64, ), # 'tat3': MsgInteropTestCaseAnalyze( # testcase_id="TD_COAP_CORE_04", - # testcase_ref="http://f-interop.paris.inria.fr/tests/TD_COAP_CORE_04_v01", + # testcase_ref="http://doc.f-interop.eu/tests/TD_COAP_CORE_04_v01", # file_enc="pcap_base64", # filename="TD_COAP_CORE_04.pcap", # value=PCAP_COAP_TC4_OVER_TUN_INTERFACE_base64, # ) - + 'tat_onem2m': MsgInteropTestCaseAnalyze( + protocol='onem2m', + testcase_id="TD_M2M_NH_01", + testcase_ref="http://doc.f-interop.eu/tests/TD_M2M_NH_01", + file_enc="pcap_base64", + filename="TD_M2M_NH_01.pcap", + value=PCAP_ONEM2M_TD_M2M_NH_01, + ), }) service_dissection = OrderedDict({ @@ -679,16 +692,27 @@ if __name__ == '__main__': 'conf_format_04_eut2': MsgConfigurationExecute(testcase_id='TD_6LoWPAN_FORMAT_04', node='eut2'), 'conf_format_06_eut1': MsgConfigurationExecute(testcase_id='TD_6LoWPAN_FORMAT_06', node='eut1'), 'conf_format_06_eut2': MsgConfigurationExecute(testcase_id='TD_6LoWPAN_FORMAT_06', node='eut2'), - 's_hc_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_01', node='eut1', target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"), # need to update the target_Address before sending the message!! - 's_hc_03': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_03', node='eut1', target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"), - 's_hc_05_step_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_05', node='eut2', target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"), - 's_hc_05_step_02': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_05', node='eut1', target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"), - 's_hc_07_step_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_07', node='eut2', target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"), - 's_hc_07_step_02': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_07', node='eut1', target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"), - 's_format_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_01', node='eut1',target_address="fe80:0000:0000:0000:0212:4b00:060d:97f5"), - 's_format_03': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_03', node='eut1',target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"), - 's_format_04': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_04', node='eut1',target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"), - 's_format_06': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_06', node='eut1',target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"), + 's_hc_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_01', node='eut1', + target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"), + # need to update the target_Address before sending the message!! + 's_hc_03': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_03', node='eut1', + target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"), + 's_hc_05_step_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_05', node='eut2', + target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"), + 's_hc_05_step_02': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_05', node='eut1', + target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"), + 's_hc_07_step_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_07', node='eut2', + target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"), + 's_hc_07_step_02': MsgStepStimuliExecute(step_id='TD_6LoWPAN_HC_07', node='eut1', + target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"), + 's_format_01': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_01', node='eut1', + target_address="fe80:0000:0000:0000:0212:4b00:060d:97f5"), + 's_format_03': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_03', node='eut1', + target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"), + 's_format_04': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_04', node='eut1', + target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"), + 's_format_06': MsgStepStimuliExecute(step_id='TD_6LoWPAN_FORMAT_06', node='eut1', + target_address="fe80:0000:0000:0000:0212:4b00:0433:ed9c"), # 'tc_hc_01': MsgTestCaseReady(testcase_id='TD_6LoWPAN_HC_01'), They have no effect on the iut controller # 'tc_hc_02': MsgTestCaseReady(testcase_id='TD_6LoWPAN_HC_02'), # 'tc_hc_03': MsgTestCaseReady(testcase_id='TD_6LoWPAN_HC_03'), @@ -713,7 +737,6 @@ if __name__ == '__main__': # target_address="fe80:0000:0000:0000:0212:4b00:0615:a500"), }) - event_type = params[0] print(event_type) diff --git a/event_bus_utils.py b/event_bus_utils.py index ed7547a..a8613fb 100644 --- a/event_bus_utils.py +++ b/event_bus_utils.py @@ -24,18 +24,19 @@ class AmqpListener(threading.Thread): DEFAULT_EXCHAGE = 'amq.topic' DEFAULT_AMQP_URL = 'amqp://guest:guest@locahost/' - def __init__(self, amqp_url, amqp_exchange, callback, topics=None): + def __init__(self, amqp_url, amqp_exchange, callback, topics=None, use_message_typing=True): self.COMPONENT_ID = 'amqp_listener_%s' % str(uuid.uuid4())[:8] self.connection = None self.channel = None self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID + self.use_message_typing = use_message_typing threading.Thread.__init__(self) if callback is None: - self.message_dispatcher = print + self.message_dispatcher = AmqpListener.default_message_handler else: self.message_dispatcher = callback @@ -56,6 +57,11 @@ class AmqpListener(threading.Thread): self.amqp_connect() + @classmethod + def default_message_handler(cls,message_as_dict): + clean_dict = dict((k, v) for k, v in message_as_dict.items() if v) + print(json.dumps(clean_dict,indent=4)) + def amqp_connect(self): self.connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url)) self.channel = self.connection.channel() @@ -106,25 +112,34 @@ class AmqpListener(threading.Thread): 'app_id': props.app_id, } - try: - m = Message.from_json(body) - if m is None: - raise Exception("Couldnt build message from json %s, amqp props: %s " % (body, props_dict)) - m.update_properties(**props_dict) - m.routing_key = method.routing_key - logging.debug('Message in bus: %s'%repr(m)) - self.message_dispatcher(m) - - except NonCompliantMessageFormatError as e: - logging.error('%s got a non compliant message error %s' % (self.__class__.__name__, e)) - - except Exception as e: - logging.error(e) - logging.error('message received:\n\tr_key: %s\n\t%s' % (method.routing_key, body)) - raise e - - finally: + if self.use_message_typing: + try: + m = Message.from_json(body) + if m is None: + raise Exception("Couldnt build message from json %s, amqp props: %s " % (body, props_dict)) + m.update_properties(**props_dict) + m.routing_key = method.routing_key + logging.debug('Message in bus: %s'%repr(m)) + self.message_dispatcher(m) + + except NonCompliantMessageFormatError as e: + logging.error('%s got a non compliant message error %s' % (self.__class__.__name__, e)) + + except Exception as e: + logging.error(e) + logging.error('message received:\n\tr_key: %s\n\t%s' % (method.routing_key, body)) + raise e + + finally: + ch.basic_ack(delivery_tag=method.delivery_tag) + else: + body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict) ch.basic_ack(delivery_tag=method.delivery_tag) + text_based_message_representation = OrderedDict() + text_based_message_representation.update({'routing_key':method.routing_key}) + text_based_message_representation.update(props_dict) + text_based_message_representation.update(body_dict) + self.message_dispatcher(text_based_message_representation) def run(self): logging.info("Starting thread listening on the event bus on topics %s" % self.topics) @@ -169,7 +184,7 @@ def publish_message(connection, message): channel.close() -def amqp_request(connection, request_message, component_id): +def amqp_request(connection, request_message, component_id, retries = 10): """ Publishes message into the correct topic (uses Message object metadata) Returns reply message. @@ -181,6 +196,9 @@ def amqp_request(connection, request_message, component_id): # check first that sender didnt forget about reply to and corr id assert request_message.reply_to assert request_message.correlation_id + assert retries > 0 + + time_between_requests = 0.5 channel = None @@ -207,10 +225,10 @@ def amqp_request(connection, request_message, component_id): ) time.sleep(0.2) - retries_left = 10 + retries_left = retries while retries_left > 0: - time.sleep(0.5) + time.sleep(time_between_requests) method, props, body = channel.basic_get(reply_queue_name) if method: channel.basic_ack(method.delivery_tag) @@ -261,37 +279,48 @@ if __name__ == '__main__': print("Callback function received: \n\t" + repr(message_received)) - # amqp listener example: - amqp_listener_thread = AmqpListener( - amqp_url=AMQP_URL, - amqp_exchange=AMQP_EXCHANGE, - callback=callback_function, - topics='#' - ) - - try: - amqp_listener_thread.start() - except Exception as e: - print(e) - - # publish message example - retries_left = 3 - while retries_left > 0: - try: - connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL)) - m = MsgTest() - publish_message(connection, m) - break - except pika.exceptions.ConnectionClosed: - retries_left -= 1 - print('retrying..') - time.sleep(0.2) - - # example of a request sent into the bus - m = MsgTestSuiteGetTestCases() - try: - r = amqp_request(connection, m, 'someImaginaryComponent') - print("This is the response I got:\n\t" + repr(r)) - - except AmqpSynchCallTimeoutError as e: - print("Nobody answered to our request :'(") + # EXAMPLE ON AMQP LISTENER + + # # amqp listener example: + # amqp_listener_thread = AmqpListener( + # amqp_url=AMQP_URL, + # amqp_exchange=AMQP_EXCHANGE, + # callback=callback_function, + # topics='#' + # ) + # + # try: + # amqp_listener_thread.start() + # except Exception as e: + # print(e) + # + # # publish message example + # retries_left = 3 + # while retries_left > 0: + # try: + # connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL)) + # m = MsgTest() + # publish_message(connection, m) + # break + # except pika.exceptions.ConnectionClosed: + # retries_left -= 1 + # print('retrying..') + # time.sleep(0.2) + # + # # example of a request sent into the bus + # m = MsgTestSuiteGetTestCases() + # try: + # r = amqp_request(connection, m, 'someImaginaryComponent') + # print("This is the response I got:\n\t" + repr(r)) + # + # except AmqpSynchCallTimeoutError as e: + # print("Nobody answered to our request :'(") + + + # EXAMPLE ON REQUEST REPLY FOR UI BUTTONS: + con = pika.BlockingConnection(pika.URLParameters(AMQP_URL)) + channel = con.channel() + + ui_request = MsgUiRequestConfirmationButton() + ui_reply = amqp_request(con,ui_request,'dummy_component') + print(repr(ui_reply)) \ No newline at end of file diff --git a/interop_cli.py b/interop_cli.py index 064212a..31fa1cb 100644 --- a/interop_cli.py +++ b/interop_cli.py @@ -47,9 +47,6 @@ DEFAULT_TOPIC_SUBSCRIPTIONS = [ MESSAGE_TYPES_NOT_ECHOED = [ MsgPacketInjectRaw, - MsgUiReply, - MsgUiDisplay, - MsgUiDisplayMarkdownText ] CONNECTION_SETUP_RETRIES = 3 @@ -64,6 +61,16 @@ session_profile = OrderedDict( } ) +# TODO handle the lock automatically with an state object +# def some_dummuy_release(): +# _echo_log_message('releasing!') +# def some_dummuy_acquire(): +# _echo_log_message('acquiring!') +# state_lock = Message() +# state_lock.__setattr__('release', some_dummuy_release) +# state_lock.__setattr__('acquire', some_dummuy_acquire) + +state_lock = threading.RLock() state = { 'testcase_id': None, 'step_id': None, @@ -90,10 +97,12 @@ def _init_action_suggested(): state['suggested_cmd'] = 'ts_start' -def amqp_request(channel, request_message, component_id): +def amqp_request(request_message, component_id=COMPONENT_ID): """ NOTE: channel must be a pika channel """ + state_lock.acquire() + channel = state['channel'] amqp_exchange = session_profile['amqp_exchange'] # check first that sender didnt forget about reply to and corr id @@ -155,6 +164,7 @@ def amqp_request(channel, request_message, component_id): finally: # clean up channel.queue_delete(reply_queue_name) + state_lock.release() return response @@ -168,6 +178,7 @@ def publish_message(message): for i in range(1, 4): try: + state_lock.acquire() state['channel'].basic_publish( exchange=session_profile['amqp_exchange'], routing_key=message.routing_key, @@ -181,6 +192,9 @@ def publish_message(message): _echo_error('Unexpected connection closed, retrying %s/%s' % (i, 4)) _set_up_connection() + finally: + state_lock.acquire() + @click.group() def cli(): @@ -212,11 +226,15 @@ def repl(): @cli.command() -def connect(): +@click.option('-ll', '--lazy-listener', + is_flag=True, + default=False, + help="lazy-listener doest perform convertion to Messages class") +def connect(lazy_listener): """ Connect to an AMQP session and start consuming messages """ - _set_up_connection() + _set_up_connection(lazy_listener=lazy_listener) @cli.command() @@ -238,16 +256,11 @@ def download_network_traces(): _handle_get_testcase_list() ls = state['tc_list'].copy() - try: - channel = state['connection'].channel() - except Exception as e: - _echo_error(e) - for tc_item in ls: try: tc_id = tc_item['testcase_id'] msg = MsgSniffingGetCapture(capture_id=tc_id) - response = amqp_request(channel, msg, COMPONENT_ID) + response = amqp_request(msg, COMPONENT_ID) if response.ok: save_pcap_from_base64(response.filename, response.value, TEMP_DIR) @@ -298,11 +311,10 @@ def _handle_get_testcase_list(): # requires testing tool to implement GetTestCases feature, see MsgTestSuiteGetTestCases if _connection_ok(): - temp_channel = state['connection'].channel() request_message = MsgTestSuiteGetTestCases() try: - testcases_list_reponse = amqp_request(temp_channel, request_message, COMPONENT_ID) + testcases_list_reponse = amqp_request(request_message, COMPONENT_ID) except Exception as e: _echo_error('Is testing tool up?') _echo_error(e) @@ -494,6 +506,13 @@ def enter_debug_context(): """ _send_configuration_default_message_for_coap_testsuite() + @cli.command() + def _get_session_configuration_from_ui(): + """ + Get session config from UI + """ + _get_session_configuration() + @cli.command() @click.argument('testcase_id') def _testcase_skip(testcase_id): @@ -573,11 +592,10 @@ def get_session_status(): # requires testing tool to implement GetStatus feature, see MsgTestSuiteGetStatus if _connection_ok(): - temp_channel = state['connection'].channel() request_message = MsgTestSuiteGetStatus() try: - status_resp = amqp_request(temp_channel, request_message, COMPONENT_ID) + status_resp = amqp_request(request_message, COMPONENT_ID) except Exception as e: _echo_error('Is testing tool up?') _echo_error(e) @@ -634,14 +652,14 @@ def _echo_context(): _echo_list_as_table(table) -def _set_up_connection(): - global state - +def _set_up_connection(lazy_listener=False): # conn for repl publisher try: retries_left = CONNECTION_SETUP_RETRIES + state_lock.acquire() while retries_left > 0: try: + state['connection'] = pika.BlockingConnection(pika.URLParameters(session_profile['amqp_url'])) state['channel'] = state['connection'].channel() break @@ -656,6 +674,9 @@ def _set_up_connection(): state['channel'] = None return + finally: + state_lock.release() + # note we have a separate conn for amqp listener (each pika threads needs a different connection) if 'amqp_listener_thread' in state and state['amqp_listener_thread'] is not None: _echo_log_message('stopping amqp listener thread') @@ -666,12 +687,22 @@ def _set_up_connection(): _echo_log_message('amqp listener thread doesnt want to stop, lets terminate it..') th.terminate() - amqp_listener_thread = AmqpListener( - amqp_url=session_profile['amqp_url'], - amqp_exchange=session_profile['amqp_exchange'], - callback=_message_handler, - topics=DEFAULT_TOPIC_SUBSCRIPTIONS, - ) + if lazy_listener: + amqp_listener_thread = AmqpListener( + amqp_url=session_profile['amqp_url'], + amqp_exchange=session_profile['amqp_exchange'], + callback=None, + topics=DEFAULT_TOPIC_SUBSCRIPTIONS, + use_message_typing=False, + ) + else: + amqp_listener_thread = AmqpListener( + amqp_url=session_profile['amqp_url'], + amqp_exchange=session_profile['amqp_exchange'], + callback=_message_handler, + topics=DEFAULT_TOPIC_SUBSCRIPTIONS, + use_message_typing=True, + ) amqp_listener_thread.start() state['amqp_listener_thread'] = amqp_listener_thread @@ -1020,7 +1051,7 @@ def _echo_gui_message(msg): str(msg.tags), str(msg.fields)[:70], msg.routing_key, - msg.correlation_id if hasattr(msg,'correlation_id') else '' + msg.correlation_id if hasattr(msg, 'correlation_id') else '' ), fg=COLOR_SESSION_LOG)) @@ -1088,6 +1119,12 @@ def _send_configuration_default_message_for_coap_testsuite(): publish_message(message) +def _get_session_configuration(): + _echo_input("Executing debug message %s" % sys._getframe().f_code.co_name) + req = MsgUiRequestSessionConfiguration() + publish_message(req) + + def _ui_send_confirmation_button(text=None): _echo_input("Executing debug message %s" % sys._getframe().f_code.co_name) diff --git a/messages.py b/messages.py index 8f8cc4a..3572084 100644 --- a/messages.py +++ b/messages.py @@ -510,6 +510,7 @@ class MsgUiReply(Message): ] } + class MsgUiRequestTextInput(Message): """ Requirements: ... @@ -551,7 +552,7 @@ class MsgUiRequestConfirmationButton(Message): "tags": [], "fields": [ { - "name": "Please confirm that Uruguay es el mejor pais", + "name": "test_button", "type": "button", "value": True }, @@ -559,6 +560,23 @@ class MsgUiRequestConfirmationButton(Message): } +class MsgUiRequestSessionConfiguration(Message): + """ + Requirements: ... + + Type: Event + + Pub/Sub: TT -> UI + + Description: Message for requesting configuration message to UI + """ + routing_key = "ui.core.session.configuration.get.request" + + _msg_data_template = { + "_type": "ui.core.session.configuration.get.request", + } + + class MsgUiDisplay(Message): """ Requirements: ... -- 2.24.1