Commit 16f6ba45 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

Merge branch 'master' into 'serial_connector'

# Conflicts:
#   utils/opentun.py
parents 562fc0f4 f589a89c
......@@ -64,7 +64,10 @@ F-interop agent and management tool.
Please use the following format to connect to the f-interop server:
sudo python -m agent connect --url amqp://alfredo:zitarrosa@exampleRmqHost[:port]/sessionXX --name coap_client_agent
sudo python -m agent connect
--url amqp://alfredo:zitarrosa@exampleRmqHost[:port]/sessionXX
--exchange myExchange
--name coap_client_agent
For more information, visit: http://doc.f-interop.eu
""",
......@@ -84,6 +87,12 @@ For more information, visit: http://doc.f-interop.eu
required=True,
help="AMQP url provided by F-Interop")
self.session_amqp_exchange = click.Option(
param_decls=["--exchange"],
default="amq.topic",
required=False,
help="AMQP exchange used in the session")
self.name_option = click.Option(
param_decls=["--name"],
default=str(uuid.uuid1()),
......@@ -97,6 +106,7 @@ For more information, visit: http://doc.f-interop.eu
callback=self.handle_connect,
params=[
self.session_url,
self.session_amqp_exchange,
self.name_option,
],
short_help="Authenticate user"
......@@ -106,7 +116,7 @@ For more information, visit: http://doc.f-interop.eu
self.plugins = {}
def handle_connect(self, url, name):
def handle_connect(self, url, exchange, name):
"""
Authenticate USER and create agent connection to f-interop.
......@@ -121,6 +131,9 @@ For more information, visit: http://doc.f-interop.eu
"name": name,
}
if exchange:
data.update({'exchange':exchange})
if p.port:
data.update({"server": "{}:{}".format(p.hostname, p.port)})
......
......@@ -16,7 +16,7 @@ DEFAULT_EXCHANGE_NAME = "amq.topic"
class BaseConsumer(ConsumerMixin):
DEFAULT_EXCHANGE_NAME = "amq.topic"
def __init__(self, user, password, session, server, name, consumer_name):
def __init__(self, user, password, session, server, exchange, name, consumer_name):
"""
Args:
......@@ -44,9 +44,14 @@ class BaseConsumer(ConsumerMixin):
self.connection = Connection(self.server_url,
transport_options={'confirm_publish': True})
self.exchange = Exchange(BaseConsumer.DEFAULT_EXCHANGE_NAME,
type="topic",
durable=True)
if exchange:
self.exchange = Exchange(exchange,
type="topic",
durable=True)
else:
self.exchange = Exchange(BaseConsumer.DEFAULT_EXCHANGE_NAME,
type="topic",
durable=True)
self.control_queue = Queue("control.{consumer_name}@{name}".format(name=name,
consumer_name=consumer_name),
......
......@@ -19,8 +19,8 @@ class CoreConsumer(BaseConsumer):
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(CoreConsumer, self).__init__(user, password, session, server, name, consumer_name)
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(CoreConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
def get_consumers(self, Consumer, channel):
return [
......
......@@ -16,8 +16,8 @@ class HTTPConsumer(BaseConsumer):
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(HTTPConsumer, self).__init__(user, password, session, server, name, consumer_name)
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(HTTPConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
def handle_control(self, body, message):
"""
......
......@@ -16,8 +16,8 @@ class PingConsumer(BaseConsumer):
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(PingConsumer, self).__init__(user, password, session, server, name, consumer_name)
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(PingConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
def handle_control(self, body, message):
"""
......
......@@ -20,8 +20,8 @@ class SerialConsumer(BaseConsumer):
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(SerialConsumer, self).__init__(user, password, session, server, name, consumer_name)
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(SerialConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
self.dispatcher = {
"data.serial.to_forward": self.handle_data,
}
......
......@@ -20,8 +20,8 @@ class TunConsumer(BaseConsumer):
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
super(TunConsumer, self).__init__(user, password, session, server, name, consumer_name)
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(TunConsumer, self).__init__(user, password, session, server, exchange, name, consumer_name)
self.dispatcher = {
"tun.start": self.handle_start,
}
......@@ -71,6 +71,7 @@ class TunConsumer(BaseConsumer):
params = {
'rmq_connection': self.connection,
'rmq_exchange': self.exchange,
'name': self.name,
'ipv6_host': ipv6_host,
'ipv6_prefix': ipv6_prefix,
......@@ -169,6 +170,8 @@ class TunConsumer(BaseConsumer):
self.log.debug("Not supported action")
class TunConnector(BaseController):
"""
......
......@@ -23,11 +23,12 @@ class ZMQConsumer(BaseConsumer):
AMQP helper
"""
def __init__(self, user, password, session, server, name, consumer_name):
def __init__(self, user, password, session, server, exchange, name, consumer_name):
super(ZMQConsumer, self).__init__(user,
password,
session,
server,
exchange,
name,
consumer_name)
......
......@@ -9,6 +9,7 @@ import time
logging.getLogger('pika').setLevel(logging.INFO)
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
queue_name = 'unittest_packet_router'
RETRY_PERIOD = 1
def publish_tun_start(exchange, channel, agent_id, ipv6_host, ipv6_prefix, ipv6_no_forwarding=False):
......@@ -52,8 +53,10 @@ def publish_tun_bootrap_success(exchange, channel, agent_id):
def check_response(channel, queue_name, agent_id):
method, header, body = channel.basic_get(queue=queue_name)
if body is not None:
try:
body_dict = json.loads(body.decode('utf-8'))
print('got message: %s'%body_dict)
if body_dict['_type'] == "tun.started" and body_dict['name'] == agent_id:
return True
except Exception as e:
......@@ -80,10 +83,11 @@ def bootstrap(amqp_url, amqp_exchange, agent_id, ipv6_host, ipv6_prefix, ipv6_no
for i in range(1, 4):
logging.debug("Let's start the bootstrap the agent %s try number %d" % (agent_id, i))
publish_tun_start(amqp_exchange, channel, agent_id, ipv6_host, ipv6_prefix, ipv6_no_forwarding)
time.sleep(4)
time.sleep(RETRY_PERIOD)
if check_response(channel, agent_event_q, agent_id):
logging.debug("Agent tun bootstrapped")
publish_tun_bootrap_success(channel, agent_id)
publish_tun_bootrap_success(amqp_exchange, channel, agent_id)
break
elif i < 3:
pass
else:
......
......@@ -333,7 +333,7 @@ class OpenTunLinux(object):
# RMQ setups
self.connection = rmq_connection
self.producer = self.connection.Producer(serializer='json')
self.exchange = Exchange(exchange, type="topic", durable=True)
self.exchange = rmq_exchange
self.name = name
self.packet_count = 0
......@@ -557,7 +557,7 @@ class OpenTunMACOS(object):
# RMQ setups
self.connection = rmq_connection
self.producer = self.connection.Producer(serializer='json')
self.exchange = Exchange(exchange, type="topic", durable=True)
self.exchange = rmq_exchange
self.name = name
self.tun_name = ''
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment