Commit f2c213ec authored by Federico Sismondi's avatar Federico Sismondi
Browse files

Add() /Device entities to DCA collection in CTX broker. Still pending...

Add() /Device entities to DCA collection in CTX broker. Still pending aggregation and pushind data to FlowerBoxes based on refDev
parent ba7a170b
......@@ -5,11 +5,15 @@ Initializes FlowerBoxes data using LoRa platform metadata about sensors.
Listens to LoRa platform POSTS (sensor readings), reformats data, and forwards to IoT platform.
"""
import re
import os
import json
import base64
import time
import unidecode
import dateutil.parser
import datetime
from geojson import Point
from flask import Flask, jsonify
from flask import request
......@@ -30,27 +34,40 @@ with open(LORA_PLATFORM_SECRET_FILE, 'r') as file:
assert secret, 'No auth token defined for LoRa platform API'
# AUTH Header for lora platform
h = {'Authorization': secret}
# URN schema for Device entities
URN_BASE_DEVICE_ENTITY = "urn:ngsi-ld:Device:Device-"
URN_BASE_FLOWERBED_ENTITY = "urn:ngsi-ld:FlowerBed:FlowerBed-"
# build header for IoT platform POSTs
http_header_post = {
'Fiware-Service': 'carouge',
'Content-Type': 'application/json',
}
# maps EUI to (URI, coords , operator_comment)
mapping_deveui_to_fiware_instance_meta = {
"0018b20000020a6a": ['urn:ngsi-ld:FlowerBed:FlowerBed-5', Point([0,0]), ''],
"0018b20000020a80": ['urn:ngsi-ld:FlowerBed:FlowerBed-4', Point([0,0]), ''],
"0018b20000020a35": ['urn:ngsi-ld:FlowerBed:FlowerBed-2', Point([0,0]), ''],
"0018b20000020a81": ['urn:ngsi-ld:FlowerBed:FlowerBed-8', Point([0,0]), ''],
"0018b20000020a83": ['urn:ngsi-ld:FlowerBed:FlowerBed-6', Point([0,0]), ''],
"0018b20000020a7d": ['urn:ngsi-ld:FlowerBed:FlowerBed-3', Point([0,0]), ''],
"0018b20000020a7c": ['urn:ngsi-ld:FlowerBed:FlowerBed-1', Point([0,0]), ''],
}
def get_ngsiv2_typed_description(val):
if isinstance(val, Point):
return {'type': 'geo:json', 'value': dict(val)}
elif isinstance(val, int) or isinstance(val, float):
return {'type': 'Number', 'value': val}
elif isinstance(val, str):
return {'type': 'Text', 'value': val}
elif isinstance(val, datetime.datetime):
return {'type': 'DateTime', 'value': val.isoformat()}
elif isinstance(val, dict):
return {'value': val} # bypassed
elif isinstance(val, list):
return {'value': val} # bypassed
elif isinstance(val, type(None)):
return {'type': 'Text', 'value': None}
else:
raise NotImplementedError("Couldn't get NGSIv2 type for object {} of type {}".format(val, type(val)))
# API errors & handlers
# = = = = = = = = API HANDLERS = = = = = = = = = = =
class InvalidUsage(Exception):
status_code = 400
......@@ -81,7 +98,8 @@ def healthcheck():
return 'This service is up and running!'
# Utility functions
# = = = = = = = = MOISTURE SENSORS AUX FUNCTIONS = = = = = = = =
def _log_level_of_moisture(cb, sensor):
""" Recommended levels for standard plants:
0-10 Centibars = Saturated soil
......@@ -152,19 +170,95 @@ def _get_reading(message_data):
raise InvalidUsage(message=msg)
def _forward_lora_devs_metadata_to_ctx_broker(dev_eui, entity_id, location, operator_comment):
app.logger.info("Initializing IoT sensor meta (entity {})".format(entity_id))
url = "{}/v2/entities/{}/attrs".format(URL_BASE, entity_id)
payload = {
#'boxId': {'value': entity_id.split('-')[-1]},
'refDevice': {'value': dev_eui},
#'boxOperatorComment': {'value': operator_comment},
'location': {'type': 'geo:json', 'value': {'coordinates': location, 'type': 'Point'}},
}
# = = = = = = = = CONTEXT BROKER FUNCTIONS = = = = = = = =
def _forward_lora_dev_data_to_ctx_broker(deveui, comment, longitude, latitude, last_reception, dataFrame,
dataFrame_rssi, dataFrame_timestamp):
""" Creates/Updates LoRa devs into ctx broker
Output Data model:
-id (static)
-type (static)
-serialNumber (static)
-controlledProperty (static)
-owner (static)
-source (static)
-description
-name
-dateLastValueReported
-value
-location
-rssi
NOTES:
name is regexed from LoRa platform <comment> field of device,
e.g. This device is below the big tree next to the tram station {device_name}=dev_2 , note by Maurizio!"
incurs into having name="dev_2"
"""
assert deveui
body = {}
app.logger.info('Creating/Updating devices in CTX broker for deveui {}'.format(deveui))
# static values
body.update({'id': '{}{}'.format(URN_BASE_DEVICE_ENTITY, deveui[-4:])})
body.update({'type': 'Device'})
body.update({'serialNumber': get_ngsiv2_typed_description(deveui)})
body.update({'controlledProperty': get_ngsiv2_typed_description(['soil_moisture'])})
body.update({'owner': get_ngsiv2_typed_description('Carouge')})
body.update({'source': get_ngsiv2_typed_description(LORA_PLATFORM_URL)})
# dynamic/real-time values
if longitude and latitude:
body.update({'location': get_ngsiv2_typed_description(Point([longitude, latitude]))})
if comment:
# do some special chars cleaning
unaccented_comment = unidecode.unidecode(comment)
unaccented_comment = unaccented_comment.replace("'", "")
unaccented_comment = unaccented_comment.replace("(", "")
unaccented_comment = unaccented_comment.replace(")", "")
body.update({'description': get_ngsiv2_typed_description(unaccented_comment)})
# parse device name hiden inside comment field
pattern = r'{device_name}=([^\s]+)'
result = re.search(pattern, unaccented_comment)
if result:
body.update({'name': get_ngsiv2_typed_description(result.groups()[0])})
if last_reception or dataFrame_timestamp:
in_date = None
try:
in_date = dateutil.parser.isoparse(last_reception)
assert in_date and isinstance(in_date, datetime.datetime), \
'Didnt get expected type from LoRa platform for last_reception'
except TypeError:
pass
try:
in_date = dateutil.parser.isoparse(dataFrame_timestamp)
assert in_date and isinstance(in_date, datetime.datetime), \
'Didnt get expected type from LoRa platform for dataFrame_timestamp'
except TypeError:
pass
if in_date:
body.update({'dateLastValueReported': get_ngsiv2_typed_description(in_date)})
else:
app.logger.warning("No date parsed for updaging Device field: dateLastValueReported")
if dataFrame:
body.update({'value': get_ngsiv2_typed_description(_get_reading(dataFrame))})
if dataFrame_rssi:
body.update({'rssi': get_ngsiv2_typed_description(dataFrame_rssi)})
r = client_request.patch(
app.logger.debug('Sending data to CTX broker: {}'.format(body))
url = "{}/v2/entities/?options=upsert".format(URL_BASE)
r = client_request.post(
url=url,
json=payload,
json=body,
headers=http_header_post
)
......@@ -173,46 +267,13 @@ def _forward_lora_devs_metadata_to_ctx_broker(dev_eui, entity_id, location, oper
app.logger.error(r.status_code)
app.logger.error(r.reason)
app.logger.error(r.json())
return "got error {}".format(r.reason)
return "Got error {}".format(r.reason)
else:
app.logger.info('HTTP response is {} - Entity updated'.format(r.status_code))
return "ok"
def _forward_reading_to_ctx_broker():
message_data = request.json['dataFrame']
sensor_reading = _get_reading(message_data)
if sensor_reading:
timestamp = request.json['timestamp']
dev_eui = request.json['deveui']
entity_id, _, _ = mapping_deveui_to_fiware_instance_meta[dev_eui]
_log_level_of_moisture(sensor_reading, entity_id)
app.logger.info("Forwarding IoT reading to IoT platform (entity {})".format(entity_id))
url = "{}/v2/entities/{}/attrs".format(URL_BASE, entity_id)
payload = {
'soilMoisture': {'value': sensor_reading},
#'dateObserved': {'type': 'DateTime', 'value': timestamp},
}
r = client_request.patch(
url=url,
json=payload,
headers=http_header_post
)
if not (200 <= r.status_code < 300):
app.logger.error('HTTP status code not 2xx, something went wrong..')
app.logger.error(r.status_code)
app.logger.error(r.reason)
app.logger.error(r.json())
return "got error {}".format(r.reason)
else:
app.logger.info('HTTP response is {} - Entity updated'.format(r.status_code))
return "ok"
# = = = = = = = = OTHER AUX FUNCTIONS = = = = = = = =
def _dump_raw_data_to_filesystem():
try:
filepath = os.path.join(COLLECTION_DIR, '{}.json'.format(int(time.time())))
......@@ -238,7 +299,7 @@ def _dummy_post_handler():
return 'Got POST with json %s' % request.json
# Flask route handlers
# = = = = = = = = FLASK ROUTES = = = = = = = =
@app.route('/dca-carouge-watering-sensed-data', methods=['POST'])
def post_dca_carouge_watering_sensed_data():
return _dummy_post_handler()
......@@ -253,11 +314,19 @@ def post_dca_carouge_watering_sensed_data_test():
def post_dca_carouge_watering_sensed_data_payload():
resp = {}
resp.update({"fs_dump": _dump_raw_data_to_filesystem()})
resp.update({"message_broker_fw": _forward_reading_to_ctx_broker()})
resp.update({"update_devices_entities": _forward_lora_dev_data_to_ctx_broker(
deveui=request.json['deveui'],
comment=None,
longitude=None,
latitude=None,
last_reception=None,
dataFrame=request.json['dataFrame'],
dataFrame_rssi=request.json['rssi'],
dataFrame_timestamp=request.json['timestamp']
)})
# resp.update({'description': _dummy_post_handler()})
return resp
@app.route('/dca-carouge-watering-sensed-data/rest/callback/nodeinfo', methods=['PUT'])
def put_dca_carouge_watering_sensed_node_info():
_dummy_post_handler()
......@@ -274,44 +343,26 @@ def get_dca_carouge_watering_sensed_data_callback():
raise InvalidUsage(message='This is still not implemented')
# = = = = = = = = MAIN = = = = = = = =
def init():
# initialization stuff
# initialization of devices info in CTX broker
# pull data from LoRa platform
lora_nodes_info = client_request.get(url=LORA_PLATFORM_URL + '/rest/nodes', headers=h).json()
# push data to CTX broker
for i in lora_nodes_info:
dev = i['deveui']
coords = [i['longitude'], i['latitude']]
# do some special chars cleaning
unaccented_comment = unidecode.unidecode(i['comment'])
unaccented_comment = unaccented_comment.replace("'", "")
unaccented_comment = unaccented_comment.replace("(", "")
unaccented_comment = unaccented_comment.replace(")", "")
try:
uri, _, _ = mapping_deveui_to_fiware_instance_meta[dev]
except KeyError:
uri = 'urn:ngsi-ld:FlowerBed:FlowerBed-{}'.format(dev)
print('Unkown LoRa device found for app, adding it as {}'.format(uri))
mapping_deveui_to_fiware_instance_meta[dev] = [uri, coords, unaccented_comment]
print('Checking if LoRa devices already exist in context manager')
for dev_eui, dev_info in mapping_deveui_to_fiware_instance_meta.items():
url = "{}/v2/entities/{}".format(URL_BASE, dev_info[0])
r = client_request.get(
url=url,
headers={'Fiware-Service': 'carouge', }
_forward_lora_dev_data_to_ctx_broker(
deveui=i['deveui'],
comment=i['comment'],
longitude=i['longitude'],
latitude=i['latitude'],
last_reception=i['last_reception'],
dataFrame=None,
dataFrame_rssi=None,
dataFrame_timestamp=None
)
if 200 <= r.status_code < 300:
print('Entity found: {}'.format(url))
else:
msg = 'Entity not found, please create if before restarting DCA component: {}'.format(url)
raise SystemError(msg)
_forward_lora_devs_metadata_to_ctx_broker(dev_eui, *dev_info)
def main():
app.run(host="0.0.0.0", debug=True, port=80)
......
......@@ -2,3 +2,4 @@ flask
requests
unidecode
geojson
python-dateutil
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment