Commit 06137f47 authored by Federico Sismondi's avatar Federico Sismondi

bugfix + pep8

parent f4d8c90b
......@@ -28,15 +28,16 @@ rabbitmq_handler.setFormatter(json_formatter)
logger.addHandler(rabbitmq_handler)
logger.setLevel(logging.DEBUG)
class PacketRouter(threading.Thread):
AGENT_1_ID = AGENT_NAMES[0]
AGENT_2_ID = AGENT_NAMES[1]
AGENT_TT_ID = AGENT_TT_ID
def __init__(self, conn, routing_table = None):
def __init__(self, conn, routing_table=None):
threading.Thread.__init__(self)
logger.info("Imported agent names of the test session: %s" %str(AGENT_NAMES))
logger.info("Imported agent names of the test session: %s" % str(AGENT_NAMES))
if routing_table:
self.routing_table = routing_table
......@@ -45,21 +46,21 @@ class PacketRouter(threading.Thread):
# agent_TT is the agent instantiated by the testing tool
self.routing_table = {
# first two entries is for a user to user setup
'data.tun.fromAgent.%s'%PacketRouter.AGENT_1_ID:
'data.tun.fromAgent.%s' % PacketRouter.AGENT_1_ID:
[
'data.tun.toAgent.%s'%PacketRouter.AGENT_2_ID,
'data.tun.toAgent.%s'%PacketRouter.AGENT_TT_ID
'data.tun.toAgent.%s' % PacketRouter.AGENT_2_ID,
'data.tun.toAgent.%s' % PacketRouter.AGENT_TT_ID
],
'data.tun.fromAgent.%s'%PacketRouter.AGENT_2_ID:
'data.tun.fromAgent.%s' % PacketRouter.AGENT_2_ID:
[
'data.tun.toAgent.%s'%PacketRouter.AGENT_1_ID,
'data.tun.toAgent.%s'%PacketRouter.AGENT_TT_ID
'data.tun.toAgent.%s' % PacketRouter.AGENT_1_ID,
'data.tun.toAgent.%s' % PacketRouter.AGENT_TT_ID
],
# entry for a user to automated iut setup (doesnt create any conflict with the previous ones)
'data.tun.fromAgent.%s'%PacketRouter.AGENT_TT_ID:
'data.tun.fromAgent.%s' % PacketRouter.AGENT_TT_ID:
[
'data.tun.toAgent.%s'%PacketRouter.AGENT_1_ID
'data.tun.toAgent.%s' % PacketRouter.AGENT_1_ID
],
}
......@@ -73,18 +74,16 @@ class PacketRouter(threading.Thread):
self.channel = self.connection.channel()
queue_name = 'data_packets_queue@%s' % COMPONENT_ID
self.channel.queue_declare(queue=queue_name, auto_delete = True )
self.channel.queue_declare(queue=queue_name, auto_delete=True)
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=queue_name,
routing_key='data.tun.fromAgent.#')
queue=queue_name,
routing_key='data.tun.fromAgent.#')
msg = MsgTestingToolComponentReady(
component='packetrouting'
)
publish_message(channel, msg)
publish_message(self.channel, msg)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=queue_name)
......@@ -94,40 +93,40 @@ class PacketRouter(threading.Thread):
def on_request(self, ch, method, props, body):
# obj hook so json.loads respects the order of the fields sent -just for visualization purposeses-
body_dict = json.loads(body.decode('utf-8'),object_pairs_hook=OrderedDict)
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.debug("Message sniffed: %s" %(body_dict['_type']))
logger.debug("Message sniffed: %s" % (body_dict['_type']))
self.message_count += 1
print('\n* * * * * * MESSAGE SNIFFED (%s) * * * * * * *'%self.message_count)
print("TIME: %s"%datetime.datetime.time(datetime.datetime.now()))
print('\n* * * * * * MESSAGE SNIFFED (%s) * * * * * * *' % self.message_count)
print("TIME: %s" % datetime.datetime.time(datetime.datetime.now()))
print(" - - - ")
print("ROUTING_KEY: %s" % method.routing_key)
print(" - - - ")
print("HEADERS: %s" % props.headers)
print(" - - - ")
print("PROPS: %s" %json.dumps(
{
'content_type' : props.content_type,
'content_encoding' : props.content_encoding,
'headers' : props.headers,
'delivery_mode' : props.delivery_mode,
'priority' : props.priority,
'correlation_id' : props.correlation_id,
'reply_to' : props.reply_to,
'expiration' : props.expiration,
'message_id' : props.message_id,
'timestamp' : props.timestamp,
'user_id' : props.user_id,
'app_id' : props.app_id,
'cluster_id' : props.cluster_id,
}
)
print("PROPS: %s" % json.dumps(
{
'content_type': props.content_type,
'content_encoding': props.content_encoding,
'headers': props.headers,
'delivery_mode': props.delivery_mode,
'priority': props.priority,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'expiration': props.expiration,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
'cluster_id': props.cluster_id,
}
)
)
print(" - - - ")
print('BODY %s' % json.dumps(body_dict))
print(" - - - ")
#print("ERRORS: %s" % )
# print("ERRORS: %s" % )
print('* * * * * * * * * * * * * * * * * * * * * \n')
# let's route the message to the right agent
......@@ -152,27 +151,25 @@ class PacketRouter(threading.Thread):
)
)
logger.info("Routing packet (%d) from topic: %s to topic: %s"%(self.message_count,src_rkey,dst_rkey))
logger.info(
"Routing packet (%d) from topic: %s to topic: %s" % (self.message_count, src_rkey, dst_rkey))
else:
logger.error('No known route for r_key source: {r_key}'.format(r_key=src_rkey))
return
def run(self):
self.channel.start_consuming()
logger.info('Bye byes!')
###############################################################################
if __name__ == '__main__':
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
channel = connection.channel()
def signal_int_handler(channel):
# FINISHING... let's send a goodby message
msg = {
......@@ -197,14 +194,10 @@ if __name__ == '__main__':
# in case its not declared
connection.channel().exchange_declare(exchange=AMQP_EXCHANGE,
type='topic',
durable=True,
)
type='topic',
durable=True,
)
# start amqp router thread
r = PacketRouter(connection,None)
r = PacketRouter(connection, None)
r.start()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment