Browse Source

Fix racey heartbeat sequence (#303)

* Fix racey heartbeat sequence

* Change all sequence fields to int64
jonas747 8 years ago
parent
commit
2d50fc197f
4 changed files with 15 additions and 12 deletions
  1. 1 0
      discord.go
  2. 1 1
      events.go
  3. 1 1
      structs.go
  4. 12 10
      wsapi.go

+ 1 - 0
discord.go

@@ -48,6 +48,7 @@ func New(args ...interface{}) (s *Session, err error) {
 		ShardCount:             1,
 		MaxRestRetries:         3,
 		Client:                 &http.Client{Timeout: (20 * time.Second)},
+		sequence:               new(int64),
 	}
 
 	// If no arguments are passed return the empty Session interface.

+ 1 - 1
events.go

@@ -27,7 +27,7 @@ type RateLimit struct {
 // Event provides a basic initial struct for all websocket events.
 type Event struct {
 	Operation int             `json:"op"`
-	Sequence  int             `json:"s"`
+	Sequence  int64           `json:"s"`
 	Type      string          `json:"t"`
 	RawData   json.RawMessage `json:"d"`
 	// Struct contains one of the other types in this file.

+ 1 - 1
structs.go

@@ -92,7 +92,7 @@ type Session struct {
 	ratelimiter *RateLimiter
 
 	// sequence tracks the current gateway api websocket sequence number
-	sequence int
+	sequence *int64
 
 	// stores sessions current Discord Gateway
 	gateway string

+ 12 - 10
wsapi.go

@@ -19,6 +19,7 @@ import (
 	"io"
 	"net/http"
 	"runtime"
+	"sync/atomic"
 	"time"
 
 	"github.com/gorilla/websocket"
@@ -29,7 +30,7 @@ type resumePacket struct {
 	Data struct {
 		Token     string `json:"token"`
 		SessionID string `json:"session_id"`
-		Sequence  int    `json:"seq"`
+		Sequence  int64  `json:"seq"`
 	} `json:"d"`
 }
 
@@ -89,13 +90,14 @@ func (s *Session) Open() (err error) {
 		return
 	}
 
-	if s.sessionID != "" && s.sequence > 0 {
+	sequence := atomic.LoadInt64(s.sequence)
+	if s.sessionID != "" && sequence > 0 {
 
 		p := resumePacket{}
 		p.Op = 6
 		p.Data.Token = s.Token
 		p.Data.SessionID = s.sessionID
-		p.Data.Sequence = s.sequence
+		p.Data.Sequence = sequence
 
 		s.log(LogInformational, "sending resume packet to gateway")
 		err = s.wsConn.WriteJSON(p)
@@ -176,8 +178,8 @@ func (s *Session) listen(wsConn *websocket.Conn, listening <-chan interface{}) {
 }
 
 type heartbeatOp struct {
-	Op   int `json:"op"`
-	Data int `json:"d"`
+	Op   int   `json:"op"`
+	Data int64 `json:"d"`
 }
 
 type helloOp struct {
@@ -200,10 +202,10 @@ func (s *Session) heartbeat(wsConn *websocket.Conn, listening <-chan interface{}
 	ticker := time.NewTicker(i * time.Millisecond)
 
 	for {
-
-		s.log(LogInformational, "sending gateway websocket heartbeat seq %d", s.sequence)
+		sequence := atomic.LoadInt64(s.sequence)
+		s.log(LogInformational, "sending gateway websocket heartbeat seq %d", sequence)
 		s.wsMutex.Lock()
-		err = wsConn.WriteJSON(heartbeatOp{1, s.sequence})
+		err = wsConn.WriteJSON(heartbeatOp{1, sequence})
 		s.wsMutex.Unlock()
 		if err != nil {
 			s.log(LogError, "error sending heartbeat to gateway %s, %s", s.gateway, err)
@@ -370,7 +372,7 @@ func (s *Session) onEvent(messageType int, message []byte) {
 	if e.Operation == 1 {
 		s.log(LogInformational, "sending heartbeat in response to Op1")
 		s.wsMutex.Lock()
-		err = s.wsConn.WriteJSON(heartbeatOp{1, s.sequence})
+		err = s.wsConn.WriteJSON(heartbeatOp{1, atomic.LoadInt64(s.sequence)})
 		s.wsMutex.Unlock()
 		if err != nil {
 			s.log(LogError, "error sending heartbeat in response to Op1")
@@ -420,7 +422,7 @@ func (s *Session) onEvent(messageType int, message []byte) {
 	}
 
 	// Store the message sequence
-	s.sequence = e.Sequence
+	atomic.StoreInt64(s.sequence, e.Sequence)
 
 	// Map event to registered event handlers and pass it along to any registered handlers.
 	if eh, ok := registeredInterfaceProviders[e.Type]; ok {