diff --git a/appsync_example_test.go b/appsync_example_test.go index f221006..c807fa0 100644 --- a/appsync_example_test.go +++ b/appsync_example_test.go @@ -3,7 +3,8 @@ package appsync_test import ( "encoding/json" "fmt" - "log" + "log/slog" + "os" "strings" "github.com/sony/appsync-client-go/internal/appsynctest" @@ -22,12 +23,14 @@ func ExampleClient_Post_query() { Query: query, }) if err != nil { - log.Fatal(err) + slog.Error("unable to post query", "error", err) + os.Exit(1) } data := new(string) if err := response.DataAs(data); err != nil { - log.Fatalln(err, response) + slog.Error("unable to process data", "error", err, "response", response) + os.Exit(1) } fmt.Println(*data) @@ -47,12 +50,14 @@ func ExampleClient_Post_mutation() { Variables: &variables, }) if err != nil { - log.Fatal(err) + slog.Error("unable to post mutation", "error", err) + os.Exit(1) } data := new(string) if err := response.DataAs(data); err != nil { - log.Fatalln(err, response) + slog.Error("unable to process data", "error", err, "response", response) + os.Exit(1) } fmt.Println(*data) @@ -70,22 +75,27 @@ func ExampleClient_mqtt_subscription() { Query: subscription, }) if err != nil { - log.Fatal(err) + slog.Error("unable to post subscription", "error", err) + os.Exit(1) } ext, err := appsync.NewExtensions(response) if err != nil { - log.Fatalln(err) + slog.Error("unable to process extensions", "error", err) + os.Exit(1) } ch := make(chan *graphql.Response) subscriber := appsync.NewSubscriber(*ext, func(r *graphql.Response) { ch <- r }, - func(err error) { log.Println(err) }, + func(err error) { + slog.Warn("unable to create new subscriber", "error", err) + }, ) if err := subscriber.Start(); err != nil { - log.Fatalln(err) + slog.Error("unable to start subscriber", "error", err) + os.Exit(1) } defer subscriber.Stop() @@ -96,13 +106,15 @@ func ExampleClient_mqtt_subscription() { Variables: &variables, }) if err != nil { - log.Fatal(err) + slog.Error("unable to post mutation", "error", err) + os.Exit(1) } response = <-ch data := new(string) if err := response.DataAs(data); err != nil { - log.Fatalln(err, response) + slog.Error("unable to process data", "error", err, "response", response) + os.Exit(1) } fmt.Println(*data) @@ -124,11 +136,14 @@ func ExampleClient_graphqlws_subscription() { Query: subscription, }, func(r *graphql.Response) { ch <- r }, - func(err error) { log.Println(err) }, + func(err error) { + slog.Warn("unable to create new pure websocket subscriber ", "error", err) + }, ) if err := subscriber.Start(); err != nil { - log.Fatalln(err) + slog.Error("unable to start subscriber", "error", err) + os.Exit(1) } defer subscriber.Stop() @@ -139,13 +154,15 @@ func ExampleClient_graphqlws_subscription() { Variables: &variables, }) if err != nil { - log.Fatal(err) + slog.Error("unable to post mutation", "error", err) + os.Exit(1) } response := <-ch data := new(string) if err := response.DataAs(data); err != nil { - log.Fatalln(err, response) + slog.Error("unable to process data", "error", err, "response", response) + os.Exit(1) } fmt.Println(*data) diff --git a/client.go b/client.go index c6c1df1..8c29a8c 100644 --- a/client.go +++ b/client.go @@ -3,7 +3,7 @@ package appsync import ( "context" "encoding/json" - "log" + "log/slog" "net/http" "time" @@ -45,12 +45,12 @@ func (c *Client) setupHeaders(request graphql.PostRequest) (http.Header, error) jsonBytes, err := json.Marshal(request) if err != nil { - log.Println(err) + slog.Error("unable to marshal request", "error", err, "request", request) return nil, err } h, err := c.signer.signHTTP(jsonBytes) if err != nil { - log.Println(err) + slog.Error("unable to sign request", "error", err, "request", request) return nil, err } for k, vv := range h { @@ -66,7 +66,7 @@ func (c *Client) Post(request graphql.PostRequest) (*graphql.Response, error) { defer c.sleepIfNeeded(request) header, err := c.setupHeaders(request) if err != nil { - log.Println(err) + slog.Error("unable to setup headers", "error", err, "request", request) return nil, err } return c.graphQLAPI.Post(header, request) @@ -76,7 +76,7 @@ func (c *Client) Post(request graphql.PostRequest) (*graphql.Response, error) { func (c *Client) PostAsync(request graphql.PostRequest, callback func(*graphql.Response, error)) (context.CancelFunc, error) { header, err := c.setupHeaders(request) if err != nil { - log.Println(err) + slog.Error("unable to setup headers", "error", err, "request", request) return nil, err } cb := func(g *graphql.Response, err error) { diff --git a/go.mod b/go.mod index 78b6a18..e46ad85 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/sony/appsync-client-go -go 1.20 +go 1.21 require ( github.com/aws/aws-sdk-go v1.44.245 diff --git a/graphql/client.go b/graphql/client.go index 1387dff..b32a178 100644 --- a/graphql/client.go +++ b/graphql/client.go @@ -4,7 +4,7 @@ import ( "bytes" "context" "encoding/json" - "log" + "log/slog" "net/http" "net/url" "time" @@ -69,13 +69,13 @@ func (c *Client) Post(header http.Header, request PostRequest) (*Response, error func (c *Client) PostAsync(header http.Header, request PostRequest, callback func(*Response, error)) (context.CancelFunc, error) { jsonBytes, err := json.Marshal(request) if err != nil { - log.Println(err) + slog.Error("unable to marshal request", "error", err, "request", request) return nil, err } req, err := http.NewRequest("POST", c.endpoint, bytes.NewBuffer(jsonBytes)) if err != nil { - log.Println(err) + slog.Error("unable to create request", "error", err) return nil, err } req.Header = merge(req.Header, merge(c.header, header)) @@ -93,7 +93,7 @@ func (c *Client) PostAsync(header http.Header, request PostRequest, callback fun op := func() error { r, err := c.http.Do(req) if err != nil { - log.Println(err) + slog.Warn("unable to send request", "error", err, "request", request) if err.(*url.Error).Timeout() { c.http.CloseIdleConnections() } @@ -101,21 +101,21 @@ func (c *Client) PostAsync(header http.Header, request PostRequest, callback fun } defer func() { if err := r.Body.Close(); err != nil { - log.Println(err) + slog.Error("unable to close response body", "error", err, "request", request) } }() if r.StatusCode != http.StatusOK { httpErr := httpStatusError{StatusCode: r.StatusCode} if httpErr.shouldRetry() { - log.Println(httpErr) + slog.Error("unable to send request", "error", httpErr, "request", request) return httpErr } return backoff.Permanent(httpErr) } if err := json.NewDecoder(r.Body).Decode(&response); err != nil { - log.Println(err) + slog.Error("unable to decode response", "error", err, "request", request) return backoff.Permanent(err) } response.StatusCode = &r.StatusCode diff --git a/graphql/option.go b/graphql/option.go index 57d7a04..d1f710b 100644 --- a/graphql/option.go +++ b/graphql/option.go @@ -1,7 +1,7 @@ package graphql import ( - "log" + "log/slog" "net/http" "net/url" "time" @@ -29,7 +29,7 @@ func WithHTTPProxy(proxy string) ClientOption { return func(c *Client) { proxy, err := url.Parse(proxy) if err != nil { - log.Println(err) + slog.Warn("unable to parse proxy URL", "error", err) return } if t, ok := c.http.Transport.(*http.Transport); ok { diff --git a/graphql/response.go b/graphql/response.go index 597f2a1..2c44252 100644 --- a/graphql/response.go +++ b/graphql/response.go @@ -20,7 +20,7 @@ func (r *Response) DataAs(v interface{}) error { return fmt.Errorf("data is invalid") } if len(m) != 1 { - return fmt.Errorf("the data is not exist") + return fmt.Errorf("the data does not exist") } for _, value := range m { if b, err := json.Marshal(value); err == nil { diff --git a/internal/appsynctest/server.go b/internal/appsynctest/server.go index 4784bce..25cc16c 100644 --- a/internal/appsynctest/server.go +++ b/internal/appsynctest/server.go @@ -4,8 +4,8 @@ import ( "bytes" "encoding/json" "fmt" - "io/ioutil" - "log" + "io" + "log/slog" "net/http" "net/http/httptest" @@ -88,15 +88,15 @@ func (m *mqttPublisher) Write(payload []byte) (int, error) { for s := range m.mqttSessions { writer, err := s.NextWriter(websocket.BinaryMessage) if err != nil { - log.Println(err) + slog.Error("unable to get next writer", "error", err) continue } if err := pub.Write(writer); err != nil { - log.Println(err) + slog.Error("unable to write packet", "error", err) continue } if err := writer.Close(); err != nil { - log.Println(err) + slog.Error("unable to close writer", "error", err) continue } } @@ -107,7 +107,7 @@ func (m *mqttPublisher) Write(payload []byte) (int, error) { for s := range m.grapqhWsSessions { data := json.RawMessage(fmt.Sprintf(gqlwsdatafmt, "id", string(payload))) if err := s.WriteJSON(data); err != nil { - log.Println(err) + slog.Warn("unable to write json", "error", err) continue } } @@ -140,20 +140,20 @@ func (e *echoResolver) SubscribeToEcho() string { func mqttWsSession(ws *websocket.Conn, onConnected func(ws *websocket.Conn), onDisconnected func(ws *websocket.Conn)) { defer func() { if err := ws.Close(); err != nil { - log.Println(err) + slog.Error("unable to close websocket", "error", err) } }() for { mt, r, err := ws.NextReader() if err != nil { - log.Println(err) + slog.Error("unable to get next reader", "error", err) return } cp, err := packets.ReadPacket(r) if err != nil { - log.Println(err) + slog.Error("unable to read packet", "error", err) return } @@ -178,15 +178,15 @@ func mqttWsSession(ws *websocket.Conn, onConnected func(ws *websocket.Conn), onD writer, err := ws.NextWriter(mt) if err != nil { - log.Println(err) + slog.Error("unable to get next writer", "error", err) return } if err := ack.Write(writer); err != nil { - log.Println(err) + slog.Error("unable to write packet", "error", err) return } if err := writer.Close(); err != nil { - log.Println(err) + slog.Error("unable to close writer", "error", err) return } } @@ -195,7 +195,7 @@ func mqttWsSession(ws *websocket.Conn, onConnected func(ws *websocket.Conn), onD func graphQLWsSession(ws *websocket.Conn, onConnected func(ws *websocket.Conn), onDisconnected func(ws *websocket.Conn)) { defer func() { if err := ws.Close(); err != nil { - log.Println(err) + slog.Error("unable to close websocket", "error", err) } }() @@ -217,7 +217,7 @@ func graphQLWsSession(ws *websocket.Conn, onConnected func(ws *websocket.Conn), onDisconnected(ws) } if err := ws.WriteJSON(ack); err != nil { - log.Println(err) + slog.Error("unable to write json", "error", err) return } } @@ -260,12 +260,12 @@ func newSubscriptionHandlerFunc() http.HandlerFunc { resp := graphql.Response{Extensions: &ext} b, err := json.Marshal(resp) if err != nil { - log.Println(err) + slog.Warn("unable to marshal response", "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } if _, err = w.Write(b); err != nil { - log.Println(err) + slog.Warn("unable to write response", "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -287,6 +287,7 @@ func newMqttWsHandlerFunc(onConnected func(ws *websocket.Conn), onDisconnected f return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { + slog.Warn("unable to upgrade websocket", "error", err) http.Error(w, err.Error(), http.StatusBadRequest) } go mqttWsSession(ws, onConnected, onDisconnected) @@ -301,6 +302,7 @@ func newGraphQLWsHandlerFunc(onConnected func(ws *websocket.Conn), onDisconnecte return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { + slog.Warn("unable to upgrade websocket", "error", err) http.Error(w, err.Error(), http.StatusBadRequest) } go graphQLWsSession(ws, onConnected, onDisconnected) @@ -324,14 +326,14 @@ func newAppSyncEchoHandlerFunc(initialMessage string) http.HandlerFunc { func(ws *websocket.Conn) { delete(grapqhWsSessions, ws) }, ) return func(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) if err != nil { - log.Println(err) + slog.Warn("unable to read body", "error", err) http.Error(w, err.Error(), http.StatusBadRequest) return } // Reset - r.Body = ioutil.NopCloser(bytes.NewBuffer(body)) + r.Body = io.NopCloser(bytes.NewBuffer(body)) if isMqttWs(r) { mqttws.ServeHTTP(w, r) @@ -345,7 +347,7 @@ func newAppSyncEchoHandlerFunc(initialMessage string) http.HandlerFunc { req := new(graphql.PostRequest) if err := json.Unmarshal(body, req); err != nil { - log.Println(err) + slog.Warn("unable to unmarshal request", "error", err) http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -362,7 +364,7 @@ func newAppSyncEchoHandlerFunc(initialMessage string) http.HandlerFunc { subscription.ServeHTTP(w, r) return } - + slog.Warn("unknown request") http.Error(w, err.Error(), http.StatusBadRequest) } } diff --git a/pure_websocket_subscriber.go b/pure_websocket_subscriber.go index 26a7e8a..053e014 100644 --- a/pure_websocket_subscriber.go +++ b/pure_websocket_subscriber.go @@ -6,7 +6,7 @@ import ( "encoding/json" "errors" "fmt" - "log" + "log/slog" "net" "net/http" "time" @@ -94,6 +94,7 @@ func NewPureWebSocketSubscriber(realtimeEndpoint string, request graphql.PostReq onReceive func(response *graphql.Response), onConnectionLost func(err error), opts ...PureWebSocketSubscriberOption) *PureWebSocketSubscriber { + slog.Debug("creating new pure websocket subscriber", "realtimeEndpoint", realtimeEndpoint, "request", request) ctx, cancel := context.WithCancel(context.Background()) p := PureWebSocketSubscriber{ realtimeEndpoint: realtimeEndpoint, @@ -109,7 +110,9 @@ func NewPureWebSocketSubscriber(realtimeEndpoint string, request graphql.PostReq } func (p *PureWebSocketSubscriber) setupHeaders(payload []byte) (map[string]string, error) { + slog.Debug("setting up headers", "payload", string(payload)) if p.sigv4 == nil { + slog.Debug("no sigV4") headers := map[string]string{} for k := range p.header { headers[k] = p.header.Get(k) @@ -117,9 +120,10 @@ func (p *PureWebSocketSubscriber) setupHeaders(payload []byte) (map[string]strin return headers, nil } + slog.Debug("signing ws headers", "payload", string(payload)) headers, err := p.sigv4.signWS(payload) if err != nil { - log.Println(err) + slog.Error("error signing WS headers", "error", err) return nil, err } @@ -131,36 +135,36 @@ func (p *PureWebSocketSubscriber) Start() error { bpayload := []byte("{}") header, err := p.setupHeaders(bpayload) if err != nil { - log.Println(err) + slog.Error("error setting up headers", "error", err) return err } bheader, err := json.Marshal(header) if err != nil { - log.Println(err) + slog.Error("error marshalling headers during Start", "error", err, "header", header) return err } if err := p.op.connect(p.realtimeEndpoint, bheader, bpayload); err != nil { - log.Println(err) + slog.ErrorContext(p.op.ctx, "error connecting to websocket", "error", err, "realtimeEndpoint", p.realtimeEndpoint, "header", bheader, "payload", bpayload) return err } if err := p.op.connectionInit(); err != nil { - log.Println(err) + slog.ErrorContext(p.op.ctx, "error initializing connection", "error", err) return err } brequest, err := json.Marshal(p.request) if err != nil { - log.Println(err) + slog.ErrorContext(p.op.ctx, "error marshalling request", "error", err, "request", p.request) return err } authz, err := p.setupHeaders(brequest) if err != nil { - log.Println(err) + slog.ErrorContext(p.op.ctx, "error setting up headers", "error", err) return err } if err := p.op.start(brequest, authz); err != nil { - log.Println(err) + slog.ErrorContext(p.op.ctx, "error starting subscription", "error", err) return err } @@ -206,7 +210,7 @@ func (r *realtimeWebSocketOperation) readLoop() { defer close(r.completeCh) if err := r.ws.SetReadDeadline(time.Now().Add(defaultTimeout)); err != nil { - log.Println(err) + slog.Error("error setting read deadline", "error", err) return } for { @@ -221,22 +225,27 @@ func (r *realtimeWebSocketOperation) readLoop() { _, payload, err := r.ws.ReadMessage() if err != nil { - log.Println(err) - if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + slog.Warn("connection timeout") r.onConnectionLost(err) + return } + + slog.ErrorContext(r.ctx, "error reading message", "error", err, "payload", string(payload)) return } msg := new(message) if err := json.Unmarshal(payload, msg); err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error unmarshalling message", "error", err, "payload", string(payload)) return } handler, ok := handlers[msg.Type] + slog.Debug("msg", "payload", string(payload), "ok", ok) if !ok { - log.Println("invalid message received: " + msg.Type) + slog.Warn("invalid message received", "msgType", msg.Type) continue } if handler(payload) { @@ -257,13 +266,13 @@ func (r *realtimeWebSocketOperation) connect(realtimeEndpoint string, header, pa if err := backoff.Retry(func() error { ws, _, err := websocket.DefaultDialer.DialContext(r.ctx, endpoint, http.Header{"sec-websocket-protocol": []string{"graphql-ws"}}) if err != nil { - log.Print(err) + slog.Error("error connecting to websocket", "error", err) return err } r.ws = ws return nil }, backoff.WithContext(backoff.NewExponentialBackOff(), r.ctx)); err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error connecting to websocket", "error", err) return err } @@ -278,7 +287,7 @@ func (r *realtimeWebSocketOperation) connect(realtimeEndpoint string, header, pa func (r *realtimeWebSocketOperation) onConnected(payload []byte) bool { connack := new(connectionAckMessage) if err := json.Unmarshal(payload, connack); err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error unmarshalling connection_ack", "error", err) return true } r.connackCh <- *connack @@ -292,11 +301,11 @@ func (r *realtimeWebSocketOperation) connectionInit() error { init, err := json.Marshal(connectionInitMsg) if err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error marshalling connection_init", "error", err) return err } if err := r.ws.WriteMessage(websocket.TextMessage, init); err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error writing connection_init", "error", err) return err } connack, ok := <-r.connackCh @@ -314,7 +323,7 @@ func (r *realtimeWebSocketOperation) onKeepAlive([]byte) bool { timeout = r.connectionTimeout } if err := r.ws.SetReadDeadline(time.Now().Add(timeout)); err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error setting read deadline", "error", err) return true } return false @@ -338,17 +347,19 @@ func (r *realtimeWebSocketOperation) start(request []byte, authorization map[str b, err := json.Marshal(start) if err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error marshalling start", "error", err) return err } if err := r.ws.WriteMessage(websocket.TextMessage, b); err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error writing start", "error", err) } startack, ok := <-r.startackCh if !ok { return errors.New("subscription registration failed") } + r.subscriptionID = startack.ID + slog.Debug("subscriptionID", "id", r.subscriptionID) return nil } @@ -356,7 +367,7 @@ func (r *realtimeWebSocketOperation) start(request []byte, authorization map[str func (r *realtimeWebSocketOperation) onStarted(payload []byte) bool { startack := new(startAckMessage) if err := json.Unmarshal(payload, startack); err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error unmarshalling start_ack", "error", err) return true } r.startackCh <- *startack @@ -366,7 +377,7 @@ func (r *realtimeWebSocketOperation) onStarted(payload []byte) bool { func (r *realtimeWebSocketOperation) onData(payload []byte) bool { data := new(processingDataMessage) if err := json.Unmarshal(payload, data); err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error unmarshalling onData payload", "error", err) return true } r.onReceive(&graphql.Response{ @@ -383,15 +394,15 @@ func (r *realtimeWebSocketOperation) stop() { stop := stopMessage{message{"stop"}, r.subscriptionID} b, err := json.Marshal(stop) if err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error marshalling stop", "error", err) return } if err := r.ws.WriteMessage(websocket.TextMessage, b); err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error writing stop", "error", err) return } if _, ok := <-r.completeCh; !ok { - log.Println("unsubscribe failed") + slog.Warn("subscription stop failed") } r.subscriptionID = "" } @@ -399,7 +410,7 @@ func (r *realtimeWebSocketOperation) stop() { func (r *realtimeWebSocketOperation) onStopped(payload []byte) bool { complete := new(completeMessage) if err := json.Unmarshal(payload, complete); err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error unmarshalling onStopped complete msg", "error", err, "payload", string(payload)) return true } r.completeCh <- *complete @@ -412,7 +423,7 @@ func (r *realtimeWebSocketOperation) disconnect() { } if err := r.ws.Close(); err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error closing websocket", "error", err) } r.connectionTimeout = 0 r.ws = nil @@ -421,7 +432,7 @@ func (r *realtimeWebSocketOperation) disconnect() { func (r *realtimeWebSocketOperation) onError(payload []byte) bool { em := new(errorMessage) if err := json.Unmarshal(payload, em); err != nil { - log.Println(err) + slog.ErrorContext(r.ctx, "error unmarshalling onError payload", "error", err, "payload", string(payload)) return true } errors := make([]interface{}, len(em.Payload.Errors)) diff --git a/pure_websocket_subscriber_test.go b/pure_websocket_subscriber_test.go index 4fd03e0..995b59b 100644 --- a/pure_websocket_subscriber_test.go +++ b/pure_websocket_subscriber_test.go @@ -2,7 +2,7 @@ package appsync import ( "encoding/json" - "log" + "log/slog" "net/http" "net/http/httptest" "strings" @@ -153,7 +153,7 @@ func TestPureWebSocketSubscriber_AbortStopOnReadMessage(t *testing.T) { func pureWebSocketSession(ws *websocket.Conn, conn_ack_delay, start_ack_delay, complete_delay time.Duration) { defer func() { if err := ws.Close(); err != nil { - log.Println(err) + slog.Error("unable to close websocket", "error", err) } }() @@ -202,25 +202,25 @@ func pureWebSocketSession(ws *websocket.Conn, conn_ack_delay, start_ack_delay, c for { _, payload, err := ws.ReadMessage() if err != nil { - log.Println(err) + slog.Error("unable to read message from websocket", "error", err) return } msg := new(message) if err := json.Unmarshal(payload, msg); err != nil { - log.Println(err) + slog.Error("unable to unmarshal message", "error", err, "message", string(payload)) return } handler, ok := handlers[msg.Type] if !ok { - log.Println("invalid message received: " + msg.Type) + slog.Error("invalid message received", "msgType", msg.Type) continue } out, finish := handler(payload) if err := ws.WriteMessage(websocket.TextMessage, out); err != nil { - log.Println(err) + slog.Error("unable to write message to websocket", "error", err, "message", string(out)) return } if finish { diff --git a/sigv4.go b/sigv4.go index 401df92..8316e76 100644 --- a/sigv4.go +++ b/sigv4.go @@ -6,7 +6,7 @@ import ( "crypto/sha256" "encoding/hex" "errors" - "log" + "log/slog" "net/http" "strconv" "time" @@ -30,22 +30,25 @@ type _signer struct { } func (s *_signer) signHTTP(payload []byte) (http.Header, error) { + slog.Debug("signing http request", "payload", string(payload)) req, err := http.NewRequest("POST", s.url, bytes.NewBuffer(payload)) if err != nil { - log.Println(err) + slog.Error("error creating signing request", "error", err) return nil, err } switch signer := s.sdkSigner.(type) { case *sdkv1_v4.Signer: + slog.Debug("signing request using sdk v1") _, err = signer.Sign(req, bytes.NewReader(payload), "appsync", s.region, time.Now()) if err != nil { - log.Println(err) + slog.Error("error signing request using sdk v1", "error", err) return nil, err } case *sdkv2_v4.Signer: + slog.Debug("signing request using sdk v2") hash := sha256.Sum256(payload) if err := signer.SignHTTP(context.TODO(), *s.creds, req, hex.EncodeToString(hash[:]), "appsync", s.region, time.Now()); err != nil { - log.Println(err) + slog.Error("error signing request using sdk v2", "error", err) return nil, err } default: @@ -55,13 +58,15 @@ func (s *_signer) signHTTP(payload []byte) (http.Header, error) { } func (s *_signer) signWS(payload []byte) (map[string]string, error) { + slog.Debug("signing ws", "payload", string(payload)) url := s.url if bytes.Equal(payload, []byte("{}")) { url = url + "/connect" } + slog.Debug("signing ws url", "url", url) req, err := http.NewRequest("POST", url, bytes.NewBuffer(payload)) if err != nil { - log.Println(err) + slog.Error("error creating request", "error", err) return nil, err } @@ -71,9 +76,10 @@ func (s *_signer) signWS(payload []byte) (map[string]string, error) { switch signer := s.sdkSigner.(type) { case *sdkv1_v4.Signer: + slog.Debug("signing ws using sdk v1") _, err = signer.Sign(req, bytes.NewReader(payload), "appsync", s.region, time.Now()) if err != nil { - log.Println(err) + slog.Error("error signing request", "error", err) return nil, err } return map[string]string{ @@ -86,9 +92,10 @@ func (s *_signer) signWS(payload []byte) (map[string]string, error) { "X-Amz-Security-Token": req.Header.Get("X-Amz-Security-Token"), }, nil case *sdkv2_v4.Signer: + slog.Debug("signing ws using sdk v2") hash := sha256.Sum256(payload) if err := signer.SignHTTP(context.TODO(), *s.creds, req, hex.EncodeToString(hash[:]), "appsync", s.region, time.Now()); err != nil { - log.Println(err) + slog.Error("error signing request", "error", err) return nil, err } @@ -101,12 +108,11 @@ func (s *_signer) signWS(payload []byte) (map[string]string, error) { "x-amz-date": req.Header.Get("x-amz-date"), "Authorization": req.Header.Get("Authorization"), } - token := req.Header.Get("X-Amz-Security-Token") if token != "" { headers["X-Amz-Security-Token"] = token } - + slog.Debug("signed ws headers", "headers", headers) return headers, nil } return map[string]string{}, errors.New("unsupported signer") diff --git a/subscriber.go b/subscriber.go index 2b1ce9c..3d21d41 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,7 +2,7 @@ package appsync import ( "encoding/json" - "log" + "log/slog" "sync" "time" @@ -30,23 +30,25 @@ const ( // NewSubscriber returns a new Subscriber instance. func NewSubscriber(extensions Extensions, callback func(response *graphql.Response), onConnectionLost func(err error)) *Subscriber { + slog.Debug("creating new subscriber", "extensions", extensions) if len(extensions.Subscription.MqttConnections) == 0 { - log.Printf("There is no mqtt connections.\n%+v\n", extensions) + slog.Warn("There is no mqtt connections.", "extensions", extensions) return nil } topic := func() string { if len(extensions.Subscription.NewSubscriptions) > 1 { - log.Printf("Multiple subscriptions are currently not supported.\n%+v\n", extensions) + slog.Warn("Multiple subscriptions are currently not supported.", "extensions", extensions) return "" } for _, v := range extensions.Subscription.NewSubscriptions { + slog.Debug("topics", "topic", v.Topic) return v.Topic } return "" }() if len(topic) == 0 { - log.Printf("There are no new subscriptions.\n%+v\n", extensions) + slog.Warn("There are no new subscriptions.", "extensions", extensions) return nil } @@ -61,7 +63,7 @@ func NewSubscriber(extensions Extensions, callback func(response *graphql.Respon return -1 }() if index < 0 { - log.Printf("There are no new subscriptions.\n%+v\n", extensions) + slog.Warn("There are no new subscriptions.", "extensions", extensions) return nil } @@ -95,6 +97,7 @@ func (r *rwBool) store(b bool) { // Start starts a new subscription. func (s *Subscriber) Start() error { + slog.Debug("starting new subscriber", "clientID", s.clientID, "url", s.url, "topic", s.topic) opts := MQTT.NewClientOptions(). AddBroker(s.url). SetClientID(s.clientID). @@ -116,7 +119,7 @@ func (s *Subscriber) Start() error { } r := new(graphql.Response) if err := json.Unmarshal(msg.Payload(), r); err != nil { - log.Println(err) + slog.Error("unable to unmarshal mqtt message", "error", err, "message", string(msg.Payload())) return } s.callback(r) @@ -124,6 +127,7 @@ func (s *Subscriber) Start() error { subscribe := func() error { if token := c.Subscribe(s.topic, 0, mqttCallback); token.Wait() && token.Error() != nil { + slog.Warn("unable to subscribe to topic", "topic", s.topic, "error", token.Error()) return token.Error() } s.started.store(true) @@ -149,6 +153,7 @@ func (s *Subscriber) Start() error { b := backoff.NewExponentialBackOff() b.MaxElapsedTime = 5 * time.Minute if err := backoff.Retry(connect, b); err != nil { + slog.Warn("unable to connect to mqtt on retry", "error", err) return err } @@ -157,6 +162,7 @@ func (s *Subscriber) Start() error { // Stop ends the subscription. func (s *Subscriber) Stop() { + slog.Debug("stopping subscriber", "topic", s.topic) if s.mqtt == nil { return } @@ -166,6 +172,6 @@ func (s *Subscriber) Stop() { s.mqtt = nil }() if token := s.mqtt.Unsubscribe(s.topic); token.Wait() && token.Error() != nil { - log.Println(token.Error()) + slog.Warn("error in token", "topic", s.topic, "error", token.Error()) } } diff --git a/test/EventApp/appsync_eventapp.go b/test/EventApp/appsync_eventapp.go index e38c708..b70fa0a 100644 --- a/test/EventApp/appsync_eventapp.go +++ b/test/EventApp/appsync_eventapp.go @@ -4,7 +4,8 @@ import ( "context" "flag" "fmt" - "log" + "log/slog" + "os" "strings" "time" @@ -31,7 +32,14 @@ type subscriber interface { } func main() { - log.SetFlags(log.Llongfile) + + handle := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + Level: slog.LevelInfo, + AddSource: true, + }) + + slog.SetDefault(slog.New(handle)) + var ( region = flag.String("region", "", "AppSync API region") url = flag.String("url", "", "AppSync API URL") @@ -52,11 +60,13 @@ func main() { ctx := context.TODO() cfg, err := config.LoadDefaultConfig(ctx) if err != nil { - log.Fatalln(err) + slog.Error("unable to load default config", "error", err) + os.Exit(1) } creds, err := cfg.Credentials.Retrieve(ctx) if err != nil { - log.Fatalln(err) + slog.Error("unable to retrieve credentials", "error", err) + os.Exit(1) } signer := sdkv2_v4.NewSigner() opt = appsync.WithIAMAuthorizationV2(signer, creds, *region, *url) @@ -64,10 +74,10 @@ func main() { } client := appsync.NewClient(appsync.NewGraphQLClient(graphql.NewClient(*url)), opt) - log.Println("mutation createEvent()") + slog.Info("mutation createEvent()") event := createEvent(client) - log.Println("subscription subscribeToEventComments()") + slog.Info("subscription subscribeToEventComments()") ch := make(chan *graphql.Response) defer close(ch) var s subscriber @@ -77,23 +87,26 @@ func main() { case "graphql-ws": s = wsSubscribeToEventComments(*url, sOpt, event, ch) default: - log.Fatalln("unsupported protocol: " + *protocol) + slog.Error("unsupported protocol", "protocol", *protocol) + os.Exit(1) } if err := s.Start(); err != nil { - log.Fatalln(err) + slog.Error("unable to start subscriber", "error", err) + os.Exit(1) } defer s.Stop() - log.Println("mutation commentOnEvent()") + slog.Info("mutation commentOnEvent()") commentOnEvent(client, event) msg, ok := <-ch if !ok { - log.Fatal("ch has been closed.") + slog.Error("ch has been closed.") + os.Exit(1) } - log.Println("comment received") + slog.Info("comment received") _, _ = pp.Println(msg) - log.Println("mutation deleteEvent()") + slog.Info("mutation deleteEvent()") deleteEvent(client, event) } @@ -112,13 +125,15 @@ mutation { Query: mutation, }) if err != nil { - log.Fatalln(err) + slog.Error("unable to create postRequest", "error", err) + os.Exit(1) } _, _ = pp.Println(res) ev := new(event) if err := res.DataAs(ev); err != nil { - log.Fatalln(err) + slog.Error("unable to process event", "error", err) + os.Exit(1) } return ev } @@ -133,23 +148,28 @@ subscription { createdAt } }`, e.ID) - subreq := graphql.PostRequest{ + subReq := graphql.PostRequest{ Query: subscription, } - res, err := c.Post(subreq) + res, err := c.Post(subReq) if err != nil { - log.Fatalln(err) + slog.Error("unable to subscribe", "error", err) + os.Exit(1) } _, _ = pp.Println(res) ext, err := appsync.NewExtensions(res) if err != nil { - log.Fatalln(err) + slog.Error("unable to process extensions", "error", err) + os.Exit(1) } return appsync.NewSubscriber(*ext, func(r *graphql.Response) { ch <- r }, - func(err error) { log.Println(err) }) + func(err error) { + slog.Error("unable to create new subscriber", "error", err) + os.Exit(1) + }) } @@ -169,7 +189,10 @@ subscription { realtime := strings.Replace(strings.Replace(url, "https", "wss", 1), "appsync-api", "appsync-realtime-api", 1) return appsync.NewPureWebSocketSubscriber(realtime, subreq, func(r *graphql.Response) { ch <- r }, - func(err error) { log.Println(err) }, + func(err error) { + slog.Error("unable to create new prue websocket subscriber", "error", err) + os.Exit(1) + }, opt, ) } @@ -188,7 +211,8 @@ mutation { Query: mutation, }) if err != nil { - log.Fatalln(err) + slog.Error("unable to comment on event", "error", err) + os.Exit(1) } _, _ = pp.Println(res) } @@ -208,7 +232,8 @@ mutation { Query: mutation, }) if err != nil { - log.Fatalln(err) + slog.Error("unable to delete event", "error", err) + os.Exit(1) } _, _ = pp.Println(res) }