diff --git a/Jenkinsfile b/Jenkinsfile index 5c9f8b53191f69d1613a02a96af9bb7196ae3d91..83f402780e53b0060494e193d710742f23dce7a0 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -60,6 +60,14 @@ if(env.JOB_NAME =~ 'utils/'){ ''' } } + + stage("other tests"){ + gitlabCommitStatus("other tests"){ + sh ''' + python3 -m messages_doc + ''' + } + } } } diff --git a/amqp_synch_call.py b/amqp_synch_call.py index c52d6fb8fd91026c262c4d428461d39cd50b7243..2dce0c22b1fe4d6bd007bd658c485b7f95ab33dc 100644 --- a/amqp_synch_call.py +++ b/amqp_synch_call.py @@ -104,7 +104,7 @@ def amqp_request(connection, request_message, component_id): raise AmqpSynchCallTimeoutError( "Response timeout! rkey: %s , request type: %s" % ( request_message.routing_key, - request_message._type + type(request_message) ) ) diff --git a/event_bus_utils.py b/event_bus_utils.py index 74928ce9c15a436bdc9ec110e3b5958b739c3646..56afb84249886e79fd559053dd93ffd2704d3491 100644 --- a/event_bus_utils.py +++ b/event_bus_utils.py @@ -103,23 +103,11 @@ class AmqpListener(threading.Thread): def on_request(self, ch, method, props, body): - props_dict = { - 'content_type': props.content_type, - 'delivery_mode': props.delivery_mode, - 'correlation_id': props.correlation_id, - 'reply_to': props.reply_to, - 'message_id': props.message_id, - 'timestamp': props.timestamp, - 'user_id': props.user_id, - 'app_id': props.app_id, - } - if self.use_message_typing: try: - m = Message.from_json(body) + m = Message.load_from_pika(method, props, body) if m is None: - raise Exception("Couldnt build message from json %s, amqp props: %s " % (body, props_dict)) - m.update_properties(**props_dict) + raise Exception("Couldnt build message from json %s, rkey: %s " % (body, method.routing_key)) m.routing_key = method.routing_key logging.debug('Message in bus: %s' % repr(m)) self.message_dispatcher(m) @@ -135,6 +123,18 @@ class AmqpListener(threading.Thread): finally: ch.basic_ack(delivery_tag=method.delivery_tag) else: + + props_dict = { + 'content_type': props.content_type, + 'delivery_mode': props.delivery_mode, + 'correlation_id': props.correlation_id, + 'reply_to': props.reply_to, + 'message_id': props.message_id, + 'timestamp': props.timestamp, + 'user_id': props.user_id, + 'app_id': props.app_id, + } + body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict) ch.basic_ack(delivery_tag=method.delivery_tag) text_based_message_representation = OrderedDict() @@ -249,7 +249,7 @@ def amqp_request(connection, request_message, component_id, retries=10): raise AmqpSynchCallTimeoutError( "Response timeout! rkey: %s , request type: %s" % ( request_message.routing_key, - request_message._type + type(request_message) ) ) diff --git a/interop_cli.py b/interop_cli.py index ac8d901ae0b15bb868ac301e26ee9ce08d2a3c5e..d9f01be38ef8148573a47f80a8f36cbf1b19d3e1 100644 --- a/interop_cli.py +++ b/interop_cli.py @@ -156,7 +156,7 @@ def amqp_request(request_message, component_id=COMPONENT_ID): raise Exception( "Response timeout! rkey: %s , request type: %s" % ( request_message.routing_key, - request_message._type + type(request_message) ) ) @@ -969,7 +969,7 @@ def _echo_backend_message(msg): assert isinstance(msg, Message) try: - m = "\n[Session message] [%s] " % msg._type + m = "\n[Session message] [%s] " % type(msg) if hasattr(m, 'description'): m += m.description diff --git a/messages_doc.py b/messages_doc.py index 5ab238d6d12f32d97fe01f8564fc1675740ee879..0a75460b3f5fd6f2f7edf71aa078fc936219800f 100644 --- a/messages_doc.py +++ b/messages_doc.py @@ -109,19 +109,19 @@ TBD listens and consumes the following messages from the bus: print(head_1) for s in services: - print(table_row(s._type)) + print(table_row(s.routing_key)) print() print() print(head_2) for e in events: - print(table_row(e._type)) + print(table_row(e.routing_key)) print() print() print(head_3) for e in events: - print(table_row(e._type)) + print(table_row(e.routing_key)) def generate_doc_section_into_file(file_to_write, section, list_of_message_classes): @@ -130,7 +130,7 @@ def generate_doc_section_into_file(file_to_write, section, list_of_message_class for msg_class in list_of_message_classes: - msg_type = msg_class()._type + msg_type = msg_class().routing_key print('generating doc for %s,%s' % (msg_type, msg_class)) # Message Type diff --git a/packet_dumper.py b/packet_dumper.py index f908a47ee41f64c06d94d6f29635061da8d32ecc..ed2cdf730dedb44874a5af63887dd88b755d3452 100644 --- a/packet_dumper.py +++ b/packet_dumper.py @@ -265,30 +265,15 @@ class AmqpDataPacketDumper: ch.basic_ack(delivery_tag=method.delivery_tag) try: - - props_dict = { - 'content_type': props.content_type, - 'delivery_mode': props.delivery_mode, - 'correlation_id': props.correlation_id, - 'reply_to': props.reply_to, - 'message_id': props.message_id, - 'timestamp': props.timestamp, - 'user_id': props.user_id, - 'app_id': props.app_id, - } - - m = Message.from_json(body) - m.update_properties(**props_dict) + m = Message.load_from_pika(method, props, body) logger.info('got event: %s' % type(m)) if isinstance(m, MsgTestingToolTerminate): ch.stop_consuming() self.stop() - if isinstance(m, MsgPacketSniffedRaw): - + elif isinstance(m, MsgPacketSniffedRaw): self.dump_packet(m) - try: # rotate files each X messages dumped if self.messages_dumped != 0 and self.messages_dumped % self.QUANTITY_MESSAGES_PER_PCAP == 0: self.dumps_rotate() @@ -298,7 +283,6 @@ class AmqpDataPacketDumper: logger.error(e) else: - #logger.info('drop amqp message: ' + repr(m)) pass except NonCompliantMessageFormatError as e: