Skip to content

Commit

Permalink
First draft handling incoming queries from plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
tinyzimmer committed Jul 21, 2023
1 parent 09ff574 commit d04eec4
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/vishvananda/netlink v1.0.0
github.com/webmeshproj/api v0.1.4-0.20230720215736-045051c251f1
github.com/webmeshproj/api v0.1.4-0.20230721143731-964f8f403a0f
golang.org/x/crypto v0.9.0
golang.org/x/exp v0.0.0-20230519143937-03e91628a987
golang.org/x/sync v0.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,8 @@ github.com/webmeshproj/api v0.1.4-0.20230720212546-ce2b95fa8e70 h1:wJjFVSC9zx2cz
github.com/webmeshproj/api v0.1.4-0.20230720212546-ce2b95fa8e70/go.mod h1:ymJL82yEGc/xw880DAb65pBJShkBqkKOl3sCOBq7NVk=
github.com/webmeshproj/api v0.1.4-0.20230720215736-045051c251f1 h1:KO4db6uN1iRO/BDQa70GC3UfAGSrYyV1F7C/5hNcAD4=
github.com/webmeshproj/api v0.1.4-0.20230720215736-045051c251f1/go.mod h1:ymJL82yEGc/xw880DAb65pBJShkBqkKOl3sCOBq7NVk=
github.com/webmeshproj/api v0.1.4-0.20230721143731-964f8f403a0f h1:2Fdyi8nVqALQSZgVETZT4DCQ5fbyM4Qvm3/9nY2yvQk=
github.com/webmeshproj/api v0.1.4-0.20230721143731-964f8f403a0f/go.mod h1:ymJL82yEGc/xw880DAb65pBJShkBqkKOl3sCOBq7NVk=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
52 changes: 51 additions & 1 deletion pkg/plugins/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/webmeshproj/node/pkg/context"
"github.com/webmeshproj/node/pkg/meshdb/models"
"github.com/webmeshproj/node/pkg/meshdb/raftlogs"
"github.com/webmeshproj/node/pkg/plugins/clients"
)

Expand Down Expand Up @@ -226,7 +227,56 @@ func (m *manager) Close() error {
}

// handleQueries handles SQL queries from plugins.
func (m *manager) handleQueries() {}
func (m *manager) handleQueries() {
for plugin, client := range m.plugins {
ctx := context.Background()
q, err := client.Query(ctx)
if err != nil {
if status.Code(err) == codes.Unimplemented {
m.log.Debug("plugin does not implement queries", "plugin", plugin)
continue
}
m.log.Error("start query stream", "plugin", plugin, "error", err)
continue
}
go m.handleQueryClient(plugin, client, q)
}
}

// handleQueryClient handles a query client.
func (m *manager) handleQueryClient(plugin string, client clients.PluginClient, queries v1.Plugin_QueryClient) {
defer func() {
if err := queries.CloseSend(); err != nil {
m.log.Error("close query stream", "plugin", plugin, "error", err)
}
}()
for {
query, err := queries.Recv()
if err != nil {
if err == io.EOF {
return
}
// TODO: restart the stream?
m.log.Error("receive query", "plugin", plugin, "error", err)
return
}
var result v1.PluginSQLQueryResult
result.Id = query.GetId()
res, err := raftlogs.QueryWithQuerier(queries.Context(), m.db, query.GetQuery())
if err != nil {
m.log.Error("query", "plugin", plugin, "error", err)
result.Error = err.Error()
} else {
result.Result = res
}
err = queries.Send(&result)
if err != nil {
// TODO: restart the stream?
m.log.Error("send query result", "plugin", plugin, "error", err)
return
}
}
}

func (m *manager) newAuthRequest(ctx context.Context) *v1.AuthenticationRequest {
var req v1.AuthenticationRequest
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ var (
// NewManager creates a new plugin manager.
func NewManager(ctx context.Context, db models.DBTX, opts *Options) (Manager, error) {
var auth, ipamv4, ipamv6 clients.PluginClient
registered := make(map[string]clients.PluginClient)
allPlugins := make(map[string]clients.PluginClient)
stores := make([]clients.PluginClient, 0)
emitters := make([]clients.PluginClient, 0)
log := context.LoggerFrom(ctx)
Expand Down Expand Up @@ -101,7 +101,7 @@ func NewManager(ctx context.Context, db models.DBTX, opts *Options) (Manager, er
if err != nil {
return nil, fmt.Errorf("configure plugin %q: %w", name, err)
}
registered[name] = plugin
allPlugins[name] = plugin
}
// If both IPAM plugins are unconfigured, use the in-process IPAM plugin.
if ipamv4 == nil && ipamv6 == nil {
Expand All @@ -120,7 +120,7 @@ func NewManager(ctx context.Context, db models.DBTX, opts *Options) (Manager, er
ipamv6: ipamv6,
stores: stores,
emitters: emitters,
plugins: registered,
plugins: allPlugins,
log: slog.Default().With("component", "plugin-manager"),
}
go m.handleQueries()
Expand Down

0 comments on commit d04eec4

Please sign in to comment.