At work, we have a service that provides a Kafka -> Websocket gateway.

Clients connect to this, provide a topic, and partition offsets, and receive a stream of Websocket messages with the events for that topic.

I wrote a client for this a couple of years ago, and during testing, and during the development phase, we encountered many of the common issues encountered when developing distributed systems, disconnections, hanging connections, thundering herd etc.

These are well known issues, and so I wanted my client library to be able to handle these well, but provide for different strategies when needed.

Inspired by the Java Cassandra reconnection policy I implemented this in Go.

This started off with some simple interfaces…

// ReconnectionScheduler should return the time before the next reconnection
// should be made.
type ReconnectionScheduler interface {
	// ScheduleReconnection returns an error to indicate that the client should not reconnect.
	ScheduleReconnection() (time.Duration, error)
}

// ReconnectionPolicy returns a ReconnectionScheduler to be used when attempting
// to reconnect.
type ReconnectionPolicy interface {
	NewScheduler() ReconnectionScheduler
}

This lends itself to some simple implementations…first the tests…

func s(d int) time.Duration {
	return time.Duration(d) * time.Second
}

var tests = []struct {
	p      ReconnectionPolicy
	values []time.Duration
}{
	{
		NewConstantReconnectionPolicy(1 * time.Second),
		[]time.Duration{s(1), s(1), s(1), s(1), s(1)},
	},
	{
		NewExponentialReconnectionPolicy(s(1), s(32)),
		[]time.Duration{s(1), s(2), s(4), s(8), s(16), s(32), s(32)},
	},
}

This code defines tests for two policies, one that returns a constant response, and the other that returns an exponential backoff.

func TestPolicies(t *testing.T) {
	for _, test := range tests {
		scheduler := test.p.NewScheduler()
		for i, duration := range test.values {
			backoff, err := scheduler.ScheduleReconnection()
			if err != nil {
				t.Errorf("unexpected error: %s", err)
			}
			if backoff != duration {
				t.Errorf("testing %+v failed, step %d: got %s, wanted %v", test.p, i, backoff, duration)
			}
		}
	}
}
// ConstantReconnectionPolicy reconnects every duration forever.
type ConstantReconnectionPolicy struct {
	duration time.Duration
}

type constantReconnectionScheduler struct {
	duration time.Duration
}

func (s constantReconnectionScheduler) ScheduleReconnection() (time.Duration, error) {
	return s.duration, nil
}

// NewScheduler implements the ReconnectionPolicy interface and returns a new
// constant reconnection scheduler.
func (p ConstantReconnectionPolicy) NewScheduler() ReconnectionScheduler {
	return constantReconnectionScheduler{p.duration}
}

// NewConstantReconnectionPolicy creates a new ConstantReconnectionPolicy with
// the specified duration.
func NewConstantReconnectionPolicy(t time.Duration) *ConstantReconnectionPolicy {
	return &ConstantReconnectionPolicy{t}
}

This makes it fairly easy to use the reconnection strategy in the connection logic.

func (c *Client) connect() error {
	c.state = connecting{}
	reconnectTimeout, exit := c.Reconnection.ScheduleReconnection()
	if exit != nil {
		return exit
	}
	time.Sleep(reconnectTimeout)
	conn, _, err := c.dialer.Dial(c.config.Endpoint, nil)
	if err != nil {
		return err
	}
	conn.SetReadDeadline(time.Now().Add(c.KeepAliveTimeout))
	pingHandler := conn.PingHandler()
	conn.SetPingHandler(func(s string) error {
		conn.SetReadDeadline(time.Now().Add(c.KeepAliveTimeout))
		pingHandler(s)
		return nil
	})
	c.socket = conn
	return nil
}

Note that this means that connection always waits the amount of time returned by the reconnection scheduler before attempting to connect.

This could be improved by either adding an initial boolean, and skip if this is our initial connection (determined elsewhere), or possibly by detecting previous connections.

Or, the option I prefer, in the policy, return a 0 value for the first duration so that it is in control, this would make it easy to introduce arbitrary delays before connecting.