Commit f2d6b5a6 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

Merge branch 'pcap_dump_option' into 'master'

Pcap dump option

See merge request !24
parents 33f9a46f bdb44526
......@@ -29,6 +29,7 @@ by default in the final version.
import logging
import click
import uuid
import multiprocessing
from connectors.tun import TunConnector
from connectors.core import CoreConnector
......@@ -38,6 +39,7 @@ from connectors.zeromq import ZMQConnector
from connectors.serialconn import SerialConnector
from utils import arrow_down, arrow_up, finterop_banner
from utils.packet_dumper import launch_amqp_data_to_pcap_dumper
try:
from urllib.parse import urlparse
......@@ -49,7 +51,7 @@ __version__ = (0, 0, 1)
DEFAULT_PLATFORM = 'f-interop.paris.inria.fr'
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
logging.getLogger('amqp').setLevel(logging.INFO)
......@@ -99,6 +101,20 @@ For more information, visit: http://doc.f-interop.eu
required=False,
help="Agent identity (default: random generated)")
self.dump_option = click.Option(
param_decls=["--dump"],
default=False,
required=False,
help="[NOT YET SUPPORTED] Dump automatically data packets from event bus into pcap files.",
is_flag=True)
self.serial_option = click.Option(
param_decls=["--serial"],
default=False,
required=False,
help="Run agent bound to serial injector/forwarder of 802.15.4 frames .",
is_flag=True)
# Commands
self.connect_command = click.Command(
......@@ -108,15 +124,17 @@ For more information, visit: http://doc.f-interop.eu
self.session_url,
self.session_amqp_exchange,
self.name_option,
self.dump_option,
self.serial_option
],
short_help="Authenticate user"
short_help="Connect with authentication AMQP_URL, and some other basic agent configurations."
)
self.cli.add_command(self.connect_command)
self.plugins = {}
def handle_connect(self, url, exchange, name):
def handle_connect(self, url, exchange, name, dump, serial):
"""
Authenticate USER and create agent connection to f-interop.
......@@ -132,23 +150,33 @@ For more information, visit: http://doc.f-interop.eu
}
if exchange:
data.update({'exchange':exchange})
data.update({'exchange': exchange})
if p.port:
data.update({"server": "{}:{}".format(p.hostname, p.port)})
log.info("Try to connect with %s" % data)
self.plugins["core"] = CoreConnector(**data)
self.plugins["tun"] = TunConnector(**data)
self.plugins["zmq"] = ZMQConnector(**data)
self.plugins["ping"] = PingConnector(**data)
self.plugins["http"] = HTTPConnector(**data)
self.plugins["serial"] = SerialConnector(**data)
if serial:
self.plugins["serial"] = SerialConnector(**data)
else:
self.plugins["core"] = CoreConnector(**data)
self.plugins["tun"] = TunConnector(**data)
self.plugins["zmq"] = ZMQConnector(**data)
self.plugins["ping"] = PingConnector(**data)
self.plugins["http"] = HTTPConnector(**data)
for p in self.plugins.values():
p.start()
# TODO re-implement with kombu and BaseController/CoreConsumer
# TODO fix pcap_dumper support for py2, python3 -m utils.packet_dumper works fine tho
# if dump:
# dump_p = multiprocessing.Process(target=launch_amqp_data_to_pcap_dumper, args=())
# dump_p.start()
def run(self):
self.cli()
......
......@@ -22,9 +22,6 @@ class SerialConsumer(BaseConsumer):
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(SerialConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
self.dispatcher = {
"data.serial.to_forward": self.handle_data,
}
self.bootstrap()
self.message_count = 0
self.output = ''
......
This diff is collapsed.
import os
import six
import pika
import shutil
import signal
import logging
from datetime import datetime
from utils.messages import *
from utils.pure_pcapy import Dumper, Pkthdr, DLT_RAW, DLT_IEEE802_15_4_NOFCS
logger = logging.getLogger(__name__)
logging.getLogger('pika').setLevel(logging.INFO)
VERSION = '0.0.1'
def launch_amqp_data_to_pcap_dumper(amqp_url=None, amqp_exchange=None, topics=None, dump_dir=None):
def signal_int_handler(self, frame):
logger.info('got SIGINT, stopping dumper..')
if pcap_dumper:
pcap_dumper.stop()
signal.signal(signal.SIGINT, signal_int_handler)
if amqp_url and amqp_exchange:
amqp_exchange = amqp_exchange
amqp_url = amqp_url
else:
try:
amqp_exchange = str(os.environ['AMQP_EXCHANGE'])
print('Imported AMQP_EXCHANGE env var: %s' % amqp_exchange)
except KeyError as e:
amqp_exchange = "amq.topic"
print('Cannot retrieve environment variables for AMQP EXCHANGE. Loading default: %s' % amqp_exchange)
try:
amqp_url = str(os.environ['AMQP_URL'])
print('Imported AMQP_URL env var: %s' % amqp_url)
p = six.moves.urllib_parse.urlparse(amqp_url)
user = p.username
server = p.hostname
logger.info(
"Env variables imported for AMQP connection, User: {0} @ Server: {1} ".format(user,
server))
except KeyError:
print('Cannot retrieve environment variables for AMQP connection. Loading defaults..')
# load default values
amqp_url = "amqp://{0}:{1}@{2}/{3}".format("guest", "guest", "localhost", "/")
if topics:
pcap_amqp_topic_subscriptions = topics
else:
pcap_amqp_topic_subscriptions = ['data.serial.fromAgent.coap_client_agent',
'data.serial.fromAgent.coap_server_agent',
'data.tun.fromAgent.coap_server_agent',
'data.tun.fromAgent.coap_client_agent',
]
# init pcap_dumper
pcap_dumper = AmqpDataPacketDumper(
amqp_url=amqp_url,
amqp_exchange=amqp_exchange,
topics=pcap_amqp_topic_subscriptions,
dump_dir=dump_dir,
)
# start pcap_dumper
pcap_dumper.run()
class AmqpDataPacketDumper:
"""
Sniffs data.serial and dumps into pcap file (assumes that frames are DLT_IEEE802_15_4)
Sniffs data.tun and dumps into pcap file (assumes that frames are DLT_RAW)
about pcap header:
ts_sec: the date and time when this packet was captured. This value is in seconds since January 1,
1970 00:00:00 GMT; this is also known as a UN*X time_t. You can use the ANSI C time() function
from time.h to get this value, but you might use a more optimized way to get this timestamp value.
If this timestamp isn't based on GMT (UTC), use thiszone from the global header for adjustments.
ts_usec: in regular pcap files, the microseconds when this packet was captured, as an offset to ts_sec.
In nanosecond-resolution files, this is, instead, the nanoseconds when the packet was captured, as
an offset to ts_sec
/!\ Beware: this value shouldn't reach 1 second (in regular pcap files 1 000 000;
in nanosecond-resolution files, 1 000 000 000); in this case ts_sec must be increased instead!
incl_len: the number of bytes of packet data actually captured and saved in the file. This value should
never become larger than orig_len or the snaplen value of the global header.
orig_len: the length of the packet as it appeared on the network when it was captured. If incl_len and
orig_len differ, the actually saved packet size was limited by snaplen.
"""
COMPONENT_ID = 'capture_dumper_%s' % uuid.uuid1() # uuid in case several dumpers listening to bus
DEFAULT_DUMP_DIR = 'tmp'
DEFAULT_RAWIP_DUMP_FILENAME = "DLT_RAW.pcap"
DEFAULT_802154_DUMP_FILENAME = "DLT_IEEE802_15_4_NO_FCS.pcap"
NETWORK_DUMPS = [DEFAULT_802154_DUMP_FILENAME, DEFAULT_RAWIP_DUMP_FILENAME]
DEFAULT_RAWIP_DUMP_FILENAME_WR = "DLT_RAW.pcap~"
DEFAULT_802154_DUMP_FILENAME_WR = "DLT_IEEE802_15_4_NO_FCS.pcap~"
NETWORK_DUMPS_TEMP = [DEFAULT_RAWIP_DUMP_FILENAME_WR, DEFAULT_802154_DUMP_FILENAME_WR]
QUANTITY_MESSAGES_PER_PCAP = 100
def __init__(self, amqp_url, amqp_exchange, topics, dump_dir=None):
self.messages_dumped = 0
self.url = amqp_url
self.exchange = amqp_exchange
if dump_dir:
self.dump_dir = dump_dir
else:
self.dump_dir = self.DEFAULT_DUMP_DIR
if not os.path.exists(self.dump_dir):
os.makedirs(self.dump_dir)
# pcap dumpers
self.pcap_15_4_dumper = None
self.pcap_raw_ip_dumper = None
self.dumpers_init()
# AMQP stuff
self.connection = pika.BlockingConnection(pika.URLParameters(self.url)) # queues & default exchange declaration
self.channel = self.connection.channel()
self.data_queue_name = 'data@%s' % self.COMPONENT_ID
self.channel.queue_declare(queue=self.data_queue_name,
auto_delete=True,
arguments={'x-max-length': 1000}
)
# subscribe to data plane channels
for t in topics:
self.channel.queue_bind(exchange=self.exchange,
queue=self.data_queue_name,
routing_key=t)
# subscribe to channel where the terminate session message is published
self.channel.queue_bind(exchange=self.exchange,
queue=self.data_queue_name,
routing_key='control.session')
# publish Hello message in bus
self.channel.basic_publish(
body=json.dumps({'_type': '%s.info' % self.COMPONENT_ID,
'value': '%s is up!' % self.COMPONENT_ID, }
),
routing_key='control.%s.info' % self.COMPONENT_ID,
exchange=self.exchange,
properties=pika.BasicProperties(
content_type='application/json',
)
)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.on_request, queue=self.data_queue_name)
def dumpers_init(self):
for net_dump_filename in self.NETWORK_DUMPS_TEMP:
full_path = os.path.join(self.dump_dir, net_dump_filename)
if os.path.isfile(full_path):
os.remove(full_path)
self.pcap_15_4_dumper = Dumper(
filename=os.path.join(self.dump_dir, self.DEFAULT_802154_DUMP_FILENAME_WR),
snaplen=200,
network=DLT_IEEE802_15_4_NOFCS
)
self.pcap_raw_ip_dumper = Dumper(
filename=os.path.join(self.dump_dir, self.DEFAULT_RAWIP_DUMP_FILENAME_WR),
snaplen=200,
network=DLT_RAW
)
def dump_packet(self, message):
try:
t = time.time()
t_s = int(t)
t_u_delta = int((t - t_s) * 1000000)
if 'serial' in message.interface_name:
raw_packet = bytes(message.data)
packet_slip = bytes(message.data_slip)
# lets build pcap header for packet
pcap_packet_header = Pkthdr(
ts_sec=t_s,
ts_usec=t_u_delta,
incl_len=len(raw_packet),
orig_len=len(raw_packet),
)
self.pcap_15_4_dumper.dump(pcap_packet_header, raw_packet)
self.messages_dumped += 1
shutil.copyfile(
os.path.join(self.dump_dir, self.DEFAULT_802154_DUMP_FILENAME_WR),
os.path.join(self.dump_dir, self.DEFAULT_802154_DUMP_FILENAME)
)
elif 'tun' in message.interface_name:
raw_packet = bytes(message.data)
# lets build pcap header for packet
pcap_packet_header = Pkthdr(
ts_sec=t_s,
ts_usec=t_u_delta,
incl_len=len(raw_packet),
orig_len=len(raw_packet),
)
self.pcap_raw_ip_dumper.dump(pcap_packet_header, raw_packet)
self.messages_dumped += 1
shutil.copyfile(
os.path.join(self.dump_dir, self.DEFAULT_RAWIP_DUMP_FILENAME_WR),
os.path.join(self.dump_dir, self.DEFAULT_RAWIP_DUMP_FILENAME)
)
else:
logger.info('Raw packet not dumped to pcap: ' + repr(message))
return
except Exception as e:
logger.error(e)
print('Messages dumped : ' + str(self.messages_dumped))
def dumps_rotate(self):
for net_dump_filename in self.NETWORK_DUMPS:
full_path = os.path.join(self.dump_dir, net_dump_filename)
if os.path.isfile(full_path):
logger.info('rotating file dump: %s' % full_path)
shutil.copyfile(
full_path,
os.path.join(self.dump_dir, datetime.now().strftime('%Y%m%d_%H%M%S_') + net_dump_filename),
)
def stop(self):
logger.info("Stopping packet dumper..")
self.channel.queue_delete(self.data_queue_name)
self.channel.stop_consuming()
self.connection.close()
def on_request(self, ch, method, props, body):
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
props_dict = {
'content_type': props.content_type,
'delivery_mode': props.delivery_mode,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
}
m = Message.from_json(body)
m.update_properties(**props_dict)
logger.info('got event: %s' % type(m))
if isinstance(m, MsgTestingToolTerminate):
ch.stop_consuming()
self.stop()
if isinstance(m, MsgPacketSniffedRaw):
self.dump_packet(m)
try: # rotate files each X messages dumped
if self.messages_dumped != 0 and self.messages_dumped % self.QUANTITY_MESSAGES_PER_PCAP == 0:
self.dumps_rotate()
self.dumpers_init()
except Exception as e:
logger.error(e)
else:
logger.info('drop amqp message: ' + repr(m))
except NonCompliantMessageFormatError as e:
print('* * * * * * API VALIDATION ERROR * * * * * * * ')
print("AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
print('* * * * * * * * * * * * * * * * * * * * * * * * * \n')
# raise NonCompliantMessageFormatError("AMQP MESSAGE LIBRARY COULD PROCESS JSON MESSAGE")
except Exception as e:
logger.error(e)
req_body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
logger.error("Message: %s, body: %s" % (json.dumps(req_body_dict), str(body)))
def run(self):
print("Starting thread listening on the event bus")
self.channel.start_consuming()
print('Bye byes!')
if __name__ == '__main__':
import multiprocessing
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
p = multiprocessing.Process(target=launch_amqp_data_to_pcap_dumper(), args=())
p.start()
for i in range(1, 1000):
time.sleep(1)
print(i)
p.join()
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
pcapy clone in pure python
This module aims to support exactly the same interface as pcapy,
so that it can be used as a drop-in replacement. This means the module
does not and probably will never support live captures. Offline file
support should match the original though.
"""
# Copyright 2010 Stanisław Pitucha. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification, are
# permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of
# conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice, this list
# of conditions and the following disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY Stanisław Pitucha ``AS IS'' AND ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are those of the
# authors and should not be interpreted as representing official policies, either expressed
# or implied, of Stanisław Pitucha.
import struct
import logging
DLT_NULL = 0
DLT_EN10MB = 1
DLT_IEEE802 = 6
DLT_ARCNET = 7
DLT_SLIP = 8
DLT_PPP = 9
DLT_FDDI = 10
DLT_ATM_RFC1483 = 11
DLT_RAW = 12
DLT_PPP_SERIAL = 50
DLT_PPP_ETHER = 51
DLT_RAW = 101
DLT_C_HDLC = 104
DLT_IEEE802_11 = 105
DLT_LOOP = 108
DLT_LINUX_SLL = 113
DLT_LTALK = 114
# For IEEE 802.15.4 DLT codes
DLT_IEEE802_15_4 = 195
DLT_IEEE802_15_4_NONASK_PHY = 215
DLT_IEEE802_15_4_NOFCS = 230
logger = logging.getLogger()
class PcapError(Exception):
""" General Pcap module exception class """
pass
def fixup_identical_short(short):
""" noop for "fixing" big/little endian """
return short
def fixup_identical_long(long_int):
""" noop for "fixing" big/little endian """
return long_int
def fixup_swapped_short(short):
""" swap bytes in a 16b short """
return ((short & 0xff) << 8) | ((short & 0xff00) >> 8)
def fixup_swapped_long(long_int):
""" swap swapped shorts in a 32b int """
bottom = fixup_swapped_short(long_int & 0xffff)
top = fixup_swapped_short((long_int >> 16) & 0xffff)
return ((bottom << 16) & 0xffff0000) | top
fixup_sets = {
b"\xd4\xc3\xb2\xa1": (fixup_identical_short, fixup_identical_long),
b"\xa1\xb2\xc3\xd4": (fixup_swapped_short, fixup_swapped_long),
}
def open_offline(filename):
""" opens the pcap file indicated by `filename` and returns a Reader object """
if filename == "-":
import sys
source = sys.stdin
else:
try:
source = open(filename, "rb")
except IOError as error:
if error.args[0] == 21:
raise PcapError("error reading dump file: %s" % (error.args[1]))
else:
raise PcapError("%s: %s" % (filename, error.args[1]))
return Reader(source)
def open_live(_device, _snaplen, _promisc, _to_ms):
raise NotImplementedError("This function is only available in pcapy")
def lookupdev():
raise NotImplementedError("This function is only available in pcapy")
def findalldevs():
raise NotImplementedError("This function is only available in pcapy")
def compile(_linktype, _snaplen, _filter, _optimize, _netmask):
raise NotImplementedError("not implemented yet")
class Reader(object):
"""
An interface for reading an open pcap file.
This object can either read a single packet via `next()` or a series
via `loop()` or `dispatch()`.
"""
__GLOBAL_HEADER_LEN = 24
__PACKET_HEADER_LEN = 16
def __init__(self, source):
""" creates a Reader instance from an open file object """
self.__source = source
header = self.__source.read(self.__GLOBAL_HEADER_LEN)
if len(header) < self.__GLOBAL_HEADER_LEN:
raise PcapError(
"truncated dump file; tried to read %i file header bytes, only got %i" %
(self.__GLOBAL_HEADER_LEN, len(header)))
hdr_values = struct.unpack("IHHIIII", header)
if header[:4] in fixup_sets:
self.fixup_short, self.fixup_long = fixup_sets[header[:4]]
logger.debug(header[:4])
logger.debug(hdr_values)
else:
raise PcapError("bad dump file format")
self.version_major, self.version_minor = [self.fixup_short(x)
for x in hdr_values[1:3]]
self.thiszone, self.sigfigs, self.snaplen, self.network = [self.fixup_long(x)
for x in hdr_values[3:]]
self.last_good_position = self.__GLOBAL_HEADER_LEN
logger.debug('pcap header: network: %s sigfigs: %s snaplen: %s network %s'%(self.thiszone, self.sigfigs, self.snaplen, self.network))
def __loop_and_count(self, maxcant, callback):
"""
reads up to `maxcant` packets and runs callback for each of them
returns the number of packets processed
"""
i = 0
while True:
if i >= maxcant > -1:
break