app.py 12.3 KB
Newer Older
1
2
3
4
5
6
7
#!/usr/bin/env python3

"""
Initializes FlowerBoxes data using LoRa platform metadata about sensors.
Listens to LoRa platform POSTS (sensor readings), reformats data, and forwards to IoT platform.

"""
8
import re
9
10
import os
import json
11
import base64
12
import time
13
import unidecode
14
15
16
import dateutil.parser
import datetime

17
18
19
20
from geojson import Point
from flask import Flask, jsonify
from flask import request
import requests as client_request
21

22
23
app = Flask(__name__, )
print("Starting to collect..")
24
25

# get config from environment
26
COLLECTION_DIR = os.getenv('COLLECTION_DIR', default='.')
27
28
URL_BASE = 'http://{}:1026'.format(os.getenv('ORION_HOST', default='localhost'))

29
30
31
32
33
LORA_PLATFORM_URL = 'https://{}'.format(os.getenv('LORA_PLATFORM_SERVER', default='lora-ns.sig-ge.ch:443'))
LORA_PLATFORM_SECRET_FILE = os.getenv('LORA_PLATFORM_SECRET_FILE', 'secret_carouge_lora')

with open(LORA_PLATFORM_SECRET_FILE, 'r') as file:
    secret = file.read().replace('\n', '')
34

35
36
assert secret, 'No auth token defined for LoRa platform API'

37
# AUTH Header for lora platform
38
39
h = {'Authorization': secret}

40
41
42
43
# URN schema for Device entities
URN_BASE_DEVICE_ENTITY = "urn:ngsi-ld:Device:Device-"
URN_BASE_FLOWERBED_ENTITY = "urn:ngsi-ld:FlowerBed:FlowerBed-"

44
# build header for IoT platform POSTs
45
46
47
48
http_header_post = {
    'Fiware-Service': 'carouge',
    'Content-Type': 'application/json',
}
49

50

51
52
53
54
55
56
def get_ngsiv2_typed_description(val):
    if isinstance(val, Point):
        return {'type': 'geo:json', 'value': dict(val)}
    elif isinstance(val, int) or isinstance(val, float):
        return {'type': 'Number', 'value': val}
    elif isinstance(val, str):
57
58
59
60
61
62
63
64
        # do some special chars cleaning
        special_chars = ["'", "(", ")", "{", "}", "|"]
        unaccented_comment = unidecode.unidecode(val)
        for ch in special_chars:
            unaccented_comment = unaccented_comment.replace(ch, "")
            app.logger.warning("Special char deleted. This <{}> turned into this <{}>".format(val,unaccented_comment))
        return {'type': 'Text', 'value': unaccented_comment}

65
    elif isinstance(val, datetime.datetime):
66
67
        # replace +00:00 is the same as Z but Orion doesnt like it :/
        return {'type': 'DateTime', 'value': str(val.isoformat()).replace("+00:00", "Z")}
68
69
70
71
72
73
74
75
    elif isinstance(val, dict):
        return {'value': val}  # bypassed
    elif isinstance(val, list):
        return {'value': val}  # bypassed
    elif isinstance(val, type(None)):
        return {'type': 'Text', 'value': None}
    else:
        raise NotImplementedError("Couldn't get NGSIv2 type for object {} of type {}".format(val, type(val)))
76

77
78

# = = = = = = = = API HANDLERS = = = = = = = =  = = =
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

class InvalidUsage(Exception):
    status_code = 400

    def __init__(self, message, status_code=None, payload=None):
        Exception.__init__(self)
        self.message = message
        if status_code is not None:
            self.status_code = status_code
        self.payload = payload

    def to_dict(self):
        rv = dict(self.payload or ())
        rv['message'] = self.message
        return rv


@app.errorhandler(InvalidUsage)
def handle_invalid_usage(error):
    response = jsonify(error.to_dict())
    response.status_code = error.status_code
    return response


103
104
105
106
107
108
@app.route('/')
@app.route('/healthcheck')
def healthcheck():
    return 'This service is up and running!'


109
110
# = = = = = = = = MOISTURE SENSORS AUX FUNCTIONS = = = = = = = =

111
def _log_level_of_moisture(cb, sensor):
112
    """ Recommended levels for standard plants:
113
114
115
116
117
118
119
120
121
122
123
124
        0-10 Centibars = Saturated soil
        10-30 Centibars = Soil is adequately wet (except coarse sands, which are drying)
        30-60 Centibars = Usual range for irrigation (most soils)
        60-100 Centibars = Usual range for irrigation in heavy clay
        100-200 Centibars = Soil is becoming dangerously dry- proceed with caution!

    :param cb:
    :param sensor:
    :return:
    """

    if cb < 10:
125
        app.logger.warning("{} indicates soil is saturated wet - {} cb!".format(sensor, cb))
126
    elif cb < 30:
127
        app.logger.info("{} indicates soil is adequately wet - {} cb".format(sensor, cb))
128
    elif cb < 60:
129
        app.logger.info("{} indicates soil in usual range of irrigation for moist soils - {} cb".format(sensor, cb))
130
    elif cb < 100:
131
        app.logger.warning("{} indicates soil needs irrigation - {} cb".format(sensor, cb))
132
    elif cb < 200:
133
        app.logger.warning("{} indicates soil is dangerously dry - {} cb".format(sensor, cb))
134
    else:
135
        app.logger.error("Measurement not in expected range - {} cb".format(cb))
136
137
138
139


def _conv_micro_volts_to_moiture_cb(micro_volts):
    """
140
141
    Lora Bridge device has to channels (A and B).
    Only the channel A is connected. To read correctly the value from the channel A, you should take the byte 3 to 5.
142
143
    Then, you should decoded from Hex to Decimal. IMPORTANT: The value is expressed in µV. (1V = 1.000.000µV)

144
145
    Sensor Voltage range is from 0V to 3V.
    This voltage range is linearly dependent to the real value of the sensor, which is expressed in cb (kPa):
146
147
148
149
150
151
152
153
154
155
156
157

    - 0V = Plants are wet - do not need watering (0cb)
    - 3V = Plant are dry - need watering (239cb)

    :param micro_volts:
    :return: pressure (in cb ~ KPa)
    """
    assert type(micro_volts) is int

    if micro_volts == 0:
        return 0

158
    return int((micro_volts * 239) / 3000000)
159
160


161
162
163
164
165
166
167
168
169
def _get_reading(message_data):
    """
    >>> base64.standard_b64decode("QmABACcnAQAAAA==").hex()
    '42600100272701000000'

    :param message_data:
    :return:
    """

170
171
    reading_hex = base64.standard_b64decode(message_data).hex()

172
    if reading_hex[0:2] == '42':  # it's a dataFrame
173
        return _conv_micro_volts_to_moiture_cb(int(reading_hex[6:12], 16))
174
175

    else:
176
177
178
        msg = 'Not a sensor reading (expected 0x42) {}'.format(reading_hex)
        app.logger.warning(msg)
        raise InvalidUsage(message=msg)
179
180


181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# = = = = = = = = CONTEXT BROKER FUNCTIONS = = = = = = = =

def _forward_lora_dev_data_to_ctx_broker(deveui, comment, longitude, latitude, last_reception, dataFrame,
                                         dataFrame_rssi, dataFrame_timestamp):
    """ Creates/Updates LoRa devs into ctx broker

    Output Data model:
        -id (static)
        -type (static)
        -serialNumber (static)
        -controlledProperty (static)
        -owner (static)
        -source (static)
        -description
        -name
        -dateLastValueReported
        -value
        -location
        -rssi

    NOTES:
        name is regexed from LoRa platform <comment> field of device,
        e.g. This device is below the big tree next to the tram station {device_name}=dev_2 , note by Maurizio!"
        incurs into having name="dev_2"

    """
    assert deveui

    body = {}

    app.logger.info('Creating/Updating devices in CTX broker for deveui {}'.format(deveui))

    # static values
    body.update({'id': '{}{}'.format(URN_BASE_DEVICE_ENTITY, deveui[-4:])})
    body.update({'type': 'Device'})
    body.update({'serialNumber': get_ngsiv2_typed_description(deveui)})
    body.update({'controlledProperty': get_ngsiv2_typed_description(['soil_moisture'])})
    body.update({'owner': get_ngsiv2_typed_description('Carouge')})
    body.update({'source': get_ngsiv2_typed_description(LORA_PLATFORM_URL)})

    # dynamic/real-time values
    if longitude and latitude:
        body.update({'location': get_ngsiv2_typed_description(Point([longitude, latitude]))})

    if comment:
226
        body.update({'description': get_ngsiv2_typed_description(comment)})
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257

        # parse device name hiden inside comment field
        pattern = r'{device_name}=([^\s]+)'
        result = re.search(pattern, unaccented_comment)
        if result:
            body.update({'name': get_ngsiv2_typed_description(result.groups()[0])})

    if last_reception or dataFrame_timestamp:
        in_date = None
        try:
            in_date = dateutil.parser.isoparse(last_reception)
            assert in_date and isinstance(in_date, datetime.datetime), \
                'Didnt get expected type from LoRa platform for last_reception'
        except TypeError:
            pass
        try:
            in_date = dateutil.parser.isoparse(dataFrame_timestamp)
            assert in_date and isinstance(in_date, datetime.datetime), \
                'Didnt get expected type from LoRa platform for dataFrame_timestamp'
        except TypeError:
            pass
        if in_date:
            body.update({'dateLastValueReported': get_ngsiv2_typed_description(in_date)})
        else:
            app.logger.warning("No date parsed for updaging Device field: dateLastValueReported")

    if dataFrame:
        body.update({'value': get_ngsiv2_typed_description(_get_reading(dataFrame))})

    if dataFrame_rssi:
        body.update({'rssi': get_ngsiv2_typed_description(dataFrame_rssi)})
258

259
260
261
262
    app.logger.debug('Sending data to CTX broker: {}'.format(body))

    url = "{}/v2/entities/?options=upsert".format(URL_BASE)
    r = client_request.post(
263
        url=url,
264
        json=body,
265
266
267
268
269
270
271
272
        headers=http_header_post
    )

    if not (200 <= r.status_code < 300):
        app.logger.error('HTTP status code not 2xx, something went wrong..')
        app.logger.error(r.status_code)
        app.logger.error(r.reason)
        app.logger.error(r.json())
273
        return "Got error {}".format(r.reason)
274
275
276
277
278
    else:
        app.logger.info('HTTP response is {} - Entity updated'.format(r.status_code))
        return "ok"


279
# = = = = = = = = OTHER AUX FUNCTIONS = = = = = = = =
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def _dump_raw_data_to_filesystem():
    try:
        filepath = os.path.join(COLLECTION_DIR, '{}.json'.format(int(time.time())))
        with open(filepath, encoding='utf-8', mode='w') as f:
            json.dump(
                obj=request.json,
                fp=f
            )

    except Exception as e:
        app.logger.error(e)
        return 'error'

    return 'ok'


296
def _dummy_post_handler():
297
298
299
300
301
302
303
304
    if request.data:
        app.logger.info("Request data: %s" % request.data)
    if request.form:
        app.logger.info("Request form: %s" % request.form)

    return 'Got POST with json %s' % request.json


305
# = = = = = = = = FLASK ROUTES = = = = = = = =
306
307
@app.route('/dca-carouge-watering-sensed-data', methods=['POST'])
def post_dca_carouge_watering_sensed_data():
308
    return _dummy_post_handler()
309

310
311
312

@app.route('/dca-carouge-watering-sensed-data/test ', methods=['POST'])
def post_dca_carouge_watering_sensed_data_test():
313
    return _dummy_post_handler()
314
315
316
317
318
319


@app.route('/dca-carouge-watering-sensed-data/rest/callback/payloads/ul', methods=['POST'])
def post_dca_carouge_watering_sensed_data_payload():
    resp = {}
    resp.update({"fs_dump": _dump_raw_data_to_filesystem()})
320
321
322
323
324
325
326
327
328
329
    resp.update({"update_devices_entities": _forward_lora_dev_data_to_ctx_broker(
        deveui=request.json['deveui'],
        comment=None,
        longitude=None,
        latitude=None,
        last_reception=None,
        dataFrame=request.json['dataFrame'],
        dataFrame_rssi=request.json['rssi'],
        dataFrame_timestamp=request.json['timestamp']
    )})
330
    # resp.update({'description': _dummy_post_handler()})
331
    return resp
332

333
334
335
@app.route('/dca-carouge-watering-sensed-data/rest/callback/nodeinfo', methods=['PUT'])
def put_dca_carouge_watering_sensed_node_info():
    _dummy_post_handler()
336
337
338
    raise InvalidUsage(message='This is still not implemented', payload=request.json)


339
340
@app.route('/dca-carouge-watering-sensed-data', methods=['GET'])
def get_dca_carouge_watering_sensed_data():
341
    raise InvalidUsage(message='This is still not implemented')
342

343
344
345

@app.route('/dca-carouge-watering-sensed-data/rest/callback/payloads/ul', methods=['GET'])
def get_dca_carouge_watering_sensed_data_callback():
346
    raise InvalidUsage(message='This is still not implemented')
347
348


349
# = = = = = = = = MAIN = = = = = = = =
350
def init():
351
352
353
    # initialization of devices info in CTX broker

    # pull data from LoRa platform
354
355
    lora_nodes_info = client_request.get(url=LORA_PLATFORM_URL + '/rest/nodes', headers=h).json()

356
    # push data to CTX broker
357
    for i in lora_nodes_info:
358
359
360
361
362
363
364
365
366
        _forward_lora_dev_data_to_ctx_broker(
            deveui=i['deveui'],
            comment=i['comment'],
            longitude=i['longitude'],
            latitude=i['latitude'],
            last_reception=i['last_reception'],
            dataFrame=None,
            dataFrame_rssi=None,
            dataFrame_timestamp=None
367
368
369
370
        )


def main():
371
    app.run(host="0.0.0.0", debug=True, port=80)
372

373
374
375
376

if __name__ == "__main__":
    init()
    main()