|
8 | 8 | "os"
|
9 | 9 | "path/filepath"
|
10 | 10 | "slices"
|
| 11 | + "strconv" |
11 | 12 | "strings"
|
12 | 13 | "sync"
|
13 | 14 | "sync/atomic"
|
@@ -42,6 +43,9 @@ import (
|
42 | 43 | rutils "go.viam.com/rdk/utils"
|
43 | 44 | )
|
44 | 45 |
|
| 46 | +// tcpPortRange is the beginning of the port range. Only used when ViamTCPSockets() = true. |
| 47 | +const tcpPortRange = 13500 |
| 48 | + |
45 | 49 | var (
|
46 | 50 | validateConfigTimeout = 5 * time.Second
|
47 | 51 | errMessageExitStatus143 = "exit status 143"
|
@@ -70,6 +74,7 @@ func NewManager(
|
70 | 74 | restartCtxCancel: restartCtxCancel,
|
71 | 75 | packagesDir: options.PackagesDir,
|
72 | 76 | ftdc: options.FTDC,
|
| 77 | + nextPort: tcpPortRange, |
73 | 78 | }
|
74 | 79 | }
|
75 | 80 |
|
@@ -102,8 +107,9 @@ type module struct {
|
102 | 107 | inStartup atomic.Bool
|
103 | 108 | inRecoveryLock sync.Mutex
|
104 | 109 | logger logging.Logger
|
105 |
| - |
106 |
| - ftdc *ftdc.FTDC |
| 110 | + ftdc *ftdc.FTDC |
| 111 | + // port stores the listen port of this module when ViamTCPSockets() = true. |
| 112 | + port int |
107 | 113 | }
|
108 | 114 |
|
109 | 115 | type addedResource struct {
|
@@ -183,8 +189,9 @@ type Manager struct {
|
183 | 189 | removeOrphanedResources func(ctx context.Context, rNames []resource.Name)
|
184 | 190 | restartCtx context.Context
|
185 | 191 | restartCtxCancel context.CancelFunc
|
186 |
| - |
187 |
| - ftdc *ftdc.FTDC |
| 192 | + ftdc *ftdc.FTDC |
| 193 | + // nextPort manages ports when ViamTCPSockets() = true. |
| 194 | + nextPort int |
188 | 195 | }
|
189 | 196 |
|
190 | 197 | // Close terminates module connections and processes.
|
@@ -326,7 +333,9 @@ func (mgr *Manager) add(ctx context.Context, conf config.Module, moduleLogger lo
|
326 | 333 | resources: map[resource.Name]*addedResource{},
|
327 | 334 | logger: moduleLogger,
|
328 | 335 | ftdc: mgr.ftdc,
|
| 336 | + port: mgr.nextPort, |
329 | 337 | }
|
| 338 | + mgr.nextPort++ |
330 | 339 |
|
331 | 340 | if err := mgr.startModule(ctx, mod); err != nil {
|
332 | 341 | return err
|
@@ -996,8 +1005,12 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.
|
996 | 1005 | func (m *module) dial() error {
|
997 | 1006 | // TODO(PRODUCT-343): session support probably means interceptors here
|
998 | 1007 | var err error
|
| 1008 | + addrToDial := m.addr |
| 1009 | + if !rutils.TCPRegex.MatchString(addrToDial) { |
| 1010 | + addrToDial = "unix://" + m.addr |
| 1011 | + } |
999 | 1012 | conn, err := grpc.Dial( //nolint:staticcheck
|
1000 |
| - "unix://"+m.addr, |
| 1013 | + addrToDial, |
1001 | 1014 | grpc.WithTransportCredentials(insecure.NewCredentials()),
|
1002 | 1015 | grpc.WithChainUnaryInterceptor(
|
1003 | 1016 | rdkgrpc.EnsureTimeoutUnaryClientInterceptor,
|
@@ -1104,11 +1117,16 @@ func (m *module) startProcess(
|
1104 | 1117 | packagesDir string,
|
1105 | 1118 | ) error {
|
1106 | 1119 | var err error
|
1107 |
| - // append a random alpha string to the module name while creating a socket address to avoid conflicts |
1108 |
| - // with old versions of the module. |
1109 |
| - if m.addr, err = modlib.CreateSocketAddress( |
1110 |
| - filepath.Dir(parentAddr), fmt.Sprintf("%s-%s", m.cfg.Name, utils.RandomAlphaString(5))); err != nil { |
1111 |
| - return err |
| 1120 | + |
| 1121 | + if rutils.ViamTCPSockets() { |
| 1122 | + m.addr = "127.0.0.1:" + strconv.Itoa(m.port) |
| 1123 | + } else { |
| 1124 | + // append a random alpha string to the module name while creating a socket address to avoid conflicts |
| 1125 | + // with old versions of the module. |
| 1126 | + if m.addr, err = modlib.CreateSocketAddress( |
| 1127 | + filepath.Dir(parentAddr), fmt.Sprintf("%s-%s", m.cfg.Name, utils.RandomAlphaString(5))); err != nil { |
| 1128 | + return err |
| 1129 | + } |
1112 | 1130 | }
|
1113 | 1131 |
|
1114 | 1132 | // We evaluate the Module's ExePath absolutely in the viam-server process so that
|
@@ -1176,12 +1194,15 @@ func (m *module) startProcess(
|
1176 | 1194 | )
|
1177 | 1195 | }
|
1178 | 1196 | }
|
1179 |
| - err = modlib.CheckSocketOwner(m.addr) |
1180 |
| - if errors.Is(err, fs.ErrNotExist) { |
1181 |
| - continue |
1182 |
| - } |
1183 |
| - if err != nil { |
1184 |
| - return errors.WithMessage(err, "module startup failed") |
| 1197 | + if !rutils.TCPRegex.MatchString(m.addr) { |
| 1198 | + // note: we don't do this check in TCP mode because TCP addresses are not file paths and will fail check. |
| 1199 | + err = modlib.CheckSocketOwner(m.addr) |
| 1200 | + if errors.Is(err, fs.ErrNotExist) { |
| 1201 | + continue |
| 1202 | + } |
| 1203 | + if err != nil { |
| 1204 | + return errors.WithMessage(err, "module startup failed") |
| 1205 | + } |
1185 | 1206 | }
|
1186 | 1207 | break
|
1187 | 1208 | }
|
|
0 commit comments