Skip to content

Commit 7824694

Browse files
authored
RSDK-9818: Annotate gRPC requests from modules to the viam-server with module names. (viamrobotics#4749)
1 parent 56719cc commit 7824694

File tree

8 files changed

+200
-3
lines changed

8 files changed

+200
-3
lines changed

grpc/interceptors.go

+60
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"time"
66

77
"google.golang.org/grpc"
8+
"google.golang.org/grpc/metadata"
89
)
910

1011
// DefaultMethodTimeout is the default context timeout for all inbound gRPC
@@ -43,3 +44,62 @@ func EnsureTimeoutUnaryClientInterceptor(
4344

4445
return invoker(ctx, method, req, reply, cc, opts...)
4546
}
47+
48+
// The following code is for appending/extracting grpc metadata regarding module names/origins via
49+
// contexts.
50+
type modNameKeyType int
51+
52+
const modNameKeyID = modNameKeyType(iota)
53+
54+
// GetModuleName returns the module name (if any) the request came from. The module name will match
55+
// a string from the robot config.
56+
func GetModuleName(ctx context.Context) string {
57+
valI := ctx.Value(modNameKeyID)
58+
if val, ok := valI.(string); ok {
59+
return val
60+
}
61+
62+
return ""
63+
}
64+
65+
const modNameMetadataKey = "modName"
66+
67+
// ModInterceptors takes a user input `ModName` and exposes an interceptor method that will attach
68+
// it to outgoing gRPC requests.
69+
type ModInterceptors struct {
70+
ModName string
71+
}
72+
73+
// UnaryClientInterceptor adds a module name to any outgoing unary gRPC request.
74+
func (mc *ModInterceptors) UnaryClientInterceptor(
75+
ctx context.Context,
76+
method string,
77+
req, reply interface{},
78+
cc *grpc.ClientConn,
79+
invoker grpc.UnaryInvoker,
80+
opts ...grpc.CallOption,
81+
) error {
82+
ctx = metadata.AppendToOutgoingContext(ctx, modNameMetadataKey, mc.ModName)
83+
return invoker(ctx, method, req, reply, cc, opts...)
84+
}
85+
86+
// ModNameUnaryServerInterceptor checks the incoming RPC metadata for a module name and attaches any
87+
// information to a context that can be retrieved with `GetModuleName`.
88+
func ModNameUnaryServerInterceptor(
89+
ctx context.Context,
90+
req interface{},
91+
info *grpc.UnaryServerInfo,
92+
handler grpc.UnaryHandler,
93+
) (interface{}, error) {
94+
meta, ok := metadata.FromIncomingContext(ctx)
95+
if !ok {
96+
return handler(ctx, req)
97+
}
98+
99+
values := meta.Get(modNameMetadataKey)
100+
if len(values) == 1 {
101+
ctx = context.WithValue(ctx, modNameKeyID, values[0])
102+
}
103+
104+
return handler(ctx, req)
105+
}

module/modmanager/manager.go

+1
Original file line numberDiff line numberDiff line change
@@ -1436,6 +1436,7 @@ func getFullEnvironment(
14361436
environment := map[string]string{
14371437
"VIAM_HOME": viamHomeDir,
14381438
"VIAM_MODULE_DATA": dataDir,
1439+
"VIAM_MODULE_NAME": cfg.Name,
14391440
}
14401441
if cfg.Type == config.ModuleTypeRegistry {
14411442
environment["VIAM_MODULE_ID"] = cfg.ModuleID

module/module.go

+21-1
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ type peerResourceState struct {
175175

176176
// Module represents an external resource module that services components/services.
177177
type Module struct {
178+
// The name of the module as per the robot config. This value is communicated via the
179+
// `VIAM_MODULE_NAME` env var.
180+
name string
181+
178182
shutdownCtx context.Context
179183
shutdownFn context.CancelFunc
180184
parent *client.RobotClient
@@ -219,7 +223,12 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod
219223
}
220224

221225
cancelCtx, cancel := context.WithCancel(context.Background())
226+
227+
// If the env variable does not exist, the empty string is returned.
228+
modName, _ := os.LookupEnv("VIAM_MODULE_NAME")
229+
222230
m := &Module{
231+
name: modName,
223232
shutdownCtx: cancelCtx,
224233
shutdownFn: cancel,
225234
logger: logger,
@@ -369,7 +378,18 @@ func (m *Module) connectParent(ctx context.Context) error {
369378
clientLogger := logging.NewLogger("networking.module-connection")
370379
clientLogger.SetLevel(m.logger.GetLevel())
371380
// TODO(PRODUCT-343): add session support to modules
372-
rc, err := client.New(ctx, fullAddr, clientLogger, client.WithDisableSessions())
381+
382+
connectOptions := []client.RobotClientOption{
383+
client.WithDisableSessions(),
384+
}
385+
386+
// Modules compiled against newer SDKs may be running against older `viam-server`s that do not
387+
// provide the module name as an env variable.
388+
if m.name != "" {
389+
connectOptions = append(connectOptions, client.WithModName(m.name))
390+
}
391+
392+
rc, err := client.New(ctx, fullAddr, m.logger, connectOptions...)
373393
if err != nil {
374394
return err
375395
}

module/testmodule/main.go

+21-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"go.viam.com/rdk/components/generic"
1515
"go.viam.com/rdk/components/motor"
16+
"go.viam.com/rdk/components/sensor"
1617
"go.viam.com/rdk/logging"
1718
"go.viam.com/rdk/module"
1819
"go.viam.com/rdk/resource"
@@ -106,9 +107,23 @@ func mainWithArgs(ctx context.Context, args []string, logger logging.Logger) err
106107
func newHelper(
107108
ctx context.Context, deps resource.Dependencies, conf resource.Config, logger logging.Logger,
108109
) (resource.Resource, error) {
110+
var dependsOnSensor sensor.Sensor
111+
var err error
112+
if len(conf.DependsOn) > 0 {
113+
dependsOnSensor, err = sensor.FromDependencies(deps, conf.DependsOn[0])
114+
if err != nil {
115+
return nil, err
116+
}
117+
}
118+
119+
if len(deps) > 0 && dependsOnSensor == nil {
120+
return nil, fmt.Errorf("sensor not found in deps: %v", deps)
121+
}
122+
109123
return &helper{
110-
Named: conf.ResourceName().AsNamed(),
111-
logger: logger,
124+
Named: conf.ResourceName().AsNamed(),
125+
logger: logger,
126+
dependsOnSensor: dependsOnSensor,
112127
}, nil
113128
}
114129

@@ -117,6 +132,7 @@ type helper struct {
117132
resource.TriviallyCloseable
118133
logger logging.Logger
119134
numReconfigurations int
135+
dependsOnSensor sensor.Sensor
120136
}
121137

122138
// DoCommand looks up the "real" command from the map it's passed.
@@ -191,6 +207,9 @@ func (h *helper) DoCommand(ctx context.Context, req map[string]interface{}) (map
191207
return map[string]any{}, nil
192208
case "get_num_reconfigurations":
193209
return map[string]any{"num_reconfigurations": h.numReconfigurations}, nil
210+
case "do_readings_on_dep":
211+
_, err := h.dependsOnSensor.Readings(ctx, nil)
212+
return nil, err
194213
default:
195214
return nil, fmt.Errorf("unknown command string %s", cmd)
196215
}

robot/client/client.go

+5
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,11 @@ func New(ctx context.Context, address string, clientLogger logging.ZapCompatible
291291
rpc.WithStreamClientInterceptor(streamClientInterceptor()),
292292
)
293293

294+
if rOpts.modName != "" {
295+
inter := &grpc.ModInterceptors{ModName: rOpts.modName}
296+
rc.dialOptions = append(rc.dialOptions, rpc.WithUnaryClientInterceptor(inter.UnaryClientInterceptor))
297+
}
298+
294299
if err := rc.Connect(ctx); err != nil {
295300
return nil, err
296301
}

robot/client/client_options.go

+10
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type robotClientOpts struct {
3232

3333
// controls whether or not sessions are disabled.
3434
disableSessions bool
35+
36+
modName string
3537
}
3638

3739
// RobotClientOption configures how we set up the connection.
@@ -56,6 +58,14 @@ func newFuncRobotClientOption(f func(*robotClientOpts)) *funcRobotClientOption {
5658
}
5759
}
5860

61+
// WithModName attaches a unary interceptor that attaches the module name for each outgoing gRPC
62+
// request. Should only be used in Viam module library code.
63+
func WithModName(modName string) RobotClientOption {
64+
return newFuncRobotClientOption(func(o *robotClientOpts) {
65+
o.modName = modName
66+
})
67+
}
68+
5969
// WithRefreshEvery returns a RobotClientOption for how often to refresh the status/parts of the
6070
// robot.
6171
func WithRefreshEvery(refreshEvery time.Duration) RobotClientOption {

robot/impl/local_robot_test.go

+78
Original file line numberDiff line numberDiff line change
@@ -4534,3 +4534,81 @@ func TestRemovingOfflineRemotes(t *testing.T) {
45344534
cancelReconfig()
45354535
wg.Wait()
45364536
}
4537+
4538+
// TestModuleNamePassing asserts that module names are passed from viam-server -> module
4539+
// properly. Such that incoming requests from module -> viam-server identify themselves. And can be
4540+
// observed on contexts via `[r]grpc.GetModuleName(ctx)`.
4541+
func TestModuleNamePassing(t *testing.T) {
4542+
logger := logging.NewTestLogger(t)
4543+
4544+
ctx := context.Background()
4545+
4546+
// We will inject a `ReadingsFunc` handler. The request should come from the `testmodule` and
4547+
// the interceptors should pass along a module name. Which will get captured in the
4548+
// `moduleNameCh` that the end of the test will assert on.
4549+
//
4550+
// The channel must be buffered to such that the `ReadingsFunc` returns without waiting on a
4551+
// reader of the channel.
4552+
moduleNameCh := make(chan string, 1)
4553+
callbackSensor := &inject.Sensor{
4554+
ReadingsFunc: func(ctx context.Context, extra map[string]any) (map[string]any, error) {
4555+
moduleNameCh <- rgrpc.GetModuleName(ctx)
4556+
return map[string]any{
4557+
"reading": 42,
4558+
}, nil
4559+
},
4560+
CloseFunc: func(ctx context.Context) error {
4561+
return nil
4562+
},
4563+
}
4564+
4565+
// The resource registry is a global. We must use unique model names to avoid unexpected
4566+
// collisions.
4567+
callbackModelName := resource.DefaultModelFamily.WithModel(utils.RandomAlphaString(8))
4568+
resource.RegisterComponent(
4569+
sensor.API,
4570+
callbackModelName,
4571+
resource.Registration[sensor.Sensor, resource.NoNativeConfig]{Constructor: func(
4572+
ctx context.Context,
4573+
deps resource.Dependencies,
4574+
conf resource.Config,
4575+
logger logging.Logger,
4576+
) (sensor.Sensor, error) {
4577+
// Be lazy -- just return an a singleton object.
4578+
return callbackSensor, nil
4579+
}})
4580+
4581+
const moduleName = "fancy_module_name"
4582+
localRobot := setupLocalRobot(t, ctx, &config.Config{
4583+
Modules: []config.Module{
4584+
{
4585+
Name: moduleName,
4586+
ExePath: rtestutils.BuildTempModule(t, "module/testmodule"),
4587+
Type: config.ModuleTypeLocal,
4588+
},
4589+
},
4590+
Components: []resource.Config{
4591+
// We will invoke a special `DoCommand` on `modularComp`. It will expect its `DependsOn:
4592+
// "foo"` to be a sensor. And call the `Readings` API on that sensor.
4593+
{
4594+
Name: "modularComp",
4595+
API: generic.API,
4596+
Model: resource.NewModel("rdk", "test", "helper"),
4597+
DependsOn: []string{"foo"},
4598+
},
4599+
// `foo` will be a sensor that we've instrumented with the injected `ReadingsFunc`.
4600+
{
4601+
Name: "foo",
4602+
API: sensor.API,
4603+
Model: callbackModelName,
4604+
},
4605+
},
4606+
}, logger)
4607+
4608+
res, err := localRobot.ResourceByName(generic.Named("modularComp"))
4609+
test.That(t, err, test.ShouldBeNil)
4610+
4611+
_, err = res.DoCommand(ctx, map[string]interface{}{"command": "do_readings_on_dep"})
4612+
test.That(t, err, test.ShouldBeNil)
4613+
test.That(t, <-moduleNameCh, test.ShouldEqual, moduleName)
4614+
}

robot/web/web.go

+4
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ func (svc *webService) StartModule(ctx context.Context) error {
194194

195195
unaryInterceptors = append(unaryInterceptors, grpc.EnsureTimeoutUnaryServerInterceptor)
196196

197+
// Attach the module name (as defined by the robot config) to the handler context. Can be
198+
// accessed via `grpc.GetModuleName`.
199+
unaryInterceptors = append(unaryInterceptors, grpc.ModNameUnaryServerInterceptor)
200+
197201
opManager := svc.r.OperationManager()
198202
unaryInterceptors = append(unaryInterceptors,
199203
opManager.UnaryServerInterceptor, logging.UnaryServerInterceptor)

0 commit comments

Comments
 (0)