From 2a1baa1d16f2e6a76cd3d010cecf58c82097f928 Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Sat, 24 Feb 2024 05:49:33 +0100 Subject: [PATCH] go/keymanager/churp: Implement backend --- .../cometbft/apps/keymanager/churp/query.go | 36 +++ .../cometbft/apps/keymanager/query.go | 25 +- .../cometbft/apps/keymanager/secrets/query.go | 12 +- .../cometbft/keymanager/churp/client.go | 118 ++++++++++ .../cometbft/keymanager/keymanager.go | 20 +- go/keymanager/api/api.go | 4 + go/keymanager/api/grpc.go | 8 + go/keymanager/churp/api.go | 8 + go/keymanager/churp/backend.go | 27 +++ go/keymanager/churp/grpc.go | 221 ++++++++++++++++++ 10 files changed, 468 insertions(+), 11 deletions(-) create mode 100644 go/consensus/cometbft/apps/keymanager/churp/query.go create mode 100644 go/consensus/cometbft/keymanager/churp/client.go create mode 100644 go/keymanager/churp/backend.go create mode 100644 go/keymanager/churp/grpc.go diff --git a/go/consensus/cometbft/apps/keymanager/churp/query.go b/go/consensus/cometbft/apps/keymanager/churp/query.go new file mode 100644 index 00000000000..15345477c87 --- /dev/null +++ b/go/consensus/cometbft/apps/keymanager/churp/query.go @@ -0,0 +1,36 @@ +package churp + +import ( + "context" + + "github.com/oasisprotocol/oasis-core/go/common" + churpState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/keymanager/churp/state" + "github.com/oasisprotocol/oasis-core/go/keymanager/churp" +) + +// Query is the key manager query interface. +type Query interface { + Status(context.Context, common.Namespace, uint8) (*churp.Status, error) + Statuses(context.Context, common.Namespace) ([]*churp.Status, error) + AllStatuses(context.Context) ([]*churp.Status, error) +} + +type querier struct { + state *churpState.ImmutableState +} + +func (kq *querier) Status(ctx context.Context, runtimeID common.Namespace, churpID uint8) (*churp.Status, error) { + return kq.state.Status(ctx, runtimeID, churpID) +} + +func (kq *querier) Statuses(ctx context.Context, runtimeID common.Namespace) ([]*churp.Status, error) { + return kq.state.Statuses(ctx, runtimeID) +} + +func (kq *querier) AllStatuses(ctx context.Context) ([]*churp.Status, error) { + return kq.state.AllStatuses(ctx) +} + +func NewQuery(state *churpState.ImmutableState) Query { + return &querier{state} +} diff --git a/go/consensus/cometbft/apps/keymanager/query.go b/go/consensus/cometbft/apps/keymanager/query.go index 233f2d2ab6c..f18866f2f21 100644 --- a/go/consensus/cometbft/apps/keymanager/query.go +++ b/go/consensus/cometbft/apps/keymanager/query.go @@ -4,6 +4,8 @@ import ( "context" abciAPI "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api" + "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/keymanager/churp" + churpState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/keymanager/churp/state" "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/keymanager/secrets" secretsState "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/keymanager/secrets/state" ) @@ -11,6 +13,7 @@ import ( // Query is the key manager query interface. type Query interface { Secrets() secrets.Query + Churp() churp.Query } // QueryFactory is the key manager query factory. @@ -20,19 +23,33 @@ type QueryFactory struct { // QueryAt returns the key manager query interface for a specific height. func (sf *QueryFactory) QueryAt(ctx context.Context, height int64) (Query, error) { - state, err := secretsState.NewImmutableState(ctx, sf.state, height) // TODO: not ok + secretsState, err := secretsState.NewImmutableState(ctx, sf.state, height) if err != nil { return nil, err } - return &keymanagerQuerier{state}, nil + + churpState, err := churpState.NewImmutableState(ctx, sf.state, height) + if err != nil { + return nil, err + } + + return &keymanagerQuerier{ + secretsState: secretsState, + churpState: churpState, + }, nil } type keymanagerQuerier struct { - state *secretsState.ImmutableState + secretsState *secretsState.ImmutableState + churpState *churpState.ImmutableState } func (kq *keymanagerQuerier) Secrets() secrets.Query { - return secrets.NewQuery(kq.state) + return secrets.NewQuery(kq.secretsState) +} + +func (kq *keymanagerQuerier) Churp() churp.Query { + return churp.NewQuery(kq.churpState) } func (app *keymanagerApplication) QueryFactory() interface{} { diff --git a/go/consensus/cometbft/apps/keymanager/secrets/query.go b/go/consensus/cometbft/apps/keymanager/secrets/query.go index 44b23572f41..02461ebd5eb 100644 --- a/go/consensus/cometbft/apps/keymanager/secrets/query.go +++ b/go/consensus/cometbft/apps/keymanager/secrets/query.go @@ -21,20 +21,20 @@ type querier struct { state *secretsState.ImmutableState } -func (kq *querier) Status(ctx context.Context, id common.Namespace) (*secrets.Status, error) { - return kq.state.Status(ctx, id) +func (kq *querier) Status(ctx context.Context, runtimeID common.Namespace) (*secrets.Status, error) { + return kq.state.Status(ctx, runtimeID) } func (kq *querier) Statuses(ctx context.Context) ([]*secrets.Status, error) { return kq.state.Statuses(ctx) } -func (kq *querier) MasterSecret(ctx context.Context, id common.Namespace) (*secrets.SignedEncryptedMasterSecret, error) { - return kq.state.MasterSecret(ctx, id) +func (kq *querier) MasterSecret(ctx context.Context, runtimeID common.Namespace) (*secrets.SignedEncryptedMasterSecret, error) { + return kq.state.MasterSecret(ctx, runtimeID) } -func (kq *querier) EphemeralSecret(ctx context.Context, id common.Namespace) (*secrets.SignedEncryptedEphemeralSecret, error) { - return kq.state.EphemeralSecret(ctx, id) +func (kq *querier) EphemeralSecret(ctx context.Context, runtimeID common.Namespace) (*secrets.SignedEncryptedEphemeralSecret, error) { + return kq.state.EphemeralSecret(ctx, runtimeID) } func (kq *querier) Genesis(ctx context.Context) (*secrets.Genesis, error) { diff --git a/go/consensus/cometbft/keymanager/churp/client.go b/go/consensus/cometbft/keymanager/churp/client.go new file mode 100644 index 00000000000..aa569da56f8 --- /dev/null +++ b/go/consensus/cometbft/keymanager/churp/client.go @@ -0,0 +1,118 @@ +package churp + +import ( + "context" + + cmtabcitypes "github.com/cometbft/cometbft/abci/types" + "github.com/eapache/channels" + + "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/common/pubsub" + consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" + "github.com/oasisprotocol/oasis-core/go/consensus/api/events" + app "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/keymanager" + "github.com/oasisprotocol/oasis-core/go/keymanager/churp" + "github.com/oasisprotocol/oasis-core/go/registry/api" +) + +type ServiceClient struct { + logger *logging.Logger + + querier *app.QueryFactory + statusNotifier *pubsub.Broker +} + +// Status implements churp.Backend. +func (sc *ServiceClient) Status(ctx context.Context, query *churp.StatusQuery) (*churp.Status, error) { + q, err := sc.querier.QueryAt(ctx, query.Height) + if err != nil { + return nil, err + } + + return q.Churp().Status(ctx, query.RuntimeID, query.ChurpID) +} + +// Statuses implements churp.Backend. +func (sc *ServiceClient) Statuses(ctx context.Context, query *api.NamespaceQuery) ([]*churp.Status, error) { + q, err := sc.querier.QueryAt(ctx, query.Height) + if err != nil { + return nil, err + } + + return q.Churp().Statuses(ctx, query.ID) +} + +// AllStatuses implements churp.Backend. +func (sc *ServiceClient) AllStatuses(ctx context.Context, height int64) ([]*churp.Status, error) { + q, err := sc.querier.QueryAt(ctx, height) + if err != nil { + return nil, err + } + + return q.Churp().AllStatuses(ctx) +} + +// WatchStatuses implements churp.Backend. +func (sc *ServiceClient) WatchStatuses() (<-chan *churp.Status, *pubsub.Subscription) { + sub := sc.statusNotifier.Subscribe() + ch := make(chan *churp.Status) + sub.Unwrap(ch) + + return ch, sub +} + +func (sc *ServiceClient) DeliverEvent(ev *cmtabcitypes.Event) error { + for _, pair := range ev.GetAttributes() { + key := pair.GetKey() + val := pair.GetValue() + + if events.IsAttributeKind(key, &churp.CreateEvent{}) { + var event churp.CreateEvent + if err := events.DecodeValue(val, &event); err != nil { + sc.logger.Error("worker: failed to get status from tag", + "err", err, + ) + continue + } + + sc.statusNotifier.Broadcast(event.Status) + } + if events.IsAttributeKind(key, &churp.UpdateEvent{}) { + var event churp.UpdateEvent + if err := events.DecodeValue(val, &event); err != nil { + sc.logger.Error("worker: failed to get status from tag", + "err", err, + ) + continue + } + + sc.statusNotifier.Broadcast(event.Status) + } + } + return nil +} + +// New constructs a new CometBFT backed key manager secrets management Backend +// instance. +func New(ctx context.Context, querier *app.QueryFactory) (*ServiceClient, error) { + sc := ServiceClient{ + logger: logging.GetLogger("cometbft/keymanager/churp"), + querier: querier, + } + sc.statusNotifier = pubsub.NewBrokerEx(func(ch channels.Channel) { + statuses, err := sc.AllStatuses(ctx, consensus.HeightLatest) + if err != nil { + sc.logger.Error("status notifier: unable to get a list of statuses", + "err", err, + ) + return + } + + wr := ch.In() + for _, v := range statuses { + wr <- v + } + }) + + return &sc, nil +} diff --git a/go/consensus/cometbft/keymanager/keymanager.go b/go/consensus/cometbft/keymanager/keymanager.go index fdbc9435ece..fd71c9b829f 100644 --- a/go/consensus/cometbft/keymanager/keymanager.go +++ b/go/consensus/cometbft/keymanager/keymanager.go @@ -12,8 +12,10 @@ import ( tmapi "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/api" app "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/apps/keymanager" + "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/keymanager/churp" "github.com/oasisprotocol/oasis-core/go/consensus/cometbft/keymanager/secrets" "github.com/oasisprotocol/oasis-core/go/keymanager/api" + churpAPI "github.com/oasisprotocol/oasis-core/go/keymanager/churp" secretsAPI "github.com/oasisprotocol/oasis-core/go/keymanager/secrets" ) @@ -27,6 +29,7 @@ type serviceClient struct { tmapi.BaseServiceClient secretsClient *secrets.ServiceClient + churpClient *churp.ServiceClient } // Implements api.Backend. @@ -44,6 +47,11 @@ func (sc *serviceClient) Secrets() secretsAPI.Backend { return sc.secretsClient } +// Implements api.Backend. +func (sc *serviceClient) Churp() churpAPI.Backend { + return sc.churpClient +} + // Implements api.ServiceClient. func (sc *serviceClient) ServiceDescriptor() tmapi.ServiceDescriptor { return tmapi.NewStaticServiceDescriptor(api.ModuleName, app.EventType, []cmtpubsub.Query{app.QueryApp}) @@ -51,7 +59,10 @@ func (sc *serviceClient) ServiceDescriptor() tmapi.ServiceDescriptor { // Implements api.ServiceClient. func (sc *serviceClient) DeliverEvent(_ context.Context, _ int64, _ cmttypes.Tx, ev *cmtabcitypes.Event) error { - return sc.secretsClient.DeliverEvent(ev) + if err := sc.secretsClient.DeliverEvent(ev); err != nil { + return err + } + return sc.churpClient.DeliverEvent(ev) } // New constructs a new CometBFT backed key manager management Backend @@ -63,13 +74,20 @@ func New(ctx context.Context, backend tmapi.Backend) (ServiceClient, error) { } querier := a.QueryFactory().(*app.QueryFactory) + secretsClient, err := secrets.New(ctx, querier) if err != nil { return nil, fmt.Errorf("cometbft/keymanager: failed to create secrets client: %w", err) } + churpClient, err := churp.New(ctx, querier) + if err != nil { + return nil, fmt.Errorf("cometbft/keymanager: failed to create churp client: %w", err) + } + sc := serviceClient{ secretsClient: secretsClient, + churpClient: churpClient, } return &sc, nil diff --git a/go/keymanager/api/api.go b/go/keymanager/api/api.go index 1ceaf9ce6fe..ae465e9e6a3 100644 --- a/go/keymanager/api/api.go +++ b/go/keymanager/api/api.go @@ -9,6 +9,7 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/crypto/signature" memorySigner "github.com/oasisprotocol/oasis-core/go/common/crypto/signature/signers/memory" + "github.com/oasisprotocol/oasis-core/go/keymanager/churp" "github.com/oasisprotocol/oasis-core/go/keymanager/secrets" ) @@ -41,6 +42,9 @@ type Backend interface { // Secrets returns the key manager secrets management implementation. Secrets() secrets.Backend + + // Churp returns the key manager CHURP management implementation. + Churp() churp.Backend } // Genesis is the key manager management genesis state. diff --git a/go/keymanager/api/grpc.go b/go/keymanager/api/grpc.go index 094bb7e48d3..d3d6aa524cf 100644 --- a/go/keymanager/api/grpc.go +++ b/go/keymanager/api/grpc.go @@ -3,26 +3,34 @@ package api import ( "google.golang.org/grpc" + "github.com/oasisprotocol/oasis-core/go/keymanager/churp" "github.com/oasisprotocol/oasis-core/go/keymanager/secrets" ) // RegisterService registers a new keymanager backend service with the given gRPC server. func RegisterService(server *grpc.Server, service Backend) { secrets.RegisterService(server, service.Secrets()) + churp.RegisterService(server, service.Churp()) } // KeymanagerClient is a gRPC keymanager client. type KeymanagerClient struct { secretsClient *secrets.Client + churpClient *churp.Client } func (c *KeymanagerClient) Secrets() *secrets.Client { return c.secretsClient } +func (c *KeymanagerClient) Churp() *churp.Client { + return c.churpClient +} + // NewKeymanagerClient creates a new gRPC keymanager client service. func NewKeymanagerClient(c *grpc.ClientConn) *KeymanagerClient { return &KeymanagerClient{ secretsClient: secrets.NewClient(c), + churpClient: churp.NewClient(c), } } diff --git a/go/keymanager/churp/api.go b/go/keymanager/churp/api.go index e52e7291cb9..f092725dbe5 100644 --- a/go/keymanager/churp/api.go +++ b/go/keymanager/churp/api.go @@ -1,6 +1,7 @@ package churp import ( + "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/errors" "github.com/oasisprotocol/oasis-core/go/consensus/api/transaction" ) @@ -67,3 +68,10 @@ func NewUpdateTx(nonce uint64, fee *transaction.Fee, req *UpdateRequest) *transa func NewApplyTx(nonce uint64, fee *transaction.Fee, req *SignedApplicationRequest) *transaction.Transaction { return transaction.NewTransaction(nonce, fee, MethodApply, req) } + +// StatusQuery is a status query by CHURP and runtime ID. +type StatusQuery struct { + Height int64 `json:"height"` + RuntimeID common.Namespace `json:"runtime_id"` + ChurpID uint8 `json:"churp_id"` +} diff --git a/go/keymanager/churp/backend.go b/go/keymanager/churp/backend.go new file mode 100644 index 00000000000..e475411c151 --- /dev/null +++ b/go/keymanager/churp/backend.go @@ -0,0 +1,27 @@ +package churp + +import ( + "context" + + "github.com/oasisprotocol/oasis-core/go/common/pubsub" + registry "github.com/oasisprotocol/oasis-core/go/registry/api" +) + +// Backend is a CHURP management implementation. +type Backend interface { + // Status returns the CHURP status for the specified runtime and CHURP + // instance. + Status(context.Context, *StatusQuery) (*Status, error) + + // Statuses returns the CHURP statuses for the specified runtime. + Statuses(context.Context, *registry.NamespaceQuery) ([]*Status, error) + + // AllStatuses returns the CHURP statuses for all runtimes. + AllStatuses(context.Context, int64) ([]*Status, error) + + // WatchStatuses returns a channel that produces a stream of messages + // containing CHURP statuses as they change over time. + // + // Upon subscription the current statuses are sent immediately. + WatchStatuses() (<-chan *Status, *pubsub.Subscription) +} diff --git a/go/keymanager/churp/grpc.go b/go/keymanager/churp/grpc.go new file mode 100644 index 00000000000..fa383f9471b --- /dev/null +++ b/go/keymanager/churp/grpc.go @@ -0,0 +1,221 @@ +package churp + +import ( + "context" + + "google.golang.org/grpc" + + cmnGrpc "github.com/oasisprotocol/oasis-core/go/common/grpc" + "github.com/oasisprotocol/oasis-core/go/common/pubsub" + registry "github.com/oasisprotocol/oasis-core/go/registry/api" +) + +var ( + // serviceName is the gRPC service name. + serviceName = cmnGrpc.NewServiceName("KeyManager.Churp") + + // methodGetStatus is the GetStatus method. + methodStatus = serviceName.NewMethod("Status", StatusQuery{}) + // methodGetStatuses is the GetStatuses method. + methodStatuses = serviceName.NewMethod("Statuses", registry.NamespaceQuery{}) + // methodGetStatuses is the GetStatuses method. + methodAllStatuses = serviceName.NewMethod("AllStatuses", int64(0)) + + // methodWatchStatuses is the WatchStatuses method. + methodWatchStatuses = serviceName.NewMethod("WatchStatuses", nil) + + // serviceDesc is the gRPC service descriptor. + serviceDesc = grpc.ServiceDesc{ + ServiceName: string(serviceName), + HandlerType: (*Backend)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: methodStatus.ShortName(), + Handler: handlerStatus, + }, + { + MethodName: methodStatuses.ShortName(), + Handler: handlerStatuses, + }, + { + MethodName: methodAllStatuses.ShortName(), + Handler: handlerAllStatuses, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: methodWatchStatuses.ShortName(), + Handler: handlerWatchStatuses, + ServerStreams: true, + }, + }, + } +) + +func handlerStatus( + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var query StatusQuery + if err := dec(&query); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(Backend).Status(ctx, &query) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodStatus.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(Backend).Status(ctx, req.(*StatusQuery)) + } + return interceptor(ctx, &query, info, handler) +} + +func handlerStatuses( + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var query registry.NamespaceQuery + if err := dec(&query); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(Backend).Statuses(ctx, &query) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodStatuses.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(Backend).Statuses(ctx, req.(*registry.NamespaceQuery)) + } + return interceptor(ctx, &query, info, handler) +} + +func handlerAllStatuses( + srv interface{}, + ctx context.Context, + dec func(interface{}) error, + interceptor grpc.UnaryServerInterceptor, +) (interface{}, error) { + var height int64 + if err := dec(&height); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(Backend).AllStatuses(ctx, height) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: methodAllStatuses.FullName(), + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(Backend).AllStatuses(ctx, req.(int64)) + } + return interceptor(ctx, height, info, handler) +} + +func handlerWatchStatuses(srv interface{}, stream grpc.ServerStream) error { + if err := stream.RecvMsg(nil); err != nil { + return err + } + + ctx := stream.Context() + ch, sub := srv.(Backend).WatchStatuses() + defer sub.Close() + + for { + select { + case stat, ok := <-ch: + if !ok { + return nil + } + + if err := stream.SendMsg(stat); err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// RegisterService registers a new keymanager CHURP backend service with the given gRPC server. +func RegisterService(server *grpc.Server, service Backend) { + server.RegisterService(&serviceDesc, service) +} + +// Client is a gRPC keymanager secrets client. +type Client struct { + conn *grpc.ClientConn +} + +func (c *Client) Status(ctx context.Context, query *StatusQuery) (*Status, error) { + var resp Status + if err := c.conn.Invoke(ctx, methodStatus.FullName(), query, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *Client) Statuses(ctx context.Context, query *registry.NamespaceQuery) (*Status, error) { + var resp Status + if err := c.conn.Invoke(ctx, methodStatuses.FullName(), query, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *Client) AllStatuses(ctx context.Context, height int64) ([]*Status, error) { + var resp []*Status + if err := c.conn.Invoke(ctx, methodAllStatuses.FullName(), height, &resp); err != nil { + return nil, err + } + return resp, nil +} + +func (c *Client) WatchStatuses(ctx context.Context) (<-chan *Status, pubsub.ClosableSubscription, error) { + ctx, sub := pubsub.NewContextSubscription(ctx) + + stream, err := c.conn.NewStream(ctx, &serviceDesc.Streams[0], methodWatchStatuses.FullName()) + if err != nil { + return nil, nil, err + } + if err = stream.SendMsg(nil); err != nil { + return nil, nil, err + } + if err = stream.CloseSend(); err != nil { + return nil, nil, err + } + + ch := make(chan *Status) + go func() { + defer close(ch) + + for { + var stat Status + if serr := stream.RecvMsg(&stat); serr != nil { + return + } + + select { + case ch <- &stat: + case <-ctx.Done(): + return + } + } + }() + + return ch, sub, nil +} + +// NewClient creates a new gRPC keymanager CHURP client service. +func NewClient(c *grpc.ClientConn) *Client { + return &Client{c} +}