diff --git a/src/go/rpk/pkg/cli/cluster/storage/BUILD b/src/go/rpk/pkg/cli/cluster/storage/BUILD index 2558cc6521250..e6a19e933988c 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/BUILD +++ b/src/go/rpk/pkg/cli/cluster/storage/BUILD @@ -18,7 +18,6 @@ go_library( "//src/go/rpk/pkg/cli/cluster/storage/recovery", "//src/go/rpk/pkg/config", "//src/go/rpk/pkg/out", - "//src/go/rpk/pkg/publicapi", "@build_buf_gen_go_redpandadata_dataplane_protocolbuffers_go//redpanda/api/dataplane/v1alpha2", "@com_connectrpc_connect//:connect", "@com_github_redpanda_data_common_go_rpadmin//:rpadmin", diff --git a/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go b/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go index 25753540d694b..0178b558384eb 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go @@ -46,8 +46,8 @@ Cancel a mount/unmount operation out.MaybeDie(err, "invalid migration ID: %v", err) if p.FromCloud { - cl, err := createDataplaneClient(p) - out.MaybeDieErr(err) + cl, err := p.DataplaneClient() + out.MaybeDie(err, "unable to initialize cloud client: %v", err) req := connect.NewRequest( &dataplanev1alpha2.UpdateMountTaskRequest{ diff --git a/src/go/rpk/pkg/cli/cluster/storage/list-mount.go b/src/go/rpk/pkg/cli/cluster/storage/list-mount.go index a29986c8c8f39..07be49b655ede 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/list-mount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/list-mount.go @@ -60,8 +60,8 @@ Use filter to list only migrations in a specific state var migrations []rpadmin.MigrationState if p.FromCloud { - cl, err := createDataplaneClient(p) - out.MaybeDieErr(err) + cl, err := p.DataplaneClient() + out.MaybeDie(err, "unable to initialize cloud client: %v", err) resp, err := cl.CloudStorage.ListMountTasks(cmd.Context(), connect.NewRequest(&dataplanev1alpha2.ListMountTasksRequest{})) out.MaybeDie(err, "unable to list mount/unmount operations: %v", err) diff --git a/src/go/rpk/pkg/cli/cluster/storage/list-mountable.go b/src/go/rpk/pkg/cli/cluster/storage/list-mountable.go index 8508a34ac276e..df0fe5bc20768 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/list-mountable.go +++ b/src/go/rpk/pkg/cli/cluster/storage/list-mountable.go @@ -41,8 +41,8 @@ List all mountable topics: var mountableTopics []rpadmin.MountableTopic if p.FromCloud { - cl, err := createDataplaneClient(p) - out.MaybeDieErr(err) + cl, err := p.DataplaneClient() + out.MaybeDie(err, "unable to initialize cloud client: %v", err) resp, err := cl.CloudStorage.ListMountableTopics(cmd.Context(), connect.NewRequest(&dataplanev1alpha2.ListMountableTopicsRequest{})) out.MaybeDie(err, "unable to list mountable topics: %v", err) diff --git a/src/go/rpk/pkg/cli/cluster/storage/mount.go b/src/go/rpk/pkg/cli/cluster/storage/mount.go index fa3868f92872e..5bf78bcc9fe31 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/mount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/mount.go @@ -66,8 +66,8 @@ with my-new-topic as the new topic name if an != "" && strings.ToLower(an) != "kafka" { out.Die("Failed to parse '--to' flag: namespace %q not allowed. Only kafka topics can be mounted in Redpanda Cloud clusters", an) } - cl, err := createDataplaneClient(p) - out.MaybeDieErr(err) + cl, err := p.DataplaneClient() + out.MaybeDie(err, "unable to initialize cloud client: %v", err) topicMount := &dataplanev1alpha2.MountTopicsRequest_TopicMount{ SourceTopicReference: t, diff --git a/src/go/rpk/pkg/cli/cluster/storage/status-mount.go b/src/go/rpk/pkg/cli/cluster/storage/status-mount.go index e5e2fb364e061..e4f89d27507f5 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/status-mount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/status-mount.go @@ -51,8 +51,8 @@ Status for a mount/unmount operation var mState rpadmin.MigrationState if p.FromCloud { - cl, err := createDataplaneClient(p) - out.MaybeDieErr(err) + cl, err := p.DataplaneClient() + out.MaybeDie(err, "unable to initialize cloud client: %v", err) resp, err := cl.CloudStorage.GetMountTask( cmd.Context(), diff --git a/src/go/rpk/pkg/cli/cluster/storage/storage.go b/src/go/rpk/pkg/cli/cluster/storage/storage.go index 97731886d8c32..a9436a93b8325 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/storage.go +++ b/src/go/rpk/pkg/cli/cluster/storage/storage.go @@ -10,11 +10,8 @@ package storage import ( - "fmt" - "github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/storage/recovery" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" - "github.com/redpanda-data/redpanda/src/go/rpk/pkg/publicapi" "github.com/spf13/afero" "github.com/spf13/cobra" ) @@ -35,15 +32,3 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command { ) return cmd } - -func createDataplaneClient(p *config.RpkProfile) (*publicapi.DataPlaneClientSet, error) { - url, err := p.CloudCluster.CheckClusterURL() - if err != nil { - return nil, fmt.Errorf("unable to get cluster information from your profile: %v", err) - } - cl, err := publicapi.NewDataPlaneClientSet(url, p.CurrentAuth().AuthToken) - if err != nil { - return nil, fmt.Errorf("unable to initialize cloud client: %v", err) - } - return cl, nil -} diff --git a/src/go/rpk/pkg/cli/cluster/storage/unmount.go b/src/go/rpk/pkg/cli/cluster/storage/unmount.go index 0a81436f6e172..470d28ad5a9c8 100644 --- a/src/go/rpk/pkg/cli/cluster/storage/unmount.go +++ b/src/go/rpk/pkg/cli/cluster/storage/unmount.go @@ -63,8 +63,8 @@ Unmount topic 'my-topic' from the cluster in the 'my-namespace' if ns != "" && strings.ToLower(ns) != "kafka" { out.Die("Namespace %q not allowed. Only kafka topics can be unmounted in Redpanda Cloud clusters", ns) } - cl, err := createDataplaneClient(p) - out.MaybeDieErr(err) + cl, err := p.DataplaneClient() + out.MaybeDie(err, "unable to initialize cloud client: %v", err) resp, err := cl.CloudStorage.UnmountTopics( cmd.Context(), diff --git a/src/go/rpk/pkg/cli/security/user/BUILD b/src/go/rpk/pkg/cli/security/user/BUILD index 938fb4ea87586..a6f4303f108e1 100644 --- a/src/go/rpk/pkg/cli/security/user/BUILD +++ b/src/go/rpk/pkg/cli/security/user/BUILD @@ -15,6 +15,8 @@ go_library( "//src/go/rpk/pkg/adminapi", "//src/go/rpk/pkg/config", "//src/go/rpk/pkg/out", + "@build_buf_gen_go_redpandadata_dataplane_protocolbuffers_go//redpanda/api/dataplane/v1alpha2", + "@com_connectrpc_connect//:connect", "@com_github_spf13_afero//:afero", "@com_github_spf13_cobra//:cobra", ], diff --git a/src/go/rpk/pkg/cli/security/user/create.go b/src/go/rpk/pkg/cli/security/user/create.go index 1bbacf102afcb..f0c9bb935efb7 100644 --- a/src/go/rpk/pkg/cli/security/user/create.go +++ b/src/go/rpk/pkg/cli/security/user/create.go @@ -15,6 +15,8 @@ import ( "math/big" "strings" + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" + "connectrpc.com/connect" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" @@ -57,10 +59,6 @@ acl help text for more info. } p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) - config.CheckExitNotServerlessAdmin(p) - - cl, err := adminapi.NewClient(cmd.Context(), fs, p) - out.MaybeDie(err, "unable to initialize admin client: %v", err) // Backwards compatibility: we favor the new user // format and the new password flag. If either are @@ -103,8 +101,27 @@ acl help text for more info. out.Die("unsupported mechanism %q", mechanism) } - err = cl.CreateUser(cmd.Context(), user, pass, mechanism) - out.MaybeDie(err, "unable to create user %q: %v", user, err) + if p.FromCloud { + cl, err := p.DataplaneClient() + out.MaybeDie(err, "unable to initialize cloud client: %v", err) + req := connect.NewRequest( + &dataplanev1alpha2.CreateUserRequest{ + User: &dataplanev1alpha2.CreateUserRequest_User{ + Name: user, + Password: pass, + Mechanism: stringToDataplaneMechanism(mechanism), + }, + }, + ) + _, err = cl.User.CreateUser(cmd.Context(), req) + out.MaybeDie(err, "unable to create user %q: %v", user, err) + } else { + cl, err := adminapi.NewClient(cmd.Context(), fs, p) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + err = cl.CreateUser(cmd.Context(), user, pass, mechanism) + out.MaybeDie(err, "unable to create user %q: %v", user, err) + } c := credentials{user, "", mechanism} if generated { c.Password = pass // We only want to display the password if it was generated. diff --git a/src/go/rpk/pkg/cli/security/user/delete.go b/src/go/rpk/pkg/cli/security/user/delete.go index 580e9a68ae19d..3ebdd8a7565be 100644 --- a/src/go/rpk/pkg/cli/security/user/delete.go +++ b/src/go/rpk/pkg/cli/security/user/delete.go @@ -12,6 +12,8 @@ package user import ( "fmt" + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" + "connectrpc.com/connect" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" @@ -37,10 +39,6 @@ delete any ACLs that may exist for this user. } p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) - config.CheckExitNotServerlessAdmin(p) - - cl, err := adminapi.NewClient(cmd.Context(), fs, p) - out.MaybeDie(err, "unable to initialize admin client: %v", err) // Backwards compat: we favor the new format (an // argument), but if that is empty, we use the old @@ -54,8 +52,20 @@ delete any ACLs that may exist for this user. out.Die("missing required username argument") } - err = cl.DeleteUser(cmd.Context(), user) - out.MaybeDie(err, "unable to delete user %q: %s", user, err) + if p.FromCloud { + cl, err := p.DataplaneClient() + out.MaybeDie(err, "unable to initialize cloud client: %v", err) + + req := connect.NewRequest(&dataplanev1alpha2.DeleteUserRequest{Name: user}) + _, err = cl.User.DeleteUser(cmd.Context(), req) + out.MaybeDie(err, "unable to delete user %q: %s", user, err) + } else { + cl, err := adminapi.NewClient(cmd.Context(), fs, p) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + err = cl.DeleteUser(cmd.Context(), user) + out.MaybeDie(err, "unable to delete user %q: %s", user, err) + } if isText, _, s, err := f.Format(credentials{user, "", ""}); !isText { out.MaybeDie(err, "unable to print response in the required format %q: %v", f.Kind, err) out.Exit(s) diff --git a/src/go/rpk/pkg/cli/security/user/list.go b/src/go/rpk/pkg/cli/security/user/list.go index f61a76674ffc9..23b3d738604c3 100644 --- a/src/go/rpk/pkg/cli/security/user/list.go +++ b/src/go/rpk/pkg/cli/security/user/list.go @@ -10,6 +10,8 @@ package user import ( + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" + "connectrpc.com/connect" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" @@ -30,13 +32,24 @@ func newListUsersCommand(fs afero.Fs, p *config.Params) *cobra.Command { } p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) - config.CheckExitNotServerlessAdmin(p) - cl, err := adminapi.NewClient(cmd.Context(), fs, p) - out.MaybeDie(err, "unable to initialize admin client: %v", err) + var users []string + if p.FromCloud { + cl, err := p.DataplaneClient() + out.MaybeDie(err, "unable to initialize cloud client: %v", err) - users, err := cl.ListUsers(cmd.Context()) - out.MaybeDie(err, "unable to list users: %v", err) + listUsers, err := cl.User.ListUsers(cmd.Context(), connect.NewRequest(&dataplanev1alpha2.ListUsersRequest{})) + out.MaybeDie(err, "unable to list users: %v", err) + if listUsers != nil { + users = dataplaneListUserToString(listUsers.Msg) + } + } else { + cl, err := adminapi.NewClient(cmd.Context(), fs, p) + out.MaybeDie(err, "unable to initialize admin client: %v", err) + + users, err = cl.ListUsers(cmd.Context()) + out.MaybeDie(err, "unable to list users: %v", err) + } if isText, _, s, err := f.Format(users); !isText { out.MaybeDie(err, "unable to print in the required format %q: %v", f.Kind, err) out.Exit(s) @@ -49,3 +62,13 @@ func newListUsersCommand(fs afero.Fs, p *config.Params) *cobra.Command { }, } } + +func dataplaneListUserToString(resp *dataplanev1alpha2.ListUsersResponse) []string { + var users []string + if resp != nil { + for _, u := range resp.Users { + users = append(users, u.Name) + } + } + return users +} diff --git a/src/go/rpk/pkg/cli/security/user/update.go b/src/go/rpk/pkg/cli/security/user/update.go index 3454d0aa0694c..ff20d90785a7c 100644 --- a/src/go/rpk/pkg/cli/security/user/update.go +++ b/src/go/rpk/pkg/cli/security/user/update.go @@ -13,6 +13,8 @@ import ( "fmt" "strings" + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" + "connectrpc.com/connect" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/out" @@ -30,14 +32,29 @@ func newUpdateCommand(fs afero.Fs, p *config.Params) *cobra.Command { f := p.Formatter p, err := p.LoadVirtualProfile(fs) out.MaybeDie(err, "rpk unable to load config: %v", err) - config.CheckExitNotServerlessAdmin(p) + user := args[0] + if p.FromCloud { + cl, err := p.DataplaneClient() + out.MaybeDie(err, "unable to initialize cloud client: %v", err) - cl, err := adminapi.NewClient(cmd.Context(), fs, p) - out.MaybeDie(err, "unable to initialize admin client: %v", err) + req := connect.NewRequest( + &dataplanev1alpha2.UpdateUserRequest{ + User: &dataplanev1alpha2.UpdateUserRequest_User{ + Name: user, + Password: newPass, + Mechanism: stringToDataplaneMechanism(mechanism), + }, + }, + ) + _, err = cl.User.UpdateUser(cmd.Context(), req) + out.MaybeDie(err, "unable to update the client credentials for user %q: %v", user, err) + } else { + cl, err := adminapi.NewClient(cmd.Context(), fs, p) + out.MaybeDie(err, "unable to initialize admin client: %v", err) - user := args[0] - err = cl.UpdateUser(cmd.Context(), user, newPass, strings.ToUpper(mechanism)) - out.MaybeDie(err, "unable to update the client credentials for user %q: %v", user, err) + err = cl.UpdateUser(cmd.Context(), user, newPass, strings.ToUpper(mechanism)) + out.MaybeDie(err, "unable to update the client credentials for user %q: %v", user, err) + } if isText, _, s, err := f.Format(credentials{user, "", mechanism}); !isText { out.MaybeDie(err, "unable to print credentials in the required format %q: %v", f.Kind, err) out.Exit(s) diff --git a/src/go/rpk/pkg/cli/security/user/user.go b/src/go/rpk/pkg/cli/security/user/user.go index 9f8158a9193dc..efb73acb44d1a 100644 --- a/src/go/rpk/pkg/cli/security/user/user.go +++ b/src/go/rpk/pkg/cli/security/user/user.go @@ -10,6 +10,9 @@ package user import ( + "strings" + + dataplanev1alpha2 "buf.build/gen/go/redpandadata/dataplane/protocolbuffers/go/redpanda/api/dataplane/v1alpha2" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/config" "github.com/spf13/afero" "github.com/spf13/cobra" @@ -39,3 +42,11 @@ true" in the redpanda section of your redpanda.yaml. ) return cmd } + +func stringToDataplaneMechanism(mechanism string) dataplanev1alpha2.SASLMechanism { + m := "SASL_MECHANISM_" + strings.ReplaceAll(strings.ToUpper(mechanism), "-", "_") + if sm, ok := dataplanev1alpha2.SASLMechanism_value[m]; ok { + return dataplanev1alpha2.SASLMechanism(sm) + } + return dataplanev1alpha2.SASLMechanism_SASL_MECHANISM_UNSPECIFIED +} diff --git a/src/go/rpk/pkg/config/BUILD b/src/go/rpk/pkg/config/BUILD index 4bc990ac8774d..bb74b4f006f12 100644 --- a/src/go/rpk/pkg/config/BUILD +++ b/src/go/rpk/pkg/config/BUILD @@ -17,6 +17,7 @@ go_library( "//src/go/rpk/pkg/os", "//src/go/rpk/pkg/out", "//src/go/rpk/pkg/publicapi", + "@com_connectrpc_connect//:connect", "@com_github_mattn_go_isatty//:go-isatty", "@com_github_spf13_afero//:afero", "@com_github_spf13_cobra//:cobra", diff --git a/src/go/rpk/pkg/config/rpk_yaml.go b/src/go/rpk/pkg/config/rpk_yaml.go index 6b5528904db3b..a01437817818b 100644 --- a/src/go/rpk/pkg/config/rpk_yaml.go +++ b/src/go/rpk/pkg/config/rpk_yaml.go @@ -17,12 +17,12 @@ import ( "reflect" "time" + "connectrpc.com/connect" + rpkos "github.com/redpanda-data/redpanda/src/go/rpk/pkg/os" "github.com/redpanda-data/redpanda/src/go/rpk/pkg/publicapi" "github.com/spf13/afero" "go.uber.org/zap" "gopkg.in/yaml.v3" - - rpkos "github.com/redpanda-data/redpanda/src/go/rpk/pkg/os" ) // DefaultRpkYamlPath returns the OS equivalent of ~/.config/rpk/rpk.yaml, if @@ -361,6 +361,16 @@ func (p *RpkProfile) ActualConfig() (*RpkYaml, bool) { return p.c.ActualRpkYaml() } +// DataplaneClient creates a publicapi.DataPlaneClientSet with the information +// loaded in the profile. +func (p *RpkProfile) DataplaneClient(opts ...connect.ClientOption) (*publicapi.DataPlaneClientSet, error) { + url, err := p.CloudCluster.CheckClusterURL() + if err != nil { + return nil, fmt.Errorf("unable to get cluster information from your profile: %v", err) + } + return publicapi.NewDataPlaneClientSet(url, p.CurrentAuth().AuthToken, opts...) +} + // HasClientCredentials returns if both ClientID and ClientSecret are non-empty. func (a *RpkCloudAuth) HasClientCredentials() bool { return a.ClientID != "" && a.ClientSecret != "" diff --git a/src/go/rpk/pkg/publicapi/dataplane.go b/src/go/rpk/pkg/publicapi/dataplane.go index 612a5609f709e..c053ec51996da 100644 --- a/src/go/rpk/pkg/publicapi/dataplane.go +++ b/src/go/rpk/pkg/publicapi/dataplane.go @@ -22,6 +22,7 @@ import ( type DataPlaneClientSet struct { Transform transformServiceClient CloudStorage dataplanev1alpha2connect.CloudStorageServiceClient + User dataplanev1alpha2connect.UserServiceClient } // NewDataPlaneClientSet creates a Public API client set with the service @@ -40,5 +41,6 @@ func NewDataPlaneClientSet(host, authToken string, opts ...connect.ClientOption) return &DataPlaneClientSet{ Transform: newTransformServiceClient(http.DefaultClient, host, authToken, opts...), CloudStorage: dataplanev1alpha2connect.NewCloudStorageServiceClient(http.DefaultClient, host, opts...), + User: dataplanev1alpha2connect.NewUserServiceClient(http.DefaultClient, host, opts...), }, nil }