Skip to content

Commit

Permalink
Add an optional Init func to the server
Browse files Browse the repository at this point in the history
That takes a config struct from the client.

The old way of configuring the server was to pass env vars (which still works), but this was at best very cumbersome.
  • Loading branch information
bep committed Apr 3, 2024
1 parent 380fc7d commit d63216e
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 93 deletions.
98 changes: 73 additions & 25 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
)

// StartClient starts a client for the given options.
func StartClient[Q, M, R any](opts ClientOptions[Q, M, R]) (*Client[Q, M, R], error) {
func StartClient[C, Q, M, R any](opts ClientOptions[C, Q, M, R]) (*Client[C, Q, M, R], error) {
if opts.Codec == nil {
return nil, errors.New("opts: Codec is required")
}
Expand All @@ -37,16 +37,23 @@ func StartClient[Q, M, R any](opts ClientOptions[Q, M, R]) (*Client[Q, M, R], er
return nil, err
}

return &Client[Q, M, R]{
c := &Client[C, Q, M, R]{
rawClient: rawClient,
opts: opts,
}, nil
}

err = c.init(opts.Config)
if err != nil {
return nil, err
}

return c, nil
}

// Client is a strongly typed RPC client.
type Client[Q, M, R any] struct {
type Client[C, Q, M, R any] struct {
rawClient *ClientRaw
opts ClientOptions[Q, M, R]
opts ClientOptions[C, Q, M, R]
}

// Result is the result of a request
Expand Down Expand Up @@ -85,13 +92,49 @@ func (r Result[M, R]) close() {
// MessagesRaw returns the raw messages from the server.
// These are not connected to the request-response flow,
// typically used for log messages etc.
func (c *Client[Q, M, R]) MessagesRaw() <-chan Message {
func (c *Client[C, Q, M, R]) MessagesRaw() <-chan Message {
return c.rawClient.Messages
}

// init passes the configuration to the server.
func (c *Client[C, Q, M, R]) init(cfg C) error {
body, err := c.opts.Codec.Encode(cfg)
if err != nil {
return fmt.Errorf("failed to encode config: %w", err)
}
var (
messagec = make(chan Message, 10)
errc = make(chan error, 1)
)

go func() {
err := c.rawClient.Execute(
func(m *Message) {
m.Body = body
m.Header.Status = MessageStatusInitServer
},
messagec,
)
if err != nil {
errc <- fmt.Errorf("failed to execute init: %w", err)
}
}()

select {
case err := <-errc:
return err
case m := <-messagec:
if m.Header.Status != MessageStatusOK {
return fmt.Errorf("failed to init: %s (error code %d)", m.Body, m.Header.Status)
}
}

return nil
}

// Execute sends the request to the server and returns the result.
// You should check Err() both before and after reading from the messages and receipt channels.
func (c *Client[Q, M, R]) Execute(r Q) Result[M, R] {
func (c *Client[C, Q, M, R]) Execute(r Q) Result[M, R] {
result := Result[M, R]{
messages: make(chan M, 10),
receipt: make(chan R, 1),
Expand All @@ -112,28 +155,31 @@ func (c *Client[Q, M, R]) Execute(r Q) Result[M, R] {

messagesRaw := make(chan Message, 10)
go func() {
err := c.rawClient.Execute(body, messagesRaw)
err := c.rawClient.Execute(func(m *Message) { m.Body = body }, messagesRaw)
if err != nil {
result.errc <- fmt.Errorf("failed to execute: %w", err)
}
}()

for message := range messagesRaw {
if message.Header.Status > MessageStatusContinue && message.Header.Status <= MessageStatusSystemReservedMax {
if message.Header.Status >= MessageStatusErrDecodeFailed && message.Header.Status <= MessageStatusSystemReservedMax {
// All of these are currently error situations produced by the server.
result.errc <- fmt.Errorf("%s (error code %d)", message.Body, message.Header.Status)
return
}

if message.Header.Status == MessageStatusContinue {
switch message.Header.Status {
case MessageStatusContinue:
var resp M
err = c.opts.Codec.Decode(message.Body, &resp)
if err != nil {
result.errc <- err
return
}
result.messages <- resp
} else {
case MessageStatusInitServer:
panic("unexpected status")
default:
// Receipt.
var rec R
err = c.opts.Codec.Decode(message.Body, &rec)
Expand All @@ -152,7 +198,7 @@ func (c *Client[Q, M, R]) Execute(r Q) Result[M, R] {
}

// Close closes the client.
func (c *Client[Q, M, R]) Close() error {
func (c *Client[C, Q, M, R]) Close() error {
return c.rawClient.Close()
}

Expand Down Expand Up @@ -248,10 +294,10 @@ func (c *ClientRaw) Close() error {
// Execute sends body to the server and sends any messages to the messages channel.
// It's safe to call Execute from multiple goroutines.
// The messages channel wil be closed when the call is done.
func (c *ClientRaw) Execute(body []byte, messages chan<- Message) error {
func (c *ClientRaw) Execute(withMessage func(m *Message), messages chan<- Message) error {
defer close(messages)

call, err := c.newCall(body, messages)
call, err := c.newCall(withMessage, messages)
if err != nil {
return err
}
Expand All @@ -276,20 +322,21 @@ func (c *ClientRaw) addErrContext(op string, err error) error {
return fmt.Errorf("%s: %s %s", op, err, c.conn.stdErr.String())
}

func (c *ClientRaw) newCall(body []byte, messages chan<- Message) (*call, error) {
func (c *ClientRaw) newCall(withMessage func(m *Message), messages chan<- Message) (*call, error) {
c.mu.Lock()
c.seq++
id := c.seq
m := Message{
Header: Header{
Version: c.version,
ID: id,
},
}
withMessage(&m)

call := &call{
Done: make(chan *call, 1),
Request: Message{
Header: Header{
Version: c.version,
ID: id,
},
Body: body,
},
Done: make(chan *call, 1),
Request: m,
Messages: messages,
}

Expand Down Expand Up @@ -384,9 +431,10 @@ func (c *ClientRaw) send(call *call) error {
}

// ClientOptions are options for the client.
type ClientOptions[Q, M, R any] struct {
type ClientOptions[C, Q, M, R any] struct {
ClientRawOptions
Codec codecs.Codec
Config C
Codec codecs.Codec // TODO1 could this be in Config somehow?
}

// ClientRawOptions are options for the raw part of the client.
Expand Down
Loading

0 comments on commit d63216e

Please sign in to comment.