Skip to content

Commit

Permalink
try to reduce cyclomatic complexity
Browse files Browse the repository at this point in the history
  • Loading branch information
KazuhitoNakamura committed Mar 22, 2021
1 parent e242418 commit ce1b3f9
Showing 1 changed file with 81 additions and 68 deletions.
149 changes: 81 additions & 68 deletions pure_websocket_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ func (p *PureWebSocketSubscriber) Stop() {
p.op.disconnect()
}

const defaultTimeout = time.Duration(300000) * time.Millisecond

type realtimeWebSocketOperation struct {
onReceive func(response *graphql.Response)
onConnectionLost func(err error)
Expand All @@ -220,80 +222,21 @@ func (r *realtimeWebSocketOperation) readLoop() {
defer close(r.startackCh)
defer close(r.completeCh)

const defaultTimeout = time.Duration(300000) * time.Millisecond
if err := r.ws.SetReadDeadline(time.Now().Add(defaultTimeout)); err != nil {
log.Println(err)
return
}
for {
handlers := map[string]func(b []byte) (finish bool){
"connection_ack": func(b []byte) bool {
connack := new(connectionAckMessage)
if err := json.Unmarshal(b, connack); err != nil {
log.Println(err)
return true
}
r.connackCh <- *connack
return false
},
"ka": func(b []byte) bool {
timeout := defaultTimeout
if r.connectionTimeout != 0 {
timeout = r.connectionTimeout
}
if err := r.ws.SetReadDeadline(time.Now().Add(timeout)); err != nil {
log.Println(err)
return true
}
return false
},
"start_ack": func(b []byte) bool {
startack := new(startAckMessage)
if err := json.Unmarshal(b, startack); err != nil {
log.Println(err)
return true
}
r.startackCh <- *startack
return false
},
"data": func(b []byte) bool {
data := new(processingDataMessage)
if err := json.Unmarshal(b, data); err != nil {
log.Println(err)
return true
}
r.onReceive(&graphql.Response{
Data: data.Payload.Data,
})
return false
},
"complete": func(b []byte) bool {
complete := new(completeMessage)
if err := json.Unmarshal(b, complete); err != nil {
log.Println(err)
return true
}
r.completeCh <- *complete
return true
},
"error": func(b []byte) bool {
em := new(errorMessage)
if err := json.Unmarshal(b, em); err != nil {
log.Println(err)
return true
}
errors := make([]interface{}, len(em.Payload.Errors))
for i, e := range em.Payload.Errors {
errors[i] = e
}
r.onReceive(&graphql.Response{
Errors: &errors,
})
return true
},
"connection_ack": r.onConnected,
"ka": r.onKeepAlive,
"start_ack": r.onStarted,
"data": r.onData,
"complete": r.onStopped,
"error": r.onError,
}

_, b, err := r.ws.ReadMessage()
_, payload, err := r.ws.ReadMessage()
if err != nil {
log.Println(err)
if strings.Contains(err.Error(), "i/o timeout") {
Expand All @@ -303,7 +246,7 @@ func (r *realtimeWebSocketOperation) readLoop() {
}

msg := new(message)
if err := json.Unmarshal(b, msg); err != nil {
if err := json.Unmarshal(payload, msg); err != nil {
log.Println(err)
return
}
Expand All @@ -313,7 +256,7 @@ func (r *realtimeWebSocketOperation) readLoop() {
log.Println("invalid message received")
continue
}
if handler(b) {
if handler(payload) {
return
}
}
Expand Down Expand Up @@ -348,6 +291,16 @@ func (r *realtimeWebSocketOperation) connect(realtimeEndpoint string, header, pa
return nil
}

func (r *realtimeWebSocketOperation) onConnected(payload []byte) bool {
connack := new(connectionAckMessage)
if err := json.Unmarshal(payload, connack); err != nil {
log.Println(err)
return true
}
r.connackCh <- *connack
return false
}

func (r *realtimeWebSocketOperation) connectionInit() error {
if r.connectionTimeout != 0 {
return errors.New("already connection initialized")
Expand All @@ -371,6 +324,18 @@ func (r *realtimeWebSocketOperation) connectionInit() error {
return nil
}

func (r *realtimeWebSocketOperation) onKeepAlive([]byte) bool {
timeout := defaultTimeout
if r.connectionTimeout != 0 {
timeout = r.connectionTimeout
}
if err := r.ws.SetReadDeadline(time.Now().Add(timeout)); err != nil {
log.Println(err)
return true
}
return false
}

func (r *realtimeWebSocketOperation) start(request []byte, authorization map[string]string) error {
if len(r.subscriptionID) != 0 {
return errors.New("already started")
Expand Down Expand Up @@ -404,6 +369,28 @@ func (r *realtimeWebSocketOperation) start(request []byte, authorization map[str
return nil
}

func (r *realtimeWebSocketOperation) onStarted(payload []byte) bool {
startack := new(startAckMessage)
if err := json.Unmarshal(payload, startack); err != nil {
log.Println(err)
return true
}
r.startackCh <- *startack
return false
}

func (r *realtimeWebSocketOperation) onData(payload []byte) bool {
data := new(processingDataMessage)
if err := json.Unmarshal(payload, data); err != nil {
log.Println(err)
return true
}
r.onReceive(&graphql.Response{
Data: data.Payload.Data,
})
return false
}

func (r *realtimeWebSocketOperation) stop() {
if len(r.subscriptionID) == 0 {
return
Expand All @@ -425,6 +412,16 @@ func (r *realtimeWebSocketOperation) stop() {
r.subscriptionID = ""
}

func (r *realtimeWebSocketOperation) onStopped(payload []byte) bool {
complete := new(completeMessage)
if err := json.Unmarshal(payload, complete); err != nil {
log.Println(err)
return true
}
r.completeCh <- *complete
return true
}

func (r *realtimeWebSocketOperation) disconnect() {
if r.ws == nil {
return
Expand All @@ -436,3 +433,19 @@ func (r *realtimeWebSocketOperation) disconnect() {
r.connectionTimeout = 0
r.ws = nil
}

func (r *realtimeWebSocketOperation) onError(payload []byte) bool {
em := new(errorMessage)
if err := json.Unmarshal(payload, em); err != nil {
log.Println(err)
return true
}
errors := make([]interface{}, len(em.Payload.Errors))
for i, e := range em.Payload.Errors {
errors[i] = e
}
r.onReceive(&graphql.Response{
Errors: &errors,
})
return true
}

0 comments on commit ce1b3f9

Please sign in to comment.