Commit 1261c1e2 authored by Federico Sismondi's avatar Federico Sismondi

now agent supports dynamically selecting the exchange to be used

parent 78d7f10d
......@@ -62,7 +62,10 @@ F-interop agent and management tool.
Please use the following format to connect to the f-interop server:
sudo python -m agent connect --url amqp://alfredo:zitarrosa@exampleRmqHost[:port]/sessionXX --name coap_client_agent
sudo python -m agent connect
--url amqp://alfredo:zitarrosa@exampleRmqHost[:port]/sessionXX
--exchange myExchange
--name coap_client_agent
For more information, visit: http://doc.f-interop.eu
""",
......@@ -80,6 +83,12 @@ For more information, visit: http://doc.f-interop.eu
required=True,
help="AMQP url provided by F-Interop")
self.session_amqp_exchange = click.Option(
param_decls=["--exchange"],
default="amq.topic",
required=False,
help="AMQP exchange used in the session")
self.name_option = click.Option(
param_decls=["--name"],
default=str(uuid.uuid1()),
......@@ -93,6 +102,7 @@ For more information, visit: http://doc.f-interop.eu
callback=self.handle_connect,
params=[
self.session_url,
self.session_amqp_exchange,
self.name_option,
],
short_help="Authenticate user"
......@@ -102,7 +112,7 @@ For more information, visit: http://doc.f-interop.eu
self.plugins = {}
def handle_connect(self, url, name):
def handle_connect(self, url, exchange, name):
"""
Authenticate USER and create agent connection to f-interop.
......@@ -117,6 +127,9 @@ For more information, visit: http://doc.f-interop.eu
"name": name,
}
if exchange:
data.update({'exchange':exchange})
if p.port:
data.update({"server": "{}:{}".format(p.hostname, p.port)})
......
......@@ -16,7 +16,7 @@ DEFAULT_EXCHANGE_NAME = "amq.topic"
class BaseConsumer(ConsumerMixin):
DEFAULT_EXCHANGE_NAME = "amq.topic"
def __init__(self, user, password, session, server, name, consumer_name):
def __init__(self, user, password, session, server, exchange, name, consumer_name):
"""
Args:
......@@ -44,9 +44,14 @@ class BaseConsumer(ConsumerMixin):
self.connection = Connection(self.server_url,
transport_options={'confirm_publish': True})
self.exchange = Exchange(BaseConsumer.DEFAULT_EXCHANGE_NAME,
type="topic",
durable=True)
if exchange:
self.exchange = Exchange(exchange,
type="topic",
durable=True)
else:
self.exchange = Exchange(BaseConsumer.DEFAULT_EXCHANGE_NAME,
type="topic",
durable=True)
self.control_queue = Queue("control.{consumer_name}@{name}".format(name=name,
consumer_name=consumer_name),
......
......@@ -19,8 +19,8 @@ class CoreConsumer(BaseConsumer):
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(CoreConsumer, self).__init__(user, password, session, server, name, consumer_name)
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(CoreConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
def get_consumers(self, Consumer, channel):
return [
......
......@@ -16,8 +16,8 @@ class HTTPConsumer(BaseConsumer):
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(HTTPConsumer, self).__init__(user, password, session, server, name, consumer_name)
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(HTTPConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
def handle_control(self, body, message):
"""
......
......@@ -16,8 +16,8 @@ class PingConsumer(BaseConsumer):
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(PingConsumer, self).__init__(user, password, session, server, name, consumer_name)
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(PingConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
def handle_control(self, body, message):
"""
......
......@@ -22,8 +22,8 @@ class SerialConsumer(BaseConsumer):
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(SerialConsumer, self).__init__(user, password, session, server, name, consumer_name)
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(SerialConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
self.dispatcher = {
"data.serial.to_forward": self.handle_data,
}
......
......@@ -22,8 +22,8 @@ class TunConsumer(BaseConsumer):
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(TunConsumer, self).__init__(user, password, session, server, name, consumer_name)
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(TunConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
self.dispatcher = {
"tun.start": self.handle_start,
}
......@@ -73,6 +73,7 @@ class TunConsumer(BaseConsumer):
params = {
'rmq_connection': self.connection,
'rmq_exchange': self.exchange,
'name': self.name,
'ipv6_host': ipv6_host,
'ipv6_prefix': ipv6_prefix,
......@@ -172,6 +173,8 @@ class TunConsumer(BaseConsumer):
self.log.debug("Not supported action")
class TunConnector(BaseController):
"""
......
......@@ -23,11 +23,12 @@ class ZMQConsumer(BaseConsumer):
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(ZMQConsumer, self).__init__(user,
password,
session,
server,
exchange,
name,
consumer_name)
......
......@@ -83,7 +83,7 @@ def bootstrap(amqp_url, amqp_exchange, agent_id, ipv6_host, ipv6_prefix, ipv6_no
time.sleep(4)
if check_response(channel, agent_event_q, agent_id):
logging.debug("Agent tun bootstrapped")
publish_tun_bootrap_success(channel, agent_id)
publish_tun_bootrap_success(amqp_exchange, channel, agent_id)
elif i < 3:
pass
else:
......
......@@ -44,8 +44,8 @@ def buf2int(buf):
def formatStringBuf(buf):
return '({0:>2}B) {1}'.format(
len(buf),
'-'.join(["%02x" % ord(b) for b in buf]),
len(buf),
'-'.join(["%02x" % ord(b) for b in buf]),
)
......@@ -55,8 +55,8 @@ def formatBuf(buf):
``[0xab,0xcd,0xef,0x00] -> '(4B) ab-cd-ef-00'``
"""
return '({0:>2}B) {1}'.format(
len(buf),
'-'.join(["%02x" % b for b in buf]),
len(buf),
'-'.join(["%02x" % b for b in buf]),
)
......@@ -72,8 +72,8 @@ def formatAddr(addr):
def formatThreadList():
return '\nActive threads ({0})\n {1}'.format(
threading.activeCount(),
'\n '.join([t.name for t in threading.enumerate()]),
threading.activeCount(),
'\n '.join([t.name for t in threading.enumerate()]),
)
......@@ -325,14 +325,14 @@ class OpenTunLinux(object):
Class which interfaces between a TUN virtual interface and an EventBus.
"""
def __init__(self, name, rmq_connection, exchange="amq.topic",
ipv6_prefix=None, ipv6_host=None, ipv6_no_forwarding = None,
def __init__(self, name, rmq_connection, rmq_exchange,
ipv6_prefix=None, ipv6_host=None, ipv6_no_forwarding=None,
ipv4_host=None, ipv4_network=None, ipv4_netmask=None):
# RMQ setups
self.connection = rmq_connection
self.producer = self.connection.Producer(serializer='json')
self.exchange = Exchange(exchange, type="topic", durable=True)
self.exchange = rmq_exchange
self.name = name
self.packet_count = 0
......@@ -481,8 +481,8 @@ class OpenTunLinux(object):
TUN interface.
"""
return TunReadThread(
self.tunIf,
self._tunToEventBus
self.tunIf,
self._tunToEventBus
)
def _tunToEventBus(self, data):
......@@ -549,14 +549,14 @@ class OpenTunMACOS(object):
Class which interfaces between a TUN virtual interface and an EventBus.
'''
def __init__(self, name, rmq_connection, exchange="amq.topic",
ipv6_prefix=None, ipv6_host=None, ipv6_no_forwarding = None,
def __init__(self, name, rmq_connection, rmq_exchange,
ipv6_prefix=None, ipv6_host=None, ipv6_no_forwarding=None,
ipv4_host=None, ipv4_network=None, ipv4_netmask=None):
# RMQ setups
self.connection = rmq_connection
self.producer = self.connection.Producer(serializer='json')
self.exchange = Exchange(exchange, type="topic", durable=True)
self.exchange = rmq_exchange
self.name = name
self.tun_name = ''
......@@ -588,7 +588,6 @@ class OpenTunMACOS(object):
ipv4_netmask = [255, 255, 0, 0]
self.ipv4_netmask = ipv4_netmask
log.debug("IP info")
log.debug('ipv6_prefix: ' + self.ipv6_prefix)
log.debug('ipv6_host: ' + self.ipv6_host)
......@@ -604,8 +603,6 @@ class OpenTunMACOS(object):
else:
self.tunReadThread = None
# ======================== public ==========================================
# ======================== private =========================================
......@@ -695,8 +692,8 @@ class OpenTunMACOS(object):
TUN interface.
'''
return TunReadThread(
self.tunIf,
self._tunToEventBus
self.tunIf,
self._tunToEventBus
)
def _tunToEventBus(self, data):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment