@@ -20,8 +20,9 @@ import (
20
20
"github.com/prometheus/client_golang/prometheus"
21
21
"github.com/prometheus/client_golang/prometheus/promauto"
22
22
"github.com/wormhole-foundation/wormhole-monitor/fly/common"
23
- "github.com/wormhole-foundation/wormhole-monitor/fly/pkg/db "
23
+ "github.com/wormhole-foundation/wormhole-monitor/fly/pkg/bigtable "
24
24
"github.com/wormhole-foundation/wormhole-monitor/fly/pkg/historical_uptime"
25
+ "github.com/wormhole-foundation/wormhole-monitor/fly/pkg/types"
25
26
"github.com/wormhole-foundation/wormhole-monitor/fly/utils"
26
27
"github.com/wormhole-foundation/wormhole/sdk/vaa"
27
28
"go.uber.org/zap"
31
32
rootCtx context.Context
32
33
rootCtxCancel context.CancelFunc
33
34
34
- dataDir string
35
35
p2pNetworkID string
36
36
p2pPort uint
37
37
p2pBootstrap string
40
40
ethRpcUrl string
41
41
coreBridgeAddr string
42
42
promRemoteURL string
43
+
44
+ gcpProjectId string
45
+ useBigtableEmulator bool // only use this in local development
46
+ bigTableEmulatorHost string = "" // required if using emulator
47
+ gcpCredentialsFile string = "" // required if not using emulator
48
+ bigTableInstanceId string
43
49
)
44
50
45
51
var (
@@ -96,7 +102,6 @@ func loadEnvVars() {
96
102
if err != nil {
97
103
log .Fatal ("Error loading .env file" )
98
104
}
99
- dataDir = verifyEnvVar ("DATA_DIR" )
100
105
p2pNetworkID = verifyEnvVar ("P2P_NETWORK_ID" )
101
106
port , err := strconv .ParseUint (verifyEnvVar ("P2P_PORT" ), 10 , 32 )
102
107
if err != nil {
@@ -108,6 +113,18 @@ func loadEnvVars() {
108
113
ethRpcUrl = verifyEnvVar ("ETH_RPC_URL" )
109
114
coreBridgeAddr = verifyEnvVar ("CORE_BRIDGE_ADDR" )
110
115
promRemoteURL = verifyEnvVar ("PROM_REMOTE_URL" )
116
+
117
+ gcpProjectId = verifyEnvVar ("GCP_PROJECT_ID" )
118
+ bigTableInstanceId = verifyEnvVar ("BIGTABLE_INSTANCE_ID" )
119
+ useBigtableEmulator , err = strconv .ParseBool (verifyEnvVar ("USE_BIGTABLE_EMULATOR" ))
120
+ if err != nil {
121
+ log .Fatal ("Error parsing USE_BIGTABLE_EMULATOR" )
122
+ }
123
+ if useBigtableEmulator {
124
+ bigTableEmulatorHost = verifyEnvVar ("BIGTABLE_EMULATOR_HOST" )
125
+ } else {
126
+ gcpCredentialsFile = verifyEnvVar ("GCP_CREDENTIALS_FILE" )
127
+ }
111
128
}
112
129
113
130
func verifyEnvVar (key string ) string {
@@ -180,7 +197,7 @@ func initPromScraper(promRemoteURL string, logger *zap.Logger, errC chan error)
180
197
}
181
198
}
182
199
183
- func initObservationScraper (db * db. Database , logger * zap.Logger , errC chan error ) {
200
+ func initObservationScraper (db * bigtable. BigtableDB , logger * zap.Logger , errC chan error ) {
184
201
node_common .StartRunnable (rootCtx , errC , false , "observation_scraper" , func (ctx context.Context ) error {
185
202
t := time .NewTicker (15 * time .Second )
186
203
@@ -189,20 +206,35 @@ func initObservationScraper(db *db.Database, logger *zap.Logger, errC chan error
189
206
case <- ctx .Done ():
190
207
return nil
191
208
case <- t .C :
192
- messages , err := db .QueryMessagesByIndex (false , common .ExpiryDuration )
209
+ messageObservations := make (map [types.MessageID ][]* types.Observation )
210
+
211
+ messages , err := db .GetUnprocessedMessagesBeforeCutOffTime (ctx , time .Now ().Add (- common .ExpiryDuration ))
193
212
if err != nil {
194
213
logger .Error ("QueryMessagesByIndex error" , zap .Error (err ))
195
214
continue
196
215
}
197
216
217
+ for _ , message := range messages {
218
+ observations , err := db .GetObservationsByMessageID (ctx , string (message .MessageID ))
219
+ if err != nil {
220
+ logger .Error ("GetObservationsByMessageID error" ,
221
+ zap .Error (err ),
222
+ zap .String ("messageId" , string (message .MessageID )),
223
+ )
224
+ continue
225
+ }
226
+
227
+ messageObservations [message .MessageID ] = observations
228
+ }
229
+
198
230
// Tally the number of messages for each chain
199
231
messagesPerChain := historical_uptime .TallyMessagesPerChain (logger , messages )
200
232
201
233
// Initialize the missing observations count for each guardian for each chain
202
234
guardianMissingObservations := historical_uptime .InitializeMissingObservationsCount (logger , messages , messagesPerChain )
203
235
204
236
// Decrement the missing observations count for each observed message
205
- historical_uptime .DecrementMissingObservationsCount (logger , guardianMissingObservations , messages )
237
+ historical_uptime .DecrementMissingObservationsCount (logger , guardianMissingObservations , messageObservations )
206
238
207
239
// Update the metrics with the final count of missing observations
208
240
historical_uptime .UpdateMetrics (guardianMissedObservations , guardianMissingObservations )
@@ -211,25 +243,6 @@ func initObservationScraper(db *db.Database, logger *zap.Logger, errC chan error
211
243
})
212
244
}
213
245
214
- func initDatabaseCleanUp (db * db.Database , logger * zap.Logger , errC chan error ) {
215
- node_common .StartRunnable (rootCtx , errC , false , "db_cleanup" , func (ctx context.Context ) error {
216
- t := time .NewTicker (common .DatabaseCleanUpInterval )
217
-
218
- for {
219
- select {
220
- case <- ctx .Done ():
221
- return nil
222
- case <- t .C :
223
- err := db .RemoveObservationsByIndex (true , common .ExpiryDuration )
224
- if err != nil {
225
- logger .Error ("RemoveObservationsByIndex error" , zap .Error (err ))
226
- }
227
- }
228
- }
229
- })
230
-
231
- }
232
-
233
246
func main () {
234
247
loadEnvVars ()
235
248
p2pBootstrap = "/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC,/dns4/wormhole.mcf.rocks/udp/8999/quic/p2p/12D3KooWDZVv7BhZ8yFLkarNdaSWaB43D6UbQwExJ8nnGAEmfHcU,/dns4/wormhole-v2-mainnet-bootstrap.staking.fund/udp/8999/quic/p2p/12D3KooWG8obDX9DNi1KUwZNu9xkGwfKqTp2GFwuuHpWZ3nQruS1"
@@ -283,12 +296,14 @@ func main() {
283
296
}
284
297
gst .Set (& gs )
285
298
286
- db := db .OpenDb (logger , & dataDir )
299
+ db , err := bigtable .NewBigtableDB (rootCtx , gcpProjectId , bigTableInstanceId , gcpCredentialsFile , bigTableEmulatorHost , useBigtableEmulator )
300
+ if err != nil {
301
+ logger .Fatal ("Failed to create bigtable db" , zap .Error (err ))
302
+ }
287
303
promErrC := make (chan error )
288
304
// Start Prometheus scraper
289
305
initPromScraper (promRemoteURL , logger , promErrC )
290
306
initObservationScraper (db , logger , promErrC )
291
- initDatabaseCleanUp (db , logger , promErrC )
292
307
293
308
go func () {
294
309
for {
0 commit comments