Skip to content

Commit ad5d6f4

Browse files
committed
Fly: Remove references to SignedObservation
1 parent 50325da commit ad5d6f4

File tree

11 files changed

+334
-196
lines changed

11 files changed

+334
-196
lines changed

.github/workflows/test.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,6 @@ jobs:
4141
- name: Set up Go
4242
uses: actions/setup-go@v3
4343
with:
44-
go-version: 1.21
44+
go-version: 1.23
4545
- name: Build
4646
run: cd fly && go build -v ./...

fly/cmd/healthcheck/main.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func main() {
7070
defer localCancel()
7171
hbReceived := false
7272
var addrInfo peer.AddrInfo
73-
observationsReceived := 0
73+
observationBatchesReceived := 0
7474
components := p2p.DefaultComponents()
7575
components.Port = p2pPort
7676
host, err := p2p.NewHost(logger, localContext, p2pNetworkID, p2pBootstrap, components, priv)
@@ -116,10 +116,10 @@ func main() {
116116
hbReceived = true
117117
}
118118
}
119-
case *gossipv1.GossipMessage_SignedObservation:
120-
logger.Debug("received observation")
121-
if bytes.Equal(m.SignedObservation.Addr, guardianPubKey) {
122-
observationsReceived++
119+
case *gossipv1.GossipMessage_SignedObservationBatch:
120+
logger.Debug("received observation batch")
121+
if bytes.Equal(m.SignedObservationBatch.Addr, guardianPubKey) {
122+
observationBatchesReceived++
123123
}
124124
}
125125
}
@@ -144,8 +144,8 @@ func main() {
144144
} else {
145145
fmt.Println("❌ NO HEARTBEAT RECEIVED")
146146
}
147-
if observationsReceived > 0 {
148-
fmt.Printf("✅ %d observations received\n", observationsReceived)
147+
if observationBatchesReceived > 0 {
148+
fmt.Printf("✅ %d observationBatches received\n", observationBatchesReceived)
149149
} else {
150150
fmt.Println("❌ NO OBSERVATIONS RECEIVED")
151151
}

fly/cmd/heartbeats/main.go

+9-42
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,7 @@ func main() {
183183
type GossipMsgType int16
184184

185185
const (
186-
GSM_signedObservation GossipMsgType = iota
187-
GSM_signedObservationInBatch
186+
GSM_signedObservationInBatch GossipMsgType = iota
188187
GSM_signedObservationBatch
189188
GSM_tbObservation
190189
GSM_signedHeartbeat
@@ -196,7 +195,6 @@ func main() {
196195
)
197196

198197
// Inbound observations
199-
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
200198
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 20000)
201199

202200
// Inbound observation requests
@@ -266,12 +264,12 @@ func main() {
266264

267265
gossipMsgTable := table.NewWriter()
268266
gossipMsgTable.SetOutputMirror(os.Stdout)
269-
gossipMsgTable.AppendHeader(table.Row{"#", "Guardian", "Obsv", "ObsvInB", "ObsvB", "TB_OBsv", "HB", "VAA", "Obsv_Req", "Chain_Gov_Cfg", "Chain_Gov_Status"})
267+
gossipMsgTable.AppendHeader(table.Row{"#", "Guardian", "ObsvInB", "ObsvB", "TB_OBsv", "HB", "VAA", "Obsv_Req", "Chain_Gov_Cfg", "Chain_Gov_Status"})
270268
gossipMsgTable.SetStyle(table.StyleColoredDark)
271269

272270
obsvRateTable := table.NewWriter()
273271
obsvRateTable.SetOutputMirror(os.Stdout)
274-
obsvRateTable.AppendHeader(table.Row{"#", "Guardian", "Obsv", "1%", "2%", "3%", "4%", "5%", "6%", "7%", "8%", "9%", "10%"})
272+
obsvRateTable.AppendHeader(table.Row{"#", "Guardian", "1%", "2%", "3%", "4%", "5%", "6%", "7%", "8%", "9%", "10%"})
275273
obsvRateTable.SetStyle(table.StyleColoredDark)
276274

277275
guardianTable := table.NewWriter()
@@ -338,41 +336,11 @@ func main() {
338336

339337
// Just count observations
340338
go func() {
341-
uniqueObs := map[string]struct{}{}
342339
uniqueObsInBatch := map[string]struct{}{}
343340
for {
344341
select {
345342
case <-rootCtx.Done():
346343
return
347-
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
348-
spl := strings.Split(o.Msg.MessageId, "/")
349-
emitter := strings.ToLower(spl[1])
350-
addr := "0x" + string(hex.EncodeToString(o.Msg.Addr))
351-
idx := guardianIndexMap[strings.ToLower(addr)]
352-
if knownEmitters[emitter] {
353-
gossipCounter[idx][GSM_tbObservation]++
354-
gossipCounter[totalsRow][GSM_tbObservation]++
355-
}
356-
if handleObsv(uint(idx)) {
357-
obsvRateTable.ResetRows()
358-
for i := 0; i < numGuardians; i++ {
359-
obsvRateTable.AppendRow(table.Row{i, obsvRateRows[int(i)].guardianName, obsvRateRows[int(i)].obsvCount, obsvRateRows[uint(i)].percents[0], obsvRateRows[uint(i)].percents[1], obsvRateRows[uint(i)].percents[2], obsvRateRows[uint(i)].percents[3], obsvRateRows[uint(i)].percents[4], obsvRateRows[uint(i)].percents[5], obsvRateRows[uint(i)].percents[6], obsvRateRows[uint(i)].percents[7], obsvRateRows[uint(i)].percents[8], obsvRateRows[uint(i)].percents[9]})
360-
}
361-
}
362-
gossipCounter[idx][GSM_signedObservation]++
363-
gossipCounter[totalsRow][GSM_signedObservation]++
364-
365-
if *loadTesting {
366-
uniqueObs[hex.EncodeToString(o.Msg.Hash)] = struct{}{}
367-
gossipCounter[uniqueRow][GSM_signedObservation] = len(uniqueObs)
368-
}
369-
370-
gossipLock.Lock()
371-
gossipMsgTable.ResetRows()
372-
for idx, r := range gossipCounter {
373-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
374-
}
375-
gossipLock.Unlock()
376344
case batch := <-batchObsvC:
377345
addr := "0x" + string(hex.EncodeToString(batch.Msg.Addr))
378346
idx := guardianIndexMap[strings.ToLower(addr)]
@@ -406,7 +374,7 @@ func main() {
406374
gossipLock.Lock()
407375
gossipMsgTable.ResetRows()
408376
for idx, r := range gossipCounter {
409-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
377+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]})
410378
}
411379
gossipLock.Unlock()
412380
}
@@ -428,7 +396,7 @@ func main() {
428396
gossipLock.Lock()
429397
gossipMsgTable.ResetRows()
430398
for idx, r := range gossipCounter {
431-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
399+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]})
432400
}
433401
gossipLock.Unlock()
434402
}
@@ -460,7 +428,7 @@ func main() {
460428
gossipLock.Lock()
461429
gossipMsgTable.ResetRows()
462430
for idx, r := range gossipCounter {
463-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
431+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]})
464432
}
465433
gossipLock.Unlock()
466434
}
@@ -529,7 +497,7 @@ func main() {
529497
gossipLock.Lock()
530498
gossipMsgTable.ResetRows()
531499
for idx, r := range gossipCounter {
532-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
500+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]})
533501
}
534502
gossipLock.Unlock()
535503
if activeTable == 0 {
@@ -564,7 +532,7 @@ func main() {
564532
gossipLock.Lock()
565533
gossipMsgTable.ResetRows()
566534
for idx, r := range gossipCounter {
567-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
535+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]})
568536
}
569537
gossipLock.Unlock()
570538
}
@@ -585,7 +553,7 @@ func main() {
585553
gossipLock.Lock()
586554
gossipMsgTable.ResetRows()
587555
for idx, r := range gossipCounter {
588-
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8]})
556+
gossipMsgTable.AppendRow(table.Row{idx, guardianIndexToNameMap[idx], r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7]})
589557
}
590558
gossipLock.Unlock()
591559
}
@@ -610,7 +578,6 @@ func main() {
610578
gst,
611579
rootCtxCancel,
612580
p2p.WithComponents(components),
613-
p2p.WithSignedObservationListener(obsvC),
614581
p2p.WithSignedObservationBatchListener(batchObsvC),
615582
p2p.WithSignedVAAListener(signedInC),
616583
p2p.WithObservationRequestListener(obsvReqC),

fly/cmd/historical_uptime/main.go

-12
Original file line numberDiff line numberDiff line change
@@ -294,11 +294,9 @@ func main() {
294294
defer rootCtxCancel()
295295

296296
// Inbound observations
297-
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
298297
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)
299298

300299
// Add channel capacity checks
301-
go monitorChannelCapacity(rootCtx, logger, "obsvC", obsvC)
302300
go monitorChannelCapacity(rootCtx, logger, "batchObsvC", batchObsvC)
303301

304302
// Heartbeat updates
@@ -370,15 +368,6 @@ func main() {
370368
}
371369
logger.Info("Observation cleanup completed.")
372370
return
373-
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
374-
obs := historical_uptime.CreateNewObservation(o.Msg.MessageId, o.Msg.Addr, o.Timestamp, o.Msg.Addr)
375-
observationBatch = append(observationBatch, obs)
376-
377-
// if it reaches batchSize then process this batch
378-
if len(observationBatch) >= batchSize {
379-
historical_uptime.ProcessObservationBatch(*db, logger, observationBatch)
380-
observationBatch = observationBatch[:0] // Clear the batch
381-
}
382371
case batch := <-batchObsvC:
383372
for _, signedObs := range batch.Msg.Observations {
384373
obs := historical_uptime.CreateNewObservation(signedObs.MessageId, signedObs.Signature, batch.Timestamp, signedObs.TxHash)
@@ -477,7 +466,6 @@ func main() {
477466
gst,
478467
rootCtxCancel,
479468
p2p.WithComponents(components),
480-
p2p.WithSignedObservationListener(obsvC),
481469
p2p.WithSignedObservationBatchListener(batchObsvC),
482470
)
483471
if err != nil {

fly/cmd/observation_stats/main.go

-13
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ func main() {
6767
defer rootCtxCancel()
6868

6969
// Inbound observations
70-
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
7170
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)
7271

7372
// Guardian set state managed by processor
@@ -93,17 +92,6 @@ func main() {
9392
select {
9493
case <-rootCtx.Done():
9594
return
96-
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
97-
if o.Msg.MessageId[:3] != "26/" && o.Msg.MessageId[:2] != "7/" {
98-
ga := eth_common.BytesToAddress(o.Msg.Addr).String()
99-
if _, ok := obsvByHash[o.Msg.MessageId]; !ok {
100-
obsvByHash[o.Msg.MessageId] = map[string]time.Time{}
101-
}
102-
if _, ok := obsvByHash[o.Msg.MessageId][ga]; !ok {
103-
obsvByHash[o.Msg.MessageId][ga] = time.Now()
104-
}
105-
logger.Warn("status", zap.String("id", o.Msg.MessageId), zap.Any("msg", obsvByHash[o.Msg.MessageId]))
106-
}
10795
case batch := <-batchObsvC:
10896
for _, o := range batch.Msg.Observations {
10997
if o.MessageId[:3] != "26/" && o.MessageId[:2] != "7/" {
@@ -139,7 +127,6 @@ func main() {
139127
gst,
140128
rootCtxCancel,
141129
p2p.WithComponents(components),
142-
p2p.WithSignedObservationListener(obsvC),
143130
p2p.WithSignedObservationBatchListener(batchObsvC),
144131
)
145132
if err != nil {

fly/cmd/prom_gossip/main.go

-25
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ func main() {
188188
defer rootCtxCancel()
189189

190190
// Inbound observations
191-
obsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], 20000)
192191
batchObsvC := make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 20000)
193192

194193
// Inbound observation requests
@@ -248,29 +247,6 @@ func main() {
248247
afterCount := len(uniqueObs)
249248
logger.Info("Cleaned up unique observations cache", zap.Int("beforeCount", beforeCount), zap.Int("afterCount", afterCount), zap.Int("cleanedUpCount", beforeCount-afterCount))
250249
timer.Reset(delay)
251-
case o := <-obsvC: // TODO: Rip out this code once we cut over to batching.
252-
gossipByType.WithLabelValues("observation").Inc()
253-
spl := strings.Split(o.Msg.MessageId, "/")
254-
chain, err := parseChainID(spl[0])
255-
if err != nil {
256-
chain = vaa.ChainIDUnset
257-
}
258-
emitter := strings.ToLower(spl[1])
259-
addr := "0x" + string(hex.EncodeToString(o.Msg.Addr))
260-
name := addr
261-
idx, found := guardianIndexMap[strings.ToLower(addr)]
262-
if found {
263-
name = guardianIndexToNameMap[idx]
264-
}
265-
observationsByGuardianPerChain.WithLabelValues(name, chain.String()).Inc()
266-
if knownEmitters[emitter] {
267-
tbObservationsByGuardianPerChain.WithLabelValues(name, chain.String()).Inc()
268-
}
269-
hash := hex.EncodeToString(o.Msg.Hash)
270-
if _, exists := uniqueObs[hash]; !exists {
271-
uniqueObservationsCounter.Inc()
272-
}
273-
uniqueObs[hash] = time.Now()
274250
case batch := <-batchObsvC:
275251
gossipByType.WithLabelValues("batch_observation").Inc()
276252
addr := "0x" + string(hex.EncodeToString(batch.Msg.Addr))
@@ -431,7 +407,6 @@ func main() {
431407
gst,
432408
rootCtxCancel,
433409
p2p.WithComponents(components),
434-
p2p.WithSignedObservationListener(obsvC),
435410
p2p.WithSignedObservationBatchListener(batchObsvC),
436411
p2p.WithSignedVAAListener(signedInC),
437412
p2p.WithObservationRequestListener(obsvReqC),

fly/cmd/track_gossip/main.go

-4
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ func main() {
8484
defer rootCtxCancel()
8585

8686
// Inbound observations
87-
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
8887
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)
8988

9089
// Inbound observation requests
@@ -128,8 +127,6 @@ func main() {
128127
select {
129128
case <-rootCtx.Done():
130129
return
131-
case <-obsvC: // TODO: Rip out this code once we cut over to batching.
132-
numObs++
133130
case batch := <-batchObsvC:
134131
numObs += len(batch.Msg.Observations)
135132
case <-signedInC:
@@ -193,7 +190,6 @@ func main() {
193190
gst,
194191
rootCtxCancel,
195192
p2p.WithComponents(components),
196-
p2p.WithSignedObservationListener(obsvC),
197193
p2p.WithSignedObservationBatchListener(batchObsvC),
198194
p2p.WithSignedVAAListener(signedInC),
199195
p2p.WithObservationRequestListener(obsvReqC),

fly/cmd/track_pyth/main.go

+1-11
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,6 @@ func main() {
126126
defer rootCtxCancel()
127127

128128
// Inbound observations
129-
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
130129
batchObsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 1024)
131130

132131
// Inbound signed VAAs
@@ -160,14 +159,6 @@ func main() {
160159
select {
161160
case <-rootCtx.Done():
162161
return
163-
case m := <-obsvC: // TODO: Rip out this code once we cut over to batching.
164-
obs := &gossipv1.Observation{
165-
Hash: m.Msg.Hash,
166-
Signature: m.Msg.Signature,
167-
TxHash: m.Msg.TxHash,
168-
MessageId: m.Msg.MessageId,
169-
}
170-
handleObservation(logger, gs, m.Msg.Addr, obs)
171162
case batch := <-batchObsvC:
172163
for _, o := range batch.Msg.Observations {
173164
handleObservation(logger, gs, batch.Msg.Addr, o)
@@ -235,7 +226,6 @@ func main() {
235226
gst,
236227
rootCtxCancel,
237228
p2p.WithComponents(components),
238-
p2p.WithSignedObservationListener(obsvC),
239229
p2p.WithSignedObservationBatchListener(batchObsvC),
240230
p2p.WithSignedVAAListener(signedInC),
241231
)
@@ -403,7 +393,7 @@ func handleObservation(logger *zap.Logger, gs common.GuardianSet, addr []byte, m
403393
return
404394
}
405395

406-
// Hooray! Now, we have verified all fields on SignedObservation and know that it includes
396+
// Hooray! Now, we have verified all fields on observation and know that it includes
407397
// a valid signature by an active guardian. We still don't fully trust them, as they may be
408398
// byzantine, but now we know who we're dealing with.
409399

0 commit comments

Comments
 (0)