Skip to content

Commit

Permalink
tidy: reorganize plugindb and pull in API support for writes
Browse files Browse the repository at this point in the history
  • Loading branch information
tinyzimmer committed Oct 31, 2023
1 parent 697179c commit 9a84dad
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 132 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/vishvananda/netlink v1.2.1-beta.2
github.com/webmeshproj/api v0.10.1
github.com/webmeshproj/api v0.10.2
golang.org/x/crypto v0.14.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/sync v0.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1572,8 +1572,8 @@ github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1Y
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/webmeshproj/api v0.10.1 h1:3e2/C2KcYL2JEBrmBS/KgjxxEkVxkwKWxU/g6w645x8=
github.com/webmeshproj/api v0.10.1/go.mod h1:lFGd4eyG9wr0mQiAJxQDBJ6VwEe5EC0AXKZjTv6ZA3U=
github.com/webmeshproj/api v0.10.2 h1:3LQPB9A1s1o3NSQX2JlAzY1stncIcrOWa4q/QjaaU6I=
github.com/webmeshproj/api v0.10.2/go.mod h1:lFGd4eyG9wr0mQiAJxQDBJ6VwEe5EC0AXKZjTv6ZA3U=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package plugindb contains an interface for performing storage queries
// over the storage APIs.
package plugindb

import (
Expand All @@ -24,7 +22,6 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/dominikbraun/graph"
v1 "github.com/webmeshproj/api/go/v1"
Expand Down Expand Up @@ -52,99 +49,6 @@ func OpenDB(srv QueryServer) storage.MeshDB {
return meshdb.New(&PluginDataStore{QueryServer: srv})
}

// OpenKeyVal opens a new key-value store connection to a plugin query stream.
func OpenKeyVal(srv QueryServer) storage.MeshStorage {
return &PluginMeshStorage{QueryServer: srv}
}

// PluginMeshStorage implements a mesh key-value store over a plugin query stream.
type PluginMeshStorage struct {
QueryServer
mu sync.Mutex
}

// GetValue returns the value of a key.
func (p *PluginMeshStorage) GetValue(ctx context.Context, key []byte) ([]byte, error) {
p.mu.Lock()
defer p.mu.Unlock()
if !types.IsValidPathID(string(key)) {
return nil, errors.ErrInvalidKey
}
err := p.Send(&v1.QueryRequest{
Command: v1.QueryRequest_GET,
Type: v1.QueryRequest_VALUE,
Query: types.NewQueryFilters().WithID(string(key)).Encode(),
})
if err != nil {
return nil, err
}
resp, err := p.Recv()
if err != nil {
return nil, err
}
if resp.GetError() != "" {
if strings.Contains(err.Error(), "not found") {
return nil, errors.ErrKeyNotFound
}
return nil, fmt.Errorf(resp.GetError())
}
if len(resp.GetItems()) == 0 {
return nil, errors.ErrKeyNotFound
}
return resp.GetItems()[0], nil
}

func (p *PluginMeshStorage) PutValue(ctx context.Context, key, value []byte, ttl time.Duration) error {
return errors.ErrNotStorageNode
}

func (p *PluginMeshStorage) Delete(ctx context.Context, key []byte) error {
return errors.ErrNotStorageNode
}

func (p *PluginMeshStorage) ListKeys(ctx context.Context, prefix []byte) ([][]byte, error) {
p.mu.Lock()
defer p.mu.Unlock()
err := p.Send(&v1.QueryRequest{
Command: v1.QueryRequest_LIST,
Type: v1.QueryRequest_KEYS,
Query: types.NewQueryFilters().WithID(string(prefix)).Encode(),
})
if err != nil {
return nil, err
}
resp, err := p.Recv()
if err != nil {
return nil, err
}
return resp.GetItems(), nil
}

func (p *PluginMeshStorage) IterPrefix(ctx context.Context, prefix []byte, fn storage.PrefixIterator) error {
keys, err := p.ListKeys(ctx, prefix)
if err != nil {
return err
}
for _, key := range keys {
value, err := p.GetValue(ctx, key)
if err != nil {
return err
}
if err := fn(key, value); err != nil {
return err
}
}
return nil
}

func (p *PluginMeshStorage) Subscribe(ctx context.Context, prefix []byte, fn storage.KVSubscribeFunc) (context.CancelFunc, error) {
return func() {}, errors.ErrNotStorageNode
}

func (p *PluginMeshStorage) Close() error {
return nil
}

// PluginDataStore implements a mesh data store over a plugin query stream.
type PluginDataStore struct {
QueryServer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Package plugindb contains an interface for performing storage queries
// over the storage APIs.
package plugindb

import (
Expand Down
124 changes: 124 additions & 0 deletions pkg/plugins/plugindb/kv_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2023 Avi Zimmerman <[email protected]>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package plugindb

import (
"fmt"
"strings"
"sync"
"time"

v1 "github.com/webmeshproj/api/go/v1"

"github.com/webmeshproj/webmesh/pkg/context"
"github.com/webmeshproj/webmesh/pkg/storage"
"github.com/webmeshproj/webmesh/pkg/storage/errors"
"github.com/webmeshproj/webmesh/pkg/storage/types"
)

// OpenKeyVal opens a new key-value store connection to a plugin query stream.
func OpenKeyVal(srv QueryServer) storage.MeshStorage {
return &PluginMeshStorage{QueryServer: srv}
}

// PluginMeshStorage implements a mesh key-value store over a plugin query stream.
type PluginMeshStorage struct {
QueryServer
mu sync.Mutex
}

// GetValue returns the value of a key.
func (p *PluginMeshStorage) GetValue(ctx context.Context, key []byte) ([]byte, error) {
p.mu.Lock()
defer p.mu.Unlock()
if !types.IsValidPathID(string(key)) {
return nil, errors.ErrInvalidKey
}
err := p.Send(&v1.QueryRequest{
Command: v1.QueryRequest_GET,
Type: v1.QueryRequest_VALUE,
Query: types.NewQueryFilters().WithID(string(key)).Encode(),
})
if err != nil {
return nil, err
}
resp, err := p.Recv()
if err != nil {
return nil, err
}
if resp.GetError() != "" {
if strings.Contains(err.Error(), "not found") {
return nil, errors.ErrKeyNotFound
}
return nil, fmt.Errorf(resp.GetError())
}
if len(resp.GetItems()) == 0 {
return nil, errors.ErrKeyNotFound
}
return resp.GetItems()[0], nil
}

func (p *PluginMeshStorage) PutValue(ctx context.Context, key, value []byte, ttl time.Duration) error {
return errors.ErrNotStorageNode
}

func (p *PluginMeshStorage) Delete(ctx context.Context, key []byte) error {
return errors.ErrNotStorageNode
}

func (p *PluginMeshStorage) ListKeys(ctx context.Context, prefix []byte) ([][]byte, error) {
p.mu.Lock()
defer p.mu.Unlock()
err := p.Send(&v1.QueryRequest{
Command: v1.QueryRequest_LIST,
Type: v1.QueryRequest_KEYS,
Query: types.NewQueryFilters().WithID(string(prefix)).Encode(),
})
if err != nil {
return nil, err
}
resp, err := p.Recv()
if err != nil {
return nil, err
}
return resp.GetItems(), nil
}

func (p *PluginMeshStorage) IterPrefix(ctx context.Context, prefix []byte, fn storage.PrefixIterator) error {
keys, err := p.ListKeys(ctx, prefix)
if err != nil {
return err
}
for _, key := range keys {
value, err := p.GetValue(ctx, key)
if err != nil {
return err
}
if err := fn(key, value); err != nil {
return err
}
}
return nil
}

func (p *PluginMeshStorage) Subscribe(ctx context.Context, prefix []byte, fn storage.KVSubscribeFunc) (context.CancelFunc, error) {
return func() {}, errors.ErrNotStorageNode
}

func (p *PluginMeshStorage) Close() error {
return nil
}
19 changes: 19 additions & 0 deletions pkg/plugins/plugindb/plugindb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
Copyright 2023 Avi Zimmerman <[email protected]>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package plugindb contains an interface for performing storage queries
// over the storage APIs.
package plugindb
63 changes: 32 additions & 31 deletions pkg/storage/types/storage_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type StorageQuery struct {

// ParseStorageQuery parses a storage query.
func ParseStorageQuery(query *v1.QueryRequest) (StorageQuery, error) {
filters := parseFilters(query)
filters := ParseQueryFilters(query)
if query.GetCommand() == v1.QueryRequest_GET {
// The filter should always contain an ID or pub key
// unless its for raw network or rbac state.
Expand Down Expand Up @@ -113,7 +113,7 @@ const (
// IsValid returns true if the filter type is valid.
func (f FilterType) IsValid() bool {
switch f {
case FilterTypeID, FilterTypePubKey:
case FilterTypeID, FilterTypePubKey, FilterTypeSourceID, FilterTypeTargetID, FilterTypeNodeID, FilterTypeCIDR:
return true
default:
return false
Expand All @@ -123,11 +123,41 @@ func (f FilterType) IsValid() bool {
// QueryFilters is a list of parsed filters for a storage query.
type QueryFilters []QueryFilter

// QueryFilter is a parsed filter for a storage query.
type QueryFilter struct {
// The type of filter.
Type FilterType
// Value is the value of the filter.
Value string
}

// NewQueryFilters returns a new list of query filters.
func NewQueryFilters(filters ...QueryFilter) QueryFilters {
return filters
}

// ParseQueryFilters parses the query filters from a query request.
func ParseQueryFilters(req *v1.QueryRequest) QueryFilters {
query := req.GetQuery()
fields := strings.Split(query, ",")
filters := make(QueryFilters, 0, len(fields))
for _, field := range fields {
parts := strings.Split(field, "=")
if len(parts) != 2 {
continue
}
ftype := FilterType(parts[0])
if !ftype.IsValid() {
continue
}
filters = append(filters, QueryFilter{
Type: ftype,
Value: parts[1],
})
}
return filters
}

// Encode encodes the query filters into a string.
func (q QueryFilters) Encode() string {
var sb strings.Builder
Expand Down Expand Up @@ -250,32 +280,3 @@ func (q QueryFilters) GetByType(ftype FilterType) (QueryFilter, bool) {
}
return QueryFilter{}, false
}

// QueryFilter is a parsed filter for a storage query.
type QueryFilter struct {
// The type of filter.
Type FilterType
// Value is the value of the filter.
Value string
}

func parseFilters(req *v1.QueryRequest) QueryFilters {
query := req.GetQuery()
fields := strings.Split(query, ",")
filters := make(QueryFilters, 0, len(fields))
for _, field := range fields {
parts := strings.Split(field, "=")
if len(parts) != 2 {
continue
}
ftype := FilterType(parts[0])
if !ftype.IsValid() {
continue
}
filters = append(filters, QueryFilter{
Type: ftype,
Value: parts[1],
})
}
return filters
}

0 comments on commit 9a84dad

Please sign in to comment.