Commit 01c45110 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

Merge branch 'dumper_as_a_processes' into 'master'

Dumper as a processes

See merge request !6
parents 406621a3 f75f374a
import six
import os
import pika
import time
from .messages import *
import logging
import threading
from binascii import unhexlify
from datetime import datetime
from messages import *
VERSION = '0.0.3'
......@@ -29,10 +35,10 @@ def publish_message(channel, message):
properties = pika.BasicProperties(**message.get_properties())
channel.basic_publish(
exchange=AMQP_EXCHANGE,
routing_key=message.routing_key,
properties=properties,
body=message.to_json(),
exchange=AMQP_EXCHANGE,
routing_key=message.routing_key,
properties=properties,
body=message.to_json(),
)
......@@ -60,16 +66,16 @@ def amqp_request(channel, request_message, component_id):
# bind and listen to reply_to topic
channel.queue_bind(
exchange=AMQP_EXCHANGE,
queue=callback_queue,
routing_key=request_message.reply_to
exchange=AMQP_EXCHANGE,
queue=callback_queue,
routing_key=request_message.reply_to
)
channel.basic_publish(
exchange=AMQP_EXCHANGE,
routing_key=request_message.routing_key,
properties=pika.BasicProperties(**request_message.get_properties()),
body=request_message.to_json(),
exchange=AMQP_EXCHANGE,
routing_key=request_message.routing_key,
properties=pika.BasicProperties(**request_message.get_properties()),
body=request_message.to_json(),
)
time.sleep(0.2)
......@@ -91,10 +97,10 @@ def amqp_request(channel, request_message, component_id):
else:
raise Exception(
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
request_message._type
)
"Response timeout! rkey: %s , request type: %s" % (
request_message.routing_key,
request_message._type
)
)
# clean up
......@@ -103,7 +109,43 @@ def amqp_request(channel, request_message, component_id):
return response
if __name__ == '__main__':
m = MsgSniffingGetCapture()
r = amqp_request(m, 'someImaginaryComponent')
print(repr(r))
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.WARNING)
# m = MsgSniffingGetCapture()
# r = amqp_request(m, 'someImaginaryComponent')
# print(repr(r))
try:
AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
print('Imported AMQP_EXCHANGE env var: %s' % AMQP_EXCHANGE)
except KeyError as e:
AMQP_EXCHANGE = "amq.topic"
print('Cannot retrieve environment variables for AMQP EXCHANGE. Loading default: %s' % AMQP_EXCHANGE)
try:
AMQP_URL = str(os.environ['AMQP_URL'])
print('Imported AMQP_URL env var: %s' % AMQP_URL)
p = six.moves.urllib_parse.urlparse(AMQP_URL)
AMQP_USER = p.username
AMQP_SERVER = p.hostname
logging.info(
"Env variables imported for AMQP connection, User: {0} @ Server: {1} ".format(AMQP_USER, AMQP_SERVER))
except KeyError as e:
print('Cannot retrieve environment variables for AMQP connection. Loading defaults..')
# load default values
AMQP_URL = "amqp://{0}:{1}@{2}/{3}".format("guest", "guest", "localhost", "/")
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
channel = connection.channel()
logging.info("AMQP connection established")
......@@ -18,17 +18,72 @@ import threading
import logging
import time
import json
from datetime import timedelta
from datetime import timedelta, datetime
import traceback
import uuid
from collections import OrderedDict
import datetime
import os
import signal
from messages import *
from examples_pcap_base64 import *
COMPONENT_ID = 'finterop_CLI'
from pure_pcapy import Dumper, Pkthdr, DLT_IEEE802_15_4, DLT_RAW
# globals
message_count = 0
def print_message(method, props, body):
global message_count
req_body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
logging.info("Message sniffed: %s, body: %s" % (json.dumps(req_body_dict), str(body)))
message_count += 1
props_dict = {
'content_type': props.content_type,
'content_encoding': props.content_encoding,
'headers': props.headers,
'delivery_mode': props.delivery_mode,
'priority': props.priority,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'expiration': props.expiration,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
'cluster_id': props.cluster_id,
}
# let's get rid of values which are empty
props_dict_only_non_empty_values = {k: v for k, v in props_dict.items() if v is not None}
print('\n* * * * * * MESSAGE SNIFFED (%s) * * * * * * *' % message_count)
print("TIME: %s" % datetime.time(datetime.now()))
print(" - - - ")
print("ROUTING_KEY: %s" % method.routing_key)
print(" - - - ")
print("PROPS: %s" % json.dumps(props_dict_only_non_empty_values))
print(" - - - ")
print('BODY %s' % json.dumps(req_body_dict))
print(" - - - ")
# print("ERRORS: %s" % )
print('* * * * * * * * * * * * * * * * * * * * * \n')
def validate_message_format(method, props, body):
# obj hook so json.loads respects the order of the fields sent -just for visualization purposeses-
req_body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
if props.content_type != "application/json":
print('* * * * * * API VALIDATION WARNING * * * * * * * ')
print("props.content_type : " + str(props.content_type))
print("application/json was expected")
print('* * * * * * * * * * * * * * * * * * * * * \n')
if '_type' not in req_body_dict.keys():
print('* * * * * * API VALIDATION WARNING * * * * * * * ')
print("no < _type > field found")
print('* * * * * * * * * * * * * * * * * * * * * \n')
class NullLogHandler(logging.Handler):
......@@ -37,21 +92,28 @@ class NullLogHandler(logging.Handler):
class AmqpSniffer(threading.Thread):
def __init__(self, conn):
COMPONENT_ID = 'amqp_sniffer'
def __init__(self, conn, topics=None):
threading.Thread.__init__(self)
# queues & default exchange declaration
self.message_count = 0
self.connection = conn
self.channel = connection.channel()
self.channel = self.connection.channel()
self.services_queu_name = 'services_queue@%s' % COMPONENT_ID
self.services_queu_name = 'services_queue@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.services_queu_name, auto_delete=True)
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=self.services_queu_name,
routing_key='#')
if topics: # susbscribe only to passed list
for t in topics:
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=self.services_queu_name,
routing_key=t)
else: # subscribe to all events
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=self.services_queu_name,
routing_key='#')
# Hello world message
self.channel.basic_publish(
body=json.dumps({'_type': 'cli.info', 'value': 'CLI is up!'}),
......@@ -72,51 +134,131 @@ class AmqpSniffer(threading.Thread):
def on_request(self, ch, method, props, body):
# obj hook so json.loads respects the order of the fields sent -just for visualization purposeses-
ch.basic_ack(delivery_tag=method.delivery_tag)
print_message(method, props, body)
validate_message_format(method, props, body)
def run(self):
print("Starting thread listening on the event bus")
self.channel.start_consuming()
print('Bye byes!')
class AmqpDataPacketDumper(threading.Thread):
"""
Sniffs data.serial and dumps into pcap file (assumes that frames are DLT_IEEE802_15_4)
Sniffs data.tun and dumps into pcap file (assumes that frames are DLT_IEEE802_15_4)
about pcap header:
ts_sec: the date and time when this packet was captured. This value is in seconds since January 1, 1970 00:00:00 GMT; this is also known as a UN*X time_t. You can use the ANSI C time() function from time.h to get this value, but you might use a more optimized way to get this timestamp value. If this timestamp isn't based on GMT (UTC), use thiszone from the global header for adjustments.
ts_usec: in regular pcap files, the microseconds when this packet was captured, as an offset to ts_sec. In nanosecond-resolution files, this is, instead, the nanoseconds when the packet was captured, as an offset to ts_sec /!\ Beware: this value shouldn't reach 1 second (in regular pcap files 1 000 000; in nanosecond-resolution files, 1 000 000 000); in this case ts_sec must be increased instead!
incl_len: the number of bytes of packet data actually captured and saved in the file. This value should never become larger than orig_len or the snaplen value of the global header.
orig_len: the length of the packet as it appeared on the network when it was captured. If incl_len and orig_len differ, the actually saved packet size was limited by snaplen.
"""
COMPONENT_ID = 'capture_dumper'
def __init__(self, connection, topics):
threading.Thread.__init__(self)
# queues & default exchange declaration
self.messages_dumped = 0
self.connection = connection
self.channel = self.connection.channel()
self.data_queue_name = 'data@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.data_queue_name, auto_delete=True)
for t in topics:
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=self.data_queue_name,
routing_key=t)
# Hello world message
self.channel.basic_publish(
body=json.dumps({'_type': '%s.info' % self.COMPONENT_ID,
'value': '%s is up!' % self.COMPONENT_ID, }
),
routing_key='control.%s.info' % self.COMPONENT_ID,
exchange=AMQP_EXCHANGE,
properties=pika.BasicProperties(
content_type='application/json',
)
)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.data_queue_name)
self.pcap_15_4_dumper = Dumper(
filename='agents_serial_15_4.pcap',
snaplen=200,
network=DLT_IEEE802_15_4
)
self.pcap_raw_ip_dumper = Dumper(
filename='agents_tun_raw_ip.pcap',
snaplen=200,
network=DLT_RAW
)
def stop(self):
self.channel.queue_delete(self.data_queue_name)
self.channel.stop_consuming()
self.connection.close()
def on_request(self, ch, method, props, body):
now = datetime.now()
# obj hook so json.loads respects the order of the fields sent -just for visualization purposeses-
req_body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag)
logging.info("Message sniffed: %s, body: %s" % (json.dumps(req_body_dict), str(body)))
self.message_count += 1
props_dict = {
'content_type': props.content_type,
'content_encoding': props.content_encoding,
'headers': props.headers,
'delivery_mode': props.delivery_mode,
'priority': props.priority,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'expiration': props.expiration,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
'cluster_id': props.cluster_id,
}
# let's get rid of values which are empty
props_dict_only_non_empty_values = {k: v for k, v in props_dict.items() if v is not None}
print('\n* * * * * * MESSAGE SNIFFED (%s) * * * * * * *' % self.message_count)
print("TIME: %s" % datetime.datetime.time(datetime.datetime.now()))
print(" - - - ")
print("ROUTING_KEY: %s" % method.routing_key)
print(" - - - ")
print("PROPS: %s" % json.dumps(props_dict_only_non_empty_values))
print(" - - - ")
print('BODY %s' % json.dumps(req_body_dict))
print(" - - - ")
# print("ERRORS: %s" % )
print('* * * * * * * * * * * * * * * * * * * * * \n')
if props.content_type != "application/json":
print('* * * * * * API VALIDATION WARNING * * * * * * * ')
print("props.content_type : " + str(props.content_type))
print("application/json was expected")
print('* * * * * * * * * * * * * * * * * * * * * \n')
if '_type' not in req_body_dict.keys():
print('* * * * * * API VALIDATION WARNING * * * * * * * ')
print("no < _type > field found")
print('* * * * * * * * * * * * * * * * * * * * * \n')
self.messages_dumped += 1
try:
m = Message.from_json(body)
if isinstance(m, MsgTestingToolTerminate):
ch.stop_consuming()
self.stop()
if isinstance(m, MsgPacketSniffedRaw):
if 'serial' in m.routing_key:
raw_packet = bytes(m.data)
packet_slip = bytes(m.data_slip)
# lets build pcap header for packet
pcap_packet_header = Pkthdr(
ts_sec=now.second,
ts_usec=now.microsecond,
incl_len=len(raw_packet),
orig_len=len(raw_packet),
)
self.pcap_15_4_dumper.dump(pcap_packet_header, raw_packet)
elif 'tun' in m.routing_key:
raw_packet = bytes(m.data)
# lets build pcap header for packet
pcap_packet_header = Pkthdr(
ts_sec=now.second,
ts_usec=now.microsecond,
incl_len=len(raw_packet),
orig_len=len(raw_packet),
)
self.pcap_raw_ip_dumper.dump(pcap_packet_header, raw_packet)
else:
logging.info('raw packet not dumped to pcap: ' + repr(m))
else:
logging.info('drop amqp message: ' + repr(m))
except NonCompliantMessageFormatError as e:
print('* * * * * * API VALIDATION ERROR * * * * * * * ')
print("AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
print('* * * * * * * * * * * * * * * * * * * * * * * * * \n')
raise NonCompliantMessageFormatError("AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
def run(self):
print("Starting thread listening on the event bus")
......@@ -128,7 +270,7 @@ class Cli(threading.Thread):
"""
\brief Thread which handles CLI commands entered by the user.
"""
COMPONENT_ID = 'finterop_CLI'
CMD_LEVEL_USER = "user"
CMD_LEVEL_SYSTEM = "system"
CMD_LEVEL_ALL = [CMD_LEVEL_USER,
......@@ -148,12 +290,12 @@ class Cli(threading.Thread):
self.goOn = True
# logging
self.log = logging.getLogger(COMPONENT_ID)
self.log = logging.getLogger(self.COMPONENT_ID)
self.log.setLevel(logging.DEBUG)
self.log.addHandler(NullLogHandler())
# give this thread a name
self.name = COMPONENT_ID
self.name = self.COMPONENT_ID
# register system commands (user commands registered by child object)
self._registerCommand_internal(
......@@ -393,7 +535,7 @@ if __name__ == '__main__':
except KeyError as e:
AMQP_EXCHANGE = "amq.topic"
print('Cannot retrieve environment variables for AMQP EXCHANGE. Loading default: %s' %AMQP_EXCHANGE)
print('Cannot retrieve environment variables for AMQP EXCHANGE. Loading default: %s' % AMQP_EXCHANGE)
try:
AMQP_URL = str(os.environ['AMQP_URL'])
......@@ -417,11 +559,11 @@ if __name__ == '__main__':
channel = connection.channel()
logging.info("AMQP connection established")
# # in case its not declared
# connection.channel().exchange_declare(exchange=AMQP_EXCHANGE,
# type='topic',
# durable=True,
# )
# in case its not declared
connection.channel().exchange_declare(exchange=AMQP_EXCHANGE,
type='topic',
durable=True,
)
def quitCallback():
......@@ -625,8 +767,29 @@ if __name__ == '__main__':
cli.start()
# # start pcap dumper
# pcap_amqp_topic_subscriptions = ['data.serial.fromAgent.coap_client_agent',
# 'data.serial.fromAgent.coap_server_agent',
# 'data.tun.fromAgent.coap_server_agent',
# 'data.tun.fromAgent.coap_client_agent',
# ]
# pcap_dumper = AmqpDataPacketDumper(
# connection=connection,
# topics=pcap_amqp_topic_subscriptions
# )
# pcap_dumper.start()
# start amqp listener thread
amqp_listener = AmqpSniffer(connection)
# sniffer_subscriptions = [
# 'control.session',
# 'control.testcoordination',
# 'log.warning.#',
# 'log.error.#',
# 'log.critical.#',
# ]
#sniffer_subscriptions = list(set(sniffer_subscriptions + pcap_amqp_topic_subscriptions))
amqp_listener = AmqpSniffer(connection, None) # if None subscribe to all messages
amqp_listener.start()
# interrumpted
......
......@@ -1342,6 +1342,7 @@ class MsgInteropTestCaseAnalyze(Message):
_msg_data_template = {
"_type": "analysis.interop.testcase.analyze",
"protocol": "coap",
"testcase_id": "TD_COAP_CORE_01",
"testcase_ref": "http://doc.f-interop.eu/tests/TD_COAP_CORE_01_v01",
"file_enc": "pcap_base64",
......
import six
import pika
import logging
from multiprocessing import Process
from datetime import datetime
import os
from messages import *
from pure_pcapy import Dumper, Pkthdr, DLT_IEEE802_15_4, DLT_RAW
class AmqpDataPacketDumper:
"""
Sniffs data.serial and dumps into pcap file (assumes that frames are DLT_IEEE802_15_4)
Sniffs data.tun and dumps into pcap file (assumes that frames are DLT_IEEE802_15_4)
about pcap header:
ts_sec: the date and time when this packet was captured. This value is in seconds since January 1, 1970 00:00:00 GMT; this is also known as a UN*X time_t. You can use the ANSI C time() function from time.h to get this value, but you might use a more optimized way to get this timestamp value. If this timestamp isn't based on GMT (UTC), use thiszone from the global header for adjustments.
ts_usec: in regular pcap files, the microseconds when this packet was captured, as an offset to ts_sec. In nanosecond-resolution files, this is, instead, the nanoseconds when the packet was captured, as an offset to ts_sec /!\ Beware: this value shouldn't reach 1 second (in regular pcap files 1 000 000; in nanosecond-resolution files, 1 000 000 000); in this case ts_sec must be increased instead!
incl_len: the number of bytes of packet data actually captured and saved in the file. This value should never become larger than orig_len or the snaplen value of the global header.
orig_len: the length of the packet as it appeared on the network when it was captured. If incl_len and orig_len differ, the actually saved packet size was limited by snaplen.
"""
COMPONENT_ID = 'capture_dumper_%s' % uuid.uuid1() # uuid in case several dumpers listening to bus
DEFAULT_DUMP_DIR = './dumps'
def __init__(self, amqp_url, amqp_exchange, topics, dump_dir=None):
self.url = amqp_url
self.exchange = amqp_exchange
# setup connection
self.connection = pika.BlockingConnection(pika.URLParameters(self.url))
# queues & default exchange declaration
self.messages_dumped = 0
self.channel = self.connection.channel()
self.data_queue_name = 'data@%s' % self.COMPONENT_ID
self.channel.queue_declare(
queue=self.data_queue_name,
auto_delete=True,
arguments={'x-max-length': 100}
)
for t in topics:
self.channel.queue_bind(exchange=self.exchange,
queue=self.data_queue_name,
routing_key=t)
if dump_dir and os.path.isdir(dump_dir):
self.dump_dir = dump_dir
else:
self.dump_dir = self.DEFAULT_DUMP_DIR
# Hello world message
self.channel.basic_publish(
body=json.dumps({'_type': '%s.info' % self.COMPONENT_ID,
'value': '%s is up!' % self.COMPONENT_ID, }
),
routing_key='control.%s.info' % self.COMPONENT_ID,
exchange=self.exchange,
properties=pika.BasicProperties(
content_type='application/json',
)
)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.data_queue_name)
datetime_string = time.strftime("%Y%m%d_%H%M")
network_type = "DLT_IEEE802_15_4"
self.pcap_15_4_dumper = Dumper(
filename=os.path.join(self.dump_dir, "{0}_{1}.pcap".format(datetime_string, network_type)),
snaplen=200,
network=DLT_IEEE802_15_4
)
network_type = "DLT_RAW"
self.pcap_raw_ip_dumper = Dumper(
filename=os.path.join(self.dump_dir, "{0}_{1}.pcap".format(datetime_string, network_type)),
snaplen=200,
network=DLT_RAW
)
def stop(self):
self.channel.queue_delete(self.data_queue_name)
self.channel.stop_consuming()
self.connection.close()
def on_request(self, ch, method, props, body):
now = datetime.now()
# obj hook so json.loads respects the order of the fields sent -just for visualization purposeses-
req_body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag)
logging.info("Message sniffed: %s, body: %s" % (json.dumps(req_body_dict), str(body)))
self.messages_dumped += 1
try:
props_dict = {
'content_type': props.content_type,
'delivery_mode': props.delivery_mode,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
}
m = Message.from_json(body)
m.update_properties(**props_dict)
if isinstance(m, MsgTestingToolTerminate):
ch.stop_consuming()
self.stop()
if isinstance(m, MsgPacketSniffedRaw):
try:
if 'serial' in m.interface_name:
raw_packet = bytes(m.data)
packet_slip = bytes(m.data_slip)
# lets build pcap header for packet
pcap_packet_header = Pkthdr(
ts_sec=now.second,
ts_usec=now.microsecond,
incl_len=len(raw_packet),
orig_len=len(raw_packet),
)
self.pcap_15_4_dumper.dump(pcap_packet_header, raw_packet)
elif 'tun' in m.interface_name:
raw_packet = bytes(m.data)
# lets build pcap header for packet
pcap_packet_header = Pkthdr(
ts_sec=now.second,