[go: nahoru, domu]

Skip to content

Commit

Permalink
Use expontential backoff algo for internode reconnections (minio#17052)
Browse files Browse the repository at this point in the history
  • Loading branch information
vadmeste authored May 2, 2023
1 parent 1704aba commit 4640b13
Showing 1 changed file with 42 additions and 13 deletions.
55 changes: 42 additions & 13 deletions internal/rest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ type Client struct {
// is online or offline.
HealthCheckFn func() bool

// HealthCheckInterval will be the duration between re-connection attempts
// when a call has failed with a network error.
HealthCheckInterval time.Duration
// HealthCheckRetryUnit will be used to calculate the exponential
// backoff when trying to reconnect to an offline node
HealthCheckReconnectUnit time.Duration

// HealthCheckTimeout determines timeout for each call.
HealthCheckTimeout time.Duration
Expand Down Expand Up @@ -309,14 +309,14 @@ func NewClient(url *url.URL, tr http.RoundTripper, newAuthToken func(aud string)
// Transport is exactly same as Go default in https://golang.org/pkg/net/http/#RoundTripper
// except custom DialContext and TLSClientConfig.
return &Client{
httpClient: &http.Client{Transport: tr},
url: url,
newAuthToken: newAuthToken,
connected: online,
lastConn: time.Now().UnixNano(),
MaxErrResponseSize: 4096,
HealthCheckInterval: 200 * time.Millisecond,
HealthCheckTimeout: time.Second,
httpClient: &http.Client{Transport: tr},
url: url,
newAuthToken: newAuthToken,
connected: online,
lastConn: time.Now().UnixNano(),
MaxErrResponseSize: 4096,
HealthCheckReconnectUnit: 200 * time.Millisecond,
HealthCheckTimeout: time.Second,
}
}

Expand All @@ -337,6 +337,28 @@ func (c *Client) LastError() error {
return c.lastErr
}

// computes the exponential backoff duration according to
// https://www.awsarchitectureblog.com/2015/03/backoff.html
func exponentialBackoffWait(r *rand.Rand, unit, cap time.Duration) func(uint) time.Duration {
if unit > time.Hour {
// Protect against integer overflow
panic("unit cannot exceed one hour")
}
return func(attempt uint) time.Duration {
if attempt > 16 {
// Protect against integer overflow
attempt = 16
}
// sleep = random_between(unit, min(cap, base * 2 ** attempt))
sleep := unit * time.Duration(1<<attempt)
if sleep > cap {
sleep = cap
}
sleep -= time.Duration(r.Float64() * float64(sleep-unit))
return sleep
}
}

// MarkOffline - will mark a client as being offline and spawns
// a goroutine that will attempt to reconnect if HealthCheckFn is set.
// returns true if the node changed state from online to offline
Expand All @@ -347,8 +369,14 @@ func (c *Client) MarkOffline(err error) bool {
// Start goroutine that will attempt to reconnect.
// If server is already trying to reconnect this will have no effect.
if c.HealthCheckFn != nil && atomic.CompareAndSwapInt32(&c.connected, online, offline) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
go func() {
backOff := exponentialBackoffWait(
rand.New(rand.NewSource(time.Now().UnixNano())),
200*time.Millisecond,
30*time.Second,
)

attempt := uint(0)
for {
if atomic.LoadInt32(&c.connected) == closed {
return
Expand All @@ -362,7 +390,8 @@ func (c *Client) MarkOffline(err error) bool {
}
return
}
time.Sleep(time.Duration(r.Float64() * float64(c.HealthCheckInterval)))
attempt++
time.Sleep(backOff(attempt))
}
}()
return true
Expand Down

0 comments on commit 4640b13

Please sign in to comment.