app.py 15.8 KB
Newer Older
1
2
3
4
5
#!/usr/bin/env python3

"""
Listens to LoRa platform POSTS (sensor readings), reformats data, and forwards to IoT platform.

6
7
8
9
10
In simple words this module:
- Initializes Device entities
- Updates Device and Flowerbed entities' attributes.

# datetime for now should be datetime.datetime.now(datetime.timezone.utc).isoformat().replace("+00:00", "Z")
11
"""
12
import re
13
14
import os
import json
15
import base64
16
import time
17
18
19
import dateutil.parser
import datetime

20
21
22
23
from geojson import Point
from flask import Flask, jsonify
from flask import request
import requests as client_request
24

25
26
app = Flask(__name__, )
print("Starting to collect..")
27
28

# get config from environment
29
COLLECTION_DIR = os.getenv('COLLECTION_DIR', default='.')
30
31
URL_BASE = 'http://{}:1026'.format(os.getenv('ORION_HOST', default='localhost'))

32
LORA_PLATFORM_URL = os.getenv('LORA_PLATFORM_URL')
33
34
35
36
LORA_PLATFORM_SECRET_FILE = os.getenv('LORA_PLATFORM_SECRET_FILE', 'secret_carouge_lora')

with open(LORA_PLATFORM_SECRET_FILE, 'r') as file:
    secret = file.read().replace('\n', '')
37

38
39
assert secret, 'No auth token defined for LoRa platform API'

40
# AUTH Header for lora platform
41
42
h = {'Authorization': secret}

43
44
45
46
# URN schema for Device entities
URN_BASE_DEVICE_ENTITY = "urn:ngsi-ld:Device:Device-"
URN_BASE_FLOWERBED_ENTITY = "urn:ngsi-ld:FlowerBed:FlowerBed-"

47
# build header for POST
48
49
50
http_header_post = {
    'Fiware-Service': 'carouge',
    'Content-Type': 'application/json',
51
    'Accept': 'application/json',
52
}
53

54
55
56
57
# build header for GET
http_header_get = {
    'Fiware-Service': 'carouge',
    'Accept': 'application/json',
58
59
}

60

61
62
63
64
65
66
def get_ngsiv2_typed_description(val):
    if isinstance(val, Point):
        return {'type': 'geo:json', 'value': dict(val)}
    elif isinstance(val, int) or isinstance(val, float):
        return {'type': 'Number', 'value': val}
    elif isinstance(val, str):
67
        pat = r'[<>"\'=;()]'  # see https://fiware-orion.readthedocs.io/en/master/user/forbidden_characters/index.html
68
69
70
71
        r = re.compile(pat)
        if r.search(val) is None:
            new_val = val
        else:
72
73
            new_val = re.sub(pat, '', val)
            app.logger.warning("Special char(s) deleted. This <{}> turned into this <{}>".format(val, new_val))
74
        return {'type': 'Text', 'value': new_val}
75
    elif isinstance(val, datetime.datetime):
76
77
        # replace +00:00 is the same as Z but Orion doesnt like it :/
        return {'type': 'DateTime', 'value': str(val.isoformat()).replace("+00:00", "Z")}
78
79
80
81
82
83
84
85
    elif isinstance(val, dict):
        return {'value': val}  # bypassed
    elif isinstance(val, list):
        return {'value': val}  # bypassed
    elif isinstance(val, type(None)):
        return {'type': 'Text', 'value': None}
    else:
        raise NotImplementedError("Couldn't get NGSIv2 type for object {} of type {}".format(val, type(val)))
86

87
88

# = = = = = = = = API HANDLERS = = = = = = = =  = = =
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112

class InvalidUsage(Exception):
    status_code = 400

    def __init__(self, message, status_code=None, payload=None):
        Exception.__init__(self)
        self.message = message
        if status_code is not None:
            self.status_code = status_code
        self.payload = payload

    def to_dict(self):
        rv = dict(self.payload or ())
        rv['message'] = self.message
        return rv


@app.errorhandler(InvalidUsage)
def handle_invalid_usage(error):
    response = jsonify(error.to_dict())
    response.status_code = error.status_code
    return response


113
114
115
116
117
118
@app.route('/')
@app.route('/healthcheck')
def healthcheck():
    return 'This service is up and running!'


119
120
# = = = = = = = = MOISTURE SENSORS AUX FUNCTIONS = = = = = = = =

121
def _log_level_of_moisture(cb, sensor):
122
    """ Recommended levels for standard plants:
123
124
125
126
127
128
129
130
131
132
133
134
        0-10 Centibars = Saturated soil
        10-30 Centibars = Soil is adequately wet (except coarse sands, which are drying)
        30-60 Centibars = Usual range for irrigation (most soils)
        60-100 Centibars = Usual range for irrigation in heavy clay
        100-200 Centibars = Soil is becoming dangerously dry- proceed with caution!

    :param cb:
    :param sensor:
    :return:
    """

    if cb < 10:
135
        app.logger.warning("{} indicates soil is saturated wet: {} cb!".format(sensor, cb))
136
    elif cb < 30:
137
        app.logger.info("{} indicates soil is adequately wet: {} cb".format(sensor, cb))
138
    elif cb < 60:
139
        app.logger.info("{} indicates soil in usual range of irrigation for moist soils: {} cb".format(sensor, cb))
140
    elif cb < 100:
141
        app.logger.warning("{} indicates soil needs irrigation: {} cb".format(sensor, cb))
142
    elif cb < 200:
143
        app.logger.warning("{} indicates soil is dangerously dry: {} cb".format(sensor, cb))
144
    else:
145
        app.logger.error("Measurement not in expected range: {} cb".format(cb))
146

Federico Sismondi's avatar
Federico Sismondi committed
147

148
149
150
151
152
153
154
def _log_level_of_battery_level(bat, sensor):

    if bat < 0.5:
        app.logger.warning("{} battery level is critically low: {} %!".format(sensor, bat))
    else:
        app.logger.info("{} battery level is: {} %".format(sensor, bat))

155
156
157

def _conv_micro_volts_to_moiture_cb(micro_volts):
    """
158
159
    Sensor Voltage range is from 0V to 3V.
    This voltage range is linearly dependent to the real value of the sensor, which is expressed in cb (kPa):
160
161
162
163
164
165

    - 0V = Plants are wet - do not need watering (0cb)
    - 3V = Plant are dry - need watering (239cb)

    :param micro_volts:
    :return: pressure (in cb ~ KPa)
166

167
168
169
170
171
172
    """
    assert type(micro_volts) is int

    if micro_volts == 0:
        return 0

173
    return int((micro_volts * 239) / 3000000)
174
175


176
177
178
179
180
181
182
183
184
185
186
187
188
189
def _conv_micro_volts_to_battery_level(micro_volts):
    """Battery is 0-12v
    but is connected with a Voltage divider R1=R2= 5 K ohm
    https://en.wikipedia.org/wiki/Voltage_divider

    - measured voltage is half on the real value
    - measured voltage is in micro volts

    hence:
        bat_percetage = 100 * (v_input /1.000.000 ) * 2 / 12

    :param micro_volts:
    :return: battery percentage 0-100%

190
    """
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
    assert type(micro_volts) is int

    if micro_volts == 0:
        return 0

    return 2 * (micro_volts / 1000000) / 12


def _get_reading(message_data):
    """ Lora Bridge device has to channels (A and B).
    channel A is connected to moisture sensors
    channel B is connected to battery voltage output

    To read correctly the value from the channel A, you should take the byte 3 to 5 (included).
    To read correctly the value from the channel B, you should take the byte 6 to 8 (included).
    Then, you should decoded from Hex to Decimal.
    IMPORTANT: The value is expressed in µV if sensor is configured as voltage input (1V = 1.000.000µV)
    For more info look at LoRaWAN 863-870 ANALOG PWR - Guide utilisateur / User guide version V2.0.2


    >>> base64.standard_b64decode("QsABAqD7AVL5yQ==").hex()
    '42c00102a0fb0152f9c9'

    # deplexing data:
    # 42 -> it's a data frame!
    # c0 ->
    # 01 -> configured as 0-10v input for channel A
    # 02a0fb -> data channel A
    # 01 -> configured as 0-10v input for channel A
    # 52f9c9 -> data channel B
221
222

    :param message_data:
223
    :return: moisture, battery_level
224
225
    """

226
227
    reading_hex = base64.standard_b64decode(message_data).hex()

228
    if reading_hex[0:2] == '42':  # it's a dataFrame
229
230
231
232

        moisture = _conv_micro_volts_to_moiture_cb(int(reading_hex[6:12], 16))
        battery_level = _conv_micro_volts_to_battery_level(int(reading_hex[14:20], 16))
        return moisture, battery_level
233
234

    else:
235
236
237
        msg = 'Not a sensor reading (expected 0x42) {}'.format(reading_hex)
        app.logger.warning(msg)
        raise InvalidUsage(message=msg)
238
239


240
# = = = = = = = = CONTEXT BROKER FUNCTIONS = = = = = = = =
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def _update_flowerbed_entity(soil_moisture, sensor_eui):
    flowerbed_id = None

    # check if refDevice attr defined for any FlowerBed
    url = "{}/v2/entities/?q=refDevice=={}&type=FlowerBed".format(URL_BASE, sensor_eui)
    r = client_request.get(
        url=url,
        headers=http_header_get
    )
    if r.ok and r.json():
        flowerbed_id = r.json()[0]['id']

    if not flowerbed_id:
        msg = 'No FlowerBed associated to sensor {}'.format(sensor_eui)
        app.logger.warning(msg)
        return msg
257

258
259
    app.logger.debug('Found FlowerBed {} associated to sensor {}'.format(flowerbed_id, sensor_eui))

Federico Sismondi's avatar
Federico Sismondi committed
260
    body = {"soilMoisture": get_ngsiv2_typed_description(soil_moisture)}
261
    url = "{}/v2/entities/{}/attrs".format(URL_BASE, flowerbed_id)
262
263
    r = client_request.patch(
        url=url,
264
        headers=http_header_post,
Federico Sismondi's avatar
Federico Sismondi committed
265
        json=body
266
267
    )

Federico Sismondi's avatar
Federico Sismondi committed
268
269
    app.logger.debug('Sending data to CTX broker: {}'.format(body))

270
271
272
273
274
    if not (200 <= r.status_code < 300):
        app.logger.error('HTTP status code not 2xx, something went wrong..')
        app.logger.error(r.status_code)
        app.logger.error(r.reason)
        app.logger.error(r.json())
275
        return "Got error {}".format(r.reason)
276
    else:
Federico Sismondi's avatar
Federico Sismondi committed
277
        app.logger.info('Entity updated (code {}), url {}'.format(r.status_code, url))
278
279
280
        return "ok"


281
282
def _update_device_entity(deveui, comment, longitude, latitude, last_reception, dataFrame,
                          dataFrame_rssi, dataFrame_timestamp):
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
    """ Creates/Updates LoRa devs into ctx broker

    Output Data model:
        -id (static)
        -type (static)
        -serialNumber (static)
        -controlledProperty (static)
        -owner (static)
        -source (static)
        -description
        -name
        -dateLastValueReported
        -value
        -location
        -rssi
298
        -batteryLevel
299
300
301
302
303

    NOTES:
        name is regexed from LoRa platform <comment> field of device,
        e.g. This device is below the big tree next to the tram station {device_name}=dev_2 , note by Maurizio!"
        incurs into having name="dev_2"
304

305
306
    """
    assert deveui
307

308
    body = {}
309

310
311
312
313
314
315
316
317
318
    app.logger.info('Creating/Updating devices in CTX broker for deveui {}'.format(deveui))

    # static values
    body.update({'id': '{}{}'.format(URN_BASE_DEVICE_ENTITY, deveui[-4:])})
    body.update({'type': 'Device'})
    body.update({'serialNumber': get_ngsiv2_typed_description(deveui)})
    body.update({'controlledProperty': get_ngsiv2_typed_description(['soil_moisture'])})
    body.update({'owner': get_ngsiv2_typed_description('Carouge')})
    body.update({'source': get_ngsiv2_typed_description(LORA_PLATFORM_URL)})
319

320
321
322
323
324
    # dynamic/real-time values
    if longitude and latitude:
        body.update({'location': get_ngsiv2_typed_description(Point([longitude, latitude]))})

    if comment:
325
        body.update({'description': get_ngsiv2_typed_description(comment)})
326

Federico Sismondi's avatar
Federico Sismondi committed
327
        # parse device name hidden inside comment field
328
        pattern = r'{device_name}=([^\s]+)'
Federico Sismondi's avatar
Federico Sismondi committed
329
        result = re.search(pattern, comment)
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
        if result:
            body.update({'name': get_ngsiv2_typed_description(result.groups()[0])})

    if last_reception or dataFrame_timestamp:
        in_date = None
        try:
            in_date = dateutil.parser.isoparse(last_reception)
            assert in_date and isinstance(in_date, datetime.datetime), \
                'Didnt get expected type from LoRa platform for last_reception'
        except TypeError:
            pass
        try:
            in_date = dateutil.parser.isoparse(dataFrame_timestamp)
            assert in_date and isinstance(in_date, datetime.datetime), \
                'Didnt get expected type from LoRa platform for dataFrame_timestamp'
        except TypeError:
            pass
        if in_date:
            body.update({'dateLastValueReported': get_ngsiv2_typed_description(in_date)})
349
        else:
350
351
352
            app.logger.warning("No date parsed for updaging Device field: dateLastValueReported")

    if dataFrame:
353
354
355
356
        moist, bat = _get_reading(dataFrame)
        body.update({'value': get_ngsiv2_typed_description(moist)})
        body.update({'batteryLevel': get_ngsiv2_typed_description(bat)})

357
358
    if dataFrame_rssi:
        body.update({'rssi': get_ngsiv2_typed_description(dataFrame_rssi)})
359

360
361
362
363
    app.logger.debug('Sending data to CTX broker: {}'.format(body))

    url = "{}/v2/entities/?options=upsert".format(URL_BASE)
    r = client_request.post(
364
        url=url,
365
        json=body,
366
367
368
369
370
371
372
373
        headers=http_header_post
    )

    if not (200 <= r.status_code < 300):
        app.logger.error('HTTP status code not 2xx, something went wrong..')
        app.logger.error(r.status_code)
        app.logger.error(r.reason)
        app.logger.error(r.json())
374
        return "Got error {}".format(r.reason)
375
    else:
Federico Sismondi's avatar
Federico Sismondi committed
376
        app.logger.info('Entity updated (code {}), url {}'.format(r.status_code, url))
377
378
379
        return "ok"


380
# = = = = = = = = OTHER AUX FUNCTIONS = = = = = = = =
381
def _dump_raw_data_to_filesystem():
382
    app.logger.info('dumping request to FS')
383
384
    try:
        filepath = os.path.join(COLLECTION_DIR, '{}.json'.format(int(time.time())))
385

386
387
388
389
390
391
392
393
394
395
        with open(filepath, encoding='utf-8', mode='w') as f:
            json.dump(
                obj=request.json,
                fp=f
            )

    except Exception as e:
        app.logger.error(e)
        return 'error'

396
    app.logger.info("Dumped raw data at file {}".format(filepath))
397
398
399
    return 'ok'


400
def _dummy_post_handler():
401
402
403
404
405
406
407
408
    if request.data:
        app.logger.info("Request data: %s" % request.data)
    if request.form:
        app.logger.info("Request form: %s" % request.form)

    return 'Got POST with json %s' % request.json


409
# = = = = = = = = FLASK ROUTES = = = = = = = =
410
411
@app.route('/dca-carouge-watering-sensed-data', methods=['POST'])
def post_dca_carouge_watering_sensed_data():
412
    return _dummy_post_handler()
413

414
415
416

@app.route('/dca-carouge-watering-sensed-data/test ', methods=['POST'])
def post_dca_carouge_watering_sensed_data_test():
417
    return _dummy_post_handler()
418
419
420
421


@app.route('/dca-carouge-watering-sensed-data/rest/callback/payloads/ul', methods=['POST'])
def post_dca_carouge_watering_sensed_data_payload():
422
    # get values
423
424
425

    soil_humidity, battery_level = _get_reading(request.json['dataFrame'])
    sensor_eui = request.json['deveui']
426

427
    # log reading
428
    _log_level_of_moisture(soil_humidity, sensor_eui)
429
    _log_level_of_battery_level(battery_level,sensor_eui)
430

431
    # saves locally and pushed data to FlowerBed and Device entities
432
433
    resp = {}
    resp.update({"fs_dump": _dump_raw_data_to_filesystem()})
434
435
436
437
438
439
440
441
442
443
444
445
446
447
    resp.update({"update_flowerbed_entities": _update_flowerbed_entity(
        soil_humidity,
        sensor_eui
    )})
    resp.update({"update_device_entities": _update_device_entity(
        deveui=request.json['deveui'],
        comment=None,
        longitude=None,
        latitude=None,
        last_reception=None,
        dataFrame=request.json['dataFrame'],
        dataFrame_rssi=request.json['rssi'],
        dataFrame_timestamp=request.json['timestamp']
    )})
448

449
    return resp
450
451


452
453
454
@app.route('/dca-carouge-watering-sensed-data/rest/callback/nodeinfo', methods=['PUT'])
def put_dca_carouge_watering_sensed_node_info():
    _dummy_post_handler()
455
456
457
    raise InvalidUsage(message='This is still not implemented', payload=request.json)


458
459
@app.route('/dca-carouge-watering-sensed-data', methods=['GET'])
def get_dca_carouge_watering_sensed_data():
460
    raise InvalidUsage(message='This is still not implemented')
461

462
463
464

@app.route('/dca-carouge-watering-sensed-data/rest/callback/payloads/ul', methods=['GET'])
def get_dca_carouge_watering_sensed_data_callback():
465
    raise InvalidUsage(message='This is still not implemented')
466
467


468
# = = = = = = = = MAIN = = = = = = = =
469
def init():
470
471
472
    # initialization of devices info in CTX broker

    # pull data from LoRa platform
473
474
    lora_nodes_info = client_request.get(url=LORA_PLATFORM_URL + '/rest/nodes', headers=h).json()

475
    # push data to CTX broker
476
    for i in lora_nodes_info:
477
        _update_device_entity(
478
479
480
481
482
483
484
485
            deveui=i['deveui'],
            comment=i['comment'],
            longitude=i['longitude'],
            latitude=i['latitude'],
            last_reception=i['last_reception'],
            dataFrame=None,
            dataFrame_rssi=None,
            dataFrame_timestamp=None
486
487
488
489
        )


def main():
490
    app.run(host="0.0.0.0", debug=True, port=80)
491

492
493
494
495

if __name__ == "__main__":
    init()
    main()