Commit df9b9096 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

default ex is amq.topic, added some more messages examples

parent ac78a6d6
......@@ -93,42 +93,50 @@ class NullLogHandler(logging.Handler):
class AmqpSniffer(threading.Thread):
COMPONENT_ID = 'amqp_sniffer'
DEFAULT_EXCHAGE = 'amq.topic'
def __init__(self, conn, exchange=None, topics=None):
def __init__(self, conn, topics=None):
threading.Thread.__init__(self)
if exchange:
self.exchange = exchange
else:
self.exchange = self.DEFAULT_EXCHAGE
# queues & default exchange declaration
self.connection = conn
self.channel = self.connection.channel()
self.services_queue_name = 'services_queue@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.services_queue_name,
auto_delete=True,
arguments={'x-max-length': 200})
self.services_queu_name = 'services_queue@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.services_queu_name, auto_delete=True)
if topics: # susbscribe only to passed list
if topics: # subscribe only to passed list
for t in topics:
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=self.services_queu_name,
self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name,
routing_key=t)
else: # subscribe to all events
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=self.services_queu_name,
self.channel.queue_bind(exchange=self.exchange,
queue=self.services_queue_name,
routing_key='#')
# Hello world message
self.channel.basic_publish(
body=json.dumps({'_type': 'cli.info', 'value': 'CLI is up!'}),
routing_key='control.cli.info',
exchange=AMQP_EXCHANGE,
exchange=self.exchange,
properties=pika.BasicProperties(
content_type='application/json',
)
)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.services_queu_name)
self.channel.basic_consume(self.on_request, queue=self.services_queue_name)
def stop(self):
self.channel.queue_delete(self.services_queu_name)
self.channel.queue_delete(self.services_queue_name)
self.channel.stop_consuming()
self.connection.close()
......@@ -146,126 +154,6 @@ class AmqpSniffer(threading.Thread):
print('Bye byes!')
class AmqpDataPacketDumper(threading.Thread):
"""
Sniffs data.serial and dumps into pcap file (assumes that frames are DLT_IEEE802_15_4)
Sniffs data.tun and dumps into pcap file (assumes that frames are DLT_IEEE802_15_4)
about pcap header:
ts_sec: the date and time when this packet was captured. This value is in seconds since January 1, 1970 00:00:00 GMT; this is also known as a UN*X time_t. You can use the ANSI C time() function from time.h to get this value, but you might use a more optimized way to get this timestamp value. If this timestamp isn't based on GMT (UTC), use thiszone from the global header for adjustments.
ts_usec: in regular pcap files, the microseconds when this packet was captured, as an offset to ts_sec. In nanosecond-resolution files, this is, instead, the nanoseconds when the packet was captured, as an offset to ts_sec /!\ Beware: this value shouldn't reach 1 second (in regular pcap files 1 000 000; in nanosecond-resolution files, 1 000 000 000); in this case ts_sec must be increased instead!
incl_len: the number of bytes of packet data actually captured and saved in the file. This value should never become larger than orig_len or the snaplen value of the global header.
orig_len: the length of the packet as it appeared on the network when it was captured. If incl_len and orig_len differ, the actually saved packet size was limited by snaplen.
"""
COMPONENT_ID = 'capture_dumper'
def __init__(self, connection, topics):
threading.Thread.__init__(self)
# queues & default exchange declaration
self.messages_dumped = 0
self.connection = connection
self.channel = self.connection.channel()
self.data_queue_name = 'data@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.data_queue_name, auto_delete=True)
for t in topics:
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=self.data_queue_name,
routing_key=t)
# Hello world message
self.channel.basic_publish(
body=json.dumps({'_type': '%s.info' % self.COMPONENT_ID,
'value': '%s is up!' % self.COMPONENT_ID, }
),
routing_key='control.%s.info' % self.COMPONENT_ID,
exchange=AMQP_EXCHANGE,
properties=pika.BasicProperties(
content_type='application/json',
)
)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.data_queue_name)
self.pcap_15_4_dumper = Dumper(
filename='agents_serial_15_4.pcap',
snaplen=200,
network=DLT_IEEE802_15_4
)
self.pcap_raw_ip_dumper = Dumper(
filename='agents_tun_raw_ip.pcap',
snaplen=200,
network=DLT_RAW
)
def stop(self):
self.channel.queue_delete(self.data_queue_name)
self.channel.stop_consuming()
self.connection.close()
def on_request(self, ch, method, props, body):
now = datetime.now()
# obj hook so json.loads respects the order of the fields sent -just for visualization purposeses-
req_body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag)
logging.info("Message sniffed: %s, body: %s" % (json.dumps(req_body_dict), str(body)))
self.messages_dumped += 1
try:
m = Message.from_json(body)
if isinstance(m, MsgTestingToolTerminate):
ch.stop_consuming()
self.stop()
if isinstance(m, MsgPacketSniffedRaw):
if 'serial' in m.routing_key:
raw_packet = bytes(m.data)
packet_slip = bytes(m.data_slip)
# lets build pcap header for packet
pcap_packet_header = Pkthdr(
ts_sec=now.second,
ts_usec=now.microsecond,
incl_len=len(raw_packet),
orig_len=len(raw_packet),
)
self.pcap_15_4_dumper.dump(pcap_packet_header, raw_packet)
elif 'tun' in m.routing_key:
raw_packet = bytes(m.data)
# lets build pcap header for packet
pcap_packet_header = Pkthdr(
ts_sec=now.second,
ts_usec=now.microsecond,
incl_len=len(raw_packet),
orig_len=len(raw_packet),
)
self.pcap_raw_ip_dumper.dump(pcap_packet_header, raw_packet)
else:
logging.info('raw packet not dumped to pcap: ' + repr(m))
else:
logging.info('drop amqp message: ' + repr(m))
except NonCompliantMessageFormatError as e:
print('* * * * * * API VALIDATION ERROR * * * * * * * ')
print("AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
print('* * * * * * * * * * * * * * * * * * * * * * * * * \n')
raise NonCompliantMessageFormatError("AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
def run(self):
print("Starting thread listening on the event bus")
self.channel.start_consuming()
print('Bye byes!')
class Cli(threading.Thread):
"""
\brief Thread which handles CLI commands entered by the user.
......@@ -615,6 +503,9 @@ if __name__ == '__main__':
'6.e': MsgTestCaseSkip(testcase_id='TD_COAP_CORE_05_v01'),
'7': MsgTestCaseSelect(testcase_id='TD_COAP_CORE_02_v01'),
'8': MsgTestSuiteAbort(),
'100': MsgStepStimuliExecute(),
'101': MsgStepVerifyExecute(),
'102': MsgStepCheckExecute(),
})
events_orchestrator = OrderedDict({
......@@ -674,7 +565,16 @@ if __name__ == '__main__':
file_enc="pcap_base64",
filename="TD_COAP_CORE_01.pcap",
value=PCAP_TC_COAP_01_base64,
),
'tat3': MsgInteropTestCaseAnalyze(
testcase_id="TD_COAP_CORE_04",
testcase_ref="http://f-interop.paris.inria.fr/tests/TD_COAP_CORE_04_v01",
file_enc="pcap_base64",
filename="TD_COAP_CORE_04.pcap",
value=PCAP_COAP_TC4_OVER_TUN_INTERFACE_base64,
)
})
service_dissection = OrderedDict({
......@@ -712,8 +612,15 @@ if __name__ == '__main__':
filename="TD_COAP_CORE_01.pcap",
value=PCAP_COAP_GET_OVER_TUN_INTERFACE_base64,
),
# pcap sniffed using AMQP based packet sniffer
'dis7': MsgDissectionDissectCapture(
file_enc="pcap_base64",
filename="TD_COAP_CORE_04.pcap",
value=PCAP_COAP_TC4_OVER_TUN_INTERFACE_base64,
),
})
testing_tool_emulation = OrderedDict({
# testing tool is ready to start session
'tt1': MsgTestingToolReady(),
......@@ -767,29 +674,7 @@ if __name__ == '__main__':
cli.start()
# # start pcap dumper
# pcap_amqp_topic_subscriptions = ['data.serial.fromAgent.coap_client_agent',
# 'data.serial.fromAgent.coap_server_agent',
# 'data.tun.fromAgent.coap_server_agent',
# 'data.tun.fromAgent.coap_client_agent',
# ]
# pcap_dumper = AmqpDataPacketDumper(
# connection=connection,
# topics=pcap_amqp_topic_subscriptions
# )
# pcap_dumper.start()
# start amqp listener thread
# sniffer_subscriptions = [
# 'control.session',
# 'control.testcoordination',
# 'log.warning.#',
# 'log.error.#',
# 'log.critical.#',
# ]
#sniffer_subscriptions = list(set(sniffer_subscriptions + pcap_amqp_topic_subscriptions))
amqp_listener = AmqpSniffer(connection, None) # if None subscribe to all messages
amqp_listener = AmqpSniffer(connection, AMQP_EXCHANGE, None) # if None subscribe to all messages
amqp_listener.start()
# interrumpted
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment