Skip to content

Commit

Permalink
Merge pull request #252 from red-hat-storage/sync_us--devel
Browse files Browse the repository at this point in the history
Syncing latest changes from upstream devel for ceph-csi
  • Loading branch information
openshift-merge-bot[bot] authored Feb 9, 2024
2 parents b880a41 + 5afc6fd commit c319e77
Show file tree
Hide file tree
Showing 7 changed files with 574 additions and 4 deletions.
16 changes: 16 additions & 0 deletions internal/csi-common/controllerserver-default.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,19 @@ func (cs *DefaultControllerServer) ControllerGetCapabilities(
Capabilities: cs.Driver.capabilities,
}, nil
}

// GroupControllerGetCapabilities implements the default
// GroupControllerGetCapabilities GRPC callout.
func (cs *DefaultControllerServer) GroupControllerGetCapabilities(
ctx context.Context,
req *csi.GroupControllerGetCapabilitiesRequest,
) (*csi.GroupControllerGetCapabilitiesResponse, error) {
log.TraceLog(ctx, "Using default GroupControllerGetCapabilities")
if cs.Driver == nil {
return nil, status.Error(codes.Unimplemented, "Group controller server is not enabled")
}

return &csi.GroupControllerGetCapabilitiesResponse{
Capabilities: cs.Driver.groupCapabilities,
}, nil
}
38 changes: 35 additions & 3 deletions internal/csi-common/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ type CSIDriver struct {
nodeID string
version string
// topology constraints that this nodeserver will advertise
topology map[string]string
capabilities []*csi.ControllerServiceCapability
vc []*csi.VolumeCapability_AccessMode
topology map[string]string
capabilities []*csi.ControllerServiceCapability
groupCapabilities []*csi.GroupControllerServiceCapability
vc []*csi.VolumeCapability_AccessMode
}

// NewCSIDriver Creates a NewCSIDriver object. Assumes vendor
Expand Down Expand Up @@ -116,3 +117,34 @@ func (d *CSIDriver) AddVolumeCapabilityAccessModes(
func (d *CSIDriver) GetVolumeCapabilityAccessModes() []*csi.VolumeCapability_AccessMode {
return d.vc
}

// AddControllerServiceCapabilities stores the group controller capabilities
// in driver object.
func (d *CSIDriver) AddGroupControllerServiceCapabilities(cl []csi.GroupControllerServiceCapability_RPC_Type) {
csc := make([]*csi.GroupControllerServiceCapability, 0, len(cl))

for _, c := range cl {
log.DefaultLog("Enabling group controller service capability: %v", c.String())
csc = append(csc, NewGroupControllerServiceCapability(c))
}

d.groupCapabilities = csc
}

// ValidateGroupControllerServiceRequest validates the group controller
// plugin capabilities.
//
//nolint:interfacer // c can be of type fmt.Stringer, but that does not make the API clearer
func (d *CSIDriver) ValidateGroupControllerServiceRequest(c csi.GroupControllerServiceCapability_RPC_Type) error {
if c == csi.GroupControllerServiceCapability_RPC_UNKNOWN {
return nil
}

for _, capability := range d.groupCapabilities {
if c == capability.GetRpc().GetType() {
return nil
}
}

return status.Error(codes.InvalidArgument, c.String())
}
4 changes: 4 additions & 0 deletions internal/csi-common/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Servers struct {
IS csi.IdentityServer
CS csi.ControllerServer
NS csi.NodeServer
GS csi.GroupControllerServer
}

// NewNonBlockingGRPCServer return non-blocking GRPC.
Expand Down Expand Up @@ -109,6 +110,9 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, srv Servers) {
if srv.NS != nil {
csi.RegisterNodeServer(server, srv.NS)
}
if srv.GS != nil {
csi.RegisterGroupControllerServer(server, srv.GS)
}

log.DefaultLog("Listening for connections on address: %#v", listener.Addr())
err = server.Serve(listener)
Expand Down
19 changes: 19 additions & 0 deletions internal/csi-common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ func NewControllerServiceCapability(ctrlCap csi.ControllerServiceCapability_RPC_
}
}

// NewGroupControllerServiceCapability returns group controller capabilities.
func NewGroupControllerServiceCapability(ctrlCap csi.GroupControllerServiceCapability_RPC_Type,
) *csi.GroupControllerServiceCapability {
return &csi.GroupControllerServiceCapability{
Type: &csi.GroupControllerServiceCapability_Rpc{
Rpc: &csi.GroupControllerServiceCapability_RPC{
Type: ctrlCap,
},
},
}
}

// NewMiddlewareServerOption creates a new grpc.ServerOption that configures a
// common format for log messages and other gRPC related handlers.
func NewMiddlewareServerOption() grpc.ServerOption {
Expand Down Expand Up @@ -133,6 +145,13 @@ func getReqID(req interface{}) string {

case *csi.NodeExpandVolumeRequest:
reqID = r.VolumeId

case *csi.CreateVolumeGroupSnapshotRequest:
reqID = r.Name
case *csi.DeleteVolumeGroupSnapshotRequest:
reqID = r.GroupSnapshotId
case *csi.GetVolumeGroupSnapshotRequest:
reqID = r.GroupSnapshotId
}

return reqID
Expand Down
10 changes: 10 additions & 0 deletions internal/csi-common/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ func TestGetReqID(t *testing.T) {
&csi.NodeExpandVolumeRequest{
VolumeId: fakeID,
},

&csi.CreateVolumeGroupSnapshotRequest{
Name: fakeID,
},
&csi.DeleteVolumeGroupSnapshotRequest{
GroupSnapshotId: fakeID,
},
&csi.GetVolumeGroupSnapshotRequest{
GroupSnapshotId: fakeID,
},
}
for _, r := range req {
if got := getReqID(r); got != fakeID {
Expand Down
57 changes: 56 additions & 1 deletion internal/journal/omap.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package journal
import (
"context"
"errors"
"fmt"

"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
Expand Down Expand Up @@ -79,7 +80,7 @@ func getOMapValues(
log.ErrorLog(ctx, "omap not found (pool=%q, namespace=%q, name=%q): %v",
poolName, namespace, oid, err)

return nil, util.JoinErrors(util.ErrKeyNotFound, err)
return nil, fmt.Errorf("%w: %w", util.ErrKeyNotFound, err)
}

return nil, err
Expand Down Expand Up @@ -168,3 +169,57 @@ func omapPoolError(err error) error {

return err
}

// listOMapValues fetches all omap values for a given oid, prefix, and namespace.
func listOMapValues(
ctx context.Context,
conn *Connection,
poolName, namespace, oid, prefix string,
) (map[string]string, error) {
// fetch and configure the rados ioctx
ioctx, err := conn.conn.GetIoctx(poolName)
if err != nil {
return nil, omapPoolError(err)
}
defer ioctx.Destroy()

if namespace != "" {
ioctx.SetNamespace(namespace)
}

results := map[string]string{}

numKeys := uint64(0)
startAfter := ""
for {
prevNumKeys := numKeys
err = ioctx.ListOmapValues(
oid, startAfter, prefix, chunkSize,
func(key string, value []byte) {
numKeys++
startAfter = key
results[key] = string(value)
},
)
// if we hit an error, or no new keys were seen, exit the loop
if err != nil || numKeys == prevNumKeys {
break
}
}

if err != nil {
if errors.Is(err, rados.ErrNotFound) {
log.ErrorLog(ctx, "omap not found (pool=%q, namespace=%q, name=%q): %v",
poolName, namespace, oid, err)

return nil, fmt.Errorf("%w: %w", util.ErrKeyNotFound, err)
}

return nil, err
}

log.DebugLog(ctx, "got omap values: (pool=%q, namespace=%q, name=%q): %+v",
poolName, namespace, oid, results)

return results, nil
}
Loading

0 comments on commit c319e77

Please sign in to comment.