Skip to content

Commit

Permalink
rpc: add header_added
Browse files Browse the repository at this point in the history
New filter and notification subscription.

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Dec 6, 2023
1 parent 441eb8a commit dda3466
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 1 deletion.
15 changes: 15 additions & 0 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2370,6 +2370,21 @@ unsubloop:
}
}

func (bc *Blockchain) SubscribeForHeaders(ch chan *block.Header) {
bc.subCh <- ch
}

func (bc *Blockchain) UnsubscribeFromHeaders(ch chan *block.Header) {
unsubloop:
for {
select {
case <-ch:
case bc.unsubCh <- ch:
break unsubloop
}
}
}

// CalculateClaimable calculates the amount of GAS generated by owning specified
// amount of NEO between specified blocks.
func (bc *Blockchain) CalculateClaimable(acc util.Uint160, endHeight uint32) (*big.Int, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/neorpc/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (
ExecutionEventID
// NotaryRequestEventID is used for the `notary_request_event` event.
NotaryRequestEventID
// HeaderEventID is used for the `header_added` event.
HeaderEventID
// MissedEventID notifies user of missed events.
MissedEventID EventID = 255
)
Expand All @@ -39,6 +41,8 @@ func (e EventID) String() string {
return "transaction_executed"
case NotaryRequestEventID:
return "notary_request_event"
case HeaderEventID:
return "header_added"
case MissedEventID:
return "event_missed"
default:
Expand All @@ -59,6 +63,8 @@ func GetEventIDFromString(s string) (EventID, error) {
return ExecutionEventID, nil
case "notary_request_event":
return NotaryRequestEventID, nil
case "header_added":
return HeaderEventID, nil
case "event_missed":
return MissedEventID, nil
default:
Expand Down
16 changes: 16 additions & 0 deletions pkg/neorpc/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,24 @@ type (
State *string `json:"state,omitempty"`
Container *util.Uint256 `json:"container,omitempty"`
}
HeaderFilter struct {
Index *uint32 `json:"index,omitempty"`
}
)

// Copy creates a deep copy of the HeaderFilter. It handles nil HeaderFilter correctly.
func (f *HeaderFilter) Copy() *HeaderFilter {
if f == nil {
return nil
}
var res = new(HeaderFilter)
if f.Index != nil {
res.Index = new(uint32)
*res.Index = *f.Index
}
return res
}

// Copy creates a deep copy of the BlockFilter. It handles nil BlockFilter correctly.
func (f *BlockFilter) Copy() *BlockFilter {
if f == nil {
Expand Down
62 changes: 62 additions & 0 deletions pkg/rpcclient/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,46 @@ type requestResponse struct {
RawParams []json.RawMessage `json:"params,omitempty"`
}

type headerReceiver struct {
filter *neorpc.HeaderFilter
ch chan<- *block.Header
}

func (r *headerReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
if nonBlocking {
select {
case r.ch <- ntf.Value.(*block.Header):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*block.Header)
}
return true, false
}
return false, false
}

func (r *headerReceiver) Receiver() any {
return r.ch
}

func (r *headerReceiver) EventID() neorpc.EventID {
return neorpc.HeaderEventID
}

func (r *headerReceiver) Filter() any {
if r.filter == nil {
return nil
}
return *r.filter
}

func (r *headerReceiver) Close() {
close(r.ch)
}

const (
// Message limit for receiving side.
wsReadLimit = 10 * 1024 * 1024
Expand Down Expand Up @@ -520,6 +560,8 @@ readloop:
ntf.Value = new(state.AppExecResult)
case neorpc.NotaryRequestEventID:
ntf.Value = new(result.NotaryRequestEvent)
case neorpc.HeaderEventID:
ntf.Value = &block.Header{}
case neorpc.MissedEventID:
// No value.
default:
Expand Down Expand Up @@ -828,6 +870,26 @@ func (c *WSClient) ReceiveNotaryRequests(flt *neorpc.TxFilter, rcvr chan<- *resu
return c.performSubscription(params, r)
}

// ReceiveHeaderRequests registers provided channel as a receiver for new header
// events. Events can be filtered by the given HeaderFilter, nil value doesn't add
// any filter. See WSClient comments for generic Receive* behaviour details.
func (c *WSClient) ReceiveHeaderRequests(flt *neorpc.HeaderFilter, rcvr chan<- *block.Header) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []any{"header_added"}
if flt != nil {
flt = flt.Copy()
params = append(params, *flt)
}
r := &headerReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)

}

// Unsubscribe removes subscription for the given event stream. It will return an
// error in case if there's no subscription with the provided ID. Call to Unsubscribe
// doesn't block notifications receive process for given subscriber, thus, ensure
Expand Down
28 changes: 28 additions & 0 deletions pkg/services/rpcsrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ type (
VerifyWitness(util.Uint160, hash.Hashable, *transaction.Witness, int64) (int64, error)
mempool.Feer // fee interface
ContractStorageSeeker
SubscribeForHeaders(ch chan *block.Header)
UnsubscribeFromHeaders(ch chan *block.Header)
}

// ContractStorageSeeker is the interface `findstorage*` handlers need to be able to
Expand Down Expand Up @@ -155,12 +157,14 @@ type (
notificationSubs int
transactionSubs int
notaryRequestSubs int
headerSubs int

blockCh chan *block.Block
executionCh chan *state.AppExecResult
notificationCh chan *state.ContainedNotificationEvent
transactionCh chan *transaction.Transaction
notaryRequestCh chan mempoolevent.Event
headerCh chan *block.Header
subEventsToExitCh chan struct{}
}

Expand Down Expand Up @@ -361,6 +365,7 @@ func New(chain Ledger, conf config.RPC, coreServer *network.Server,
notificationCh: make(chan *state.ContainedNotificationEvent),
transactionCh: make(chan *transaction.Transaction),
notaryRequestCh: make(chan mempoolevent.Event),
headerCh: make(chan *block.Header),
subEventsToExitCh: make(chan struct{}),
}
}
Expand Down Expand Up @@ -2750,6 +2755,11 @@ func (s *Server) subscribe(reqParams params.Params, sub *subscriber) (any, *neor
} else if err == nil {
err = errors.New("invalid state")
}

case neorpc.HeaderEventID:
flt := new(neorpc.HeaderFilter)
err = jd.Decode(flt)
filter = *flt
}
if err != nil {
return nil, neorpc.WrapErrorWithData(neorpc.ErrInvalidParams, err.Error())
Expand Down Expand Up @@ -2813,6 +2823,12 @@ func (s *Server) subscribeToChannel(event neorpc.EventID) {
s.coreServer.SubscribeForNotaryRequests(s.notaryRequestCh)
}
s.notaryRequestSubs++
case neorpc.HeaderEventID:
if s.headerSubs == 0 {
s.chain.SubscribeForHeaders(s.headerCh)
}
s.headerSubs++

}
}

Expand Down Expand Up @@ -2868,6 +2884,11 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) {
if s.notaryRequestSubs == 0 {
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
}
case neorpc.HeaderEventID:
s.headerSubs--
if s.headerSubs == 0 {
s.chain.UnsubscribeFromHeaders(s.headerCh)
}
}
}

Expand Down Expand Up @@ -2917,6 +2938,10 @@ chloop:
Type: e.Type,
NotaryRequest: e.Data.(*payload.P2PNotaryRequest),
}
case header := <-s.headerCh:
resp.Event = neorpc.HeaderEventID
resp.Payload[0] = header

}
s.subsLock.RLock()
subloop:
Expand Down Expand Up @@ -2969,6 +2994,7 @@ chloop:
s.chain.UnsubscribeFromTransactions(s.transactionCh)
s.chain.UnsubscribeFromNotifications(s.notificationCh)
s.chain.UnsubscribeFromExecutions(s.executionCh)
s.chain.UnsubscribeFromHeaders(s.headerCh)
if s.chain.P2PSigExtensionsEnabled() {
s.coreServer.UnsubscribeFromNotaryRequests(s.notaryRequestCh)
}
Expand All @@ -2981,6 +3007,7 @@ drainloop:
case <-s.notificationCh:
case <-s.transactionCh:
case <-s.notaryRequestCh:
case <-s.headerCh:
default:
break drainloop
}
Expand All @@ -2992,6 +3019,7 @@ drainloop:
close(s.notificationCh)
close(s.executionCh)
close(s.notaryRequestCh)
close(s.headerCh)
// notify Shutdown routine
close(s.subEventsToExitCh)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/rpcsrv/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func callUnsubscribe(t *testing.T, ws *websocket.Conn, msgs <-chan []byte, id st

func TestSubscriptions(t *testing.T) {
var subIDs = make([]string, 0)
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event"}
var subFeeds = []string{"block_added", "transaction_added", "notification_from_execution", "transaction_executed", "notary_request_event", "header_added"}

chain, rpcSrv, c, respMsgs, finishedFlag := initCleanServerAndWSClient(t)
defer chain.Close()
Expand Down

0 comments on commit dda3466

Please sign in to comment.