Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable pubsub package so we can increase the number of publishers on topics #261

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
883ff61
Use gcppubsub client when gcp pubsub is used
Hamishpk Oct 18, 2023
908f968
fix build files
Hamishpk Oct 18, 2023
6e4f1ca
cleanup
Hamishpk Oct 18, 2023
4892097
Use url opener rather than openTopic
Hamishpk Oct 19, 2023
f19013b
Add metric
Hamishpk Oct 19, 2023
67f84fe
fix lint
Hamishpk Oct 19, 2023
73ec819
Add failure to publish on message metrics
Hamishpk Oct 19, 2023
8a3ad33
use new pubsub opts in api server
Hamishpk Oct 20, 2023
0817754
Add and implement api pubsub opts
Hamishpk Oct 20, 2023
9a3536e
Add batch size and pollers to test struct
Hamishpk Oct 20, 2023
d8b020d
Fix tests
Hamishpk Oct 20, 2023
6bd56c1
Move pubsub opts to api package and actually make things configurable
Hamishpk Oct 23, 2023
ac22373
Fix subscription name when instance name isn't passed
Hamishpk Oct 23, 2023
4397456
make configurable pubsub package
Hamishpk Oct 23, 2023
61c0f51
add topic opts
Hamishpk Oct 23, 2023
f547bc8
lint
Hamishpk Oct 23, 2023
5eb1286
fix import
Hamishpk Oct 23, 2023
5bcf832
Remove topic config options in favour of passing them in the url
Hamishpk Oct 23, 2023
5701d14
Group pubsub opts for api server and add metrics related to failing t…
Hamishpk Oct 26, 2023
defae24
Generate release (#264)
Hamishpk Oct 26, 2023
c2e82d5
Correctly construct subscript name in api server (#265)
Hamishpk Oct 26, 2023
1760ac2
Fix bug in api server (#266)
Hamishpk Oct 26, 2023
b9ca40a
Update gocloud to v0.34.0 (#267)
Hamishpk Oct 26, 2023
6554703
Feat: Add rate limit to Redis client (#268)
Garbett1 Oct 29, 2023
e04ab39
make configurable pubsub package so we can configure the number of
Hamishpk Oct 18, 2023
af61dde
Group pubsub opts for api server and add metrics related to failing t…
Hamishpk Oct 26, 2023
81d9fe3
Randomly offset retention and expiration time. (#270)
fische Nov 10, 2023
f4bee82
Don't use Allow, because that consumes tokens (#272)
Garbett1 Nov 10, 2023
e2bd815
Fix runlocal (#271)
Tatskaari Nov 10, 2023
4634d00
Start receiving from queue only once we got all inflight executions. …
fische Nov 14, 2023
a75c4b8
Add GCP Cloud profiler support and update Go to 1.21 (#274)
Garbett1 Jan 6, 2024
f18e595
Bump to 11.7.0 (#275)
Garbett1 Jan 8, 2024
90fd7e8
Update remote apis to correct (#276)
Garbett1 Jan 15, 2024
51d669d
Set lowercase names for the profiler to work. (#277)
Garbett1 Feb 12, 2024
226af38
Increase timeout for UploadIfMissing (#278)
Hamishpk Feb 14, 2024
8365834
Set limiter before context in upload one (#279)
Hamishpk Feb 16, 2024
2c8ba3b
Bump GHA versions (#280)
peterebden Feb 19, 2024
9cc4d69
Periodically delete jobs in Mettle api server (#281)
Hamishpk Feb 21, 2024
4004669
Don't stop delete jobs ticker (#282)
Hamishpk Feb 22, 2024
7921cc0
Refactor gauge strategy (#284)
Garbett1 Mar 4, 2024
b5ca967
Reduce verbosity of api server logs (#286)
Hamishpk Mar 5, 2024
c38775b
Make startup logs more descriptive in elan (#288)
Hamishpk Mar 5, 2024
6955f2f
Reduce verbosity of worker logs (#287)
isobelormiston Mar 5, 2024
cad62f0
Make platform details optional (#285)
Garbett1 Mar 6, 2024
9c1ea8c
Update job deletion logic + propagate update times (#283)
Garbett1 Mar 6, 2024
2efe507
make configurable pubsub package
Hamishpk Oct 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 47 additions & 10 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ var totalFailedPubSubMessages = prometheus.NewCounter(prometheus.CounterOpts{
Help: "Number of times the Pub/Sub pool has failed",
})

var noExecutionInProgress = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Name: "no_execution_in_progress",
})

var requestPublishFailure = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Name: "request_publish_failures",
})

var responsePublishFailure = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Name: "response_publish_failures",
})

var timeToComplete = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "mettle",
Name: "time_to_complete_action_secs",
Expand All @@ -83,6 +98,9 @@ var metrics = []prometheus.Collector{
totalSuccessfulActions,
timeToComplete,
totalFailedPubSubMessages,
noExecutionInProgress,
requestPublishFailure,
responsePublishFailure,
}

func init() {
Expand All @@ -91,36 +109,52 @@ func init() {
}
}

// PubSubOpts holds information to configure queue options in the api server
type PubSubOpts struct {
RequestQueue string `short:"q" long:"request_queue" env:"API_REQUEST_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending requests, e.g. gcppubsub://my-request-queue"`
ResponseQueue string `short:"r" long:"response_queue" env:"API_RESPONSE_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending responses, e.g. gcppubsub://my-response-queue"`
ResponseQueueSuffix string `long:"response_queue_suffix" env:"API_RESPONSE_QUEUE_SUFFIX" description:"Suffix to apply to the response queue name"`
PreResponseQueue string `long:"pre_response_queue" env:"API_PRE_RESPONSE_QUEUE" required:"true" description:"URL describing the pub/sub queue to connect to for preloading responses to other servers"`
NumPollers int `long:"num_pollers" env:"API_NUM_POLLERS" default:"10"`
NumPublishers int `long:"num_publishers" env:"API_NUM_PUBLISHERS" default:"2"`
SubscriptionBatchSize uint `long:"subscription_batch_size" env:"API_SUBSCRIPTION" default:"100"`
TopicBatchSize int `long:"topic_batch_size" env:"API_TOPIC_BATCH_SIZE" default:"1000"`
}

// ServeForever serves on the given port until terminated.
func ServeForever(opts grpcutil.Opts, name, requestQueue, responseQueue, preResponseQueue, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool, numPollers int, responseBatchSize uint) {
s, lis, err := serve(opts, name, requestQueue, responseQueue, preResponseQueue, apiURL, connTLS, allowedPlatform, storageURL, storageTLS, numPollers, responseBatchSize)
func ServeForever(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) {
s, lis, err := serve(opts, name, queueOpts, apiURL, connTLS, allowedPlatform, storageURL, storageTLS)
if err != nil {
log.Fatalf("%s", err)
}
grpcutil.ServeForever(lis, s)
}

func serve(opts grpcutil.Opts, name, requestQueue, responseQueue, preResponseQueue, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool, numPollers int, responseBatchSize uint) (*grpc.Server, net.Listener, error) {
func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, connTLS bool, allowedPlatform map[string][]string, storageURL string, storageTLS bool) (*grpc.Server, net.Listener, error) {
responseSubscriptionName := queueOpts.ResponseQueue
if name == "" {
name = "mettle API server"
} else {
responseSubscriptionName = responseSubscriptionName + name
}

log.Notice("Contacting CAS server on %s...", storageURL)
client, err := rexclient.New(name, storageURL, storageTLS, "")
if err != nil {
return nil, nil, err
}
if numPollers < 1 {
return nil, nil, fmt.Errorf("too few pollers specified: %d", numPollers)
if queueOpts.NumPollers < 1 {
return nil, nil, fmt.Errorf("too few pollers specified: %d", queueOpts.NumPollers)
}
srv := &server{
name: name,
requests: common.MustOpenTopic(requestQueue),
responses: common.MustOpenSubscription(responseQueue, responseBatchSize),
preResponses: common.MustOpenTopic(preResponseQueue),
requests: common.MustOpenTopic(queueOpts.RequestQueue, queueOpts.TopicBatchSize, queueOpts.NumPublishers),
responses: common.MustOpenSubscription(responseSubscriptionName, queueOpts.SubscriptionBatchSize),
preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue, queueOpts.TopicBatchSize, queueOpts.NumPublishers),
Hamishpk marked this conversation as resolved.
Show resolved Hide resolved
jobs: map[string]*job{},
platform: allowedPlatform,
client: client,
numPollers: numPollers,
numPollers: queueOpts.NumPollers,
}
log.Notice("Allowed platform values:")
for k, v := range allowedPlatform {
Expand Down Expand Up @@ -276,6 +310,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := common.PublishWithOrderingKey(ctx, s.preResponses, b, req.ActionDigest.Hash, s.name); err != nil {
responsePublishFailure.Inc()
log.Error("Failed to communicate pre-response message: %s", err)
}
b, _ = proto.Marshal(req)
Expand All @@ -285,6 +320,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ
Body: b,
Metadata: platform,
}); err != nil {
requestPublishFailure.Inc()
log.Error("Failed to submit work to stream: %s", err)
return err
}
Expand Down Expand Up @@ -313,7 +349,8 @@ func (s *server) WaitExecution(req *pb.WaitExecutionRequest, stream pb.Execution
digest := &pb.Digest{Hash: req.Name}
ch, _ := s.eventStream(digest, false)
if ch == nil {
log.Warning("Request for execution %s which is not in progress", req.Name)
log.Error("Request for execution %s which is not in progress", req.Name)
noExecutionInProgress.Inc()
return status.Errorf(codes.NotFound, "No execution in progress for %s", req.Name)
}
return s.streamEvents(digest, ch, stream)
Expand Down
15 changes: 11 additions & 4 deletions mettle/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,20 @@ func setupServers(t *testing.T) (pb.ExecutionClient, *executor, *grpc.Server) {
casaddr := setupCASServer()
requests := fmt.Sprintf("mem://requests%d", queueID)
responses := fmt.Sprintf("mem://responses%d", queueID)
queues := PubSubOpts{
RequestQueue: requests,
ResponseQueue: responses,
PreResponseQueue: responses,
NumPollers: 1,
SubscriptionBatchSize: 1,
}
queueID++
common.MustOpenTopic(requests) // Ensure these are created before anything tries
common.MustOpenTopic(responses) // to open a subscription to either.
common.MustOpenTopic(requests, 1, 1) // Ensure these are created before anything tries
common.MustOpenTopic(responses, 1, 1) // to open a subscription to either.
s, lis, err := serve(grpcutil.Opts{
Host: "127.0.0.1",
Port: 0,
}, "", requests, responses, responses, "", true, map[string][]string{}, casaddr, false, 1, 1)
}, "", queues, "", true, map[string][]string{}, casaddr, false)
require.NoError(t, err)
go s.Serve(lis)
conn, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
Expand Down Expand Up @@ -329,7 +336,7 @@ type executor struct {
func newExecutor(requests, responses string) *executor {
return &executor{
requests: common.MustOpenSubscription(requests, 1),
responses: common.MustOpenTopic(responses),
responses: common.MustOpenTopic(responses, 1, 1),
}
}

Expand Down
4 changes: 4 additions & 0 deletions mettle/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ go_library(
"///third_party/go/github.com_peterebden_go-cli-init_v4//logging",
"///third_party/go/gocloud.dev//pubsub",
"///third_party/go/gocloud.dev//pubsub/gcppubsub",
"///third_party/go/gocloud.dev//pubsub/batcher",
"///third_party/go/gocloud.dev//gcp",
"///third_party/go/google.golang.org_genproto//googleapis/longrunning",
"///third_party/go/google.golang.org_genproto//googleapis/pubsub/v1",
"///third_party/go/google.golang.org_grpc//codes",
"///third_party/go/google.golang.org_grpc//status",
"//mettle/mempubsub",
"//mettle/configurablepubsub",

],
)

Expand Down
35 changes: 30 additions & 5 deletions mettle/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/thought-machine/please-servers/mettle/mempubsub" // Register our custom mempubsub scheme
_ "gocloud.dev/pubsub/gcppubsub" // And gocloud's gcppubsub provider
// Register our custom pubsub schemes
_ "github.com/thought-machine/please-servers/mettle/configurablepubsub" // blank import to register schema

Check failure on line 27 in mettle/common/common.go

View workflow job for this annotation

GitHub Actions / lint

could not import github.com/thought-machine/please-servers/mettle/configurablepubsub (-: # github.com/thought-machine/please-servers/mettle/configurablepubsub
"github.com/thought-machine/please-servers/mettle/mempubsub"
)

var log = logging.MustGetLogger()
Expand Down Expand Up @@ -77,15 +78,39 @@
}

// MustOpenTopic opens a topic, which must have been created ahead of time.
func MustOpenTopic(url string) *pubsub.Topic {
t, err := pubsub.OpenTopic(context.Background(), url)
// Batch size and number of publishers are configurable for GCP queues only.
func MustOpenTopic(url string, batchSize, numPublishers int) *pubsub.Topic {
u := addTopicOpts(url, batchSize, numPublishers)
t, err := pubsub.OpenTopic(context.Background(), u)
if err != nil {
log.Fatalf("Failed to open topic %s: %s", url, err)
log.Fatalf("Failed to open topic %s: %s", u, err)
}
log.Debug("Opened topic %s", url)
return t
}

func addTopicOpts(in string, batchSize, numPublishers int) string {
u, err := url.Parse(in)
if err != nil {
// It's not clear exactly how we can even get here; url.Parse seems to pretty much never
// return an error. Anyway, panicking at this point shouldn't be an issue.
panic(err)
}
v := u.Query()
if batchSize > 1 {
v.Add("max_send_batch_size", strconv.Itoa(batchSize))
}
if numPublishers > 1 {
if strings.HasPrefix(in, "configurable") {
v.Add("num_handlers", strconv.Itoa(numPublishers))
} else {
log.Fatal("Can only set numPublishers on configurablepubsub queues")
}
}
u.RawQuery = v.Encode()
return u.String()
}

func handleSignals(cancel context.CancelFunc, s Shutdownable) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGABRT, syscall.SIGTERM)
Expand Down
10 changes: 10 additions & 0 deletions mettle/configurablepubsub/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
go_library(
name = "configurablepubsub",
srcs = ["configurablepubsub.go"],
visibility = ["//mettle/..."],
deps = [
"///third_party/go/gocloud.dev//pubsub",
"///third_party/go/gocloud.dev//pubsub/gcppubsub",
"///third_party/go/gocloud.dev//gcp",
],
)
87 changes: 87 additions & 0 deletions mettle/configurablepubsub/configurablepubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// configurablepubsub is a wrapper around gpcpubsub package that allows us to configure num handlers

Check failure on line 1 in mettle/configurablepubsub/configurablepubsub.go

View workflow job for this annotation

GitHub Actions / lint

: # github.com/thought-machine/please-servers/mettle/configurablepubsub
package configurablepubsub

import (
"context"
"fmt"
"net/url"
"path"
"regexp"
"strconv"
"strings"

"gocloud.dev/gcp"
"gocloud.dev/pubsub"
"gocloud.dev/pubsub/gcppubsub"
)

const Scheme = "configurable-gcppubsub"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very long name for a scheme 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha I shorterned it to tm-gcppubsub. I wouldn't be against changing the package name to this also but the name is pretty descriptive.


var topicPathRE = regexp.MustCompile("^projects/.+/topics/.+$")

func init() {
uo := &urlOpener{}
pubsub.DefaultURLMux().RegisterTopic(Scheme, uo)
}

type urlOpener struct {
}

func (uo *urlOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error) {
opts := gcppubsub.TopicOptions{}
for param, value := range u.Query() {
switch param {
case "max_send_batch_size":
maxBatchSize, err := queryParameterInt(value)
if err != nil {
return nil, fmt.Errorf("open topic %v: invalid query parameter %q: %v", u, param, err)
}

if maxBatchSize <= 0 || maxBatchSize > 1000 {
return nil, fmt.Errorf("open topic %v: invalid query parameter %q: must be between 1 and 1000", u, param)
}

opts.BatcherOptions.MaxBatchSize = maxBatchSize
case "num_handlers":
maxHandlers, err := queryParameterInt(value)
if err != nil {
return nil, fmt.Errorf("open topic %v: invalid query parameter %q: %v", u, param, err)
}
if maxHandlers < 1 {
return nil, fmt.Errorf("open topic %v: numHandlers cannot be less than 1")
}
opts.BatcherOptions.MaxHandlers = maxHandlers
default:
return nil, fmt.Errorf("open topic %v: invalid query parameter %q", u, param)
}
}

creds, err := gcp.DefaultCredentials(ctx)
if err != nil {
return nil, err
}
conn, _, err := gcppubsub.Dial(ctx, creds.TokenSource)
if err != nil {
return nil, err
}

pc, err := gcppubsub.PublisherClient(ctx, conn)
if err != nil {
return nil, err
}
topicPath := path.Join(u.Host, u.Path)
if topicPathRE.MatchString(topicPath) {
return gcppubsub.OpenTopicByPath(pc, topicPath, &opts)
}
// Shortened form?
topicName := strings.TrimPrefix(u.Path, "/")
return gcppubsub.OpenTopic(pc, gcp.ProjectID(u.Host), topicName, &opts), nil
Comment on lines +69 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using gcppubsub.URLOpener here? You initialise it with the connection and topic options and then just call OpenTopicURL with the URL. That way we can still benefit from their implementation and their extra params.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I didn't go for this is because I did want to play around with the url passed into the service. We would take a url, get the params from it, reconstruct the url, modify the schema and then pass it to the URL opener. If the schema changes then this function would need to change. We still get the default gcp opts doing it this way. What do you think?

}

func queryParameterInt(value []string) (int, error) {
if len(value) > 1 {
return 0, fmt.Errorf("expected only one parameter value, got: %v", len(value))
}

return strconv.Atoi(value[0])
}
27 changes: 13 additions & 14 deletions mettle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,9 @@ var opts = struct {
URL string `long:"url" description:"URL for communicating with other API servers"`
TLS bool `long:"tls" description:"Use TLS for communication between api servers"`
} `group:"Options controlling communication with other API servers for bootstrapping zero-downtime deployments." namespace:"api"`
Storage StorageOpts `group:"Options controlling communication with the CAS server" namespace:"storage"`
GRPC grpcutil.Opts `group:"Options controlling the gRPC server"`
Queues struct {
RequestQueue string `short:"q" long:"request_queue" env:"API_REQUEST_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending requests, e.g. gcppubsub://my-request-queue"`
ResponseQueue string `short:"r" long:"response_queue" env:"API_RESPONSE_QUEUE" required:"true" description:"URL defining the pub/sub queue to connect to for sending responses, e.g. gcppubsub://my-response-queue"`
ResponseQueueSuffix string `long:"response_queue_suffix" env:"API_RESPONSE_QUEUE_SUFFIX" description:"Suffix to apply to the response queue name"`
PreResponseQueue string `long:"pre_response_queue" env:"API_PRE_RESPONSE_QUEUE" required:"true" description:"URL describing the pub/sub queue to connect to for preloading responses to other servers"`
NumPollers int `long:"num_pollers" env:"API_NUM_POLLERS" default:"10"`
ResponseBatchSize uint `long:"response_batch_size" env:"API_RESPONSE_BATCH_SIZE" default:"100"`
} `group:"Options controlling the pub/sub queues"`
Storage StorageOpts `group:"Options controlling communication with the CAS server" namespace:"storage"`
GRPC grpcutil.Opts `group:"Options controlling the gRPC server"`
Queues api.PubSubOpts `group:"Options controlling the pub/sub queues"`
AllowedPlatform map[string][]string `long:"allowed_platform" description:"Allowed values for platform properties"`
} `command:"api" description:"Start as an API server"`
Worker struct {
Expand Down Expand Up @@ -170,21 +163,27 @@ func main() {
if cmd == "dual" {
const requests = "mem://requests"
const responses = "mem://responses"
queues := api.PubSubOpts{
RequestQueue: requests,
ResponseQueue: responses,
NumPollers: 1,
}

// Must ensure the topics are created ahead of time.
common.MustOpenTopic(requests)
common.MustOpenTopic(responses)
common.MustOpenTopic(requests, 1, 1)
common.MustOpenTopic(responses, 1, 1)
if opts.Dual.NumWorkers == 0 {
opts.Dual.NumWorkers = runtime.NumCPU()
}
for i := 0; i < opts.Dual.NumWorkers; i++ {
storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)]
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, opts.Dual.Redis.URL, opts.Dual.Redis.ReadURL, opts.Dual.Redis.ReadPassword(), opts.Dual.Redis.CAFile, opts.Dual.Redis.TLS, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
}
api.ServeForever(opts.Dual.GRPC, "", requests, responses, responses, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS, opts.API.Queues.NumPollers, opts.API.Queues.ResponseBatchSize)
api.ServeForever(opts.Dual.GRPC, "", queues, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS)
} else if cmd == "worker" {
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, opts.Worker.Redis.URL, opts.Worker.Redis.ReadURL, opts.Worker.Redis.ReadPassword(), opts.Worker.Redis.CAFile, opts.Worker.Redis.TLS, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
} else if cmd == "api" {
api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.RequestQueue, opts.API.Queues.ResponseQueue+opts.API.Queues.ResponseQueueSuffix, opts.API.Queues.PreResponseQueue, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS, opts.API.Queues.NumPollers, opts.API.Queues.ResponseBatchSize)
api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS)
} else if err := one(); err != nil {
log.Fatalf("%s", err)
}
Expand Down
4 changes: 2 additions & 2 deletions mettle/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, c
// RunOne runs one single request, returning any error received.
func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tokenFile, redis, readRedis, redisPassword, redisCAFile string, redisTLS bool, cachePrefix, cacheParts []string, clean, secureStorage bool, digest *pb.Digest) error {
// Must create this to submit on first
topic := common.MustOpenTopic("mem://requests")
topic := common.MustOpenTopic("mem://requests", 1, 1)
w, err := initialiseWorker(instanceName, "mem://requests", "mem://responses", name, storage, dir, cacheDir, "", sandbox, altSandbox, "", "", tokenFile, redis, readRedis, redisPassword, redisCAFile, redisTLS, cachePrefix, cacheParts, clean, secureStorage, 0, math.MaxInt64, 100.0, "", nil, 0)
if err != nil {
return err
Expand Down Expand Up @@ -290,7 +290,7 @@ func initialiseWorker(instanceName, requestQueue, responseQueue, name, storage,

w := &worker{
requests: common.MustOpenSubscription(requestQueue, 1),
responses: common.MustOpenTopic(responseQueue),
responses: common.MustOpenTopic(responseQueue, 1, 1),
ackExtension: ackExtension,
client: client,
rclient: rexclient.Uninitialised(),
Expand Down
Loading
Loading