Commit cc65e678 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

Merge branch 'dca_watering' into 'master'

Dca watering

See merge request !11
parents 8826b276 4b256ec2
#!/usr/bin/env python3
"""
Initializes FlowerBoxes data using LoRa platform metadata about sensors.
Listens to LoRa platform POSTS (sensor readings), reformats data, and forwards to IoT platform.
In simple words this module:
- Initializes Device entities
- Updates Device and Flowerbed entities' attributes.
# datetime for now should be datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")
"""
import re
import os
import json
import base64
import time
import unidecode
import dateutil.parser
import datetime
from geojson import Point
from flask import Flask, jsonify
from flask import request
......@@ -30,27 +37,55 @@ with open(LORA_PLATFORM_SECRET_FILE, 'r') as file:
assert secret, 'No auth token defined for LoRa platform API'
# AUTH Header for lora platform
h = {'Authorization': secret}
# build header for IoT platform POSTs
# URN schema for Device entities
URN_BASE_DEVICE_ENTITY = "urn:ngsi-ld:Device:Device-"
URN_BASE_FLOWERBED_ENTITY = "urn:ngsi-ld:FlowerBed:FlowerBed-"
# build header for POST
http_header_post = {
'Fiware-Service': 'carouge',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
# maps EUI to (URI, coords , operator_comment)
mapping_deveui_to_fiware_instance_meta = {
"0018b20000020a6a": ['urn:ngsi-ld:FlowerBed:FlowerBed-5', Point([0,0]), ''],
"0018b20000020a80": ['urn:ngsi-ld:FlowerBed:FlowerBed-4', Point([0,0]), ''],
"0018b20000020a35": ['urn:ngsi-ld:FlowerBed:FlowerBed-2', Point([0,0]), ''],
"0018b20000020a81": ['urn:ngsi-ld:FlowerBed:FlowerBed-8', Point([0,0]), ''],
"0018b20000020a83": ['urn:ngsi-ld:FlowerBed:FlowerBed-6', Point([0,0]), ''],
"0018b20000020a7d": ['urn:ngsi-ld:FlowerBed:FlowerBed-3', Point([0,0]), ''],
"0018b20000020a7c": ['urn:ngsi-ld:FlowerBed:FlowerBed-1', Point([0,0]), ''],
# build header for GET
http_header_get = {
'Fiware-Service': 'carouge',
'Accept': 'application/json',
}
# API errors & handlers
def get_ngsiv2_typed_description(val):
if isinstance(val, Point):
return {'type': 'geo:json', 'value': dict(val)}
elif isinstance(val, int) or isinstance(val, float):
return {'type': 'Number', 'value': val}
elif isinstance(val, str):
pat = r'[<>"\'=;()]' # see https://fiware-orion.readthedocs.io/en/master/user/forbidden_characters/index.html
r = re.compile(pat)
if r.search(val) is None:
new_val = val
else:
new_val = re.sub(pat, '', val)
app.logger.warning("Special char(s) deleted. This <{}> turned into this <{}>".format(val, new_val))
return {'type': 'Text', 'value': new_val}
elif isinstance(val, datetime.datetime):
# replace +00:00 is the same as Z but Orion doesnt like it :/
return {'type': 'DateTime', 'value': str(val.isoformat()).replace("+00:00", "Z")}
elif isinstance(val, dict):
return {'value': val} # bypassed
elif isinstance(val, list):
return {'value': val} # bypassed
elif isinstance(val, type(None)):
return {'type': 'Text', 'value': None}
else:
raise NotImplementedError("Couldn't get NGSIv2 type for object {} of type {}".format(val, type(val)))
# = = = = = = = = API HANDLERS = = = = = = = = = = =
class InvalidUsage(Exception):
status_code = 400
......@@ -81,7 +116,8 @@ def healthcheck():
return 'This service is up and running!'
# Utility functions
# = = = = = = = = MOISTURE SENSORS AUX FUNCTIONS = = = = = = = =
def _log_level_of_moisture(cb, sensor):
""" Recommended levels for standard plants:
0-10 Centibars = Saturated soil
......@@ -96,17 +132,17 @@ def _log_level_of_moisture(cb, sensor):
"""
if cb < 10:
app.logger.warning("{} indicates soil is saturated wet - {} cb!".format(sensor, cb))
app.logger.warning("{} indicates soil is saturated wet: {} cb!".format(sensor, cb))
elif cb < 30:
app.logger.info("{} indicates soil is adequately wet - {} cb".format(sensor, cb))
app.logger.info("{} indicates soil is adequately wet: {} cb".format(sensor, cb))
elif cb < 60:
app.logger.info("{} indicates soil in usual range of irrigation for moist soils - {} cb".format(sensor, cb))
app.logger.info("{} indicates soil in usual range of irrigation for moist soils: {} cb".format(sensor, cb))
elif cb < 100:
app.logger.warning("{} indicates soil needs irrigation - {} cb".format(sensor, cb))
app.logger.warning("{} indicates soil needs irrigation: {} cb".format(sensor, cb))
elif cb < 200:
app.logger.warning("{} indicates soil is dangerously dry - {} cb".format(sensor, cb))
app.logger.warning("{} indicates soil is dangerously dry: {} cb".format(sensor, cb))
else:
app.logger.error("Measurement not in expected range - {} cb".format(cb))
app.logger.error("Measurement not in expected range: {} cb".format(cb))
def _conv_micro_volts_to_moiture_cb(micro_volts):
......@@ -152,20 +188,33 @@ def _get_reading(message_data):
raise InvalidUsage(message=msg)
def _forward_lora_devs_metadata_to_ctx_broker(dev_eui, entity_id, location, operator_comment):
app.logger.info("Initializing IoT sensor meta (entity {})".format(entity_id))
url = "{}/v2/entities/{}/attrs".format(URL_BASE, entity_id)
payload = {
#'boxId': {'value': entity_id.split('-')[-1]},
'refDevice': {'value': dev_eui},
#'boxOperatorComment': {'value': operator_comment},
'location': {'type': 'geo:json', 'value': {'coordinates': location, 'type': 'Point'}},
}
# = = = = = = = = CONTEXT BROKER FUNCTIONS = = = = = = = =
def _update_flowerbed_entity(soil_moisture, sensor_eui):
flowerbed_id = None
# check if refDevice attr defined for any FlowerBed
url = "{}/v2/entities/?q=refDevice=={}&type=FlowerBed".format(URL_BASE, sensor_eui)
r = client_request.get(
url=url,
headers=http_header_get
)
if r.ok and r.json():
flowerbed_id = r.json()[0]['id']
if not flowerbed_id:
msg = 'No FlowerBed associated to sensor {}'.format(sensor_eui)
app.logger.warning(msg)
return msg
app.logger.debug('Found FlowerBed {} associated to sensor {}'.format(flowerbed_id, sensor_eui))
url = "{}/v2/entities/{}/attrs".format(URL_BASE, flowerbed_id)
r = client_request.patch(
url=url,
json=payload,
headers=http_header_post
headers=http_header_post,
json={
"soilMoisture": get_ngsiv2_typed_description(soil_moisture)
}
)
if not (200 <= r.status_code < 300):
......@@ -173,46 +222,109 @@ def _forward_lora_devs_metadata_to_ctx_broker(dev_eui, entity_id, location, oper
app.logger.error(r.status_code)
app.logger.error(r.reason)
app.logger.error(r.json())
return "got error {}".format(r.reason)
return "Got error {}".format(r.reason)
else:
app.logger.info('HTTP response is {} - Entity updated'.format(r.status_code))
return "ok"
def _forward_reading_to_ctx_broker():
message_data = request.json['dataFrame']
sensor_reading = _get_reading(message_data)
def _update_device_entity(deveui, comment, longitude, latitude, last_reception, dataFrame,
dataFrame_rssi, dataFrame_timestamp):
""" Creates/Updates LoRa devs into ctx broker
Output Data model:
-id (static)
-type (static)
-serialNumber (static)
-controlledProperty (static)
-owner (static)
-source (static)
-description
-name
-dateLastValueReported
-value
-location
-rssi
NOTES:
name is regexed from LoRa platform <comment> field of device,
e.g. This device is below the big tree next to the tram station {device_name}=dev_2 , note by Maurizio!"
incurs into having name="dev_2"
if sensor_reading:
timestamp = request.json['timestamp']
dev_eui = request.json['deveui']
entity_id, _, _ = mapping_deveui_to_fiware_instance_meta[dev_eui]
"""
assert deveui
_log_level_of_moisture(sensor_reading, entity_id)
app.logger.info("Forwarding IoT reading to IoT platform (entity {})".format(entity_id))
url = "{}/v2/entities/{}/attrs".format(URL_BASE, entity_id)
payload = {
'soilMoisture': {'value': sensor_reading},
#'dateObserved': {'type': 'DateTime', 'value': timestamp},
}
body = {}
r = client_request.patch(
url=url,
json=payload,
headers=http_header_post
)
app.logger.info('Creating/Updating devices in CTX broker for deveui {}'.format(deveui))
# static values
body.update({'id': '{}{}'.format(URN_BASE_DEVICE_ENTITY, deveui[-4:])})
body.update({'type': 'Device'})
body.update({'serialNumber': get_ngsiv2_typed_description(deveui)})
body.update({'controlledProperty': get_ngsiv2_typed_description(['soil_moisture'])})
body.update({'owner': get_ngsiv2_typed_description('Carouge')})
body.update({'source': get_ngsiv2_typed_description(LORA_PLATFORM_URL)})
if not (200 <= r.status_code < 300):
app.logger.error('HTTP status code not 2xx, something went wrong..')
app.logger.error(r.status_code)
app.logger.error(r.reason)
app.logger.error(r.json())
return "got error {}".format(r.reason)
# dynamic/real-time values
if longitude and latitude:
body.update({'location': get_ngsiv2_typed_description(Point([longitude, latitude]))})
if comment:
body.update({'description': get_ngsiv2_typed_description(comment)})
# parse device name hidden inside comment field
pattern = r'{device_name}=([^\s]+)'
result = re.search(pattern, comment)
if result:
body.update({'name': get_ngsiv2_typed_description(result.groups()[0])})
if last_reception or dataFrame_timestamp:
in_date = None
try:
in_date = dateutil.parser.isoparse(last_reception)
assert in_date and isinstance(in_date, datetime.datetime), \
'Didnt get expected type from LoRa platform for last_reception'
except TypeError:
pass
try:
in_date = dateutil.parser.isoparse(dataFrame_timestamp)
assert in_date and isinstance(in_date, datetime.datetime), \
'Didnt get expected type from LoRa platform for dataFrame_timestamp'
except TypeError:
pass
if in_date:
body.update({'dateLastValueReported': get_ngsiv2_typed_description(in_date)})
else:
app.logger.info('HTTP response is {} - Entity updated'.format(r.status_code))
return "ok"
app.logger.warning("No date parsed for updaging Device field: dateLastValueReported")
if dataFrame:
body.update({'value': get_ngsiv2_typed_description(_get_reading(dataFrame))})
if dataFrame_rssi:
body.update({'rssi': get_ngsiv2_typed_description(dataFrame_rssi)})
app.logger.debug('Sending data to CTX broker: {}'.format(body))
url = "{}/v2/entities/?options=upsert".format(URL_BASE)
r = client_request.post(
url=url,
json=body,
headers=http_header_post
)
if not (200 <= r.status_code < 300):
app.logger.error('HTTP status code not 2xx, something went wrong..')
app.logger.error(r.status_code)
app.logger.error(r.reason)
app.logger.error(r.json())
return "Got error {}".format(r.reason)
else:
app.logger.info('HTTP response is {} - Entity updated'.format(r.status_code))
return "ok"
# = = = = = = = = OTHER AUX FUNCTIONS = = = = = = = =
def _dump_raw_data_to_filesystem():
app.logger.info('dumping request to FS')
try:
......@@ -239,7 +351,7 @@ def _dummy_post_handler():
return 'Got POST with json %s' % request.json
# Flask route handlers
# = = = = = = = = FLASK ROUTES = = = = = = = =
@app.route('/dca-carouge-watering-sensed-data', methods=['POST'])
def post_dca_carouge_watering_sensed_data():
return _dummy_post_handler()
......@@ -252,10 +364,27 @@ def post_dca_carouge_watering_sensed_data_test():
@app.route('/dca-carouge-watering-sensed-data/rest/callback/payloads/ul', methods=['POST'])
def post_dca_carouge_watering_sensed_data_payload():
# get values
soil_humidity, sensor_eui = _get_reading(request.json['dataFrame']), request.json['deveui']
# log reading
_log_level_of_moisture(soil_humidity, sensor_eui)
# push data to context broker
resp = {}
resp.update({"fs_dump": _dump_raw_data_to_filesystem()})
resp.update({"message_broker_fw": _forward_reading_to_ctx_broker()})
# resp.update({'description': _dummy_post_handler()})
resp.update({"update_flowerbed_entities": _update_flowerbed_entity(soil_humidity, sensor_eui)})
resp.update({"update_device_entities": _update_device_entity(
deveui=request.json['deveui'],
comment=None,
longitude=None,
latitude=None,
last_reception=None,
dataFrame=request.json['dataFrame'],
dataFrame_rssi=request.json['rssi'],
dataFrame_timestamp=request.json['timestamp']
)})
return resp
......@@ -275,44 +404,26 @@ def get_dca_carouge_watering_sensed_data_callback():
raise InvalidUsage(message='This is still not implemented')
# = = = = = = = = MAIN = = = = = = = =
def init():
# initialization stuff
# initialization of devices info in CTX broker
# pull data from LoRa platform
lora_nodes_info = client_request.get(url=LORA_PLATFORM_URL + '/rest/nodes', headers=h).json()
# push data to CTX broker
for i in lora_nodes_info:
dev = i['deveui']
coords = [i['longitude'], i['latitude']]
# do some special chars cleaning
unaccented_comment = unidecode.unidecode(i['comment'])
unaccented_comment = unaccented_comment.replace("'", "")
unaccented_comment = unaccented_comment.replace("(", "")
unaccented_comment = unaccented_comment.replace(")", "")
try:
uri, _, _ = mapping_deveui_to_fiware_instance_meta[dev]
except KeyError:
uri = 'urn:ngsi-ld:FlowerBed:FlowerBed-{}'.format(dev)
print('Unkown LoRa device found for app, adding it as {}'.format(uri))
mapping_deveui_to_fiware_instance_meta[dev] = [uri, coords, unaccented_comment]
print('Checking if LoRa devices already exist in context manager')
for dev_eui, dev_info in mapping_deveui_to_fiware_instance_meta.items():
url = "{}/v2/entities/{}".format(URL_BASE, dev_info[0])
r = client_request.get(
url=url,
headers={'Fiware-Service': 'carouge', }
_update_device_entity(
deveui=i['deveui'],
comment=i['comment'],
longitude=i['longitude'],
latitude=i['latitude'],
last_reception=i['last_reception'],
dataFrame=None,
dataFrame_rssi=None,
dataFrame_timestamp=None
)
if 200 <= r.status_code < 300:
print('Entity found: {}'.format(url))
else:
msg = 'Entity not found, please create if before restarting DCA component: {}'.format(url)
raise SystemError(msg)
_forward_lora_devs_metadata_to_ctx_broker(dev_eui, *dev_info)
def main():
app.run(host="0.0.0.0", debug=True, port=80)
......
......@@ -2,3 +2,4 @@ flask
requests
unidecode
geojson
python-dateutil
......@@ -39,8 +39,10 @@ http_header_post = {
}
# build header for GET
http_header_get = http_header_post.copy()
http_header_get.pop('Content-Type')
http_header_get = {
'Fiware-Service': 'carouge',
'Accept': 'application/json',
}
logger.info('Config: \n\tPOST_DATA_PERIOD: {} seconds \n\tURL: {}'.format(POST_DATA_PERIOD, URL_BASE))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment