Commit 0c746b5c authored by Federico Sismondi's avatar Federico Sismondi

Merge branch 'deprecate_type_field_from_API' into 'master'

Deprecate type field from api

See merge request !12
parents 4ac53700 ef995729
Pipeline #1847 passed with stage
in 0 seconds
properties([[$class: 'GitLabConnectionProperty', gitLabConnection: 'figitlab']])
if(env.JOB_NAME =~ 'utils/'){
node('sudo'){
env.AMQP_URL="amqp://guest:guest@localhost/"
env.AMQP_EXCHANGE="amq.topic"
stage("Clone repo and submodules"){
checkout scm
sh '''
git submodule update --init
tree .
'''
}
stage ("Environment dependencies"){
withEnv(["DEBIAN_FRONTEND=noninteractive"]){
sh '''
sudo apt-get clean
sudo apt-get update
sudo apt-get upgrade -y -qq
sudo apt-get install --fix-missing -y -qq python-dev python-pip python-setuptools
sudo apt-get install --fix-missing -y -qq python3-dev python3-pip python3-setuptools
sudo apt-get install --fix-missing -y -qq build-essential
sudo apt-get install --fix-missing -y -qq libyaml-dev
sudo apt-get install --fix-missing -y -qq libssl-dev openssl
sudo apt-get install --fix-missing -y -qq libffi-dev
sudo apt-get install --fix-missing -y -qq curl tree netcat
sudo apt-get install --fix-missing -y -qq rabbitmq-server
sudo apt-get install --fix-missing -y -qq supervisor
sudo apt-get install --fix-missing -y -qq make
echo restarting rmq server and app
sudo rabbitmq-server -detached || true
sudo rabbitmqctl stop_app || true
sudo rabbitmqctl start_app || true
'''
}
}
stage("install dependencies"){
gitlabCommitStatus("install dependencies"){
withEnv(["DEBIAN_FRONTEND=noninteractive"]){
sh '''
echo installing python dependencies...
sudo -H python3 -m pip -qq install -r requirements.txt
sudo -H python3 -m pip -qq install pytest
'''
}
}
}
stage("unittesting modules"){
gitlabCommitStatus("unittesting modules"){
sh '''
echo $AMQP_URL
pwd
python3 -m pytest -p no:cacheprovider tests
'''
}
}
stage("other tests"){
gitlabCommitStatus("other tests"){
sh '''
python3 -m messages_doc
'''
}
}
}
}
......@@ -37,4 +37,5 @@ if __name__ == '__main__':
#from examples_pcap_base64 import PCAP_COAP_TC4_OVER_TUN_INTERFACE_base64,PCAP_ONEM2M_TD_M2M_NH_06
#base_64_pcap_value = "1MOyoQIABAAAAAAAAAAAAMgAAABlAAAAuDomWkVbCQCLAAAAiwAAAGABrlcAYxFAu7sAAAAAAAAAAAAAAAAAAbu7AAAAAAAAAAAAAAAAAAKOoRYzAGM/fEQC7BP7dUmTsX4Gc2VydmVyBnNlcnZlchEyUTLR4kMWMTAwNDg2oQL/eyJtMm06YWUiOnsicnIiOmZhbHNlLCJhcGkiOjEyMzQ1LCJybiI6ImFlVGVzdEMifX24OiZavKEKAMgAAAAyAgAAYAjkQQIKEUC7uwAAAAAAAAAAAAAAAAACu7sAAAAAAAAAAAAAAAAAARYzjqECCqBwZEHsE/t1SZONES9zZXJ2ZXIvQ0FFMTMxMjU0MDI5MDMwNzk4NjM0M0Ey1ugxMDA0ODaCB9H/ewogICAibTJtOmFlIiA6IHsKICAgICAgInJuIiA6ICJhZVRlc3RDIiwKICAgICAgInR5IiA6IDIsCiAgICAgICJyaSIgOiAiL3NlcnZlci9DQUUxMzEyNTQwMjkwMzA3OTg="
base_64_pcap_value = "1MOyoQIABAAAAAAAAAAAAMgAAABlAAAAk9MmWiSMDgCLAAAAiwAAAGABhdIAYxFAu7sAAAAAAAAAAAAAAAAAAru7AAAAAAAAAAAAAAAAAAGeoBYzAGMRfUQCGw2HLKvhsX4Gc2VydmVyBnNlcnZlchEyUTLR4kMWMTAwMzk5oQL/eyJtMm06YWUiOnsicnIiOmZhbHNlLCJhcGkiOjEyMzQ1LCJybiI6ImFlVGVzdEMifX2U0yZaDxsAAMgAAAAzAgAAYAQl6AILEUC7uwAAAAAAAAAAAAAAAAABu7sAAAAAAAAAAAAAAAAAAhYznqACC63hZEEbDYcsq+GNES9zZXJ2ZXIvQ0FFMzEzNTI5MDgyMzA2NDI1ODY2OEEy1ugxMDAzOTmCB9H/ewogICAibTJtOmFlIiA6IHsKICAgICAgInJuIiA6ICJhZVRlc3RDIiwKICAgICAgInR5IiA6IDIsCiAgICAgICJyaSIgOiAiL3NlcnZlci9DQUUzMTM1MjkwODIzMDY0MjU="
save_pcap_from_base64('test2.pcap', base_64_pcap_value)
b="1MOyoQIABAAAAAAAAAAAANAHAABlAAAAUH5eWpoNAABcAAAAXAAAAGAAH+YANBFAu7sAAAAAAAAAAAAAAAAAAru7AAAAAAAAAAAAAAAAAAEWM6EmADQyqGBFAUTAIR7/VHlwZTogMCAoQ09OKQpDb2RlOiAxIChHRVQpCk1JRDogMzI0"
save_pcap_from_base64('test2.pcap', b)
......@@ -104,7 +104,7 @@ def amqp_request(connection, request_message, component_id):
raise AmqpSynchCallTimeoutError(
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
request_message._type
type(request_message)
)
)
......
......@@ -10,8 +10,9 @@ try:
except:
from .messages import *
VERSION = '0.0.8'
VERSION = '0.0.9'
AMQP_EXCHANGE = 'amq.topic'
MAX_LOG_LINE_LENGTH = 120
class AmqpSynchCallTimeoutError(Exception):
......@@ -59,7 +60,10 @@ class AmqpListener(threading.Thread):
@classmethod
def default_message_handler(cls, message_as_dict):
clean_dict = dict((k, v) for k, v in message_as_dict.items() if v)
print(json.dumps(clean_dict, indent=4))
print('-' * 120)
print('%s : %s' % ('routing_key', clean_dict.pop('routing_key')))
print('-' * 120)
print(json.dumps(clean_dict, indent=4, sort_keys=True))
def amqp_connect(self):
self.connection = pika.BlockingConnection(pika.URLParameters(self.amqp_url))
......@@ -100,25 +104,13 @@ class AmqpListener(threading.Thread):
def on_request(self, ch, method, props, body):
props_dict = {
'content_type': props.content_type,
'delivery_mode': props.delivery_mode,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
}
if self.use_message_typing:
try:
m = Message.from_json(body)
m = Message.load_from_pika(method, props, body)
if m is None:
raise Exception("Couldnt build message from json %s, amqp props: %s " % (body, props_dict))
m.update_properties(**props_dict)
raise Exception("Couldnt build message from json %s, rkey: %s " % (body, method.routing_key))
m.routing_key = method.routing_key
logging.debug('Message in bus: %s' % repr(m))
logging.debug('Message in bus: %s' % repr(m)[:MAX_LOG_LINE_LENGTH])
self.message_dispatcher(m)
except NonCompliantMessageFormatError as e:
......@@ -132,6 +124,18 @@ class AmqpListener(threading.Thread):
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
props_dict = {
'content_type': props.content_type,
'delivery_mode': props.delivery_mode,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
}
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag)
text_based_message_representation = OrderedDict()
......@@ -155,7 +159,7 @@ class AmqpListener(threading.Thread):
logging.error(traceback.format_exc())
self.amqp_connect()
logging.info('Bye byes!')
logging.info('%s says Bye byes!' % self.COMPONENT_ID)
def publish_message(connection, message):
......@@ -246,7 +250,7 @@ def amqp_request(connection, request_message, component_id, retries=10):
raise AmqpSynchCallTimeoutError(
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
request_message._type
type(request_message)
)
)
......
......@@ -39,9 +39,6 @@ COLOR_TEST_SESSION_HELPER_MESSAGE = 'yellow'
TEMP_DIR = 'tmp'
DEFAULT_TOPIC_SUBSCRIPTIONS = [
# 'control.testcoordination',
# 'control.dissection',
# 'control.session',
'#'
]
......@@ -156,7 +153,7 @@ def amqp_request(request_message, component_id=COMPONENT_ID):
raise Exception(
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
request_message._type
type(request_message)
)
)
......@@ -969,7 +966,7 @@ def _echo_backend_message(msg):
assert isinstance(msg, Message)
try:
m = "\n[Session message] [%s] " % msg._type
m = "\n[Session message] [%s] " % type(msg)
if hasattr(m, 'description'):
m += m.description
......
This diff is collapsed.
......@@ -109,19 +109,19 @@ TBD listens and consumes the following messages from the bus:
print(head_1)
for s in services:
print(table_row(s._type))
print(table_row(s.routing_key))
print()
print()
print(head_2)
for e in events:
print(table_row(e._type))
print(table_row(e.routing_key))
print()
print()
print(head_3)
for e in events:
print(table_row(e._type))
print(table_row(e.routing_key))
def generate_doc_section_into_file(file_to_write, section, list_of_message_classes):
......@@ -130,7 +130,7 @@ def generate_doc_section_into_file(file_to_write, section, list_of_message_class
for msg_class in list_of_message_classes:
msg_type = msg_class()._type
msg_type = msg_class().routing_key
print('generating doc for %s,%s' % (msg_type, msg_class))
# Message Type
......
This diff is collapsed.
......@@ -45,7 +45,7 @@ DLT_SLIP = 8
DLT_PPP = 9
DLT_FDDI = 10
DLT_ATM_RFC1483 = 11
DLT_RAW = 12
#DLT_RAW = 12
DLT_PPP_SERIAL = 50
DLT_PPP_ETHER = 51
DLT_RAW = 101
......
......@@ -171,7 +171,6 @@ class JsonFormatter(logging.Formatter):
try:
log_record = OrderedDict()
log_record['_type'] = 'log'
log_record['component'] = record.name
except NameError:
log_record = {}
......
import doctest
import unittest
import messages
# python3 -m unittest tests/test_doctests.py -vvv
suite = unittest.TestSuite()
suite.addTest(doctest.DocTestSuite(messages))
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)
# -*- coding: utf-8 -*-
# !/usr/bin/env python3
import unittest
import pika
import os
from messages import *
from event_bus_utils import publish_message
import logging
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
logger = logging.getLogger(__name__)
logging.getLogger('pika').setLevel(logging.WARNING)
AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
AMQP_URL = str(os.environ['AMQP_URL'])
class MessagesLibraryTests(unittest.TestCase):
"""
python3 -m unittest tests/test_messages.py -vvv
"""
def setUp(self):
# let's collect all the messages classes
self.set_of_collected_messages = {v for (k, v) in globals().items() if 'Msg' in k}
self.non_serializable_messages_types = {MsgReply, MsgErrorReply}
self.serializable_messages_types = self.set_of_collected_messages - self.non_serializable_messages_types
# amqp stuff
self.conn = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
self.channel = self.conn.channel()
# message validation queue
self.queue_for_post_validation = '%s::queue_for_post_validation' % self.__class__.__name__
# lets' first clean up the queue
self.channel.queue_delete(queue=self.queue_for_post_validation)
self.channel.queue_declare(queue=self.queue_for_post_validation, auto_delete=True)
# subscribe to all messages in the bus
self.channel.queue_bind(exchange=AMQP_EXCHANGE, queue=self.queue_for_post_validation, routing_key='#')
self.channel.basic_qos(prefetch_count=1)
def tearDown(self):
self.conn.close()
def test_serialize_and_deserialize_messages_on_bus(self):
self.serialize_and_publish_all_messages()
self.consume_and_deserialize_messages()
def serialize_and_publish_all_messages(self):
for i in self.serializable_messages_types:
try:
m = i()
except Exception as e:
self.fail('%s found, while serializing %s' % (e, str(i)))
logging.info("publishing message type %s" % str(type(m)))
publish_message(
self.conn,
m
)
def consume_and_deserialize_messages(self):
time.sleep(1)
message_count = 0
method, props, body = self.channel.basic_get(self.queue_for_post_validation)
while method:
message_count += 1
logging.info('parsing message %s, number: %s' % (method.routing_key,message_count))
self.channel.basic_ack(method.delivery_tag)
# api call 1 - load w/o properties
message = Message.load(
json_body=body.decode('utf-8'),
routing_key=method.routing_key,
properties=None,
)
assert message, 'load w/o properties didnt work for %s' % (body.decode('utf-8'), method.routing_key)
# api call 2 - load w/ properties
props_dict = {
'content_type': props.content_type,
'delivery_mode': props.delivery_mode,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
}
message = Message.load(
json_body=body.decode('utf-8'),
routing_key=method.routing_key,
properties=props_dict,
)
assert message, 'load WITH properties didnt work for %s' % (body.decode('utf-8'), method.routing_key)
# api call 3 - load using pika dependent api call
message = Message.load_from_pika(
method,
props,
body,
)
#print(repr(message))
assert message, 'load pika properties didnt work for %s' % (body.decode('utf-8'), method.routing_key)
method, props, body = self.channel.basic_get(self.queue_for_post_validation)
if len(self.serializable_messages_types) != message_count:
self.fail('expecting %s messages, but got %s' % (len(self.serializable_messages_types), message_count))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment