Commit fc487a69 authored by Federico Sismondi's avatar Federico Sismondi

deprecate use of .from_json() everywhere

parent d39d1f77
Pipeline #1770 passed with stage
in 0 seconds
...@@ -60,6 +60,14 @@ if(env.JOB_NAME =~ 'utils/'){ ...@@ -60,6 +60,14 @@ if(env.JOB_NAME =~ 'utils/'){
''' '''
} }
} }
stage("other tests"){
gitlabCommitStatus("other tests"){
sh '''
python3 -m messages_doc
'''
}
}
} }
} }
...@@ -104,7 +104,7 @@ def amqp_request(connection, request_message, component_id): ...@@ -104,7 +104,7 @@ def amqp_request(connection, request_message, component_id):
raise AmqpSynchCallTimeoutError( raise AmqpSynchCallTimeoutError(
"Response timeout! rkey: %s , request type: %s" % ( "Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key, request_message.routing_key,
request_message._type type(request_message)
) )
) )
......
...@@ -103,23 +103,11 @@ class AmqpListener(threading.Thread): ...@@ -103,23 +103,11 @@ class AmqpListener(threading.Thread):
def on_request(self, ch, method, props, body): def on_request(self, ch, method, props, body):
props_dict = {
'content_type': props.content_type,
'delivery_mode': props.delivery_mode,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
}
if self.use_message_typing: if self.use_message_typing:
try: try:
m = Message.from_json(body) m = Message.load_from_pika(method, props, body)
if m is None: if m is None:
raise Exception("Couldnt build message from json %s, amqp props: %s " % (body, props_dict)) raise Exception("Couldnt build message from json %s, rkey: %s " % (body, method.routing_key))
m.update_properties(**props_dict)
m.routing_key = method.routing_key m.routing_key = method.routing_key
logging.debug('Message in bus: %s' % repr(m)) logging.debug('Message in bus: %s' % repr(m))
self.message_dispatcher(m) self.message_dispatcher(m)
...@@ -135,6 +123,18 @@ class AmqpListener(threading.Thread): ...@@ -135,6 +123,18 @@ class AmqpListener(threading.Thread):
finally: finally:
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
else: else:
props_dict = {
'content_type': props.content_type,
'delivery_mode': props.delivery_mode,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
}
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict) body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
text_based_message_representation = OrderedDict() text_based_message_representation = OrderedDict()
...@@ -249,7 +249,7 @@ def amqp_request(connection, request_message, component_id, retries=10): ...@@ -249,7 +249,7 @@ def amqp_request(connection, request_message, component_id, retries=10):
raise AmqpSynchCallTimeoutError( raise AmqpSynchCallTimeoutError(
"Response timeout! rkey: %s , request type: %s" % ( "Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key, request_message.routing_key,
request_message._type type(request_message)
) )
) )
......
...@@ -156,7 +156,7 @@ def amqp_request(request_message, component_id=COMPONENT_ID): ...@@ -156,7 +156,7 @@ def amqp_request(request_message, component_id=COMPONENT_ID):
raise Exception( raise Exception(
"Response timeout! rkey: %s , request type: %s" % ( "Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key, request_message.routing_key,
request_message._type type(request_message)
) )
) )
...@@ -969,7 +969,7 @@ def _echo_backend_message(msg): ...@@ -969,7 +969,7 @@ def _echo_backend_message(msg):
assert isinstance(msg, Message) assert isinstance(msg, Message)
try: try:
m = "\n[Session message] [%s] " % msg._type m = "\n[Session message] [%s] " % type(msg)
if hasattr(m, 'description'): if hasattr(m, 'description'):
m += m.description m += m.description
......
...@@ -109,19 +109,19 @@ TBD listens and consumes the following messages from the bus: ...@@ -109,19 +109,19 @@ TBD listens and consumes the following messages from the bus:
print(head_1) print(head_1)
for s in services: for s in services:
print(table_row(s._type)) print(table_row(s.routing_key))
print() print()
print() print()
print(head_2) print(head_2)
for e in events: for e in events:
print(table_row(e._type)) print(table_row(e.routing_key))
print() print()
print() print()
print(head_3) print(head_3)
for e in events: for e in events:
print(table_row(e._type)) print(table_row(e.routing_key))
def generate_doc_section_into_file(file_to_write, section, list_of_message_classes): def generate_doc_section_into_file(file_to_write, section, list_of_message_classes):
...@@ -130,7 +130,7 @@ def generate_doc_section_into_file(file_to_write, section, list_of_message_class ...@@ -130,7 +130,7 @@ def generate_doc_section_into_file(file_to_write, section, list_of_message_class
for msg_class in list_of_message_classes: for msg_class in list_of_message_classes:
msg_type = msg_class()._type msg_type = msg_class().routing_key
print('generating doc for %s,%s' % (msg_type, msg_class)) print('generating doc for %s,%s' % (msg_type, msg_class))
# Message Type # Message Type
......
...@@ -265,30 +265,15 @@ class AmqpDataPacketDumper: ...@@ -265,30 +265,15 @@ class AmqpDataPacketDumper:
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
try: try:
m = Message.load_from_pika(method, props, body)
props_dict = {
'content_type': props.content_type,
'delivery_mode': props.delivery_mode,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
}
m = Message.from_json(body)
m.update_properties(**props_dict)
logger.info('got event: %s' % type(m)) logger.info('got event: %s' % type(m))
if isinstance(m, MsgTestingToolTerminate): if isinstance(m, MsgTestingToolTerminate):
ch.stop_consuming() ch.stop_consuming()
self.stop() self.stop()
if isinstance(m, MsgPacketSniffedRaw): elif isinstance(m, MsgPacketSniffedRaw):
self.dump_packet(m) self.dump_packet(m)
try: # rotate files each X messages dumped try: # rotate files each X messages dumped
if self.messages_dumped != 0 and self.messages_dumped % self.QUANTITY_MESSAGES_PER_PCAP == 0: if self.messages_dumped != 0 and self.messages_dumped % self.QUANTITY_MESSAGES_PER_PCAP == 0:
self.dumps_rotate() self.dumps_rotate()
...@@ -298,7 +283,6 @@ class AmqpDataPacketDumper: ...@@ -298,7 +283,6 @@ class AmqpDataPacketDumper:
logger.error(e) logger.error(e)
else: else:
#logger.info('drop amqp message: ' + repr(m))
pass pass
except NonCompliantMessageFormatError as e: except NonCompliantMessageFormatError as e:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment