Skip to content

Commit

Permalink
implement server side rpc call for peering
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Oct 17, 2024
1 parent 5005439 commit 0da944b
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 10 deletions.
9 changes: 9 additions & 0 deletions rbac/provider-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,12 @@ rules:
verbs:
- get
- list
- apiGroups:
- ocs.openshift.io
resources:
- storageclusterpeers
- storageclusterpeers/status
verbs:
- get
- list
- update
98 changes: 88 additions & 10 deletions services/provider/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ const (

type OCSProviderServer struct {
pb.UnimplementedOCSProviderServer
client client.Client
consumerManager *ocsConsumerManager
storageRequestManager *storageRequestManager
namespace string
client client.Client
consumerManager *ocsConsumerManager
storageRequestManager *storageRequestManager
storageClusterPeerManager *storageClusterPeerManager
namespace string
}

func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderServer, error) {
Expand All @@ -88,11 +89,17 @@ func NewOCSProviderServer(ctx context.Context, namespace string) (*OCSProviderSe
return nil, fmt.Errorf("failed to create new StorageRequest instance. %v", err)
}

storageClusterPeerManager, err := newStorageClusterPeerManager(client, namespace)
if err != nil {
return nil, fmt.Errorf("failed to create new StorageClusterPeer instance. %v", err)
}

return &OCSProviderServer{
client: client,
consumerManager: consumerManager,
storageRequestManager: storageRequestManager,
namespace: namespace,
client: client,
consumerManager: consumerManager,
storageRequestManager: storageRequestManager,
storageClusterPeerManager: storageClusterPeerManager,
namespace: namespace,
}, nil
}

Expand Down Expand Up @@ -893,6 +900,77 @@ func extractMonitorIps(data string) ([]string, error) {
return ips, nil
}

func (s *OCSProviderServer) PeerStorageCluster(_ context.Context, _ *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) {
return &pb.PeerStorageClusterResponse{}, nil
func (s *OCSProviderServer) PeerStorageCluster(ctx context.Context, req *pb.PeerStorageClusterRequest) (*pb.PeerStorageClusterResponse, error) {

pubKey, err := s.getOnboardingValidationKey(ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to get public key to validate onboarding ticket for StorageCluster %q. %v", req.StorageClusterUID, err)
}

onboardingToken, err := decodeAndValidateTicket(req.OnboardingToken, pubKey)
if err != nil {
klog.Errorf("failed to validate onboarding ticket for StorageCluster %q. %v", req.StorageClusterUID, err)
return nil, status.Errorf(codes.InvalidArgument, "onboarding ticket is not valid. %v", err)
}

if onboardingToken.SubjectRole != services.PeerRole {
err := fmt.Errorf("unsupported ticket role for StorageCluster %q, found %s, expected %s", req.StorageClusterUID, onboardingToken.SubjectRole, services.ClientRole)
klog.Error(err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}

storageCluster, err := s.findStorageClusterWithUID(ctx, onboardingToken.StorageClusterUID)
if err != nil {
klog.Error(err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}

storageClusterPeer, err := s.storageClusterPeerManager.FindStorageClusterPeerWithStorageClusterID(ctx, req.StorageClusterUID)
if err != nil {
klog.Error(err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}

//TODO: Should we store the onboarding token in storageClusterPeer?

err = s.storageClusterPeerManager.UpdateStorageClusterPeerStatus(ctx, storageClusterPeer, req.StorageClusterUID)
if err != nil {
klog.Error(err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}

return &pb.PeerStorageClusterResponse{StorageClusterUID: string(storageCluster.UID)}, nil
}

func (s *OCSProviderServer) findStorageClusterWithUID(ctx context.Context, uid string) (*ocsv1.StorageCluster, error) {
storageClusterList := &ocsv1.StorageClusterList{}
err := s.client.List(ctx, storageClusterList, client.InNamespace(s.namespace))
if err != nil {
return nil, err
}
for i := range storageClusterList.Items {
if string(storageClusterList.Items[i].UID) == uid {
return &storageClusterList.Items[i], nil
}
}
return nil, fmt.Errorf("storage cluster with uid %q not found", uid)
}

func decodeToken(ticket string) (*services.OnboardingTicket, error) {
ticketArr := strings.Split(string(ticket), ".")
if len(ticketArr) != 2 {
return nil, fmt.Errorf("invalid ticket")
}

message, err := base64.StdEncoding.DecodeString(ticketArr[0])
if err != nil {
return nil, fmt.Errorf("failed to decode onboarding ticket: %v", err)
}

var ticketData services.OnboardingTicket
err = json.Unmarshal(message, &ticketData)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal onboarding ticket message. %v", err)
}
return &ticketData, nil
}
51 changes: 51 additions & 0 deletions services/provider/server/storageclusterpeer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package server

import (
"context"
"fmt"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type storageClusterPeerManager struct {
client client.Client
namespace string
}

func newStorageClusterPeerManager(cl client.Client, namespace string) (*storageClusterPeerManager, error) {
return &storageClusterPeerManager{
client: cl,
namespace: namespace,
}, nil
}

func (s *storageClusterPeerManager) FindStorageClusterPeerWithStorageClusterID(ctx context.Context, storageClusterUID string) (*ocsv1.StorageClusterPeer, error) {
storageClusterPeerList := &ocsv1.StorageClusterPeerList{}
err := s.client.List(ctx, storageClusterPeerList, client.InNamespace(s.namespace))
if err != nil {
return nil, err
}
for i := range storageClusterPeerList.Items {
token, err := decodeToken(storageClusterPeerList.Items[i].Spec.OnboardingToken)
if err != nil {
return nil, err
}
if token.StorageClusterUID == storageClusterUID {
return &storageClusterPeerList.Items[i], nil
}
}
return nil, fmt.Errorf("StorageClusterPeer linked to StorageCluster with uid %q not found", storageClusterUID)
}

func (s *storageClusterPeerManager) UpdateStorageClusterPeerStatus(ctx context.Context, storageClusterPeer *ocsv1.StorageClusterPeer, storageClusterUID string) error {

storageClusterPeer.Status.RemoteStorageClusterUID = storageClusterUID

if err := s.client.Status().Update(ctx, storageClusterPeer); err != nil {
return fmt.Errorf("failed to patch Status for StorageClusterPeer %v: %v", storageClusterPeer.Name, err)
}
klog.Infof("successfully updated Status for StorageConsumer %v", storageClusterPeer.Name)
return nil
}

0 comments on commit 0da944b

Please sign in to comment.