Skip to content

Commit

Permalink
rpc: add new header_added event subscription
Browse files Browse the repository at this point in the history
New event is to notify the user about header's content by the moment
when block is stored (which happens after block's processing). This is
needed for proper Waiter work.

Closes #2751.

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Dec 9, 2023
1 parent afca64f commit c5906be
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 2 deletions.
37 changes: 37 additions & 0 deletions docs/notifications.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ Recognized stream names:
Filter: `sender` field containing a string with hex-encoded Uint160 (LE
representation) for notary request's `Sender` and/or `signer` in the same
format for one of main transaction's `Signers`.
* `header_of_added_block`
Filter: `primary` as an integer with primary (speaker) node index from
ConsensusData and/or `since` field as an integer value with block
index starting from which new block notifications will be received and/or
`till` field as an integer values containing block index till which new
block notifications will be received.

Response: returns subscription ID (string) as a result. This ID can be used to
cancel this subscription and has no meaning other than that.
Expand Down Expand Up @@ -531,6 +537,37 @@ Example:
}
```

### `header_of_added_block` notification
Notification is sent when new block is stored, which is happens after block's processing.

Example:

```
{
"jsonrpc": "2.0",
"method": "header_of_added_block",
"params": [
{
"hash": "0x5d39141665a61c70d2ddbb997326e86e4b7f5eb34be43c7cd7823c4be513a71b",
"index": 8,
"merkleroot": "0x0000000000000000000000000000000000000000000000000000000000000000",
"nextconsensus": "NVTiAjNgagDkTr5HTzDmQP9kPwPHN5BgVq",
"nonce": "0000000000000000",
"previousblockhash": "0xec041b057433991e1a9aca10babcff850874addaf894d71f2946af2623bd6a9f",
"primary": 3,
"time": 1702111922000,
"version": 0,
"witnesses": [
{
"invocation": "DEB699tMKdFwSXyFBT2iMZcibYIuqJacmX95RZzDUCjGnAxUmG/OKXM/qnIj7848jmOGnwctZPXyAN9KByRUlWQfDEC+57OA4KzJqK5lbDSj7CpLCYUgJ5VOmgAbdi3etEAZWhEaIJF8f7Se6OKNP24H9Ud/Hugv6JBZ71pMSnHPIhjfDECfWJHGcU9ZqYkgdyjR37juu979fzB2E4JxAXVier4ImnAGfUP99441Vp2iskYDCGh0shX3v0Ym3UUOv+A1Hkhy",
"verification": "EwwhAhA6f33QFlWFl/eWDSfFFqQ5T9loueZRVetLAT5AQEBuDCECp7xV/oaE4BGXaNEEujB5W9zIZhnoZK3SYVZyPtGFzWIMIQKzYiv0AXvf4xfFiu1fTHU/IGt9uJYEb6fXdLvEv3+NwgwhA9kMB99j5pDOd5EuEKtRrMlEtmhgI3tgjE+PgwnnHuaZFEGe0Nw6"
}
]
}
]
}
```

### `event_missed` notification

Never has any parameters. Example:
Expand Down
32 changes: 32 additions & 0 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,13 +1331,16 @@ func (bc *Blockchain) notificationDispatcher() {
txFeed = make(map[chan *transaction.Transaction]bool)
notificationFeed = make(map[chan *state.ContainedNotificationEvent]bool)
executionFeed = make(map[chan *state.AppExecResult]bool)
headerFeed = make(map[chan *block.Header]bool)
)
for {
select {
case <-bc.stopCh:
return
case sub := <-bc.subCh:
switch ch := sub.(type) {
case chan *block.Header:
headerFeed[ch] = true
case chan *block.Block:
blockFeed[ch] = true
case chan *transaction.Transaction:
Expand All @@ -1351,6 +1354,8 @@ func (bc *Blockchain) notificationDispatcher() {
}
case unsub := <-bc.unsubCh:
switch ch := unsub.(type) {
case chan *block.Header:
delete(headerFeed, ch)
case chan *block.Block:
delete(blockFeed, ch)
case chan *transaction.Transaction:
Expand Down Expand Up @@ -1423,6 +1428,9 @@ func (bc *Blockchain) notificationDispatcher() {
}
}
}
for ch := range headerFeed {
ch <- &event.block.Header
}
for ch := range blockFeed {
ch <- event.block
}
Expand Down Expand Up @@ -2313,6 +2321,16 @@ func (bc *Blockchain) SubscribeForExecutions(ch chan *state.AppExecResult) {
bc.subCh <- ch
}

// SubscribeForPersistedBlockHeaders adds given channel to new header event broadcasting, so
// when there is a new header added to the chain you'll receive it via this
// channel. Make sure it's read from regularly as not reading these events might
// affect other Blockchain functions. Make sure you're not changing the received
// headers, as it may affect the functionality of Blockchain and other
// subscribers.
func (bc *Blockchain) SubscribeForPersistedBlockHeaders(ch chan *block.Header) {
bc.subCh <- ch
}

// UnsubscribeFromBlocks unsubscribes given channel from new block notifications,
// you can close it afterwards. Passing non-subscribed channel is a no-op, but
// the method can read from this channel (discarding any read data).
Expand Down Expand Up @@ -2370,6 +2388,20 @@ unsubloop:
}
}

// UnsubscribeForPersistedBlockHeaders unsubscribes given channel from new header
// notifications, you can close it afterwards. Passing non-subscribed channel is
// a no-op, but the method can read from this channel (discarding any read data).
func (bc *Blockchain) UnsubscribeForPersistedBlockHeaders(ch chan *block.Header) {
unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
}

// CalculateClaimable calculates the amount of GAS generated by owning specified
// amount of NEO between specified blocks.
func (bc *Blockchain) CalculateClaimable(acc util.Uint160, endHeight uint32) (*big.Int, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/neorpc/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (
ExecutionEventID
// NotaryRequestEventID is used for the `notary_request_event` event.
NotaryRequestEventID
// HeaderEventID is used for the `header_of_added_block` event.
HeaderEventID
// MissedEventID notifies user of missed events.
MissedEventID EventID = 255
)
Expand All @@ -39,6 +41,8 @@ func (e EventID) String() string {
return "transaction_executed"
case NotaryRequestEventID:
return "notary_request_event"
case HeaderEventID:
return "header_of_added_block"
case MissedEventID:
return "event_missed"
default:
Expand All @@ -59,6 +63,8 @@ func GetEventIDFromString(s string) (EventID, error) {
return ExecutionEventID, nil
case "notary_request_event":
return NotaryRequestEventID, nil
case "header_of_added_block":
return HeaderEventID, nil
case "event_missed":
return MissedEventID, nil
default:
Expand Down
7 changes: 7 additions & 0 deletions pkg/neorpc/rpcevent/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ func Matches(f Comparator, r Container) bool {
return true
}
switch f.EventID() {
case neorpc.HeaderEventID:
filt := filter.(neorpc.BlockFilter)
b := r.EventPayload().(*block.Header)
primaryOk := filt.Primary == nil || *filt.Primary == int(b.PrimaryIndex)
sinceOk := filt.Since == nil || *filt.Since <= b.Index
tillOk := filt.Till == nil || b.Index <= *filt.Till
return primaryOk && sinceOk && tillOk
case neorpc.BlockEventID:
filt := filter.(neorpc.BlockFilter)
b := r.EventPayload().(*block.Block)
Expand Down
46 changes: 46 additions & 0 deletions pkg/neorpc/rpcevent/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func TestMatches(t *testing.T) {
Header: block.Header{PrimaryIndex: byte(primary), Index: index},
},
}
headerContainer := testContainer{
id: neorpc.HeaderEventID,
pld: &block.Header{PrimaryIndex: byte(primary), Index: index},
}
st := vmstate.Halt
goodState := st.String()
badState := "FAULT"
Expand Down Expand Up @@ -145,6 +149,48 @@ func TestMatches(t *testing.T) {
container: bContainer,
expected: true,
},
{
name: "header, no filter",
comparator: testComparator{id: neorpc.HeaderEventID},
container: headerContainer,
expected: true,
},
{
name: "header, primary mismatch",
comparator: testComparator{
id: neorpc.HeaderEventID,
filter: neorpc.BlockFilter{Primary: &badPrimary},
},
container: headerContainer,
expected: false,
},
{
name: "header, since mismatch",
comparator: testComparator{
id: neorpc.HeaderEventID,
filter: neorpc.BlockFilter{Since: &badHigherIndex},
},
container: headerContainer,
expected: false,
},
{
name: "header, till mismatch",
comparator: testComparator{
id: neorpc.HeaderEventID,
filter: neorpc.BlockFilter{Till: &badLowerIndex},
},
container: headerContainer,
expected: false,
},
{
name: "header, filter match",
comparator: testComparator{
id: neorpc.HeaderEventID,
filter: neorpc.BlockFilter{Primary: &primary, Since: &index, Till: &index},
},
container: headerContainer,
expected: true,
},
{
name: "transaction, no filter",
comparator: testComparator{id: neorpc.TransactionEventID},
Expand Down
73 changes: 73 additions & 0 deletions pkg/rpcclient/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,52 @@ func (r *blockReceiver) Close() {
close(r.ch)
}

// headerReceiver stores information about header events subscriber.
type headerReceiver struct {
filter *neorpc.BlockFilter
ch chan<- *block.Header
}

// EventID implements neorpc.Comparator interface.
func (r *headerReceiver) EventID() neorpc.EventID {
return neorpc.HeaderEventID
}

// Filter implements neorpc.Comparator interface.
func (r *headerReceiver) Filter() any {
if r.filter == nil {
return nil
}
return *r.filter
}

// Receiver implements notificationReceiver interface.
func (r *headerReceiver) Receiver() any {
return r.ch
}

// TrySend implements notificationReceiver interface.
func (r *headerReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
if nonBlocking {
select {
case r.ch <- ntf.Value.(*block.Header):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*block.Header)
}
return true, false
}
return false, false
}

// Close implements notificationReceiver interface.
func (r *headerReceiver) Close() {
close(r.ch)
}

// txReceiver stores information about transaction events subscriber.
type txReceiver struct {
filter *neorpc.TxFilter
Expand Down Expand Up @@ -520,6 +566,14 @@ readloop:
ntf.Value = new(state.AppExecResult)
case neorpc.NotaryRequestEventID:
ntf.Value = new(result.NotaryRequestEvent)
case neorpc.HeaderEventID:
sr, err := c.stateRootInHeader()
if err != nil {
// Client is not initialized.
connCloseErr = fmt.Errorf("failed to fetch StateRootInHeader: %w", err)
break readloop
}
ntf.Value = block.New(sr).Header
case neorpc.MissedEventID:
// No value.
default:
Expand Down Expand Up @@ -831,6 +885,25 @@ func (c *WSClient) ReceiveNotaryRequests(flt *neorpc.TxFilter, rcvr chan<- *resu
return c.performSubscription(params, r)
}

// ReceivePersistedHeader registers provided channel as a receiver for new header
// events. Events can be filtered by the given HeaderFilter, nil value doesn't add
// any filter. See WSClient comments for generic Receive* behaviour details.
func (c *WSClient) ReceivePersistedHeader(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []any{"header_of_added_block"}
if flt != nil {
flt = flt.Copy()
params = append(params, *flt)
}
r := &headerReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}

// Unsubscribe removes subscription for the given event stream. It will return an
// error in case if there's no subscription with the provided ID. Call to Unsubscribe
// doesn't block notifications receive process for given subscriber, thus, ensure
Expand Down
68 changes: 68 additions & 0 deletions pkg/rpcclient/wsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,74 @@ func TestWSFilteredSubscriptions(t *testing.T) {
clientCode func(*testing.T, *WSClient)
serverCode func(*testing.T, *params.Params)
}{
{"header primary",
func(t *testing.T, wsc *WSClient) {
primary := 3
_, err := wsc.ReceivePersistedHeader(&neorpc.BlockFilter{Primary: &primary}, make(chan *block.Header))
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, 3, *filt.Primary)
require.Equal(t, (*uint32)(nil), filt.Since)
require.Equal(t, (*uint32)(nil), filt.Till)
},
},
{"header since",
func(t *testing.T, wsc *WSClient) {
var since uint32 = 3
_, err := wsc.ReceivePersistedHeader(&neorpc.BlockFilter{Since: &since}, make(chan *block.Header))
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, (*int)(nil), filt.Primary)
require.Equal(t, uint32(3), *filt.Since)
require.Equal(t, (*uint32)(nil), filt.Till)
},
},
{"header till",
func(t *testing.T, wsc *WSClient) {
var till uint32 = 3
_, err := wsc.ReceivePersistedHeader(&neorpc.BlockFilter{Till: &till}, make(chan *block.Header))
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, (*int)(nil), filt.Primary)
require.Equal(t, (*uint32)(nil), filt.Since)
require.Equal(t, (uint32)(3), *filt.Till)
},
},
{"header primary, since and till",
func(t *testing.T, wsc *WSClient) {
var (
since uint32 = 3
primary = 2
till uint32 = 5
)
_, err := wsc.ReceivePersistedHeader(&neorpc.BlockFilter{
Primary: &primary,
Since: &since,
Till: &till,
}, make(chan *block.Header))
require.NoError(t, err)
},
func(t *testing.T, p *params.Params) {
param := p.Value(1)
filt := new(neorpc.BlockFilter)
require.NoError(t, json.Unmarshal(param.RawMessage, filt))
require.Equal(t, 2, *filt.Primary)
require.Equal(t, uint32(3), *filt.Since)
require.Equal(t, uint32(5), *filt.Till)
},
},
{"blocks primary",
func(t *testing.T, wsc *WSClient) {
primary := 3
Expand Down
Loading

0 comments on commit c5906be

Please sign in to comment.