-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathraingauge.py
81 lines (62 loc) · 2.32 KB
/
raingauge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import logging
from time import sleep
from random import randrange
import lewas
from lewas.models import Measurement, Instrument
from lewas.stores import RESTStore
from lewas.leapi import fields as leapiFields
import lewas.rpi as rpi
import RPi.GPIO as GPIO
from lewas.sources import GPIOEventSource
logger = logging.getLogger(__name__)
import os
__RPi__ = os.uname()[4][:3] == 'arm'
class RandomAccumulator:
def __init__(self, start, stop):
self._start=start
self._stop=stop
@property
def count(self):
return randrange(self._start, self._stop)
def raingauge_reader(accumulator, interval):
while True:
yield accumulator.count
sleep(interval)
def parser(args):
config = args.config
inches_per_tip = float(config['inches_per_tip'])
report_interval = float(config['report_interval'])
units = config['units']
unit_time = units.split('/')[1]
time_scale = { 'h': 60*60, 'm': 60 }
scale = time_scale[unit_time]/report_interval
def parse(tips):
logging.debug('tips: {}, inches_per_tip: {}, scale: {}'.format(tips, inches_per_tip, scale))
m = Measurement(tips*inches_per_tip*scale, ('rain', 'intensity'), units)
return m
return parse
def datasource(config):
"""Raingauge data source"""
logger.log(logging.INFO, 'creating datasource')
interval = float(config['report_interval'])
if not __RPi__:
logger.log(logging.INFO, 'not running on a Raspberry Pi, using RandomAccumulator')
return raingauge_reader(RandomAccumulator(0,5), interval)
inpin = config['inpin']
outpin = config['outpin']
bouncetime = config.get('bouncetime', 300)
pins = [ rpi.inpin(inpin, pull_up_down=GPIO.PUD_DOWN),
rpi.outpin(outpin, GPIO.HIGH) ]
source = GPIOEventSource(pins, interval=interval, direction=GPIO.RISING, bouncetime=bouncetime)
return source
class RainGauge(Instrument):
__name__ = 'tipping_raingauge'
if __name__ == '__main__':
config = lewas.readConfig('./config')
raingaugeConfig = config.instruments['raingauge']
datastore = RESTStore(config, fields=leapiFields)
interval = float(raingaugeConfig['report_interval'])
tip_accumulator = RandomAccumulator(0,5)
reader = raingauge_reader(tip_accumulator, interval)
raingauge = RainGauge(reader, parser=parser(raingaugeConfig), site=config.site)
raingauge.run(datastore)