From ce1b3f9dd88335235709c9e39ab24c8a6361c875 Mon Sep 17 00:00:00 2001 From: Kazuhito NAKAMURA Date: Mon, 22 Mar 2021 12:08:35 +0900 Subject: [PATCH] try to reduce cyclomatic complexity --- pure_websocket_subscriber.go | 149 +++++++++++++++++++---------------- 1 file changed, 81 insertions(+), 68 deletions(-) diff --git a/pure_websocket_subscriber.go b/pure_websocket_subscriber.go index 76cee06..3c94ce5 100644 --- a/pure_websocket_subscriber.go +++ b/pure_websocket_subscriber.go @@ -198,6 +198,8 @@ func (p *PureWebSocketSubscriber) Stop() { p.op.disconnect() } +const defaultTimeout = time.Duration(300000) * time.Millisecond + type realtimeWebSocketOperation struct { onReceive func(response *graphql.Response) onConnectionLost func(err error) @@ -220,80 +222,21 @@ func (r *realtimeWebSocketOperation) readLoop() { defer close(r.startackCh) defer close(r.completeCh) - const defaultTimeout = time.Duration(300000) * time.Millisecond if err := r.ws.SetReadDeadline(time.Now().Add(defaultTimeout)); err != nil { log.Println(err) return } for { handlers := map[string]func(b []byte) (finish bool){ - "connection_ack": func(b []byte) bool { - connack := new(connectionAckMessage) - if err := json.Unmarshal(b, connack); err != nil { - log.Println(err) - return true - } - r.connackCh <- *connack - return false - }, - "ka": func(b []byte) bool { - timeout := defaultTimeout - if r.connectionTimeout != 0 { - timeout = r.connectionTimeout - } - if err := r.ws.SetReadDeadline(time.Now().Add(timeout)); err != nil { - log.Println(err) - return true - } - return false - }, - "start_ack": func(b []byte) bool { - startack := new(startAckMessage) - if err := json.Unmarshal(b, startack); err != nil { - log.Println(err) - return true - } - r.startackCh <- *startack - return false - }, - "data": func(b []byte) bool { - data := new(processingDataMessage) - if err := json.Unmarshal(b, data); err != nil { - log.Println(err) - return true - } - r.onReceive(&graphql.Response{ - Data: data.Payload.Data, - }) - return false - }, - "complete": func(b []byte) bool { - complete := new(completeMessage) - if err := json.Unmarshal(b, complete); err != nil { - log.Println(err) - return true - } - r.completeCh <- *complete - return true - }, - "error": func(b []byte) bool { - em := new(errorMessage) - if err := json.Unmarshal(b, em); err != nil { - log.Println(err) - return true - } - errors := make([]interface{}, len(em.Payload.Errors)) - for i, e := range em.Payload.Errors { - errors[i] = e - } - r.onReceive(&graphql.Response{ - Errors: &errors, - }) - return true - }, + "connection_ack": r.onConnected, + "ka": r.onKeepAlive, + "start_ack": r.onStarted, + "data": r.onData, + "complete": r.onStopped, + "error": r.onError, } - _, b, err := r.ws.ReadMessage() + _, payload, err := r.ws.ReadMessage() if err != nil { log.Println(err) if strings.Contains(err.Error(), "i/o timeout") { @@ -303,7 +246,7 @@ func (r *realtimeWebSocketOperation) readLoop() { } msg := new(message) - if err := json.Unmarshal(b, msg); err != nil { + if err := json.Unmarshal(payload, msg); err != nil { log.Println(err) return } @@ -313,7 +256,7 @@ func (r *realtimeWebSocketOperation) readLoop() { log.Println("invalid message received") continue } - if handler(b) { + if handler(payload) { return } } @@ -348,6 +291,16 @@ func (r *realtimeWebSocketOperation) connect(realtimeEndpoint string, header, pa return nil } +func (r *realtimeWebSocketOperation) onConnected(payload []byte) bool { + connack := new(connectionAckMessage) + if err := json.Unmarshal(payload, connack); err != nil { + log.Println(err) + return true + } + r.connackCh <- *connack + return false +} + func (r *realtimeWebSocketOperation) connectionInit() error { if r.connectionTimeout != 0 { return errors.New("already connection initialized") @@ -371,6 +324,18 @@ func (r *realtimeWebSocketOperation) connectionInit() error { return nil } +func (r *realtimeWebSocketOperation) onKeepAlive([]byte) bool { + timeout := defaultTimeout + if r.connectionTimeout != 0 { + timeout = r.connectionTimeout + } + if err := r.ws.SetReadDeadline(time.Now().Add(timeout)); err != nil { + log.Println(err) + return true + } + return false +} + func (r *realtimeWebSocketOperation) start(request []byte, authorization map[string]string) error { if len(r.subscriptionID) != 0 { return errors.New("already started") @@ -404,6 +369,28 @@ func (r *realtimeWebSocketOperation) start(request []byte, authorization map[str return nil } +func (r *realtimeWebSocketOperation) onStarted(payload []byte) bool { + startack := new(startAckMessage) + if err := json.Unmarshal(payload, startack); err != nil { + log.Println(err) + return true + } + r.startackCh <- *startack + return false +} + +func (r *realtimeWebSocketOperation) onData(payload []byte) bool { + data := new(processingDataMessage) + if err := json.Unmarshal(payload, data); err != nil { + log.Println(err) + return true + } + r.onReceive(&graphql.Response{ + Data: data.Payload.Data, + }) + return false +} + func (r *realtimeWebSocketOperation) stop() { if len(r.subscriptionID) == 0 { return @@ -425,6 +412,16 @@ func (r *realtimeWebSocketOperation) stop() { r.subscriptionID = "" } +func (r *realtimeWebSocketOperation) onStopped(payload []byte) bool { + complete := new(completeMessage) + if err := json.Unmarshal(payload, complete); err != nil { + log.Println(err) + return true + } + r.completeCh <- *complete + return true +} + func (r *realtimeWebSocketOperation) disconnect() { if r.ws == nil { return @@ -436,3 +433,19 @@ func (r *realtimeWebSocketOperation) disconnect() { r.connectionTimeout = 0 r.ws = nil } + +func (r *realtimeWebSocketOperation) onError(payload []byte) bool { + em := new(errorMessage) + if err := json.Unmarshal(payload, em); err != nil { + log.Println(err) + return true + } + errors := make([]interface{}, len(em.Payload.Errors)) + for i, e := range em.Payload.Errors { + errors[i] = e + } + r.onReceive(&graphql.Response{ + Errors: &errors, + }) + return true +}