Commit 8b5716f5 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

Merge branch 'serial_connector' into 'master'

Serial connector

See merge request !23
parents f589a89c 16f6ba45
# Created by https://www.gitignore.io/api/python
.idea
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
......
......@@ -37,6 +37,8 @@ from connectors.ping import PingConnector
from connectors.zeromq import ZMQConnector
from connectors.serialconn import SerialConnector
from utils import arrow_down, arrow_up, finterop_banner
try:
from urllib.parse import urlparse
except ImportError:
......@@ -72,6 +74,8 @@ For more information, visit: http://doc.f-interop.eu
def __init__(self):
print(finterop_banner)
self.cli = click.Group(
add_help_option=Agent.header,
short_help=Agent.header
......
"""
# -*- coding: utf-8 -*-
"""
import os
import json
import sys
import logging
import serial
import os
import collections
from subprocess import Popen, PIPE
from .base import BaseController, BaseConsumer
from threading import Thread
import time
import logging
import threading
from connectors.base import BaseController, BaseConsumer
from utils.serial_listener import SerialListener
from utils import arrow_down, arrow_up, finterop_banner
__version__ = (0, 0, 1)
......@@ -27,32 +25,39 @@ class SerialConsumer(BaseConsumer):
self.dispatcher = {
"data.serial.to_forward": self.handle_data,
}
# thread = Thread(target=self.bootstrap(), args=())
# thread.daemon = True
# thread.start()
self.bootstrap()
self.message_count = 0
# print ("it ok")
self.output = ''
self.serial_listener = None
def bootstrap(self):
self.serial_port = None
try:
self.serial_port = str(os.environ['FINTEROP_CONNECTOR_SERIAL_PORT'])
self.baudrate = str(os.environ['FINTEROP_CONNECTOR_BAUDRATE'])
log.info('FINTEROP_CONNECTOR_SERIAL_PORT env var imported: %s' % self.serial_port)
log.info('FINTEROP_CONNECTOR_BAUDRATE env var imported: %s' % self.baudrate)
# open a subprocess to listen the serialport
path = os.path.dirname(os.path.abspath(__file__))
path += "/ReadCOM.py"
print(path)
p = Popen(['python', path, str(self.serial_port), "115200", str(self.name), str(self.server),
str(self.session), str(self.user), str(self.password)], stdin=PIPE, stdout=PIPE, stderr=PIPE)
params = {
'agent_name': self.name,
'rmq_connection': self.connection,
'rmq_exchange': "amq.topic",
'serial_port': self.serial_port,
'serial_boudrate': self.baudrate,
}
serial_listener = SerialListener(**params)
serial_listener_th = threading.Thread(target=serial_listener.run, args=())
serial_listener_th.daemon = True
serial_listener_th.start()
except KeyError as e:
logging.warning(
'Cannot retrieve environment variables for serial connection: '
'FINTEROP_CONNECTOR_SERIAL_PORT'
'if no sniffer/injector needed for test ignore this warning')
'Cannot retrieve environment variables for serial connection: '
'FINTEROP_CONNECTOR_SERIAL_PORT/FINTEROP_CONNECTOR_BAUDRATE '
'If no sniffer/injector needed for test ignore this warning ')
def handle_data(self, body, message):
"""
......@@ -68,40 +73,45 @@ class SerialConsumer(BaseConsumer):
"""
if self.serial_port is None:
print('error: no serialport initiated')
log.error('No serial port initiated')
return
# WRITE RECIEVED DATA INTO serial connector -> to SNIFFER-FWD motes -> Wireless link
path = os.path.dirname(os.path.abspath(__file__))
# path += "/SendCOM.py"
decoder = json.JSONDecoder(object_pairs_hook=collections.OrderedDict)
bodydict = decoder.decode(body)
# p=Popen(['python', path, str(self.serial_port), "115200", str(bodydict['data'])], stdin=PIPE, stdout=PIPE, stderr=PIPE)
# print (path)
usleep = lambda x: time.sleep(x / 1000000.0)
ser = serial.Serial(
port=self.serial_port,
baudrate=115200,
timeout=0.0)
# inputstr=sys.argv[3]
# ser.write(inputstr.decode('hex'))
port=self.serial_port,
baudrate=self.baudrate,
timeout=0.0)
try:
ser.write(bodydict['data'].decode('hex'))
self.output = 'c0'
for c in body['data']:
if format(c, '02x') == 'c0':
# endslip
self.output += 'db'
self.output += 'dc'
elif format(c, '02x') == 'db':
# esc
self.output += 'db'
self.output += 'dd'
else:
self.output += format(c, '02x')
self.output += 'c0'
ser.write(self.output.decode('hex'))
ser.flushOutput()
except:
print('ERROR TRYING TO WRITE IN SERIAL INTERFACE')
usleep(300000)
# p.wait()
print("***************** MESSAGE INJECTED : BACKEND -> WIRELESS LINK *******************")
log.error('Error while tring to write serial interface')
print(arrow_down)
log.info('\n # # # # # # # # # # # # SERIAL INTERFACE # # # # # # # # # # # # ' +
'\n data packet EventBus -> Serial' +
'\n' + json.dumps(body) +
'\n # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # '
)
message.ack()
def handle_control(self, body, message):
msg = None
try:
msg = json.loads(body)
log.debug(message)
......@@ -110,17 +120,17 @@ def handle_control(self, body, message):
log.error(e)
log.error("Incorrect message: {0}".format(body))
return
if msg["_type"] in self.dispatcher.keys():
self.dispatcher[msg["_type"]](msg)
else:
log.debug("Not supported action")
log.warning("Not supported action")
class SerialConnector(BaseController):
"""
"""
NAME = "serial"
def __init__(self, **kwargs):
......@@ -135,9 +145,6 @@ class SerialConnector(BaseController):
self.consumer = SerialConsumer(**kwargs)
self.consumer.log = logging.getLogger(__name__)
self.consumer.log.setLevel(logging.DEBUG)
# p=Popen(['python', './ReadCOM.py', self.serial_port, "115200", str(self.name), str(self.server), str(self.session), str(self.user),str(self.password)], stdin=PIPE, stdout=PIPE, stderr=PIPE)
def run(self):
self.consumer.run()
# p=Popen(['python', './ReadCOM.py', self.serial_port, "115200", str(self.name), str(self.server), str(self.session), str(self.user),str(self.password)], stdin=PIPE, stdout=PIPE, stderr=PIPE)
......@@ -7,11 +7,9 @@ import json
import logging
import sys
import datetime
from kombu import Connection, Producer
from kombu import Queue
from kombu import Producer
from connectors.base import BaseController, BaseConsumer
from utils import arrow_down, arrow_up, finterop_banner
from utils.opentun import OpenTunLinux, OpenTunMACOS
__version__ = (0, 0, 1)
......@@ -111,11 +109,10 @@ class TunConsumer(BaseConsumer):
self.log.info("Tun started. Publishing msg: %s" % json.dumps(msg))
producer = Producer(self.connection, serializer='json')
producer.publish(
msg,
exchange=self.exchange,
routing_key='control.tun.fromAgent.%s' % self.name
)
producer.publish(msg,
exchange=self.exchange,
routing_key='control.tun.fromAgent.%s' % self.name
)
def handle_data(self, body, message):
"""
......@@ -132,7 +129,7 @@ class TunConsumer(BaseConsumer):
return
self.packet_count += 1
print(arrow_down)
self.log.debug('\n* * * * * * HANDLE DATA (%s) * * * * * * *' % self.packet_count)
self.log.debug("TIME: %s" % datetime.datetime.time(datetime.datetime.now()))
self.log.debug(" - - - ")
......
......@@ -10,3 +10,58 @@ def get_from_environment(variable, default):
logging.warning("Using default variable %s=%s" % (variable, default))
return v
arrow_up = """
_
/ \\
/ \\
/ \\
/ \\
/__ __\\
| | _ _ _
| | | (_) | |
| | _ _ _ __ | |_ _ __ | | __
| | | | | | '_ \| | | '_ \\| |/ /
| | | |_| | |_) | | | | | | <
| | \__,_| .__/|_|_|_| |_|_|\_\\
| | | |
| | |_|
!___!
\\ O /
\\/|\/
|
/ \\
_/ \\ _
"""
arrow_down = """
___
| |
| | _ _ _ _
| | | | | (_) | |
| | __| | _____ ___ __ | |_ _ __ | | __
| | / _` |/ _ \\ \\ /\\ / / '_ \\| | | '_ '\\| |/ /
| | | (_| | (_) \\ V V /| | | | | | | | | <
| | \\__,_|\\___/ \\_/\\_/ |_| |_|_|_|_| |_|_|\_\\
| |
__! !__,
\\ / \O
\\ / \/|
\\ / |
\\ / / \\
Y _/ _\\
"""
finterop_banner = \
"""
______ _____ _
| ____| |_ _| | |
| |__ ______| | _ __ | |_ ___ _ __ ___ _ __
| __|______| | | '_ \\| __/ _ \\ '__/ _ \\| '_ \\
| | _| |_| | | | || __/ | | (_) | |_) |
|_| |_____|_| |_|\\__\\___|_| \\___/| .__/
| |
|_|
"""
\ No newline at end of file
# -*- coding: utf-8 -*-
# !/usr/bin/env python3
import pika
import threading
import json
from collections import OrderedDict
import datetime
import signal
import sys
import os
import logging
AMQP_URL = str(os.environ['AMQP_URL'])
AMQP_EXCHANGE = str(os.environ['AMQP_EXCHANGE'])
COMPONENT_ID = 'packet_router_snippet'
AGENT_1_ID = 'coap_client_agent'
AGENT_2_ID = 'coap_server_agent'
AGENT_TT_ID = 'agent_TT'
# init logging to stnd output and log files
logger = logging.getLogger(__name__)
# default handler
sh = logging.StreamHandler()
logger.addHandler(sh)
logger.setLevel(logging.DEBUG)
def publish_message(channel, message):
""" Published which uses message object metadata
:param channel:
:param message:
:return:
"""
properties = pika.BasicProperties(**message.get_properties())
channel.basic_publish(
exchange=AMQP_EXCHANGE,
routing_key=message.routing_key,
properties=properties,
body=message.to_json(),
)
class PacketRouter(threading.Thread):
AGENT_1_ID = 'coap_client_agent'
AGENT_2_ID = 'coap_server_agent'
AGENT_TT_ID = 'agent_TT'
DEFAULT_ROUTING = {'data.tun.fromAgent.%s' % AGENT_1_ID: ['data.tun.toAgent.%s' % AGENT_2_ID,
'data.tun.toAgent.%s' % AGENT_TT_ID
],
'data.tun.fromAgent.%s' % AGENT_2_ID: ['data.tun.toAgent.%s' % AGENT_1_ID,
'data.tun.toAgent.%s' % AGENT_TT_ID
],
}
def __init__(self, conn, routing_table=None):
threading.Thread.__init__(self)
if routing_table:
self.routing_table = routing_table
else:
self.routing_table = PacketRouter.DEFAULT_ROUTING
logger.info('routing table (rkey_src:[rkey_dst]) : {table}'.format(table=json.dumps(self.routing_table)))
# queues & default exchange declaration
self.message_count = 0
self.connection = conn
self.channel = self.connection.channel()
self.queues_init()
logger.info('packet router waiting for new messages in the data plane..')
def queues_init(self):
for src_rkey, dst_rkey_list in self.routing_table.items():
assert type(src_rkey) is str
assert type(dst_rkey_list) is list
queue_packets_from_agents = '%s@%s' % (src_rkey, COMPONENT_ID)
self.channel.queue_declare(queue=queue_packets_from_agents, auto_delete=False)
self.channel.queue_bind(exchange=AMQP_EXCHANGE,
queue=queue_packets_from_agents,
routing_key=src_rkey)
# bind all src queues to on_request callback
self.channel.basic_consume(self.on_request, queue=queue_packets_from_agents)
# for dst_rkey in dst_rkey_list:
# # start with clean queues
# dst_queue = '%s@%s_raw_packet_logs' % (dst_rkey, COMPONENT_ID)
# self.channel.queue_delete(dst_queue)
# self.channel.queue_declare(queue=dst_queue, auto_delete=False, arguments={'x-max-length': 10})
# self.channel.queue_bind(exchange=AMQP_EXCHANGE,
# queue=queue_packets_from_agents,
# routing_key=dst_rkey)
def stop(self):
self.channel.stop_consuming()
def on_request(self, ch, method, props, body):
# TODO implement forced message drop mechanism
# obj hook so json.loads respects the order of the fields sent -just for visualization purposeses-
body_dict = json.loads(body.decode('utf-8'), object_pairs_hook=OrderedDict)
ch.basic_ack(delivery_tag=method.delivery_tag)
try:
logger.debug("Message sniffed: %s" % (body_dict['_type']))
except KeyError:
logger.warning("Incorrect formatted message received in event bus")
self.message_count += 1
print('\n* * * * * * MESSAGE SNIFFED (%s) * * * * * * *' % self.message_count)
print("TIME: %s" % datetime.datetime.time(datetime.datetime.now()))
print(" - - - ")
print("ROUTING_KEY: %s" % method.routing_key)
print(" - - - ")
print("HEADERS: %s" % props.headers)
print(" - - - ")
print("PROPS: %s" % json.dumps(
{
'content_type': props.content_type,
'content_encoding': props.content_encoding,
'headers': props.headers,
'delivery_mode': props.delivery_mode,
'priority': props.priority,
'correlation_id': props.correlation_id,
'reply_to': props.reply_to,
'expiration': props.expiration,
'message_id': props.message_id,
'timestamp': props.timestamp,
'user_id': props.user_id,
'app_id': props.app_id,
'cluster_id': props.cluster_id,
}
)
)
print(" - - - ")
print('BODY %s' % json.dumps(body_dict))
print(" - - - ")
# print("ERRORS: %s" % )
print('* * * * * * * * * * * * * * * * * * * * * \n')
# let's route the message to the right agent
try:
data = body_dict['data']
data_slip = body_dict['data_slip']
except:
logger.error('wrong message format, no data field found in : {msg}'.format(msg=json.dumps(body_dict)))
return
src_rkey = method.routing_key
if src_rkey in self.routing_table.keys():
list_dst_rkey = self.routing_table[src_rkey]
for dst_rkey in list_dst_rkey:
# resend to dst_rkey
self.channel.basic_publish(
body=json.dumps({'_type': 'packet.sniffed.raw', 'data': data, 'data_slip': data_slip}),
routing_key=dst_rkey,
exchange=AMQP_EXCHANGE,
properties=pika.BasicProperties(
content_type='application/json',
)
)
logger.info(
"Routing packet (%d) from topic: %s to topic: %s" % (self.message_count, src_rkey, dst_rkey))
else:
logger.warning('No known route for r_key source: {r_key}'.format(r_key=src_rkey))
return
def run(self):
self.channel.start_consuming()
logger.info('Bye byes!')
###############################################################################
if __name__ == '__main__':
connection = pika.BlockingConnection(pika.URLParameters(AMQP_URL))
channel = connection.channel()
def signal_int_handler(channel):
# FINISHING... let's send a goodby message
msg = {
'message': '{component} is out! Bye bye..'.format(component=COMPONENT_ID),
"_type": '{component}.shutdown'.format(component=COMPONENT_ID)
}
channel.basic_publish(
body=json.dumps(msg),
routing_key='control.session.info',
exchange=AMQP_EXCHANGE,
properties=pika.BasicProperties(
content_type='application/json',
)
)
logger.info('got SIGINT. Bye bye!')
sys.exit(1)
signal.signal(signal.SIGINT, signal_int_handler)
# routing tables for between agents' TUNs interfaces and also between agents' serial interfaces
iut_routing_table_serial = {
'data.serial.fromAgent.%s' % AGENT_1_ID: ['data.serial.toAgent.%s' % AGENT_2_ID,
],
'data.serial.fromAgent.%s' % AGENT_2_ID: ['data.serial.toAgent.%s' % AGENT_1_ID,
],
}
iut_routing_table_tun = {
'data.tun.fromAgent.%s' % AGENT_1_ID: ['data.tun.toAgent.%s' % AGENT_2_ID,
'data.tun.toAgent.%s' % AGENT_TT_ID
],
'data.tun.fromAgent.%s' % AGENT_2_ID: ['data.tun.toAgent.%s' % AGENT_1_ID,
'data.tun.toAgent.%s' % AGENT_TT_ID
],
}
routing_table = dict()
routing_table.update(iut_routing_table_serial)
routing_table.update(iut_routing_table_tun)
# in case its not declared
connection.channel().exchange_declare(exchange=AMQP_EXCHANGE,
type='topic',
durable=True,
)
# start amqp router thread
r = PacketRouter(connection, routing_table)
r.start()
......@@ -7,12 +7,13 @@ import struct
import threading
import time
import traceback
import uuid
from fcntl import ioctl
import sys
from kombu import Exchange
from utils import arrow_down, arrow_up, finterop_banner
DEFAULT_IPV6_PREFIX = 'bbbb'
logging.basicConfig(level=logging.DEBUG)
......@@ -325,7 +326,7 @@ class OpenTunLinux(object):
Class which interfaces between a TUN virtual interface and an EventBus.
"""
def __init__(self, name, rmq_connection, rmq_exchange,
def __init__(self, name, rmq_connection, exchange="amq.topic",
ipv6_prefix=None, ipv6_host=None, ipv6_no_forwarding=None,
ipv4_host=None, ipv4_network=None, ipv4_netmask=None):
......@@ -506,7 +507,7 @@ class OpenTunLinux(object):
"timestamp": str(time.time()),
"data": data
}
print(arrow_up)
log.info('\n # # # # # # # # # # # # OPEN TUN # # # # # # # # # # # # ' +
'\n data packet TUN -> EventBus' +
'\n' + json.dumps(msg) +
......@@ -549,7 +550,7 @@ class OpenTunMACOS(object):
Class which interfaces between a TUN virtual interface and an EventBus.
'''
def __init__(self, name, rmq_connection, rmq_exchange,
def __init__(self, name, rmq_connection, exchange="amq.topic",
ipv6_prefix=None, ipv6_host=None, ipv6_no_forwarding=None,
ipv4_host=None, ipv4_network=None, ipv4_netmask=None):
......@@ -717,7 +718,7 @@ class OpenTunMACOS(object):
"timestamp": str(time.time()),
"data": data
}
print(arrow_up)
log.info('\n # # # # # # # # # # # # OPEN TUN # # # # # # # # # # # # ' +
'\n data packet TUN -> EventBus' +
'\n' + json.dumps(msg) +
......
# -*- coding: utf-8 -*-
import json
import serial
import logging
import threading
import time
import signal
import sys
from kombu import Exchange
from collections import OrderedDict
from utils import arrow_down, arrow_up, finterop_banner
STATE_OK = 0
STATE_ESC = 1
STATE_RUBBISH = 2
SLIP_END = 'c0'
SLIP_ESC = 'db'
SLIP_ESC_END = 'dc'