Skip to content

Commit

Permalink
storage: merge resource and version fields in StorageConfig
Browse files Browse the repository at this point in the history
Signed-off-by: Iceber Gu <[email protected]>
  • Loading branch information
Iceber committed Jun 20, 2024
1 parent 4056ee4 commit c4805da
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func NewREST(serializer runtime.NegotiatedSerializer, factory storage.StorageFac
}

*rt = internal.CollectionResourceType{
Group: config.StorageGroupResource.Group,
Version: config.StorageVersion.Version,
Resource: config.StorageGroupResource.Resource,
Group: config.StorageResource.Group,
Version: config.StorageResource.Version,
Resource: config.StorageResource.Resource,
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kube_state_metrics/metrics_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func init() {
if err != nil {
panic(err)
}
hubGVRs[config.StorageGroupResource.WithVersion(config.MemoryVersion.Version)] = gvr
hubGVRs[config.MemoryResource] = gvr
}
}

Expand Down Expand Up @@ -105,7 +105,7 @@ func (builder *MetricsStoreBuilder) GetMetricStore(cluster string, resource sche
if err != nil {
return nil
}
hub := config.StorageGroupResource.WithVersion(config.MemoryVersion.Version)
hub := config.MemoryResource
metricsGVR, ok := hubGVRs[hub]
if !ok {
return nil
Expand Down Expand Up @@ -134,11 +134,11 @@ func (builder *MetricsStoreBuilder) GetMetricStore(cluster string, resource sche
return obj, nil
}

hobj, err := scheme.LegacyResourceScheme.ConvertToVersion(obj.(runtime.Object), config.MemoryVersion)
hobj, err := scheme.LegacyResourceScheme.ConvertToVersion(obj.(runtime.Object), hub.GroupVersion())
if err != nil {
return nil, err
}
if metricsGVR.GroupVersion() == config.MemoryVersion {
if metricsGVR.GroupVersion() == hub.GroupVersion() {
return hobj, nil
}
return scheme.LegacyResourceScheme.ConvertToVersion(hobj, metricsGVR.GroupVersion())
Expand Down
8 changes: 4 additions & 4 deletions pkg/kubeapiserver/restmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,11 @@ func (m *RESTManager) genLegacyResourceRESTStorage(gvr schema.GroupVersionResour
DefaultQualifiedResource: gvr.GroupResource(),

NewFunc: func() runtime.Object {
obj, _ := scheme.LegacyResourceScheme.New(storageConfig.MemoryVersion.WithKind(kind))
obj, _ := scheme.LegacyResourceScheme.New(storageConfig.MemoryResource.GroupVersion().WithKind(kind))
return obj
},
NewListFunc: func() runtime.Object {
obj, _ := scheme.LegacyResourceScheme.New(storageConfig.MemoryVersion.WithKind(kind + "List"))
obj, _ := scheme.LegacyResourceScheme.New(storageConfig.MemoryResource.GroupVersion().WithKind(kind + "List"))
return obj
},

Expand All @@ -310,12 +310,12 @@ func (m *RESTManager) genUnstructuredRESTStorage(gvr schema.GroupVersionResource
return &resourcerest.RESTStorage{
NewFunc: func() runtime.Object {
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(storageConfig.MemoryVersion.WithKind(kind))
obj.SetGroupVersionKind(storageConfig.MemoryResource.GroupVersion().WithKind(kind))
return obj
},
NewListFunc: func() runtime.Object {
obj := &unstructured.UnstructuredList{}
obj.SetGroupVersionKind(storageConfig.MemoryVersion.WithKind(kind + "List"))
obj.SetGroupVersionKind(storageConfig.MemoryResource.GroupVersion().WithKind(kind + "List"))
return obj
},

Expand Down
89 changes: 39 additions & 50 deletions pkg/storage/internalstorage/resource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,33 @@ import (
)

type ResourceStorage struct {
db *gorm.DB
codec runtime.Codec
groupResource schema.GroupResource

storageGroupResource schema.GroupResource
storageVersion schema.GroupVersion
memoryVersion schema.GroupVersion
db *gorm.DB
config storage.ResourceStorageConfig
}

func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig {
return &storage.ResourceStorageConfig{
Codec: s.codec,
StorageGroupResource: s.storageGroupResource,
StorageVersion: s.storageVersion,
MemoryVersion: s.memoryVersion,
config := s.config
return &config
}

func (s *ResourceStorage) gvrKeyMap() map[string]interface{} {
return map[string]interface{}{
"group": s.config.StorageResource.Group,
"version": s.config.StorageResource.Version,
"resource": s.config.StorageResource.Resource,
}
}

func (s *ResourceStorage) resourceKeyMap(cluster, namespace, name string) map[string]interface{} {
return map[string]interface{}{
"cluster": cluster,
"group": s.config.StorageResource.Group,
"version": s.config.StorageResource.Version,
"resource": s.config.StorageResource.Resource,
"namespace": namespace,
"name": name,
}
}

Expand All @@ -61,7 +74,7 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
}

var buffer bytes.Buffer
if err := s.codec.Encode(obj, &buffer); err != nil {
if err := s.config.Codec.Encode(obj, &buffer); err != nil {
return err
}

Expand All @@ -71,9 +84,9 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
UID: metaobj.GetUID(),
Name: metaobj.GetName(),
Namespace: metaobj.GetNamespace(),
Group: s.storageGroupResource.Group,
Resource: s.storageGroupResource.Resource,
Version: s.storageVersion.Version,
Group: s.config.StorageResource.Group,
Resource: s.config.StorageResource.Resource,
Version: s.config.StorageResource.Version,
Kind: gvk.Kind,
ResourceVersion: metaobj.GetResourceVersion(),
Object: buffer.Bytes(),
Expand All @@ -94,7 +107,7 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
}

var buffer bytes.Buffer
if err := s.codec.Encode(obj, &buffer); err != nil {
if err := s.config.Codec.Encode(obj, &buffer); err != nil {
return err
}

Expand All @@ -116,14 +129,9 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true}
}

result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": metaobj.GetNamespace(),
"name": metaobj.GetName(),
}).Updates(updatedResource)
result := s.db.WithContext(ctx).Model(&Resource{}).
Where(s.resourceKeyMap(cluster, metaobj.GetNamespace(), metaobj.GetName())).
Updates(updatedResource)
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
}

Expand All @@ -144,14 +152,7 @@ func (s *ResourceStorage) ConvertDeletedObject(obj interface{}) (runtime.Object,
}

func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB {
return s.db.Model(&Resource{}).Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
}).Delete(&Resource{})
return s.db.Model(&Resource{}).Where(s.resourceKeyMap(cluster, namespace, name)).Delete(&Resource{})
}

func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtime.Object) error {
Expand All @@ -167,14 +168,7 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim
}

func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
})
return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(s.resourceKeyMap(cluster, namespace, name))
}

func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
Expand All @@ -183,7 +177,7 @@ func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name stri
return InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
}

obj, _, err := s.codec.Decode(objects[0], nil, into)
obj, _, err := s.config.Codec.Decode(objects[0], nil, into)
if err != nil {
return err
}
Expand All @@ -199,12 +193,7 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
result = &ResourceMetadataList{}
}

query := s.db.WithContext(ctx).Model(&Resource{})
query = query.Where(map[string]interface{}{
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
})
query := s.db.WithContext(ctx).Model(&Resource{}).Where(s.gvrKeyMap())
offset, amount, query, err := applyListOptionsToResourceQuery(s.db, query, opts)
return offset, amount, query, result, err
}
Expand All @@ -216,7 +205,7 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o
}

if err := result.From(query); err != nil {
return InterpretDBError(s.storageGroupResource.String(), err)
return InterpretDBError(s.groupResource.String(), err)
}
objects := result.Items()

Expand Down Expand Up @@ -246,7 +235,7 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o
unstructuredList.Items = make([]unstructured.Unstructured, 0, len(objects))
for _, object := range objects {
uObj := &unstructured.Unstructured{}
obj, err := object.ConvertTo(s.codec, uObj)
obj, err := object.ConvertTo(s.config.Codec, uObj)
if err != nil {
return err
}
Expand Down Expand Up @@ -283,7 +272,7 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o
slice := reflect.MakeSlice(v.Type(), len(objects), len(objects))
expected := reflect.New(v.Type().Elem()).Interface().(runtime.Object)
for i, object := range objects {
obj, err := object.ConvertTo(s.codec, expected.DeepCopyObject())
obj, err := object.ConvertTo(s.config.Codec, expected.DeepCopyObject())
if err != nil {
return err
}
Expand All @@ -294,7 +283,7 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o
}

func (s *ResourceStorage) Watch(_ context.Context, _ *internal.ListOptions) (watch.Interface, error) {
return nil, apierrors.NewMethodNotSupported(s.storageGroupResource, "watch")
return nil, apierrors.NewMethodNotSupported(s.groupResource, "watch")
}

func applyListOptionsToResourceQuery(db *gorm.DB, query *gorm.DB, opts *internal.ListOptions) (int64, *int64, *gorm.DB, error) {
Expand Down
14 changes: 8 additions & 6 deletions pkg/storage/internalstorage/resource_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"

internal "github.com/clusterpedia-io/api/clusterpedia"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/storageconfig"
)

Expand Down Expand Up @@ -383,7 +384,7 @@ func TestResourceStorage_Update(t *testing.T) {
require.NoError(err)
require.NotNil(config)

rs.codec = config.Codec
rs.config = *config
trueRef := true

obj := &appsv1.Deployment{
Expand Down Expand Up @@ -414,7 +415,7 @@ func TestResourceStorage_Update(t *testing.T) {
require.Len(ownerRef, 1)

var buffer bytes.Buffer
err = rs.codec.Encode(obj, &buffer)
err = rs.config.Codec.Encode(obj, &buffer)
require.NoError(err)

owner := metav1.GetControllerOfNoCopy(metaObj)
Expand Down Expand Up @@ -463,10 +464,11 @@ func TestResourceStorage_Update(t *testing.T) {
assert.NotEqual(resourcesAfterUpdates[0].Object, resourcesAfterCreation[0].Object)
}

func newTestResourceStorage(db *gorm.DB, storageGVK schema.GroupVersionResource) *ResourceStorage {
func newTestResourceStorage(db *gorm.DB, storageResource schema.GroupVersionResource) *ResourceStorage {
return &ResourceStorage{
db: db,
storageGroupResource: storageGVK.GroupResource(),
storageVersion: storageGVK.GroupVersion(),
db: db,
config: storage.ResourceStorageConfig{
StorageResource: storageResource,
},
}
}
8 changes: 3 additions & 5 deletions pkg/storage/internalstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ func (s *StorageFactory) GetSupportedRequestVerbs() []string {

func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) {
return &ResourceStorage{
db: s.db,
codec: config.Codec,
groupResource: config.StorageResource.GroupResource(),

storageGroupResource: config.StorageGroupResource,
storageVersion: config.StorageVersion,
memoryVersion: config.MemoryVersion,
db: s.db,
config: *config,
}, nil
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/memorystorage/memory_resource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type ResourceStorage struct {
CrvSynchro *cache.ClusterResourceVersionSynchro
incoming chan ClusterWatchEvent
storageConfig *storage.ResourceStorageConfig
memoryVersion schema.GroupVersion
}

func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig {
Expand All @@ -62,7 +63,7 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
return err
}

err = s.watchCache.Add(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion)
err = s.watchCache.Add(obj, cluster, resourceVersion, s.storageConfig.Codec, s.memoryVersion)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err))
}
Expand All @@ -76,7 +77,7 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
return err
}

err = s.watchCache.Update(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion)
err = s.watchCache.Update(obj, cluster, resourceVersion, s.storageConfig.Codec, s.memoryVersion)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err))
}
Expand Down Expand Up @@ -108,7 +109,7 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim
return err
}

err = s.watchCache.Delete(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion)
err = s.watchCache.Delete(obj, cluster, resourceVersion, s.storageConfig.Codec, s.memoryVersion)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err))
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/storage/memorystorage/memory_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
storages.Lock()
defer storages.Unlock()

gvr := schema.GroupVersionResource{
Group: config.GroupResource.Group,
Version: config.StorageVersion.Version,
Resource: config.GroupResource.Resource,
}

gvr := config.StorageResource
resourceStorage, ok := storages.resourceStorages[gvr]
if ok {
watchCache := resourceStorage.watchCache
Expand All @@ -42,6 +37,7 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
Codec: config.Codec,
watchCache: watchCache,
storageConfig: config,
memoryVersion: config.MemoryResource.GroupVersion(),
}

storages.resourceStorages[gvr] = resourceStorage
Expand Down
10 changes: 4 additions & 6 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ type CollectionResourceStorage interface {
}

type ResourceStorageConfig struct {
Namespaced bool
Namespaced bool
GroupResource schema.GroupResource

GroupResource schema.GroupResource
StorageGroupResource schema.GroupResource

MemoryVersion schema.GroupVersion
StorageVersion schema.GroupVersion
StorageResource schema.GroupVersionResource
MemoryResource schema.GroupVersionResource

Codec runtime.Codec
}
Expand Down
Loading

0 comments on commit c4805da

Please sign in to comment.