Browse Source

Add heartbeat ACK response and error handling (#396)

* Add heartbeat ACK response and error handling

- Error when sending a heartbeat now triggers a reconnection
- Op7 now triggers a reconnection
- Session now tracks the last heartbeat ACK that was recieved. If the
last ACK is more than FailedHeartbeatAcks*heartbeatinterval in the past,
this is treated as a dead connection and a reconnection is forced.

* Address @iopred comments
Erik McClure 7 years ago
parent
commit
bb4b96e26d
3 changed files with 33 additions and 8 deletions
  1. 1 0
      discord.go
  2. 3 0
      structs.go
  3. 29 8
      wsapi.go

+ 1 - 0
discord.go

@@ -59,6 +59,7 @@ func New(args ...interface{}) (s *Session, err error) {
 		MaxRestRetries:         3,
 		Client:                 &http.Client{Timeout: (20 * time.Second)},
 		sequence:               new(int64),
+		LastHeartbeatAck:       time.Now().UTC(),
 	}
 
 	// If no arguments are passed return the empty Session interface.

+ 3 - 0
structs.go

@@ -78,6 +78,9 @@ type Session struct {
 	// The http client used for REST requests
 	Client *http.Client
 
+	// Stores the last HeartbeatAck that was recieved (in UTC)
+	LastHeartbeatAck time.Time
+
 	// Event handlers
 	handlersMu   sync.RWMutex
 	handlers     map[string][]*eventHandlerInstance

+ 29 - 8
wsapi.go

@@ -131,6 +131,7 @@ func (s *Session) Open() (err error) {
 	// lock.
 	s.listening = make(chan interface{})
 	go s.listen(s.wsConn, s.listening)
+	s.LastHeartbeatAck = time.Now().UTC()
 
 	s.Unlock()
 
@@ -199,10 +200,13 @@ type helloOp struct {
 	Trace             []string      `json:"_trace"`
 }
 
+// Number of heartbeat intervals to wait until forcing a connection restart.
+const FailedHeartbeatAcks time.Duration = 5 * time.Millisecond
+
 // heartbeat sends regular heartbeats to Discord so it knows the client
 // is still connected.  If you do not send these heartbeats Discord will
 // disconnect the websocket connection after a few seconds.
-func (s *Session) heartbeat(wsConn *websocket.Conn, listening <-chan interface{}, i time.Duration) {
+func (s *Session) heartbeat(wsConn *websocket.Conn, listening <-chan interface{}, heartbeatIntervalMsec time.Duration) {
 
 	s.log(LogInformational, "called")
 
@@ -211,20 +215,26 @@ func (s *Session) heartbeat(wsConn *websocket.Conn, listening <-chan interface{}
 	}
 
 	var err error
-	ticker := time.NewTicker(i * time.Millisecond)
+	ticker := time.NewTicker(heartbeatIntervalMsec * time.Millisecond)
 	defer ticker.Stop()
 
 	for {
+		s.RLock()
+		last := s.LastHeartbeatAck
+		s.RUnlock()
 		sequence := atomic.LoadInt64(s.sequence)
 		s.log(LogInformational, "sending gateway websocket heartbeat seq %d", sequence)
 		s.wsMutex.Lock()
 		err = wsConn.WriteJSON(heartbeatOp{1, sequence})
 		s.wsMutex.Unlock()
-		if err != nil {
-			s.log(LogError, "error sending heartbeat to gateway %s, %s", s.gateway, err)
-			s.Lock()
-			s.DataReady = false
-			s.Unlock()
+		if err != nil || time.Now().UTC().Sub(last) > (heartbeatIntervalMsec*FailedHeartbeatAcks) {
+			if err != nil {
+				s.log(LogError, "error sending heartbeat to gateway %s, %s", s.gateway, err)
+			} else {
+				s.log(LogError, "haven't gotten a heartbeat ACK in %v, triggering a reconnection", time.Now().UTC().Sub(last))
+			}
+			s.Close()
+			s.reconnect()
 			return
 		}
 		s.Lock()
@@ -398,7 +408,10 @@ func (s *Session) onEvent(messageType int, message []byte) {
 	// Reconnect
 	// Must immediately disconnect from gateway and reconnect to new gateway.
 	if e.Operation == 7 {
-		// TODO
+		s.log(LogInformational, "Closing and reconnecting in response to Op7")
+		s.Close()
+		s.reconnect()
+		return
 	}
 
 	// Invalid Session
@@ -426,6 +439,14 @@ func (s *Session) onEvent(messageType int, message []byte) {
 		return
 	}
 
+	if e.Operation == 11 {
+		s.Lock()
+		s.LastHeartbeatAck = time.Now().UTC()
+		s.Unlock()
+		s.log(LogInformational, "got heartbeat ACK")
+		return
+	}
+
 	// Do not try to Dispatch a non-Dispatch Message
 	if e.Operation != 0 {
 		// But we probably should be doing something with them.