Skip to content

Commit a7fa8bc

Browse files
committed
Accomulate in db Counters, Refactor db.go, add configCounters
1 parent 8f71c80 commit a7fa8bc

File tree

4 files changed

+119
-64
lines changed

4 files changed

+119
-64
lines changed

config.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,14 @@ type configT struct {
3131
Pass string `yaml:"pass"`
3232
Name string `yaml:"name"`
3333
}
34+
Counters struct {
35+
WaterC []string `yaml:"water-c"`
36+
WaterH []string `yaml:"water-h"`
37+
}
3438
}
3539

3640
var config = configT{}
41+
var configCounters = make(map[string]uint8)
3742

3843
// TODO: switch to github.com/spf13/viper
3944
func configLoad() {
@@ -93,6 +98,15 @@ func configLoad() {
9398
check(err)
9499

95100
configPrintSummary()
101+
102+
//make optimize varibles
103+
for _, v := range config.Counters.WaterC {
104+
configCounters[v] = 1 // water cold
105+
}
106+
107+
for _, v := range config.Counters.WaterH {
108+
configCounters[v] = 2 // water hot
109+
}
96110
}
97111

98112
func logConfigItem(key string, rawValue interface{}) {

config.yml.sample

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,7 @@ clickhouse:
3535
pass: '*secret*'
3636
# HOMER_CLICKHOUSE_NAME - database name
3737
name: 'homer'
38+
39+
counters:
40+
water-c: ['water-c']
41+
water-h: ['water-h']

db.go

Lines changed: 99 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ package main
44

55
import (
66
"time"
7-
// "reflect"
7+
//"reflect"
88
"sort"
9+
"strconv"
910
"sync"
1011

1112
"github.com/montanaflynn/stats"
@@ -14,7 +15,7 @@ import (
1415
type dbItemT struct {
1516
ColName string
1617
ColType string
17-
ColVal float64
18+
ColVal string
1819
Time int64
1920
}
2021

@@ -31,12 +32,14 @@ func (q *dbQueueT) getChan() <-chan dbItemT {
3132
}
3233

3334
type dbT struct {
34-
m map[int64][]dbItemT
35+
m map[int64][]dbItemT
36+
clicks map[uint8][]int64
3537
}
3638

3739
func newDatabase() *dbT {
3840
return &dbT{
39-
m: make(map[int64][]dbItemT),
41+
m: make(map[int64][]dbItemT),
42+
clicks: make(map[uint8][]int64),
4043
}
4144
}
4245

@@ -45,7 +48,7 @@ func (c *dbT) Load(key int64) ([]dbItemT, bool) {
4548
return val, ok
4649
}
4750

48-
func (c *dbT) Add(key int64, item dbItemT) {
51+
func (c *dbT) AddMetric(key int64, item dbItemT) {
4952
column, ok := c.Load(key)
5053
if !ok {
5154
column = make([]dbItemT, 0, clickhouseMetricCount+1)
@@ -55,6 +58,48 @@ func (c *dbT) Add(key int64, item dbItemT) {
5558
c.m[key] = column
5659
}
5760

61+
func (c *dbT) AddClick(key uint8, time int64) {
62+
click, ok := c.clicks[key]
63+
if !ok {
64+
click = make([]int64, 0, 10)
65+
}
66+
67+
click = append(click, time)
68+
c.clicks[key] = click
69+
}
70+
71+
func (c *dbT) popMetricsFrom(from int64) (rows map[string][]float64) {
72+
keys := make([]int64, 0, len(c.m))
73+
for k := range c.m {
74+
keys = append(keys, k)
75+
}
76+
77+
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
78+
rows = make(map[string][]float64)
79+
for _, key := range keys {
80+
81+
if key >= from {
82+
//log.Debugf("key [%d] %v", key, item);
83+
for _, item := range c.m[key] {
84+
rowKey := item.ColName + ":" + item.ColType
85+
value, err := strconv.ParseFloat(item.ColVal, 64)
86+
if err != nil {
87+
log.Errorf("[%d] %s %s %s convert to float: %s", item.Time, item.ColName, item.ColType, item.ColVal, err)
88+
continue
89+
}
90+
91+
rows[rowKey] = append(rows[rowKey], value)
92+
}
93+
delete(c.m, key)
94+
} else {
95+
// FIXME need send it too (its late data)
96+
log.Warnf("key [%d] %v", key, c.m[key])
97+
}
98+
}
99+
100+
return rows
101+
}
102+
58103
var dbQueue dbQueueT
59104
var database *dbT
60105
var dbShutdownChan = make(chan bool)
@@ -68,45 +113,53 @@ func init() {
68113
}
69114
}
70115

71-
func dbAddMetric(fieldName string, fieldType string, valueInterface float64, time int64) {
116+
func dbInsert(fieldName string, fieldType string, valueInterface string, time int64) {
72117
dbQueue.Add(dbItemT{fieldName, fieldType, valueInterface, time})
73118
}
74119

75-
func dbAddEvent(fieldName string, fieldType string, valueInterface uint64, time int64) {
76-
//dbQueue.Add(dbItemT{fieldName, fieldType, valueInterface, time})
120+
func getClickTime(item dbItemT) (timestamp int64) {
121+
timestamp = item.Time
122+
value, err := strconv.ParseUint(item.ColVal, 10, 64)
123+
if err != nil {
124+
// handle error
125+
log.Errorf("Cant convert to time %s %s %s convert to int: %s", item.ColName, item.ColType, item.ColVal, err)
126+
return timestamp
127+
}
128+
129+
if value > 1000000000 {
130+
timestamp = int64(value)
131+
}
132+
133+
return timestamp
77134
}
78135

79136
func dbStore(item dbItemT) {
80137
//value := reflect.ValueOf(item.ColVal)
81138
//valueType := value.Type()
82139
log.Debugf("DB [%d] %s:%s = %v", item.Time, item.ColName, item.ColType, item.ColVal)
83-
database.Add(item.Time, item)
140+
switch item.ColType {
141+
case "count":
142+
if counterID, ok := configCounters[item.ColName]; ok {
143+
database.AddClick(counterID, getClickTime(item))
144+
} else {
145+
log.Warn("Unknown counter:", item.ColName)
146+
}
147+
case "move":
148+
log.Warn("MOVE")
149+
case "led":
150+
log.Warn("LED")
151+
case "temp", "humd", "pres":
152+
database.AddMetric(item.Time, item)
153+
default:
154+
log.Warnf("Unknown item.ColType: %s", item.ColType)
155+
}
84156
}
85157

86-
func dbDoTransfer() {
158+
func dbSaveMetrics() {
87159
now := time.Now()
88160
var timestamp = now.Unix()
89161
from := timestamp - 5
90-
keys := make([]int64, 0, len(database.m))
91-
for k := range database.m {
92-
keys = append(keys, k)
93-
}
94-
95-
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
96-
rows := make(map[string][]float64)
97-
for _, key := range keys {
98-
if key >= from {
99-
//log.Debugf("key [%d] %v", key, item);
100-
for _, item := range database.m[key] {
101-
rowKey := item.ColName + ":" + item.ColType
102-
rows[rowKey] = append(rows[rowKey], item.ColVal)
103-
}
104-
delete(database.m, key)
105-
} else {
106-
// FIXME need send it too (its late data)
107-
log.Warnf("key [%d] %v", key, database.m[key])
108-
}
109-
}
162+
rows := database.popMetricsFrom(from)
110163

111164
row := make(map[string]float64) // Result
112165
nowrow := make(map[string]float64) //next old row
@@ -133,23 +186,37 @@ func dbDoTransfer() {
133186
go clickhouseMetricInsert(timestamp, row)
134187
}
135188

189+
func dbProcessEvents() {
190+
// TODO
191+
// get current time in HH:mm
192+
// create consumption speed stats
193+
// save to db
194+
}
195+
136196
// resend accomulated data to clickhouse in one row per sec
137197
func dbLoop(ms uint32) {
138198
dbWg.Add(1)
139199
defer dbWg.Done()
140200
ticker := time.NewTicker(time.Millisecond * time.Duration(ms))
201+
tickerCounter := time.NewTicker(60 * time.Second)
141202
defer ticker.Stop()
203+
defer tickerCounter.Stop()
142204
itemChan := dbQueue.getChan()
143205
for {
144206
select {
145207
case item := <-itemChan:
146208
dbStore(item)
147209
case <-ticker.C:
148210
log.Debug("Make clickhouse row")
149-
dbDoTransfer()
211+
dbSaveMetrics()
212+
case <-ticker.C:
213+
log.Debug("Make counter row")
214+
dbProcessEvents()
150215
case <-dbShutdownChan:
151216
log.Debug("DB shuting down")
152-
dbDoTransfer()
217+
// TODO make it in gorutines/parallel
218+
dbSaveMetrics()
219+
dbProcessEvents()
153220
return
154221
}
155222
}

esp.go

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9-
"strconv"
109
"strings"
1110
"time"
1211
)
@@ -82,32 +81,6 @@ func espHandleStat(payload []byte, espName string) {
8281
espName, packet.Time, packet.Ap, packet.Rssi, packet.Free, packet.Vcc, packet.Uptime)
8382
}
8483

85-
func espHandleEvent(espRoom, espTheme, payloadStr string, timestamp int64) {
86-
value, err := strconv.ParseUint(payloadStr, 10, 64)
87-
if err != nil {
88-
// handle error
89-
log.Errorf("[%d] %s %s %s convert to int: %s", timestamp, espRoom, espTheme, payloadStr, err)
90-
return
91-
}
92-
93-
if value > 1000000000 {
94-
timestamp = int64(value)
95-
}
96-
97-
dbAddEvent(espRoom, espTheme, value, timestamp)
98-
}
99-
100-
func espHandleMetric(espRoom, espTheme, payloadStr string, timestamp int64) {
101-
value, err := strconv.ParseFloat(payloadStr, 64)
102-
if err != nil {
103-
// handle error
104-
log.Errorf("[%d] %s %s %s convert to float: %s", timestamp, espRoom, espTheme, payloadStr, err)
105-
return
106-
}
107-
108-
dbAddMetric(espRoom, espTheme, value, timestamp)
109-
}
110-
11184
func parseTopic(topic string) (espName, espTheme, espTag string, err error) {
11285
s := strings.Split(topic, "/")
11386
if s[1] == "esp" {
@@ -153,11 +126,8 @@ func espMessageHandler(topic string, payload []byte) {
153126

154127
switch espTheme {
155128
// int
156-
case "count", "move", "led":
157-
espHandleEvent(espRoom, espTheme, payloadStr, timestamp)
158-
// float
159-
case "temp", "humd", "pres":
160-
espHandleMetric(espRoom, espTheme, payloadStr, timestamp)
129+
case "count", "move", "led", "temp", "humd", "pres":
130+
dbInsert(espRoom, espTheme, payloadStr, timestamp)
161131

162132
case "debug":
163133
log.Debugf("Debug")

0 commit comments

Comments
 (0)