@@ -20,14 +20,16 @@ var clickhouseMetrics = make(map[string]struct{})
20
20
var clickhouseDb * sql.DB
21
21
var clickhouseCtx = context .Background ()
22
22
23
+ const clickhouseMetricType = "Nullable(Float32)"
24
+
23
25
func clickhouseCheckFieldName (name string ) (result bool ) {
24
26
re := regexp .MustCompile (`^[^a-zA-Z0-9:\-_]+$` )
25
27
result = re .MatchString (name )
26
28
return ! result
27
29
}
28
30
29
31
func clickhouseAddMetric (fieldName , fieldType string ) {
30
- if fieldType == "Nullable(Float32)" {
32
+ if fieldType == clickhouseMetricType {
31
33
clickhouseMetrics [fieldName ] = struct {}{}
32
34
clickhouseMetricCount = len (clickhouseMetrics )
33
35
log .Info ("Metric init " , fieldName )
@@ -125,6 +127,24 @@ func clickhouseWaitConnection() {
125
127
}
126
128
}
127
129
130
+ func clickhouseCreateMetric (name string ) (ok bool ) {
131
+ if ok := clickhouseCheckFieldName (name ); ok {
132
+ _ , err := clickhouseDb .Exec ("ALTER TABLE `metrics` ADD COLUMN `$1` " + clickhouseMetricType , name )
133
+ if err != nil {
134
+ log .Errorf ("Cant ADD COLUMN [%s] to database: %v" , name , err )
135
+ return false
136
+ }
137
+
138
+ // TODO check error inside clickhouseAddMetric
139
+ clickhouseAddMetric (name , clickhouseMetricType )
140
+ } else {
141
+ log .Warnf ("Invalid COLUMN name [%s]" , name )
142
+ return false
143
+ }
144
+
145
+ return true
146
+ }
147
+
128
148
func clickhouseMetricInsert (timestamp int64 , row map [string ]float64 ) {
129
149
clickhouseWaitConnection ()
130
150
@@ -137,18 +157,8 @@ func clickhouseMetricInsert(timestamp int64, row map[string]float64) {
137
157
// check key name column exist in our ckickhouse DDL
138
158
if _ , ok := clickhouseMetrics [key ]; ! ok {
139
159
// if not exist - create it by ALTER TABLE values ADD key float8
140
- if ok := clickhouseCheckFieldName (key ); ok {
141
- _ , err := clickhouseDb .Exec ("ALTER TABLE `metrics` ADD COLUMN `$1` Nullable(Float32)" , key )
142
- if err != nil {
143
- log .Errorf ("Cant ADD COLUMN [%s] to database: %v" , key , err )
144
- // TODO save request for feature send it to database (and return)
145
- continue
146
- }
147
-
148
- // TODO check error inside clickhouseAddMetric
149
- clickhouseAddMetric (key , "Nullable(Float32)" )
150
- } else {
151
- log .Warnf ("Invalid COLUMN name [%s]" , key )
160
+ if ok := clickhouseCreateMetric (key ); ! ok {
161
+ // TODO save request for feature send it to database (and return)
152
162
continue
153
163
}
154
164
}
0 commit comments