Commit 6f16d072 authored by Federico Sismondi's avatar Federico Sismondi
Browse files

Add files and updates for handling <python3 app.py push-locally-collected >...

Add files and updates for handling <python3 app.py push-locally-collected > for both fountain and watering use cases of carouge
parent 4825a029
...@@ -55,7 +55,7 @@ http_header_post = { ...@@ -55,7 +55,7 @@ http_header_post = {
def get_ngsiv2_typed_description(val, force_date_modified=None): def get_ngsiv2_typed_description(val, force_date_modified=None):
""" """
This function updates the attributes data format and meta data for aligining it to ngsi v2 This function updates the attributes data format and meta data for aligning it to ngsi v2
:param val: Value to be added to entity as attribute :param val: Value to be added to entity as attribute
:param force_date_modified: Optional datetime for updating dateModified meta data of attribute :param force_date_modified: Optional datetime for updating dateModified meta data of attribute
:return: Dictionary of attribute typed and formated as NGSI-v2 :return: Dictionary of attribute typed and formated as NGSI-v2
......
{
"confirmed": false,
"cr_used": "4/5",
"dataFrame": "gAD/gw==",
"data_format": "base64",
"decrypted": true,
"devaddr": 20906033,
"deveui": "70b3d5fffebf6952",
"device_redundancy": 1,
"dr_used": "SF7BW125",
"early": false,
"fcnt": 62,
"freq": 867500000,
"gtw_info": [
{
"ant": 0,
"foff": 0,
"gtw_id": "000000000800000a",
"rssi": -62,
"snr": 9.8
}
],
"id": 1602783550187,
"live": true,
"mac_msg": "QDEAPwGAPgADvUapqxAsnsE=",
"port": 3,
"rssi": -62,
"sf_used": 7,
"snr": 9.8,
"time_on_air_ms": 51.456,
"timestamp": "2020-10-15T17:39:10.187Z"
}
{
"confirmed": false,
"cr_used": "4/5",
"dataFrame": "gAD/gw==",
"data_format": "base64",
"decrypted": true,
"devaddr": 20906033,
"deveui": "70b3d5fffebf6952",
"device_redundancy": 1,
"dr_used": "SF7BW125",
"early": false,
"fcnt": 85,
"freq": 868300000,
"gtw_info": [
{
"ant": 0,
"foff": 0,
"gtw_id": "000000000800000a",
"rssi": -60,
"snr": 8.2
}
],
"id": 1602824959350,
"live": true,
"mac_msg": "QDEAPwGAVQADNlKYndMUonM=",
"port": 3,
"rssi": -60,
"sf_used": 7,
"snr": 8.2,
"time_on_air_ms": 51.456,
"timestamp": "2020-10-16T05:09:19.350Z"
}
{
"confirmed": false,
"cr_used": "4/5",
"dataFrame": "+AACsgD/AGcARgMr",
"data_format": "base64",
"decrypted": true,
"devaddr": 4063278,
"deveui": "70b3d5fffebf6a44",
"device_redundancy": 1,
"dr_used": "SF7BW125",
"early": false,
"fcnt": 1626,
"freq": 868500000,
"gtw_info": [
{
"ant": 0,
"foff": 0,
"gtw_id": "000000000800000a",
"rssi": -45,
"snr": 8.2
}
],
"id": 1605542051059,
"live": true,
"mac_msg": "QC4APgCAWgYDf71VvqJ/CW0/c2cjGepvHA==",
"port": 3,
"rssi": -45,
"sf_used": 7,
"snr": 8.2,
"time_on_air_ms": 61.696,
"timestamp": "2020-11-16T15:54:11.059Z"
}
{
"confirmed": false,
"cr_used": "4/5",
"dataFrame": "+AACugD+AGAASQM1",
"data_format": "base64",
"decrypted": true,
"devaddr": 4063278,
"deveui": "70b3d5fffebf6a44",
"device_redundancy": 1,
"dr_used": "SF7BW125",
"early": false,
"fcnt": 1761,
"freq": 868300000,
"gtw_info": [
{
"ant": 0,
"foff": 0,
"gtw_id": "000000000800000a",
"rssi": -47,
"snr": 9.2
}
],
"id": 1605781541945,
"live": true,
"mac_msg": "QC4APgCA4QYDQivbkgh6jLg8AoIEtH0+SA==",
"port": 3,
"rssi": -47,
"sf_used": 7,
"snr": 9.2,
"time_on_air_ms": 61.696,
"timestamp": "2020-11-19T10:25:41.945Z"
}
dump dir for collected raw data dump dir for collected raw data
===============================
data structure pushed by devices LoRa devices is complex, please look at app.py to understand how values are multiplexed
into received dataFrame
...@@ -12,6 +12,7 @@ In simple words this module: ...@@ -12,6 +12,7 @@ In simple words this module:
import re import re
import os import os
import json import json
import sys
import base64 import base64
import time import time
import dateutil.parser import dateutil.parser
...@@ -26,24 +27,21 @@ app = Flask(__name__, ) ...@@ -26,24 +27,21 @@ app = Flask(__name__, )
print("Starting to collect..") print("Starting to collect..")
# get config from environment # get config from environment
COLLECTION_DIR = os.getenv('COLLECTION_DIR', default='.') COLLECTION_DIR = os.getenv('COLLECTION_DIR', default='raw_data')
URL_BASE = 'http://{}:1026'.format(os.getenv('ORION_HOST', default='localhost')) URL_BASE = 'http://{}:1026'.format(os.getenv('ORION_HOST', default='localhost'))
LORA_PLATFORM_URL = os.getenv('LORA_PLATFORM_URL') LORA_PLATFORM_URL = os.getenv('LORA_PLATFORM_URL')
LORA_PLATFORM_SECRET_FILE = os.getenv('LORA_PLATFORM_SECRET_FILE', 'secret_carouge_lora') LORA_PLATFORM_SECRET_FILE = os.getenv('LORA_PLATFORM_SECRET_FILE', 'secret_carouge_lora')
with open(LORA_PLATFORM_SECRET_FILE, 'r') as file:
secret = file.read().replace('\n', '')
assert secret, 'No auth token defined for LoRa platform API'
# AUTH Header for lora platform
h = {'Authorization': secret}
# URN schema for Device entities # URN schema for Device entities
URN_BASE_DEVICE_ENTITY = "urn:ngsi-ld:Device:Device-" URN_BASE_DEVICE_ENTITY = "urn:ngsi-ld:Device:Device-"
URN_BASE_FLOWERBED_ENTITY = "urn:ngsi-ld:FlowerBed:FlowerBed-" URN_BASE_FLOWERBED_ENTITY = "urn:ngsi-ld:FlowerBed:FlowerBed-"
# HISTORICAL API (QuantumLeap):
NOTIFICATION_URL = 'http://{}:8668/v2/notify'.format(os.getenv('ORION_HOST', default='localhost'))
NOTIF_FLOWERBED = '5f071e73e44ddaac278abd3d'
NOTIF_DEVICE = '5fbf8489da7e7c9ac9757a7c'
# build header for POST # build header for POST
http_header_post = { http_header_post = {
'Fiware-Service': 'carouge', 'Fiware-Service': 'carouge',
...@@ -58,32 +56,68 @@ http_header_get = { ...@@ -58,32 +56,68 @@ http_header_get = {
} }
def get_ngsiv2_typed_description(val): def get_ngsiv2_typed_description(val, force_date_modified=None):
"""
This function updates the attributes data format and meta data for aligning it to ngsi v2
:param val: Value to be added to entity as attribute
:param force_date_modified: Optional datetime for updating dateModified meta data of attribute
:return: Dictionary of attribute typed and formated as NGSI-v2
"""
ret = {}
if force_date_modified and isinstance(force_date_modified, datetime.datetime):
# adds datetime metadata
ret.update(
{"metadata":
{"dateModified":
{"type": "DateTime", "value": str(force_date_modified.isoformat()).replace("+00:00", "Z")}
}
}
)
if force_date_modified and isinstance(force_date_modified, str):
# adds datetime metadata
ret.update(
{"metadata":
{"dateModified": {"type": "DateTime", "value": force_date_modified}}
}
)
assert "Z" in force_date_modified, "Only Zulu time is accepted"
if isinstance(val, Point): if isinstance(val, Point):
return {'type': 'geo:json', 'value': dict(val)} ret.update({'type': 'geo:json', 'value': dict(val)})
elif isinstance(val, int) or isinstance(val, float): elif isinstance(val, int) or isinstance(val, float):
return {'type': 'Number', 'value': val} ret.update({'type': 'Number', 'value': val})
elif isinstance(val, str): elif isinstance(val, str):
pat = r'[<>"\'=;()]' # see https://fiware-orion.readthedocs.io/en/master/user/forbidden_characters/index.html pat = r'[<>"\'=;()]' # see https://fiware-orion.readthedocs.io/en/master/user/forbidden_characters/index.html
r = re.compile(pat) r = re.compile(pat)
if r.search(val) is None: if r.search(val) is None:
new_val = val new_val = val
else: else:
new_val = re.sub(pat, '', val) new_val = re.sub(pat, '', val)
app.logger.warning("Special char(s) deleted. This <{}> turned into this <{}>".format(val, new_val)) app.logger.warning("Special char(s) deleted. This <{}> turned into this <{}>".format(val, new_val))
return {'type': 'Text', 'value': new_val}
ret.update({'type': 'Text', 'value': new_val})
elif isinstance(val, datetime.datetime): elif isinstance(val, datetime.datetime):
# replace +00:00 is the same as Z but Orion doesnt like it :/ # replace +00:00 is the same as Z but Orion doesnt like it :/
return {'type': 'DateTime', 'value': str(val.isoformat()).replace("+00:00", "Z")} ret.update({'type': 'DateTime', 'value': str(val.isoformat()).replace("+00:00", "Z")})
elif isinstance(val, dict): elif isinstance(val, dict):
return {'value': val} # bypassed ret.update({'value': val}) # bypassed
elif isinstance(val, list): elif isinstance(val, list):
return {'value': val} # bypassed ret.update({'value': val}) # bypassed
elif isinstance(val, type(None)): elif isinstance(val, type(None)):
return {'type': 'Text', 'value': None} ret.update({'type': 'Text', 'value': None})
else: else:
raise NotImplementedError("Couldn't get NGSIv2 type for object {} of type {}".format(val, type(val))) raise NotImplementedError("Couldn't get NGSIv2 type for object {} of type {}".format(val, type(val)))
return ret
# = = = = = = = = API HANDLERS = = = = = = = = = = = # = = = = = = = = API HANDLERS = = = = = = = = = = =
...@@ -213,7 +247,7 @@ def _get_reading(message_data): ...@@ -213,7 +247,7 @@ def _get_reading(message_data):
# deplexing data: # deplexing data:
# 42 -> it's a data frame! # 42 -> it's a data frame!
# c0 -> # c0 -> ?
# 01 -> configured as 0-10v input for channel A # 01 -> configured as 0-10v input for channel A
# 02a0fb -> data channel A # 02a0fb -> data channel A
# 01 -> configured as 0-10v input for channel A # 01 -> configured as 0-10v input for channel A
...@@ -237,50 +271,16 @@ def _get_reading(message_data): ...@@ -237,50 +271,16 @@ def _get_reading(message_data):
raise InvalidUsage(message=msg) raise InvalidUsage(message=msg)
# = = = = = = = = CONTEXT BROKER FUNCTIONS = = = = = = = = def get_model_device(deveui,
def _update_flowerbed_entity(soil_moisture, sensor_eui): comment = None,
flowerbed_id = None longitude = None,
latitude = None,
# check if refDevice attr defined for any FlowerBed last_reception = None,
url = "{}/v2/entities/?q=refDevice=={}&type=FlowerBed".format(URL_BASE, sensor_eui) dataFrame = None,
r = client_request.get( dataFrame_rssi = None,
url=url, dataFrame_timestamp = None,
headers=http_header_get meta_datetime=None):
) """ Returns data dict of Device entity following Device data model
if r.ok and r.json():
flowerbed_id = r.json()[0]['id']
if not flowerbed_id:
msg = 'No FlowerBed associated to sensor {}'.format(sensor_eui)
app.logger.error(msg)
return msg
app.logger.debug('Found FlowerBed {} associated to sensor {}'.format(flowerbed_id, sensor_eui))
body = {"soilMoisture": get_ngsiv2_typed_description(soil_moisture)}
url = "{}/v2/entities/{}/attrs".format(URL_BASE, flowerbed_id)
r = client_request.patch(
url=url,
headers=http_header_post,
json=body
)
app.logger.debug('Sending data to CTX broker: {}'.format(body))
if not (200 <= r.status_code < 300):
app.logger.error('HTTP status code not 2xx, something went wrong..')
app.logger.error(r.status_code)
app.logger.error(r.reason)
app.logger.error(r.json())
return "Got error {}".format(r.reason)
else:
app.logger.info('Entity updated (code {}), url {}'.format(r.status_code, url))
return "ok"
def _update_device_entity(deveui, comment, longitude, latitude, last_reception, dataFrame,
dataFrame_rssi, dataFrame_timestamp):
""" Creates/Updates LoRa devs into ctx broker
Output Data model: Output Data model:
-id (static) -id (static)
...@@ -312,23 +312,25 @@ def _update_device_entity(deveui, comment, longitude, latitude, last_reception, ...@@ -312,23 +312,25 @@ def _update_device_entity(deveui, comment, longitude, latitude, last_reception,
# static values # static values
body.update({'id': '{}{}'.format(URN_BASE_DEVICE_ENTITY, deveui[-4:])}) body.update({'id': '{}{}'.format(URN_BASE_DEVICE_ENTITY, deveui[-4:])})
body.update({'type': 'Device'}) body.update({'type': 'Device'})
body.update({'serialNumber': get_ngsiv2_typed_description(deveui)}) body.update({'serialNumber': get_ngsiv2_typed_description(deveui,meta_datetime)})
body.update({'controlledProperty': get_ngsiv2_typed_description(['soil_moisture'])}) body.update({'controlledProperty': get_ngsiv2_typed_description(['soil_moisture'],meta_datetime)})
body.update({'owner': get_ngsiv2_typed_description('Carouge')}) body.update({'owner': get_ngsiv2_typed_description('Carouge',meta_datetime)})
body.update({'source': get_ngsiv2_typed_description(LORA_PLATFORM_URL)}) body.update({'source': get_ngsiv2_typed_description(LORA_PLATFORM_URL,meta_datetime)})
# dynamic/real-time values # dynamic/real-time values
if longitude and latitude: if longitude and latitude:
body.update({'location': get_ngsiv2_typed_description(Point([longitude, latitude]))}) body.update({'location': get_ngsiv2_typed_description(Point([longitude, latitude]),meta_datetime)})
else:
body.update({'location': get_ngsiv2_typed_description(Point([0, 0]), meta_datetime)})
if comment: if comment:
body.update({'description': get_ngsiv2_typed_description(comment)}) body.update({'description': get_ngsiv2_typed_description(comment,meta_datetime)})
# parse device name hidden inside comment field # parse device name hidden inside comment field
pattern = r'{device_name}=([^\s]+)' pattern = r'{device_name}=([^\s]+)'
result = re.search(pattern, comment) result = re.search(pattern, comment)
if result: if result:
body.update({'name': get_ngsiv2_typed_description(result.groups()[0])}) body.update({'name': get_ngsiv2_typed_description(result.groups()[0],meta_datetime)})
if last_reception or dataFrame_timestamp: if last_reception or dataFrame_timestamp:
in_date = None in_date = None
...@@ -345,17 +347,189 @@ def _update_device_entity(deveui, comment, longitude, latitude, last_reception, ...@@ -345,17 +347,189 @@ def _update_device_entity(deveui, comment, longitude, latitude, last_reception,
except TypeError: except TypeError:
pass pass
if in_date: if in_date:
body.update({'dateLastValueReported': get_ngsiv2_typed_description(in_date)}) body.update({'dateLastValueReported': get_ngsiv2_typed_description(in_date,meta_datetime)})
else: else:
app.logger.warning("No date parsed for updaging Device field: dateLastValueReported") app.logger.warning("No date parsed for updating Device field: dateLastValueReported")
if dataFrame: if dataFrame:
moist, bat = _get_reading(dataFrame) moist, bat = _get_reading(dataFrame)
body.update({'value': get_ngsiv2_typed_description(moist)}) body.update({'value': get_ngsiv2_typed_description(moist,meta_datetime)})
body.update({'batteryLevel': get_ngsiv2_typed_description(bat)}) body.update({'batteryLevel': get_ngsiv2_typed_description(bat,meta_datetime)})
if dataFrame_rssi: if dataFrame_rssi:
body.update({'rssi': get_ngsiv2_typed_description(dataFrame_rssi)}) body.update({'rssi': get_ngsiv2_typed_description(dataFrame_rssi,meta_datetime)})
return body
# = = = = = = = = CONTEXT BROKER FUNCTIONS = = = = = = = =
def _update_flowerbed_entity(soil_moisture, sensor_eui):
flowerbed_id = None
# check if refDevice attr defined for any FlowerBed
url = "{}/v2/entities/?q=refDevice=={}&type=FlowerBed".format(URL_BASE, sensor_eui)
r = client_request.get(
url=url,
headers=http_header_get
)
if r.ok and r.json():
flowerbed_id = r.json()[0]['id']
if not flowerbed_id:
msg = 'No FlowerBed associated to sensor {}'.format(sensor_eui)
app.logger.error(msg)
return msg
app.logger.debug('Found FlowerBed {} associated to sensor {}'.format(flowerbed_id, sensor_eui))
body = {"soilMoisture": get_ngsiv2_typed_description(soil_moisture)}
url = "{}/v2/entities/{}/attrs".format(URL_BASE, flowerbed_id)
r = client_request.patch(
url=url,
headers=http_header_post,
json=body
)
app.logger.debug('Sending data to CTX broker: {}'.format(body))
if not (200 <= r.status_code < 300):
app.logger.error('HTTP status code not 2xx, something went wrong..')
app.logger.error(r.status_code)
app.logger.error(r.reason)
app.logger.error(r.json())
return "Got error {}".format(r.reason)
else:
app.logger.info('Entity updated (code {}), url {}'.format(r.status_code, url))
return "ok"
def _update_historical_db_from_local_raw_files():
"""Posts data to Quantum Leap API directly as notification (without passing through Context Manager)
Used for pushing locally collected data to historical API service
"""
json_files = [os.path.join(COLLECTION_DIR, x) for x in os.listdir(COLLECTION_DIR) if x.endswith("json")]
data_devices = []
data_flowrebed = []
for json_file in json_files:
if 'merged_output' in json_file:
continue
# get data from file
with open(json_file, "r") as f:
json_data = json.load(f)
try:
sensor_eui = json_data['deveui']
except TypeError:
print("Error found trying to parse json for file {}".format(json_file))
print(json_data)
sys.exit(2)
flowerbed_id = None
if not json_data['dataFrame'] or json_data['dataFrame'] == '':
print("(!) Warning found emtpy data frame from device {}".format(sensor_eui))
# generate Device entity
device_entity = get_model_device(
deveui=sensor_eui,
dataFrame=json_data['dataFrame'],
dataFrame_rssi=json_data['rssi'],
dataFrame_timestamp=json_data['timestamp'],
meta_datetime=json_data['timestamp']
)
# add to list of data to be sent to quantum leap
data_devices.append(device_entity)
# generate FlowerBed entity
# first lets get the FlowerBed id from the deveui
# check if refDevice attr defined for any FlowerBed
url = "{}/v2/entities/?q=refDevice=={}&type=FlowerBed".format(URL_BASE, sensor_eui)
r = client_request.get(
url=url,
headers=http_header_get
)
if r.ok and r.json():
flowerbed_id = r.json()[0]['id']
if not flowerbed_id:
msg = 'No FlowerBed associated to sensor {}'.format(sensor_eui)
app.logger.error(msg)
continue
else:
moist, _ = _get_reading(json_data['dataFrame'])
flowerbed_entity = {
'id': flowerbed_id,
'type': 'FlowerBed',
'soilMoisture': get_ngsiv2_typed_description(moist, json_data['timestamp']),
'refDevice':get_ngsiv2_typed_description(sensor_eui, json_data['timestamp']),
}
# add to list of data to be sent to quantum leap
data_flowrebed.append(flowerbed_entity)
# dump to file to check what will be sent
with open(os.path.join(COLLECTION_DIR, "merged_output_device.json"), "w") as f:
json.dump(data_devices, f)
# dump to file to check what will be sent
with open(os.path.join(COLLECTION_DIR, "merged_output_flowerbed.json"), "w") as f:
json.dump(data_flowrebed, f)
## uncoment this for verifying content of files before pushing!
#sys.exit(2)
r = client_request.post(
url=NOTIFICATION_URL,
headers=http_header_post,
json={"subscriptionId": NOTIF_DEVICE, "data": data_devices}
)
if not (200 <= r.status_code < 300):
print('HTTP status code not 2xx, something went wrong..')
print(r.status_code)
print(r.reason)
print(r.json())
else:
print('History has been rewritten for DEVICE entities! (HTTP response is 2xx)')
r = client_request.post(
url=NOTIFICATION_URL,
headers=http_header_post,
json={"subscriptionId": NOTIF_FLOWERBED, "data": data_flowrebed}
)
if not (200 <= r.status_code < 300):
print('HTTP status code not 2xx, something went wrong..')
print(r.status_code)
print(r.reason)
print(r.json())
else: