From 43003d4a711654a6fc9751e87921bba2d4a2f78f Mon Sep 17 00:00:00 2001 From: Injun Song Date: Fri, 12 Jul 2024 01:50:30 +0900 Subject: [PATCH] fix(metarepos): check the NodeID for adding and removing peers This PR makes the MetadataRepository server validate the NodeID while processing RPCs such as AddPeer and RemovePeer. When the given NodeID is invalid, it returns the gRPC InvalidArgument status code. In the case of AddPeer, it prevents unexpected peer addition that has an invalid NodeID. --- .../metarepos/raft_metadata_repository.go | 8 + .../raft_metadata_repository_test.go | 174 ++++++++++++++++++ proto/mrpb/management.pb.go | 70 ++++--- proto/mrpb/management.proto | 38 ++-- 4 files changed, 252 insertions(+), 38 deletions(-) diff --git a/internal/metarepos/raft_metadata_repository.go b/internal/metarepos/raft_metadata_repository.go index 25b1a3b46..03bae46ae 100644 --- a/internal/metarepos/raft_metadata_repository.go +++ b/internal/metarepos/raft_metadata_repository.go @@ -1362,6 +1362,10 @@ func (mr *RaftMetadataRepository) Unseal(ctx context.Context, lsID types.LogStre } func (mr *RaftMetadataRepository) AddPeer(ctx context.Context, _ types.ClusterID, nodeID types.NodeID, url string) error { + if nodeID == types.InvalidNodeID { + return status.Error(codes.InvalidArgument, "invalid node id") + } + if mr.membership.IsMember(nodeID) || mr.membership.IsLearner(nodeID) { return status.Errorf(codes.AlreadyExists, "node %d, addr:%s", nodeID, url) @@ -1394,6 +1398,10 @@ func (mr *RaftMetadataRepository) AddPeer(ctx context.Context, _ types.ClusterID } func (mr *RaftMetadataRepository) RemovePeer(ctx context.Context, _ types.ClusterID, nodeID types.NodeID) error { + if nodeID == types.InvalidNodeID { + return status.Error(codes.InvalidArgument, "invalid node id") + } + if !mr.membership.IsMember(nodeID) && !mr.membership.IsLearner(nodeID) { return status.Errorf(codes.NotFound, "node %d", nodeID) diff --git a/internal/metarepos/raft_metadata_repository_test.go b/internal/metarepos/raft_metadata_repository_test.go index 55769bc8b..2d296777b 100644 --- a/internal/metarepos/raft_metadata_repository_test.go +++ b/internal/metarepos/raft_metadata_repository_test.go @@ -11,6 +11,8 @@ import ( "github.com/pkg/errors" . "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/v3/raftpb" "go.uber.org/goleak" "go.uber.org/multierr" @@ -3072,3 +3074,175 @@ func TestMain(m *testing.M) { ), ) } + +func TestMetadataRepository_AddPeer(t *testing.T) { + const clusterID = types.ClusterID(1) + + tcs := []struct { + name string + testf func(t *testing.T, server *RaftMetadataRepository, client mrpb.ManagementClient) + }{ + { + name: "InvalidNodeID", + testf: func(t *testing.T, _ *RaftMetadataRepository, client mrpb.ManagementClient) { + _, err := client.AddPeer(context.Background(), &mrpb.AddPeerRequest{ + ClusterID: clusterID, + NodeID: types.InvalidNodeID, + Url: "http://127.0.0.1:11000", + }) + require.Error(t, err) + require.Equal(t, codes.InvalidArgument, status.Code(err)) + }, + }, + { + name: "AlreadyExists", + testf: func(t *testing.T, server *RaftMetadataRepository, client mrpb.ManagementClient) { + _, err := client.AddPeer(context.Background(), &mrpb.AddPeerRequest{ + ClusterID: clusterID, + NodeID: server.nodeID, + Url: server.raftNode.url, + }) + require.Error(t, err) + require.Equal(t, codes.AlreadyExists, status.Code(err)) + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + portLease, err := ports.ReserveWeaklyWithRetry(10000) + require.NoError(t, err) + + peer := fmt.Sprintf("http://127.0.0.1:%d", portLease.Base()) + node := NewRaftMetadataRepository( + WithClusterID(clusterID), + WithRaftAddress(peer), + WithRPCAddress("127.0.0.1:0"), + WithRaftDirectory(t.TempDir()+"/raftdata"), + ) + t.Cleanup(func() { + err := node.Close() + require.NoError(t, err) + }) + + node.Run() + + // Wait for initialization + require.EventuallyWithT(t, func(collect *assert.CollectT) { + addr := node.endpointAddr.Load() + if !assert.NotNil(collect, addr) { + return + } + }, 3*time.Second, 100*time.Millisecond) + addr := node.endpointAddr.Load().(string) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + conn, err := rpc.NewConn(context.Background(), addr) + assert.NoError(collect, err) + defer func() { + err := conn.Close() + assert.NoError(collect, err) + }() + + healthClient := grpc_health_v1.NewHealthClient(conn.Conn) + _, err = healthClient.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}) + assert.NoError(collect, err) + }, 3*time.Second, 100*time.Millisecond) + + conn, err := rpc.NewConn(context.Background(), addr) + require.NoError(t, err) + t.Cleanup(func() { + _ = conn.Close() + require.NoError(t, err) + }) + + client := mrpb.NewManagementClient(conn.Conn) + tc.testf(t, node, client) + }) + } +} + +func TestMetadataRepository_RemovePeer(t *testing.T) { + const clusterID = types.ClusterID(1) + + tcs := []struct { + name string + testf func(t *testing.T, server *RaftMetadataRepository, client mrpb.ManagementClient) + }{ + { + name: "InvalidNodeID", + testf: func(t *testing.T, _ *RaftMetadataRepository, client mrpb.ManagementClient) { + _, err := client.RemovePeer(context.Background(), &mrpb.RemovePeerRequest{ + ClusterID: clusterID, + NodeID: types.InvalidNodeID, + }) + require.Error(t, err) + require.Equal(t, codes.InvalidArgument, status.Code(err)) + }, + }, + { + name: "NotFound", + testf: func(t *testing.T, server *RaftMetadataRepository, client mrpb.ManagementClient) { + _, err := client.RemovePeer(context.Background(), &mrpb.RemovePeerRequest{ + ClusterID: clusterID, + NodeID: server.nodeID + 1, + }) + require.Error(t, err) + require.Equal(t, codes.NotFound, status.Code(err)) + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + portLease, err := ports.ReserveWeaklyWithRetry(10000) + require.NoError(t, err) + + peer := fmt.Sprintf("http://127.0.0.1:%d", portLease.Base()) + node := NewRaftMetadataRepository( + WithClusterID(clusterID), + WithRaftAddress(peer), + WithRPCAddress("127.0.0.1:0"), + WithRaftDirectory(t.TempDir()+"/raftdata"), + ) + t.Cleanup(func() { + err := node.Close() + require.NoError(t, err) + }) + + node.Run() + + // Wait for initialization + require.EventuallyWithT(t, func(collect *assert.CollectT) { + addr := node.endpointAddr.Load() + if !assert.NotNil(collect, addr) { + return + } + }, 3*time.Second, 100*time.Millisecond) + addr := node.endpointAddr.Load().(string) + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + conn, err := rpc.NewConn(context.Background(), addr) + assert.NoError(collect, err) + defer func() { + err := conn.Close() + assert.NoError(collect, err) + }() + + healthClient := grpc_health_v1.NewHealthClient(conn.Conn) + _, err = healthClient.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}) + assert.NoError(collect, err) + }, 3*time.Second, 100*time.Millisecond) + + conn, err := rpc.NewConn(context.Background(), addr) + require.NoError(t, err) + t.Cleanup(func() { + _ = conn.Close() + require.NoError(t, err) + }) + + client := mrpb.NewManagementClient(conn.Conn) + tc.testf(t, node, client) + }) + } +} diff --git a/proto/mrpb/management.pb.go b/proto/mrpb/management.pb.go index c9683738f..9c29270e8 100644 --- a/proto/mrpb/management.pb.go +++ b/proto/mrpb/management.pb.go @@ -33,7 +33,7 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package // AddPeerRequest is a request message for AddPeer RPC. // -// TODO: TODO: Define a new message representing a new peer, such as "Peer" or +// TODO: Define a new message representing a new peer, such as "Peer" or // "PeerInfo" and use it rather than primitive-type fields. // See: // - https://protobuf.dev/programming-guides/api/#dont-include-primitive-types @@ -458,21 +458,31 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type ManagementClient interface { - // AddPeer is a remote procedure to add a new node to the Raft cluster. If the - // node is already a member or learner, it fails and returns the gRPC status - // code "AlreadyExists". Users can cancel this RPC, but it doesn't guarantee - // that adding a new peer is not handled. + // AddPeer adds a new node to the Raft cluster. // - // TODO: Check if the cluster ID is the same as the current node's. If they - // are not the same, return a proper gRPC status code. + // It takes an AddPeerRequest as an argument and checks the validity of the + // given Node ID. If the Node ID is invalid, it returns a gRPC status code + // "InvalidArgument". If the node is already a member or learner, it returns a + // gRPC status code "AlreadyExists". Upon successful execution, this operation + // returns an instance of google.protobuf.Empty. + // + // Note that users can cancel this operation, but cancellation does not + // guarantee that the addition of a new peer will not be handled. + // + // TODO: Implement a check for the cluster ID. AddPeer(ctx context.Context, in *AddPeerRequest, opts ...grpc.CallOption) (*types.Empty, error) - // RemovePeer is a remote procedure to remove a node from the Raft cluster. If - // the node is neither a member nor a learner of the cluster, it fails and - // returns the gRPC status code "NotFound". Users can cancel this RPC, but it - // doesn't guarantee that the node will not be removed. + // RemovePeer removes a specific node from a Raft cluster. // - // TODO: Check if the cluster ID is the same as the current node's. If they - // are not the same, return a proper gRPC status code. + // It takes a RemovePeerRequest as an argument and checks the validity of the + // Node ID. If the Node ID is invalid, it returns a gRPC status code + // "InvalidArgument". If the node is neither a member nor a learner in the + // cluster, it returns a gRPC status code "NotFound". Upon successful + // execution, this operation returns an instance of google.protobuf.Empty. + // + // Note that although users can cancel this operation, cancellation does not + // guarantee that the node will not be removed. + // + // TODO: Implement a check for the cluster ID. RemovePeer(ctx context.Context, in *RemovePeerRequest, opts ...grpc.CallOption) (*types.Empty, error) // GetClusterInfo is a remote procedure used to retrieve information about the // Raft cluster, specifically the ClusterInfo. If the current node is not a @@ -524,21 +534,31 @@ func (c *managementClient) GetClusterInfo(ctx context.Context, in *GetClusterInf // ManagementServer is the server API for Management service. type ManagementServer interface { - // AddPeer is a remote procedure to add a new node to the Raft cluster. If the - // node is already a member or learner, it fails and returns the gRPC status - // code "AlreadyExists". Users can cancel this RPC, but it doesn't guarantee - // that adding a new peer is not handled. + // AddPeer adds a new node to the Raft cluster. // - // TODO: Check if the cluster ID is the same as the current node's. If they - // are not the same, return a proper gRPC status code. + // It takes an AddPeerRequest as an argument and checks the validity of the + // given Node ID. If the Node ID is invalid, it returns a gRPC status code + // "InvalidArgument". If the node is already a member or learner, it returns a + // gRPC status code "AlreadyExists". Upon successful execution, this operation + // returns an instance of google.protobuf.Empty. + // + // Note that users can cancel this operation, but cancellation does not + // guarantee that the addition of a new peer will not be handled. + // + // TODO: Implement a check for the cluster ID. AddPeer(context.Context, *AddPeerRequest) (*types.Empty, error) - // RemovePeer is a remote procedure to remove a node from the Raft cluster. If - // the node is neither a member nor a learner of the cluster, it fails and - // returns the gRPC status code "NotFound". Users can cancel this RPC, but it - // doesn't guarantee that the node will not be removed. + // RemovePeer removes a specific node from a Raft cluster. // - // TODO: Check if the cluster ID is the same as the current node's. If they - // are not the same, return a proper gRPC status code. + // It takes a RemovePeerRequest as an argument and checks the validity of the + // Node ID. If the Node ID is invalid, it returns a gRPC status code + // "InvalidArgument". If the node is neither a member nor a learner in the + // cluster, it returns a gRPC status code "NotFound". Upon successful + // execution, this operation returns an instance of google.protobuf.Empty. + // + // Note that although users can cancel this operation, cancellation does not + // guarantee that the node will not be removed. + // + // TODO: Implement a check for the cluster ID. RemovePeer(context.Context, *RemovePeerRequest) (*types.Empty, error) // GetClusterInfo is a remote procedure used to retrieve information about the // Raft cluster, specifically the ClusterInfo. If the current node is not a diff --git a/proto/mrpb/management.proto b/proto/mrpb/management.proto index 651df504a..ad5f54e9a 100644 --- a/proto/mrpb/management.proto +++ b/proto/mrpb/management.proto @@ -16,7 +16,7 @@ option (gogoproto.goproto_sizecache_all) = false; // AddPeerRequest is a request message for AddPeer RPC. // -// TODO: TODO: Define a new message representing a new peer, such as "Peer" or +// TODO: Define a new message representing a new peer, such as "Peer" or // "PeerInfo" and use it rather than primitive-type fields. // See: // - https://protobuf.dev/programming-guides/api/#dont-include-primitive-types @@ -96,22 +96,34 @@ message GetClusterInfoResponse { // Management service manages the Raft cluster of the Metadata Repository. service Management { - // AddPeer is a remote procedure to add a new node to the Raft cluster. If the - // node is already a member or learner, it fails and returns the gRPC status - // code "AlreadyExists". Users can cancel this RPC, but it doesn't guarantee - // that adding a new peer is not handled. + // AddPeer adds a new node to the Raft cluster. // - // TODO: Check if the cluster ID is the same as the current node's. If they - // are not the same, return a proper gRPC status code. + // It takes an AddPeerRequest as an argument and checks the validity of the + // given Node ID. If the Node ID is invalid, it returns a gRPC status code + // "InvalidArgument". If the node is already a member or learner, it returns a + // gRPC status code "AlreadyExists". Upon successful execution, this operation + // returns an instance of google.protobuf.Empty. + // + // Note that users can cancel this operation, but cancellation does not + // guarantee that the addition of a new peer will not be handled. + // + // TODO: Implement a check for the cluster ID. rpc AddPeer(AddPeerRequest) returns (google.protobuf.Empty) {} - // RemovePeer is a remote procedure to remove a node from the Raft cluster. If - // the node is neither a member nor a learner of the cluster, it fails and - // returns the gRPC status code "NotFound". Users can cancel this RPC, but it - // doesn't guarantee that the node will not be removed. + + // RemovePeer removes a specific node from a Raft cluster. // - // TODO: Check if the cluster ID is the same as the current node's. If they - // are not the same, return a proper gRPC status code. + // It takes a RemovePeerRequest as an argument and checks the validity of the + // Node ID. If the Node ID is invalid, it returns a gRPC status code + // "InvalidArgument". If the node is neither a member nor a learner in the + // cluster, it returns a gRPC status code "NotFound". Upon successful + // execution, this operation returns an instance of google.protobuf.Empty. + // + // Note that although users can cancel this operation, cancellation does not + // guarantee that the node will not be removed. + // + // TODO: Implement a check for the cluster ID. rpc RemovePeer(RemovePeerRequest) returns (google.protobuf.Empty) {} + // GetClusterInfo is a remote procedure used to retrieve information about the // Raft cluster, specifically the ClusterInfo. If the current node is not a // member of the cluster, it will fail and return the gRPC status code