Commit a38eec7e authored by Federico Sismondi's avatar Federico Sismondi

moving @Remy s 6tisch version of agent to a public repo

parents
Agent for the f-interop platform
#################################
Design
------
The design of the f-interop agent is modular by design.
An agent is made of different processes that connect and exchange messages to each others
using ZMQ sockets.
Core
----
When started, the agent starts up the core module. This component is in charge of launching
all the other components. If new components needs to be added they just need to be launched
from this component.
Core open the default ZMQ socket that is used by other components to communicate with each others.
Error handling
--------------
When there is a Ctrl-C the agent should kill all other components and disconnect as gracefully as possible.
# coding: utf-8
"""
Agent for f-interop
*******************
Installation
------------
The installation for the user is supposed to be as simple as possible. Ideally, the user
should only have the ./finterop tool installed with the relevant dependencies and should
be ready to go.
Features of the agent
----------------------
* The agent must be able to inject packets in the local loop or re-emit packets it receives
from the f-interop backend.
* The agent MUST be able to authenticate itself with the backend.
* The agent will monitor all the network traffic passing through it and send it to the backend.
* The agent isn't the way the user interact with test. It simply connects to f-interop and from there receive
instruction. All the commands send from the agent are for debugging and developing purposes and won't be enabled
by default in the final version.
"""
import logging
import uuid
import click
from connectors.tun import TunConnector
from connectors.core import CoreConnector
from connectors.http import HTTPConnector
from connectors.ping import PingConnector
from connectors.zeromq import ZMQConnector
from connectors.serialconn import SerialConnector
__version__ = (0, 0, 1)
DEFAULT_PLATFORM = 'f-interop.paris.inria.fr'
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
class Agent(object):
"""
Command line interface of the agent
"""
header = """
F-interop agent and management tool.
For more information, visit: http://f-interop.paris.inria.fr.
""",
def __init__(self):
self.cli = click.Group(
add_help_option=Agent.header,
short_help=Agent.header
)
# Options
self.server_option = click.Option(
param_decls=["--server"],
default=DEFAULT_PLATFORM,
required=True,
help="f-interop platform (default: %s)" % DEFAULT_PLATFORM)
self.user_option = click.Option(
param_decls=["--user"],
required=True,
help="F-interop username."
)
self.password_option = click.Option(
param_decls=["--password"],
required=True,
help="F-interop password.",
hide_input=True)
self.session_option = click.Option(
param_decls=["--session"],
required=True,
help="F-interop session id."
)
self.name_option = click.Option(
param_decls=["--name"],
default=str(uuid.uuid1()),
required=True,
help="Agent identity (default: random generated)")
# Commands
self.connect_command = click.Command(
"connect",
callback=self.handle_connect,
params=[self.user_option,
self.password_option,
self.session_option,
self.server_option,
self.name_option],
short_help="Authenticate user")
self.cli.add_command(self.connect_command)
self.plugins = {}
def handle_connect(self, user, password, session, server, name):
"""
Authenticate a USER with an f-interop.
This create a file/token that is reused to access the f-interop platform.
"""
data = {
"user": user,
"password": password,
"session": session,
"server": server,
"name": name
}
log.info("Try to connect with %s" % data)
self.plugins["core"] = CoreConnector(**data)
self.plugins["tun"] = TunConnector(**data)
self.plugins["zmq"] = ZMQConnector(**data)
self.plugins["ping"] = PingConnector(**data)
self.plugins["http"] = HTTPConnector(**data)
self.plugins["serial"] = SerialConnector(**data)
for p in self.plugins.values():
p.start()
def run(self):
self.cli()
if __name__ == "__main__":
agent = Agent()
agent.run()
# try:
# # Loop designed to catch the keyboard interruption
#
# while any(map(lambda x: x.is_alive(), agent.plugins)):
# # print([x.isAlive() for x in plugins])
# pass
#
# except KeyboardInterrupt as SystemExit:
# for plugin in agent.plugins:
# plugin.go_on = False
# time.sleep(1)
# log.critical('! Received keyboard interrupt, quitting threads.\n')
"""
"""
import json
import logging
from multiprocessing import Process
from kombu import Connection
from kombu import Exchange
from kombu import Queue
from kombu.mixins import ConsumerMixin
DEFAULT_EXCHANGE_NAME = "default"
class BaseConsumer(ConsumerMixin):
DEFAULT_EXCHANGE_NAME = "default"
def __init__(self, user, password, session, server, name, consumer_name):
"""
Args:
user: Username
password: User password
session: Test session
server: Backend for the RMQ
name: Identity of the component. Can be an UUID or a human nickname
consumer_name: Name to easily identify a process consuming.
"""
self.log = logging.getLogger(__name__)
self.log.setLevel(logging.DEBUG)
self.user = user
self.password = password
self.session = session
self.server = server
self.name = name
self.consumer_name = consumer_name
self.server_url = 'amqp://{user}:{password}@{server}/{session}'.format(user=user,
password=password,
session=session,
server=server)
self.connection = Connection(self.server_url,
transport_options={'confirm_publish': True})
self.exchange = Exchange(BaseConsumer.DEFAULT_EXCHANGE_NAME,
type="topic",
durable=True)
self.control_queue = Queue("control.{consumer_name}@{name}".format(name=name,
consumer_name=consumer_name),
exchange=self.exchange,
routing_key='control.{consumer_name}.#.{name}'.format(consumer_name=consumer_name,
name=name),
durable=False)
self.data_queue = Queue("data.{consumer_name}@{name}".format(name=name,
consumer_name=consumer_name),
exchange=self.exchange,
routing_key='data.{consumer_name}.#.{name}'.format(consumer_name=consumer_name,
name=name),
durable=False)
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=[self.control_queue],
callbacks=[self.handle_control],
no_ack=True,
accept=['json']),
Consumer(queues=[self.data_queue],
callbacks=[self.handle_data],
no_ack=True,
accept=["json"])
]
def on_consume_ready(self, connection, channel, consumers, wakeup=True, **kwargs):
self.log.info("{consumer_name} ready to consume data. Typical routing key: control.{consumer_name}@{name}".format(consumer_name=self.consumer_name, name=self.name))
def handle_control(self, body, message):
self.log.debug("DEFAULT HANDLE CONTROL")
self.log.debug(("Payload", message.payload))
self.log.debug(("Properties", message.properties))
self.log.debug(("Headers", message.headers))
self.log.debug(("body", message.body))
msg = None
try:
msg = json.loads(body)
self.log.debug(message)
except ValueError as e:
message.ack()
self.log.error(e)
self.log.error("Incorrect message: {0}".format(body))
if msg is not None:
self.log.debug("Just received that packet")
self.log.debug(msg)
def handle_data(self, body, message):
"""
Args:
msg:
Returns:
"""
self.log.debug("DEFAULT HANDLE DATA")
self.log.debug(("Payload", message.payload))
self.log.debug(("Properties", message.properties))
self.log.debug(("Headers", message.headers))
self.log.debug(("body", message.body))
# msg = None
# try:
# msg = json.loads(body)
# self.log.debug(message)
# except ValueError as e:
# message.ack()
# self.log.error(e)
# self.log.error("Incorrect message: {0}".format(body))
#
# if msg is not None:
# self.log.debug("Just received that packet")
# self.log.debug(msg)
def test_connection(self):
"""
Test if a component can talk on the event bus.
Returns:
"""
# for key in [control key, data key]:
# log.info("Testing on local routing key: %s" % key)
# self.basic_publish(key, "PING!!")
pass
class BaseController(Process):
"""
"""
def __init__(self, name, process_args=None):
if process_args is not None:
super(BaseController, self).__init__(**process_args)
else:
super(BaseController, self).__init__()
self.go_on = True
self.name = name
"""
Plugin to connect to the F-interop backend
"""
import json
import logging
from .base import BaseController, BaseConsumer
__version__ = (0, 0, 1)
log = logging.getLogger(__name__)
class CoreConsumer(BaseConsumer):
"""
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(CoreConsumer, self).__init__(user, password, session, server, name, consumer_name)
def get_consumers(self, Consumer, channel):
return [
Consumer(queues=[self.control_queue],
callbacks=[self.handle_control],
no_ack=True,
accept=['json']),
Consumer(queues=[self.data_queue],
callbacks=[self.handle_data],
no_ack=True,
accept=["json"])
]
def on_consume_ready(self, connection, channel, consumers, wakeup=True, **kwargs):
log.info("Backend ready to consume data")
log.info("-------------------------------------------------")
log.info("Go to this URL: http://{platform}/session/{session}".format(platform=self.server_url,
session=self.session))
log.info("-------------------------------------------------")
def handle_control(self, body, message):
log.debug("DEFAULT HANDLE CONTROL")
log.debug(("Payload", message.payload))
log.debug(("Properties", message.properties))
log.debug(("Headers", message.headers))
log.debug(("body", message.body))
msg = None
try:
msg = json.loads(body)
log.debug(message)
except ValueError as e:
message.ack()
log.error(e)
log.error("Incorrect message: {0}".format(body))
if msg is not None:
log.debug("Just received that packet")
log.debug(msg)
class CoreConnector(BaseController):
"""
"""
NAME = "core"
def __init__(self, **kwargs):
"""
Args:
key:
"""
super(CoreConnector, self).__init__(name=CoreConnector.NAME)
kwargs["consumer_name"] = CoreConnector.NAME
self.consumer = CoreConsumer(**kwargs)
def run(self):
"""
Returns:
"""
self.consumer.run()
import json
__version__ = (0, 0, 1)
import logging
from requests import Request, Session
from .base import BaseController, BaseConsumer
log = logging.getLogger(__name__)
class HTTPConsumer(BaseConsumer):
"""
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(HTTPConsumer, self).__init__(user, password, session, server, name, consumer_name)
def handle_control(self, body, message):
"""
Args:
body:
message:
Returns:
"""
required_keys = {"url", "verb", "data"}
msg = None
try:
msg = json.loads(body)
self.log.debug(message)
except ValueError as e:
message.ack()
self.log.error(e)
self.log.error("Incorrect message: {0}".format(body))
if msg is not None:
present_keys = set(msg.keys())
if not present_keys.issuperset(required_keys):
self.log.error("Error you need those keys: %s" % required_keys)
else:
s = Session()
req = Request(msg["verb"], msg["url"], data=msg["data"])
prepped = s.prepare_request(req)
resp = s.send(prepped)
self.log.info(resp.status_code)
class HTTPConnector(BaseController):
NAME = "http"
def __init__(self, **kwargs):
super(HTTPConnector, self).__init__(name=HTTPConnector.NAME)
kwargs["consumer_name"] = HTTPConnector.NAME
self.consumer = HTTPConsumer(**kwargs)
self.consumer.log = logging.getLogger(__name__)
self.consumer.log.setLevel(logging.DEBUG)
def run(self):
self.consumer.run()
"""
Create ICMP packets to test if all nodes are up
"""
import json
__version__ = (0, 0, 1)
import logging
from .base import BaseController, BaseConsumer
import subprocess
class PingConsumer(BaseConsumer):
"""
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(PingConsumer, self).__init__(user, password, session, server, name, consumer_name)
def handle_control(self, body, message):
"""
Args:
body:
message:
Returns:
"""
required_keys = {"host"}
msg = None
try:
msg = json.loads(body)
self.log.debug(message)
except ValueError as e:
message.ack()
self.log.error(e)
self.log.error("Incorrect message: {0}".format(body))
if msg is not None:
present_keys = set(msg.keys())
if not present_keys.issuperset(required_keys):
self.log("Error you need those keys: %s" % required_keys)
else:
# Default version is ping
version = {"IPv6": "ping6", "IPv4": "ping"}
executable = version.get(msg["version"], "ping")
# default packet count is 1
count_option = msg.get("count", 1)
# network interface, default is None
interface_option = msg.get("interface", "")
# run the ping
command = [executable, msg["host"], "-c", str(count_option)]
self.log.info("command launched: %s" % command)
p = subprocess.call(command)
self.log.info("result: %s" % p)
class PingConnector(BaseController):
NAME = "ping"
def __init__(self, **kwargs):
super(PingConnector, self).__init__(name=PingConnector.NAME)
kwargs["consumer_name"] = PingConnector.NAME
self.consumer = PingConsumer(**kwargs)
self.consumer.log = logging.getLogger(__name__)
self.consumer.log.setLevel(logging.DEBUG)
def run(self):
self.consumer.run()
"""
"""
import json
import sys
import logging
import serial
import os
from kombu import Connection
from kombu import Queue
import collections
# from finterop import AMQP_SERVER, AMQP_USER, AMQP_PASS, AMQP_VHOST, AMQP_EXCHANGE
from subprocess import Popen, PIPE
from .base import BaseController, BaseConsumer
from threading import Thread
import time
__version__ = (0, 0, 1)
log = logging.getLogger(__name__)
class SerialConsumer(BaseConsumer):
"""
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(SerialConsumer, self).__init__(user, password, session, server, name, consumer_name)
self.dispatcher = {
"data.serial.to_forward": self.handle_data,
}
# thread = Thread(target=self.bootstrap(), args=())
# thread.daemon = True
# thread.start()
self.bootstrap()
self.message_count = 0
# print ("it ok")
def bootstrap(self):
self.serial_port = None
try:
self.serial_port = str(os.environ['FINTEROP_CONNECTOR_SERIAL_PORT'])
log.info('FINTEROP_CONNECTOR_SERIAL_PORT env var imported: %s' % self.serial_port)
# open a subprocess to listen the serialport
path = os.path.dirname(os.path.abspath(__file__))
path += "/ReadCOM.py"
print(path)
p = Popen(['python', path, str(self.serial_port), "115200", str(self.name), str(self.server),
str(self.session), str(self.user), str(self.password)], stdin=PIPE, stdout=PIPE, stderr=PIPE)
except KeyError as e:
logging.warning(
'Cannot retrieve environment variables for serial connection: '
'FINTEROP_CONNECTOR_SERIAL_PORT'
'if no sniffer/injector needed for test ignore this warning')
def handle_data(self, body, message):
"""
Function that will handle serial management messages
exchange:
- default
example:
{
}
"""
if self.serial_port is None:
print('error: no serialport initiated')
return
# WRITE RECIEVED DATA INTO serial connector -> to SNIFFER-FWD motes -> Wireless link
path = os.path.dirname(os.path.abspath(__file__))
# path += "/SendCOM.py"
decoder = json.JSONDecoder(object_pairs_hook=collections.OrderedDict)
bodydict = decoder.decode(body)
# p=Popen(['python', path, str(self.serial_port), "115200", str(bodydict['data'])], stdin=PIPE, stdout=PIPE, stderr=PIPE)
# print (path)
usleep = lambda x: time.sleep(x / 1000000.0)
ser = serial.Serial(
port=self.serial_port,
baudrate=115200,
timeout=0.0)
# inputstr=sys.argv[3]
# ser.write(inputstr.decode('hex'))
try:
ser.write(bodydict['data'].decode('hex'))
except:
print('ERROR TRYING TO WRITE IN SERIAL INTERFACE')
usleep(300000)
# p.wait()
print("***************** MESSAGE INJECTED : BACKEND -> WIRELESS LINK *******************")
message.ack()
def handle_control(self, body, message):
msg = None
try:
msg = json.loads(body)
log.debug(message)
except ValueError as e:
message.ack()
log.error(e)
log.error("Incorrect message: {0}".format(body))
return
if msg["_type"] in self.dispatcher.keys():
self.dispatcher[msg["_type"]](msg)
else:
log.debug("Not supported action")
class SerialConnector(BaseController):
"""
"""
NAME = "serial"
def __init__(self, **kwargs):
"""
Args:
key: Serial message
"""
super(SerialConnector, self).__init__(name=SerialConnector.NAME)
self.serial = None
kwargs["consumer_name"] = SerialConnector.NAME
self.consumer = SerialConsumer(**kwargs)
self.consumer.log = logging.getLogger(__name__)
self.consumer.log.setLevel(logging.DEBUG)
# p=Popen(['python', './ReadCOM.py', self.serial_port, "115200", str(self.name), str(self.server), str(self.session), str(self.user),str(self.password)], stdin=PIPE, stdout=PIPE, stderr=PIPE)
def run(self):
self.consumer.run()
# p=Popen(['python', './ReadCOM.py', self.serial_port, "115200", str(self.name), str(self.server), str(self.session), str(self.user),str(self.password)], stdin=PIPE, stdout=PIPE, stderr=PIPE)
"""
"""
import json
import logging
import sys
from kombu import Connection
from kombu import Queue
from connectors.base import BaseController, BaseConsumer
from utils.opentun import OpenTunLinux, OpenTunMACOS
__version__ = (0, 0, 1)
log = logging<