Skip to content

Commit

Permalink
storage: merge resource and version fields in StorageConfig
Browse files Browse the repository at this point in the history
Signed-off-by: Iceber Gu <[email protected]>
  • Loading branch information
Iceber committed Jun 19, 2024
1 parent 83cb0f0 commit 200cb60
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 85 deletions.
89 changes: 39 additions & 50 deletions pkg/storage/internalstorage/resource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,33 @@ import (
)

type ResourceStorage struct {
db *gorm.DB
codec runtime.Codec
groupResource schema.GroupResource

storageGroupResource schema.GroupResource
storageVersion schema.GroupVersion
memoryVersion schema.GroupVersion
db *gorm.DB
config storage.ResourceStorageConfig
}

func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig {
return &storage.ResourceStorageConfig{
Codec: s.codec,
StorageGroupResource: s.storageGroupResource,
StorageVersion: s.storageVersion,
MemoryVersion: s.memoryVersion,
config := s.config
return &config
}

func (s *ResourceStorage) gvrKeyMap() map[string]interface{} {
return map[string]interface{}{
"group": s.config.StorageResource.Group,
"version": s.config.StorageResource.Version,
"resource": s.config.StorageResource.Resource,
}
}

func (s *ResourceStorage) resourceKeyMap(cluster, namespace, name string) map[string]interface{} {
return map[string]interface{}{
"cluster": cluster,
"group": s.config.StorageResource.Group,
"version": s.config.StorageResource.Version,
"resource": s.config.StorageResource.Resource,
"namespace": namespace,
"name": name,
}
}

Expand All @@ -61,7 +74,7 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
}

var buffer bytes.Buffer
if err := s.codec.Encode(obj, &buffer); err != nil {
if err := s.config.Codec.Encode(obj, &buffer); err != nil {
return err
}

Expand All @@ -71,9 +84,9 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
UID: metaobj.GetUID(),
Name: metaobj.GetName(),
Namespace: metaobj.GetNamespace(),
Group: s.storageGroupResource.Group,
Resource: s.storageGroupResource.Resource,
Version: s.storageVersion.Version,
Group: s.config.StorageResource.Group,
Resource: s.config.StorageResource.Resource,
Version: s.config.StorageResource.Version,
Kind: gvk.Kind,
ResourceVersion: metaobj.GetResourceVersion(),
Object: buffer.Bytes(),
Expand All @@ -94,7 +107,7 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
}

var buffer bytes.Buffer
if err := s.codec.Encode(obj, &buffer); err != nil {
if err := s.config.Codec.Encode(obj, &buffer); err != nil {
return err
}

Expand All @@ -116,14 +129,9 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true}
}

result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": metaobj.GetNamespace(),
"name": metaobj.GetName(),
}).Updates(updatedResource)
result := s.db.WithContext(ctx).Model(&Resource{}).
Where(s.resourceKeyMap(cluster, metaobj.GetNamespace(), metaobj.GetName())).
Updates(updatedResource)
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
}

Expand All @@ -144,14 +152,7 @@ func (s *ResourceStorage) ConvertDeletedObject(obj interface{}) (runtime.Object,
}

func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB {
return s.db.Model(&Resource{}).Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
}).Delete(&Resource{})
return s.db.Model(&Resource{}).Where(s.resourceKeyMap(cluster, namespace, name)).Delete(&Resource{})
}

func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtime.Object) error {
Expand All @@ -167,14 +168,7 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim
}

func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(map[string]interface{}{
"cluster": cluster,
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
"namespace": namespace,
"name": name,
})
return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(s.resourceKeyMap(cluster, namespace, name))
}

func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
Expand All @@ -183,7 +177,7 @@ func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name stri
return InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
}

obj, _, err := s.codec.Decode(objects[0], nil, into)
obj, _, err := s.config.Codec.Decode(objects[0], nil, into)
if err != nil {
return err
}
Expand All @@ -199,12 +193,7 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
result = &ResourceMetadataList{}
}

query := s.db.WithContext(ctx).Model(&Resource{})
query = query.Where(map[string]interface{}{
"group": s.storageGroupResource.Group,
"version": s.storageVersion.Version,
"resource": s.storageGroupResource.Resource,
})
query := s.db.WithContext(ctx).Model(&Resource{}).Where(s.gvrKeyMap())
offset, amount, query, err := applyListOptionsToResourceQuery(s.db, query, opts)
return offset, amount, query, result, err
}
Expand All @@ -216,7 +205,7 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o
}

if err := result.From(query); err != nil {
return InterpretDBError(s.storageGroupResource.String(), err)
return InterpretDBError(s.groupResource.String(), err)
}
objects := result.Items()

Expand Down Expand Up @@ -246,7 +235,7 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o
unstructuredList.Items = make([]unstructured.Unstructured, 0, len(objects))
for _, object := range objects {
uObj := &unstructured.Unstructured{}
obj, err := object.ConvertTo(s.codec, uObj)
obj, err := object.ConvertTo(s.config.Codec, uObj)
if err != nil {
return err
}
Expand Down Expand Up @@ -283,7 +272,7 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o
slice := reflect.MakeSlice(v.Type(), len(objects), len(objects))
expected := reflect.New(v.Type().Elem()).Interface().(runtime.Object)
for i, object := range objects {
obj, err := object.ConvertTo(s.codec, expected.DeepCopyObject())
obj, err := object.ConvertTo(s.config.Codec, expected.DeepCopyObject())
if err != nil {
return err
}
Expand All @@ -294,7 +283,7 @@ func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, o
}

func (s *ResourceStorage) Watch(_ context.Context, _ *internal.ListOptions) (watch.Interface, error) {
return nil, apierrors.NewMethodNotSupported(s.storageGroupResource, "watch")
return nil, apierrors.NewMethodNotSupported(s.groupResource, "watch")
}

func applyListOptionsToResourceQuery(db *gorm.DB, query *gorm.DB, opts *internal.ListOptions) (int64, *int64, *gorm.DB, error) {
Expand Down
8 changes: 3 additions & 5 deletions pkg/storage/internalstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ func (s *StorageFactory) GetSupportedRequestVerbs() []string {

func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) {
return &ResourceStorage{
db: s.db,
codec: config.Codec,
groupResource: config.StorageResource.GroupResource(),

storageGroupResource: config.StorageGroupResource,
storageVersion: config.StorageVersion,
memoryVersion: config.MemoryVersion,
db: s.db,
config: *config,
}, nil
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/memorystorage/memory_resource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type ResourceStorage struct {
CrvSynchro *cache.ClusterResourceVersionSynchro
incoming chan ClusterWatchEvent
storageConfig *storage.ResourceStorageConfig
memoryVersion schema.GroupVersion
}

func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig {
Expand All @@ -62,7 +63,7 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
return err
}

err = s.watchCache.Add(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion)
err = s.watchCache.Add(obj, cluster, resourceVersion, s.storageConfig.Codec, s.memoryVersion)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err))
}
Expand All @@ -76,7 +77,7 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
return err
}

err = s.watchCache.Update(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion)
err = s.watchCache.Update(obj, cluster, resourceVersion, s.storageConfig.Codec, s.memoryVersion)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err))
}
Expand Down Expand Up @@ -108,7 +109,7 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim
return err
}

err = s.watchCache.Delete(obj, cluster, resourceVersion, s.storageConfig.Codec, s.storageConfig.MemoryVersion)
err = s.watchCache.Delete(obj, cluster, resourceVersion, s.storageConfig.Codec, s.memoryVersion)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to add watch event object (%#v) to store: %v", obj, err))
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/storage/memorystorage/memory_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,7 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
storages.Lock()
defer storages.Unlock()

gvr := schema.GroupVersionResource{
Group: config.GroupResource.Group,
Version: config.StorageVersion.Version,
Resource: config.GroupResource.Resource,
}

gvr := config.StorageResource
resourceStorage, ok := storages.resourceStorages[gvr]
if ok {
watchCache := resourceStorage.watchCache
Expand All @@ -42,6 +37,7 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
Codec: config.Codec,
watchCache: watchCache,
storageConfig: config,
memoryVersion: config.MemoryResource.GroupVersion(),
}

storages.resourceStorages[gvr] = resourceStorage
Expand Down
10 changes: 4 additions & 6 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ type CollectionResourceStorage interface {
}

type ResourceStorageConfig struct {
Namespaced bool
Namespaced bool
GroupResource schema.GroupResource

GroupResource schema.GroupResource
StorageGroupResource schema.GroupResource

MemoryVersion schema.GroupVersion
StorageVersion schema.GroupVersion
StorageResource schema.GroupVersionResource
MemoryResource schema.GroupVersionResource

Codec runtime.Codec
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/storageconfig/storageconfig_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ func (g *StorageConfigFactory) NewUnstructuredConfig(gvr schema.GroupVersionReso
"unstructuredObjectStorage",
)
return &storage.ResourceStorageConfig{
GroupResource: gvr.GroupResource(),
StorageGroupResource: gvr.GroupResource(),
Codec: codec,
StorageVersion: version,
MemoryVersion: version,
Namespaced: namespaced,
Namespaced: namespaced,
GroupResource: gvr.GroupResource(),

StorageResource: gvr,
MemoryResource: gvr,
Codec: codec,
}, nil
}

Expand Down Expand Up @@ -113,11 +113,11 @@ func (g *StorageConfigFactory) NewLegacyResourceConfig(gr schema.GroupResource,
}

return &storage.ResourceStorageConfig{
GroupResource: gr,
StorageGroupResource: chosenStorageResource,
Codec: codec,
StorageVersion: codecConfig.StorageVersion,
MemoryVersion: memoryVersion,
Namespaced: namespaced,
Namespaced: namespaced,
GroupResource: gr,

StorageResource: chosenStorageResource.WithVersion(storageVersion.Version),
MemoryResource: chosenStorageResource.WithVersion(memoryVersion.Version),
Codec: codec,
}, nil
}
6 changes: 3 additions & 3 deletions pkg/synchromanager/clustersynchro/resource_negotiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ func (negotiator *ResourceNegotiator) NegotiateSyncResources(syncResources []clu
continue
}

storageGVR := storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version)
storageGVR := storageConfig.StorageResource
syncCondition.StorageVersion = storageGVR.Version
if syncGR != storageConfig.StorageGroupResource {
syncCondition.StorageResource = storageConfig.StorageGroupResource.String()
if syncGR != storageGVR.GroupResource() {
syncCondition.StorageResource = storageGVR.GroupResource().String()
}
groupResourceStatus.addSyncCondition(syncGVR, syncCondition)

Expand Down

0 comments on commit 200cb60

Please sign in to comment.