Skip to content

Commit

Permalink
Expose the client utils for external integrations
Browse files Browse the repository at this point in the history
  • Loading branch information
Praveenrajmani committed Dec 26, 2023
1 parent c580047 commit fd0d1cd
Show file tree
Hide file tree
Showing 34 changed files with 414 additions and 395 deletions.
3 changes: 1 addition & 2 deletions cmd/kubectl-directpv/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/minio/directpv/pkg/k8s"
"github.com/minio/directpv/pkg/types"
"github.com/minio/directpv/pkg/utils"
"github.com/minio/directpv/pkg/volume"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -141,7 +140,7 @@ func cleanMain(ctx context.Context) {
ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()

resultCh := volume.NewLister().
resultCh := client.NewVolumeLister().
NodeSelector(toLabelValues(nodesArgs)).
DriveNameSelector(toLabelValues(drivesArgs)).
DriveIDSelector(toLabelValues(driveIDArgs)).
Expand Down
6 changes: 2 additions & 4 deletions cmd/kubectl-directpv/cordon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/drive"
"github.com/minio/directpv/pkg/utils"
"github.com/minio/directpv/pkg/volume"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -121,7 +119,7 @@ func cordonMain(ctx context.Context) {
ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()

resultCh := drive.NewLister().
resultCh := client.NewDriveLister().
NodeSelector(toLabelValues(nodesArgs)).
DriveNameSelector(toLabelValues(drivesArgs)).
StatusSelector(driveStatusSelectors).
Expand All @@ -141,7 +139,7 @@ func cordonMain(ctx context.Context) {

volumes := result.Drive.GetVolumes()
if len(volumes) != 0 {
for vresult := range volume.NewLister().VolumeNameSelector(volumes).IgnoreNotFound(true).List(ctx) {
for vresult := range client.NewVolumeLister().VolumeNameSelector(volumes).IgnoreNotFound(true).List(ctx) {
if vresult.Err != nil {
utils.Eprintf(quietFlag, true, "%v\n", vresult.Err)
os.Exit(1)
Expand Down
79 changes: 6 additions & 73 deletions cmd/kubectl-directpv/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/node"
"github.com/minio/directpv/pkg/types"
"github.com/minio/directpv/pkg/utils"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -95,33 +94,6 @@ func validateDiscoverCmd() error {
return validateDriveNameArgs()
}

func toInitConfig(resultMap map[directpvtypes.NodeID][]types.Device) InitConfig {
nodeInfo := []NodeInfo{}
initConfig := NewInitConfig()
for node, devices := range resultMap {
driveInfo := []DriveInfo{}
for _, device := range devices {
if device.DeniedReason != "" {
continue
}
driveInfo = append(driveInfo, DriveInfo{
ID: device.ID,
Name: device.Name,
Size: device.Size,
Make: device.Make,
FS: device.FSType,
Select: driveSelectedValue,
})
}
nodeInfo = append(nodeInfo, NodeInfo{
Name: node,
Drives: driveInfo,
})
}
initConfig.Nodes = nodeInfo
return initConfig
}

func showDevices(resultMap map[directpvtypes.NodeID][]types.Device) error {
writer := newTableWriter(
table.Row{
Expand Down Expand Up @@ -189,7 +161,7 @@ func showDevices(resultMap map[directpvtypes.NodeID][]types.Device) error {
return nil
}

func writeInitConfig(config InitConfig) error {
func writeInitConfig(config types.InitConfig) error {
f, err := os.OpenFile(outputFile, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o644)
if err != nil {
return err
Expand Down Expand Up @@ -232,7 +204,7 @@ func discoverDevices(ctx context.Context, nodes []types.Node, teaProgram *tea.Pr
ctx, cancel := context.WithTimeout(ctx, nodeListTimeout)
defer cancel()

eventCh, stop, err := node.NewLister().
eventCh, stop, err := client.NewNodeLister().
NodeSelector(toLabelValues(nodeNames)).
Watch(ctx)
if err != nil {
Expand All @@ -253,7 +225,7 @@ func discoverDevices(ctx context.Context, nodes []types.Node, teaProgram *tea.Pr
}
switch event.Type {
case watch.Modified, watch.Added:
node := event.Node
node := event.Item
if !node.Spec.Refresh {
devices[directpvtypes.NodeID(node.Name)] = node.GetDevicesByNames(drivesArgs)
if teaProgram != nil {
Expand All @@ -280,52 +252,13 @@ func discoverDevices(ctx context.Context, nodes []types.Node, teaProgram *tea.Pr
}
}

func syncNodes(ctx context.Context) (err error) {
csiNodes, err := getCSINodes(ctx)
if err != nil {
return fmt.Errorf("unable to get CSI nodes; %w", err)
}

nodes, err := node.NewLister().Get(ctx)
if err != nil {
return fmt.Errorf("unable to get nodes; %w", err)
}

var nodeNames []string
for _, node := range nodes {
nodeNames = append(nodeNames, node.Name)
}

// Add missing nodes.
for _, csiNode := range csiNodes {
if !utils.Contains(nodeNames, csiNode) {
node := types.NewNode(directpvtypes.NodeID(csiNode), nil)
node.Spec.Refresh = true
if _, err = client.NodeClient().Create(ctx, node, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("unable to create node %v; %w", csiNode, err)
}
}
}

// Remove non-existing nodes.
for _, nodeName := range nodeNames {
if !utils.Contains(csiNodes, nodeName) {
if err = client.NodeClient().Delete(ctx, nodeName, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("unable to remove non-existing node %v; %w", nodeName, err)
}
}
}

return nil
}

func discoverMain(ctx context.Context) {
if err := syncNodes(ctx); err != nil {
if err := client.SyncNodes(ctx); err != nil {
utils.Eprintf(quietFlag, true, "sync failed; %v\n", err)
os.Exit(1)
}

nodes, err := node.NewLister().
nodes, err := client.NewNodeLister().
NodeSelector(toLabelValues(nodesArgs)).
Get(ctx)
if err != nil {
Expand Down Expand Up @@ -374,7 +307,7 @@ func discoverMain(ctx context.Context) {
os.Exit(1)
}

if err := writeInitConfig(toInitConfig(resultMap)); err != nil {
if err := writeInitConfig(types.ToInitConfig(resultMap)); err != nil {
utils.Eprintf(quietFlag, true, "unable to write init config; %v\n", err)
} else if !quietFlag {
color.HiGreen("Generated '%s' successfully.", outputFile)
Expand Down
9 changes: 4 additions & 5 deletions cmd/kubectl-directpv/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import (
"github.com/dustin/go-humanize"
"github.com/fatih/color"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/drive"
"github.com/minio/directpv/pkg/k8s"
"github.com/minio/directpv/pkg/utils"
"github.com/minio/directpv/pkg/volume"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -66,7 +65,7 @@ func infoMain(ctx context.Context) {
os.Exit(1)
}

nodeList, err := getCSINodes(ctx)
nodeList, err := client.GetCSINodes(ctx)
if err != nil {
utils.Eprintf(quietFlag, true, "%v\n", err)
os.Exit(1)
Expand All @@ -77,13 +76,13 @@ func infoMain(ctx context.Context) {
os.Exit(1)
}

drives, err := drive.NewLister().Get(ctx)
drives, err := client.NewDriveLister().Get(ctx)
if err != nil {
utils.Eprintf(quietFlag, true, "unable to get drive list; %v\n", err)
os.Exit(1)
}

volumes, err := volume.NewLister().Get(ctx)
volumes, err := client.NewVolumeLister().Get(ctx)
if err != nil {
utils.Eprintf(quietFlag, true, "unable to get volume list; %v\n", err)
os.Exit(1)
Expand Down
33 changes: 5 additions & 28 deletions cmd/kubectl-directpv/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ import (

tea "github.com/charmbracelet/bubbletea"
"github.com/fatih/color"
"github.com/google/uuid"
"github.com/jedib0t/go-pretty/v6/table"
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/initrequest"
"github.com/minio/directpv/pkg/types"
"github.com/minio/directpv/pkg/utils"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -86,26 +84,6 @@ func init() {
addDangerousFlag(initCmd, "Perform initialization of drives which will permanently erase existing data")
}

func toInitRequestObjects(config *InitConfig, requestID string) (initRequests []types.InitRequest) {
for _, node := range config.Nodes {
initDevices := []types.InitDevice{}
for _, device := range node.Drives {
if strings.ToLower(device.Select) != driveSelectedValue {
continue
}
initDevices = append(initDevices, types.InitDevice{
ID: device.ID,
Name: device.Name,
Force: device.FS != "",
})
}
if len(initDevices) > 0 {
initRequests = append(initRequests, *types.NewInitRequest(requestID, node.Name, initDevices))
}
}
return
}

func showResults(results []initResult) {
if len(results) == 0 {
return
Expand Down Expand Up @@ -195,7 +173,7 @@ func initDevices(ctx context.Context, initRequests []types.InitRequest, requestI
ctx, cancel := context.WithTimeout(ctx, initRequestListTimeout)
defer cancel()

eventCh, stop, err := initrequest.NewLister().
eventCh, stop, err := client.NewInitRequestLister().
RequestIDSelector(toLabelValues([]string{requestID})).
Watch(ctx)
if err != nil {
Expand All @@ -216,7 +194,7 @@ func initDevices(ctx context.Context, initRequests []types.InitRequest, requestI
}
switch event.Type {
case watch.Modified, watch.Added:
initReq := event.InitRequest
initReq := event.Item
if initReq.Status.Status != directpvtypes.InitStatusPending {
results = append(results, initResult{
requestID: initReq.Name,
Expand Down Expand Up @@ -250,13 +228,13 @@ func initDevices(ctx context.Context, initRequests []types.InitRequest, requestI
}
}

func readInitConfig(inputFile string) (*InitConfig, error) {
func readInitConfig(inputFile string) (*types.InitConfig, error) {
f, err := os.Open(inputFile)
if err != nil {
return nil, err
}
defer f.Close()
return parseInitConfig(f)
return types.ParseInitConfig(f)
}

func initMain(ctx context.Context, inputFile string) {
Expand All @@ -265,8 +243,7 @@ func initMain(ctx context.Context, inputFile string) {
utils.Eprintf(quietFlag, true, "unable to read the input file; %v", err.Error())
os.Exit(1)
}
requestID := uuid.New().String()
initRequests := toInitRequestObjects(initConfig, requestID)
initRequests, requestID := initConfig.ToInitRequestObjects()
if len(initRequests) == 0 {
utils.Eprintf(false, false, "%v\n", color.HiYellowString("No drives are available to init"))
os.Exit(1)
Expand Down
61 changes: 0 additions & 61 deletions cmd/kubectl-directpv/init_config.go

This file was deleted.

4 changes: 2 additions & 2 deletions cmd/kubectl-directpv/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (
"github.com/fatih/color"
"github.com/jedib0t/go-pretty/v6/table"
directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types"
"github.com/minio/directpv/pkg/client"
"github.com/minio/directpv/pkg/consts"
"github.com/minio/directpv/pkg/installer"
legacyclient "github.com/minio/directpv/pkg/legacy/client"
"github.com/minio/directpv/pkg/utils"
"github.com/minio/directpv/pkg/volume"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -221,7 +221,7 @@ func getLegacyFlag(ctx context.Context) bool {
ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()

resultCh := volume.NewLister().
resultCh := client.NewVolumeLister().
LabelSelector(
map[directpvtypes.LabelKey]directpvtypes.LabelValue{
directpvtypes.MigratedLabelKey: "true",
Expand Down
Loading

0 comments on commit fd0d1cd

Please sign in to comment.