refactor: extract packages loadbalancer and retry

This removes in-tree packages in favor of:

* github.com/talos-systems/go-retry
* github.com/talos-systems/go-loadbalancer

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2020-09-02 20:14:25 +03:00 committed by talos-bot
parent 60bd78422b
commit f6ecf000c9
38 changed files with 41 additions and 1790 deletions

View File

@ -13,6 +13,8 @@ import (
"strings"
"time"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/pkg/blockdevice"
"github.com/talos-systems/talos/pkg/blockdevice/filesystem/vfat"
@ -21,7 +23,6 @@ import (
"github.com/talos-systems/talos/pkg/blockdevice/table/gpt/partition"
"github.com/talos-systems/talos/pkg/blockdevice/util"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/retry"
)
// Manifest represents the instructions for preparing all block devices

View File

@ -9,7 +9,7 @@ import (
"github.com/spf13/cobra"
"github.com/talos-systems/talos/internal/pkg/loadbalancer"
"github.com/talos-systems/go-loadbalancer/loadbalancer"
)
var loadbalancerLaunchCmdFlags struct {

3
go.mod
View File

@ -56,7 +56,9 @@ require (
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2
github.com/talos-systems/bootkube-plugin v0.0.0-20200729203641-12d463a3e54e
github.com/talos-systems/crypto v0.2.0
github.com/talos-systems/go-loadbalancer v0.1.0
github.com/talos-systems/go-procfs v0.0.0-20200219015357-57c7311fdd45
github.com/talos-systems/go-retry v0.1.0
github.com/talos-systems/go-smbios v0.0.0-20200219201045-94b8c4e489ee
github.com/talos-systems/grpc-proxy v0.2.0
github.com/talos-systems/net v0.1.0
@ -77,7 +79,6 @@ require (
gopkg.in/freddierice/go-losetup.v1 v1.0.0-20170407175016-fc9adea44124
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776
inet.af/tcpproxy v0.0.0-20200125044825-b6bb9b5b8252
k8s.io/api v0.19.0
k8s.io/apimachinery v0.19.0
k8s.io/apiserver v0.19.0

4
go.sum
View File

@ -685,8 +685,12 @@ github.com/talos-systems/bootkube-plugin v0.0.0-20200729203641-12d463a3e54e h1:Z
github.com/talos-systems/bootkube-plugin v0.0.0-20200729203641-12d463a3e54e/go.mod h1:AbdJAgHK5rJNDPUN3msPTfQJSR9b4DKb5xNN07uG8/Y=
github.com/talos-systems/crypto v0.2.0 h1:UwT8uhJ0eDlklY0vYwo1+LGoFgiqkPqjQnae6j8UNYE=
github.com/talos-systems/crypto v0.2.0/go.mod h1:KwqG+jANKU1FNQIapmioHQ5fkovY1DJkAqMenjYBGh0=
github.com/talos-systems/go-loadbalancer v0.1.0 h1:MQFONvSjoleU8RrKq1O1Z8CyTCJGd4SLqdAHDlR6o9s=
github.com/talos-systems/go-loadbalancer v0.1.0/go.mod h1:D5Qjfz+29WVjONWECZvOkmaLsBb3f5YeWME0u/5HmIc=
github.com/talos-systems/go-procfs v0.0.0-20200219015357-57c7311fdd45 h1:FND/LgzFHTBdJBOeZVzdO6B47kxQZvSIzb9AMIXYotg=
github.com/talos-systems/go-procfs v0.0.0-20200219015357-57c7311fdd45/go.mod h1:ATyUGFQIW8OnbnmvqefZWVPgL9g+CAmXHfkgny21xX8=
github.com/talos-systems/go-retry v0.1.0 h1:O+OeZR54CQ1+ch99p/81Pqi5GqJH6LIu1MTN/N0vd78=
github.com/talos-systems/go-retry v0.1.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM=
github.com/talos-systems/go-smbios v0.0.0-20200219201045-94b8c4e489ee h1:9i0ZFsjZ0wY8UUn/tk2MQshLBC0PNFJe3+84AUqzzyw=
github.com/talos-systems/go-smbios v0.0.0-20200219201045-94b8c4e489ee/go.mod h1:HxhrzAoTZ7ed5Z5VvtCvnCIrOxyXDS7V2B5hCetAMW8=
github.com/talos-systems/grpc-proxy v0.2.0 h1:DN75bLfaW4xfhq0r0mwFRnfGhSB+HPhK1LNzuMEs9Pw=

View File

@ -28,6 +28,8 @@ import (
"golang.org/x/sys/unix"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"github.com/talos-systems/go-retry/retry"
installer "github.com/talos-systems/talos/cmd/installer/pkg/install"
"github.com/talos-systems/talos/internal/app/machined/internal/install"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
@ -54,7 +56,6 @@ import (
"github.com/talos-systems/talos/pkg/machinery/config/configloader"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/retry"
"github.com/talos-systems/talos/pkg/sysctl"
"github.com/talos-systems/talos/pkg/version"
)

View File

@ -10,11 +10,11 @@ import (
"time"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/internal/app/machined/pkg/system"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/events"
"github.com/talos-systems/talos/pkg/conditions"
"github.com/talos-systems/talos/pkg/retry"
)
type ServiceRunnerSuite struct {

View File

@ -19,6 +19,8 @@ import (
"github.com/containerd/containerd/oci"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/events"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/health"
@ -29,7 +31,6 @@ import (
"github.com/talos-systems/talos/pkg/kubernetes"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/retry"
)
// APID implements the Service interface. It serves as the concrete type with

View File

@ -19,6 +19,8 @@ import (
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/events"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/runner"
@ -28,7 +30,6 @@ import (
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/retry"
)
// Bootkube implements the Service interface. It serves as the concrete type with

View File

@ -26,7 +26,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/talos-systems/crypto/x509"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/net"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
@ -43,7 +43,6 @@ import (
"github.com/talos-systems/talos/pkg/conditions"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/retry"
)
// Etcd implements the Service interface. It serves as the concrete type with

View File

@ -24,10 +24,10 @@ import (
"google.golang.org/protobuf/proto"
"github.com/talos-systems/go-procfs/procfs"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/internal/app/networkd/pkg/address"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/retry"
)
const (

View File

@ -15,9 +15,9 @@ import (
"github.com/beevik/ntp"
"github.com/hashicorp/go-multierror"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/internal/app/timed/pkg/timex"
"github.com/talos-systems/talos/pkg/retry"
)
// NTP contains a server address.

View File

@ -13,9 +13,10 @@ import (
"testing"
"time"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/internal/integration/base"
"github.com/talos-systems/talos/pkg/machinery/client"
"github.com/talos-systems/talos/pkg/retry"
)
type RebootSuite struct {

View File

@ -10,10 +10,11 @@ import (
"context"
"time"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/internal/integration/base"
"github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
"github.com/talos-systems/talos/pkg/retry"
)
// VersionSuite verifies version API.

View File

@ -15,6 +15,7 @@ import (
"time"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/pkg/cluster"
@ -24,7 +25,6 @@ import (
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/provision"
"github.com/talos-systems/talos/pkg/provision/access"
"github.com/talos-systems/talos/pkg/retry"
)
// APISuite is a base suite for API tests.

View File

@ -18,12 +18,12 @@ import (
"time"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/pkg/cluster"
"github.com/talos-systems/talos/pkg/cmd"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/retry"
)
// CLISuite is a base suite for CLI tests.

View File

@ -19,7 +19,7 @@ import (
"time"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/go-retry/retry"
talosnet "github.com/talos-systems/net"
"github.com/talos-systems/talos/cmd/talosctl/pkg/mgmt/helpers"
@ -36,7 +36,6 @@ import (
"github.com/talos-systems/talos/pkg/provision"
"github.com/talos-systems/talos/pkg/provision/access"
"github.com/talos-systems/talos/pkg/provision/providers/qemu"
"github.com/talos-systems/talos/pkg/retry"
)
type upgradeSpec struct {

View File

@ -9,10 +9,10 @@ import (
"fmt"
"time"
"github.com/talos-systems/talos/pkg/machinery/config"
"github.com/talos-systems/talos/pkg/retry"
"github.com/containerd/containerd"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/pkg/machinery/config"
)
// Pull is a convenience function that wraps the containerd image pull func with

View File

@ -1,92 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// Package loadbalancer provides simple TCP loadbalancer.
package loadbalancer
import (
"context"
"log"
"net"
"inet.af/tcpproxy"
"github.com/talos-systems/talos/internal/pkg/loadbalancer/upstream"
)
// TCP is a simple loadbalancer for TCP connections across a set of upstreams.
//
// Healthcheck is defined as TCP dial attempt by default.
//
// Zero value of TCP is a valid proxy, use `AddRoute` to install load balancer for
// address.
//
// Usage: call Run() to start lb and wait for shutdown, call Close() to shutdown lb.
type TCP struct {
tcpproxy.Proxy
}
type lbUpstream string
func (upstream lbUpstream) HealthCheck(ctx context.Context) error {
d := net.Dialer{}
c, err := d.DialContext(ctx, "tcp", string(upstream))
if err != nil {
log.Printf("healthcheck failed for %q: %s", string(upstream), err)
return err
}
return c.Close()
}
type lbTarget struct {
list *upstream.List
}
func (target *lbTarget) HandleConn(conn net.Conn) {
upstreamBackend, err := target.list.Pick()
if err != nil {
log.Printf("no upstreams available, closing connection from %s", conn.RemoteAddr())
conn.Close() //nolint: errcheck
return
}
upstreamAddr := upstreamBackend.(lbUpstream) //nolint: errcheck
log.Printf("proxying connection %s -> %s", conn.RemoteAddr(), string(upstreamAddr))
upstreamTarget := tcpproxy.To(string(upstreamAddr))
upstreamTarget.OnDialError = func(src net.Conn, dstDialErr error) {
src.Close() //nolint: errcheck
log.Printf("error dialing upstream %s: %s", string(upstreamAddr), dstDialErr)
target.list.Down(upstreamBackend)
}
upstreamTarget.HandleConn(conn)
}
// AddRoute installs load balancer route from listen address ipAddr to list of upstreams.
//
// TCP automatically does background health checks for the upstreams and picks only healthy
// ones. Healthcheck is simple Dial attempt.
func (t *TCP) AddRoute(ipPort string, upstreamAddrs []string, options ...upstream.ListOption) error {
upstreams := make([]upstream.Backend, len(upstreamAddrs))
for i := range upstreams {
upstreams[i] = lbUpstream(upstreamAddrs[i])
}
list, err := upstream.NewList(upstreams, options...)
if err != nil {
return err
}
t.Proxy.AddRoute(ipPort, &lbTarget{list: list})
return nil
}

View File

@ -1,177 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package loadbalancer_test
import (
"io/ioutil"
"net"
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/talos/internal/pkg/loadbalancer"
"github.com/talos-systems/talos/internal/pkg/loadbalancer/upstream"
)
type mockUpstream struct {
identity string
addr string
l net.Listener
}
func (u *mockUpstream) Start() error {
var err error
u.l, err = net.Listen("tcp", "localhost:0")
if err != nil {
return err
}
u.addr = u.l.Addr().String()
go u.serve()
return nil
}
func (u *mockUpstream) serve() {
for {
c, err := u.l.Accept()
if err != nil {
return
}
c.Write([]byte(u.identity)) //nolint: errcheck
c.Close() //nolint: errcheck
}
}
func (u *mockUpstream) Close() {
u.l.Close() //nolint: errcheck
}
func findListenAddress() (string, error) {
u := mockUpstream{}
if err := u.Start(); err != nil {
return "", err
}
u.Close()
return u.addr, nil
}
type TCPSuite struct {
suite.Suite
}
func (suite *TCPSuite) TestBalancer() {
const (
upstreamCount = 5
failingUpstream = 1
)
upstreams := make([]mockUpstream, upstreamCount)
for i := range upstreams {
upstreams[i].identity = strconv.Itoa(i)
suite.Require().NoError(upstreams[i].Start())
}
upstreamAddrs := make([]string, len(upstreams))
for i := range upstreamAddrs {
upstreamAddrs[i] = upstreams[i].addr
}
listenAddr, err := findListenAddress()
suite.Require().NoError(err)
lb := &loadbalancer.TCP{}
suite.Require().NoError(lb.AddRoute(
listenAddr,
upstreamAddrs,
upstream.WithLowHighScores(-3, 3),
upstream.WithInitialScore(1),
upstream.WithScoreDeltas(-1, 1),
upstream.WithHealthcheckInterval(time.Second),
upstream.WithHealthcheckTimeout(100*time.Millisecond),
))
suite.Require().NoError(lb.Start())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
lb.Wait() //nolint: errcheck
}()
for i := 0; i < 2*upstreamCount; i++ {
c, err := net.Dial("tcp", listenAddr)
suite.Require().NoError(err)
id, err := ioutil.ReadAll(c)
suite.Require().NoError(err)
// load balancer should go round-robin across all the upstreams
suite.Assert().Equal([]byte(strconv.Itoa(i%upstreamCount)), id)
suite.Require().NoError(c.Close())
}
// bring down one upstream
upstreams[failingUpstream].Close()
j := 0
failedRequests := 0
for i := 0; i < 10*upstreamCount; i++ {
c, err := net.Dial("tcp", listenAddr)
suite.Require().NoError(err)
id, err := ioutil.ReadAll(c)
suite.Require().NoError(err)
if len(id) == 0 {
// hit failing upstream
suite.Assert().Equal(failingUpstream, j%upstreamCount)
failedRequests++
continue
}
if j%upstreamCount == failingUpstream {
j++
}
// load balancer should go round-robin across all the upstreams
suite.Assert().Equal([]byte(strconv.Itoa(j%upstreamCount)), id)
j++
suite.Require().NoError(c.Close())
}
// worst case: score = 3 (highScore) to go to -1 requires 5 requests
suite.Assert().Less(failedRequests, 5) // no more than 5 requests should fail
suite.Require().NoError(lb.Close())
wg.Wait()
for i := range upstreams {
upstreams[i].Close()
}
}
func TestTCPSuite(t *testing.T) {
suite.Run(t, new(TCPSuite))
}

View File

@ -1,256 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// Package upstream provides utilities for choosing upstream backends based on score.
package upstream
import (
"context"
"fmt"
"sync"
"time"
)
// Backend is an interface which should be implemented for a Pick entry.
type Backend interface {
HealthCheck(ctx context.Context) error
}
type node struct {
backend Backend
score float64
}
// ListOption allows to configure List.
type ListOption func(*List) error
// WithLowHighScores configures low and high score.
func WithLowHighScores(lowScore, highScore float64) ListOption {
return func(l *List) error {
if l.lowScore > 0 {
return fmt.Errorf("lowScore should be non-positive")
}
if l.highScore < 0 {
return fmt.Errorf("highScore should be non-positive")
}
if l.lowScore > l.highScore {
return fmt.Errorf("lowScore should be less or equal to highScore")
}
l.lowScore, l.highScore = lowScore, highScore
return nil
}
}
// WithScoreDeltas configures fail and success score delta.
func WithScoreDeltas(failScoreDelta, successScoreDelta float64) ListOption {
return func(l *List) error {
if l.failScoreDelta >= 0 {
return fmt.Errorf("failScoreDelta should be negative")
}
if l.successScoreDelta <= 0 {
return fmt.Errorf("successScoreDelta should be positive")
}
l.failScoreDelta, l.successScoreDelta = failScoreDelta, successScoreDelta
return nil
}
}
// WithInitialScore configures initial backend score.
func WithInitialScore(initialScore float64) ListOption {
return func(l *List) error {
l.initialScore = initialScore
return nil
}
}
// WithHealthcheckInterval configures healthcheck interval.
func WithHealthcheckInterval(interval time.Duration) ListOption {
return func(l *List) error {
l.healthcheckInterval = interval
return nil
}
}
// WithHealthcheckTimeout configures healthcheck timeout (for each backend).
func WithHealthcheckTimeout(timeout time.Duration) ListOption {
return func(l *List) error {
l.healthcheckTimeout = timeout
return nil
}
}
// List of upstream Backends with healthchecks and different strategies to pick a node.
//
// List keeps track of Backends with score. Score is updated on health checks, and via external
// interface (e.g. when actual connection fails).
//
// Initial score is set via options (default is +1). Low and high scores defaults are (-3, +3).
// Backend score is limited by low and high scores. Each time healthcheck fails score is adjusted
// by fail delta score, and every successful check updates score by success score delta (defaults are -1/+1).
//
// Backend might be used if its score is not negative.
type List struct {
lowScore, highScore float64
failScoreDelta, successScoreDelta float64
initialScore float64
healthcheckInterval time.Duration
healthcheckTimeout time.Duration
healthWg sync.WaitGroup
healthCtx context.Context
healthCtxCancel context.CancelFunc
// Following fields are protected by mutex
mu sync.Mutex
nodes []node
current int
}
// NewList initializes new list with upstream backends and options and starts health checks.
//
// List should be stopped with `.Shutdown()`.
func NewList(upstreams []Backend, options ...ListOption) (*List, error) {
// initialize with defaults
list := &List{
lowScore: -3.0,
highScore: 3.0,
failScoreDelta: -1.0,
successScoreDelta: 1.0,
initialScore: 1.0,
healthcheckInterval: 1 * time.Second,
healthcheckTimeout: 100 * time.Millisecond,
current: -1,
}
list.healthCtx, list.healthCtxCancel = context.WithCancel(context.Background())
for _, opt := range options {
if err := opt(list); err != nil {
return nil, err
}
}
list.nodes = make([]node, len(upstreams))
for i := range list.nodes {
list.nodes[i].backend = upstreams[i]
list.nodes[i].score = list.initialScore
}
list.healthWg.Add(1)
go list.healthcheck()
return list, nil
}
// Shutdown stops healthchecks.
func (list *List) Shutdown() {
list.healthCtxCancel()
list.healthWg.Wait()
}
// Up increases backend score by success score delta.
func (list *List) Up(upstream Backend) {
list.mu.Lock()
defer list.mu.Unlock()
for i := range list.nodes {
if list.nodes[i].backend == upstream {
list.nodes[i].score += list.successScoreDelta
if list.nodes[i].score > list.highScore {
list.nodes[i].score = list.highScore
}
}
}
}
// Down decreases backend score by fail score delta.
func (list *List) Down(upstream Backend) {
list.mu.Lock()
defer list.mu.Unlock()
for i := range list.nodes {
if list.nodes[i].backend == upstream {
list.nodes[i].score += list.failScoreDelta
if list.nodes[i].score < list.lowScore {
list.nodes[i].score = list.lowScore
}
}
}
}
// Pick returns next backend to be used.
//
// Default policy is to pick healthy (non-negative score) backend in
// round-robin fashion.
func (list *List) Pick() (Backend, error) {
list.mu.Lock()
defer list.mu.Unlock()
for j := 0; j < len(list.nodes); j++ {
i := (list.current + 1 + j) % len(list.nodes)
if list.nodes[i].score >= 0 {
list.current = i
return list.nodes[list.current].backend, nil
}
}
return nil, fmt.Errorf("no upstreams available")
}
func (list *List) healthcheck() {
defer list.healthWg.Done()
ticker := time.NewTicker(list.healthcheckInterval)
defer ticker.Stop()
for {
list.mu.Lock()
nodes := append([]node(nil), list.nodes...)
list.mu.Unlock()
for _, node := range nodes {
select {
case <-list.healthCtx.Done():
return
default:
}
func() {
ctx, ctxCancel := context.WithTimeout(list.healthCtx, list.healthcheckTimeout)
defer ctxCancel()
if err := node.backend.HealthCheck(ctx); err != nil {
list.Down(node.backend)
} else {
list.Up(node.backend)
}
}()
}
select {
case <-ticker.C:
case <-list.healthCtx.Done():
return
}
}
}

View File

@ -1,192 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package upstream_test
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/talos/internal/pkg/loadbalancer/upstream"
"github.com/talos-systems/talos/pkg/retry"
)
type mockBackend string
func (b mockBackend) HealthCheck(ctx context.Context) error {
switch string(b) {
case "fail":
return errors.New("fail")
case "success":
return nil
default:
<-ctx.Done()
return ctx.Err()
}
}
type ListSuite struct {
suite.Suite
}
func (suite *ListSuite) TestEmpty() {
l, err := upstream.NewList(nil)
suite.Require().NoError(err)
defer l.Shutdown()
backend, err := l.Pick()
suite.Assert().Nil(backend)
suite.Assert().EqualError(err, "no upstreams available")
}
func (suite *ListSuite) TestRoundRobin() {
l, err := upstream.NewList([]upstream.Backend{mockBackend("one"), mockBackend("two"), mockBackend("three")})
suite.Require().NoError(err)
defer l.Shutdown()
backend, err := l.Pick()
suite.Assert().Equal(mockBackend("one"), backend)
suite.Assert().NoError(err)
backend, err = l.Pick()
suite.Assert().Equal(mockBackend("two"), backend)
suite.Assert().NoError(err)
backend, err = l.Pick()
suite.Assert().Equal(mockBackend("three"), backend)
suite.Assert().NoError(err)
backend, err = l.Pick()
suite.Assert().Equal(mockBackend("one"), backend)
suite.Assert().NoError(err)
}
func (suite *ListSuite) TestDownUp() {
l, err := upstream.NewList(
[]upstream.Backend{
mockBackend("one"),
mockBackend("two"),
mockBackend("three"),
},
upstream.WithLowHighScores(-3, 3),
upstream.WithInitialScore(1),
upstream.WithScoreDeltas(-1, 1),
upstream.WithHealthcheckInterval(time.Hour),
)
suite.Require().NoError(err)
defer l.Shutdown()
backend, err := l.Pick()
suite.Assert().Equal(mockBackend("one"), backend)
suite.Assert().NoError(err)
l.Down(mockBackend("two")) // score == 0
l.Down(mockBackend("two")) // score == -1
l.Down(mockBackend("three")) // score == 0
backend, err = l.Pick()
suite.Assert().Equal(mockBackend("three"), backend)
suite.Assert().NoError(err)
backend, err = l.Pick()
suite.Assert().Equal(mockBackend("one"), backend)
suite.Assert().NoError(err)
backend, err = l.Pick()
suite.Assert().Equal(mockBackend("three"), backend)
suite.Assert().NoError(err)
l.Down(mockBackend("three")) // score == -1
l.Up(mockBackend("two")) // score == 0
l.Up(mockBackend("two")) // score == 1
l.Up(mockBackend("two")) // score == 2
l.Up(mockBackend("two")) // score == 3
l.Up(mockBackend("two")) // score == 3 (capped at highScore)
backend, err = l.Pick()
suite.Assert().Equal(mockBackend("one"), backend)
suite.Assert().NoError(err)
backend, err = l.Pick()
suite.Assert().Equal(mockBackend("two"), backend)
suite.Assert().NoError(err)
backend, err = l.Pick()
suite.Assert().Equal(mockBackend("one"), backend)
suite.Assert().NoError(err)
l.Down(mockBackend("two")) // score == 2
l.Down(mockBackend("two")) // score == 1
l.Down(mockBackend("two")) // score == 0
l.Down(mockBackend("two")) // score == -1
backend, err = l.Pick()
suite.Assert().Equal(mockBackend("one"), backend)
suite.Assert().NoError(err)
l.Down(mockBackend("two")) // score == -2
l.Down(mockBackend("two")) // score == -3
l.Down(mockBackend("two")) // score == -3 (capped at lowScore)
backend, err = l.Pick()
suite.Assert().Equal(mockBackend("one"), backend)
suite.Assert().NoError(err)
l.Up(mockBackend("two")) // score == -2
l.Up(mockBackend("two")) // score == -1
l.Up(mockBackend("two")) // score == 0
backend, err = l.Pick()
suite.Assert().Equal(mockBackend("two"), backend)
suite.Assert().NoError(err)
}
func (suite *ListSuite) TestHealthcheck() {
l, err := upstream.NewList(
[]upstream.Backend{
mockBackend("success"),
mockBackend("fail"),
mockBackend("timeout"),
},
upstream.WithLowHighScores(-1, 1),
upstream.WithInitialScore(1),
upstream.WithScoreDeltas(-1, 1),
upstream.WithHealthcheckInterval(10*time.Millisecond),
upstream.WithHealthcheckTimeout(time.Millisecond),
)
suite.Require().NoError(err)
defer l.Shutdown()
time.Sleep(20 * time.Millisecond) // let healthchecks run
// when health info converges, "success" should be the only backend left
suite.Require().NoError(retry.Constant(time.Second, retry.WithUnits(time.Millisecond)).Retry(func() error {
for i := 0; i < 10; i++ {
backend, err := l.Pick()
if err != nil {
return retry.UnexpectedError(err)
}
if backend.(mockBackend) != mockBackend("success") {
return retry.ExpectedError(fmt.Errorf("unexpected %v", backend))
}
}
return nil
}))
}
func TestListSuite(t *testing.T) {
suite.Run(t, new(ListSuite))
}

View File

@ -13,6 +13,7 @@ import (
"strings"
"time"
"github.com/talos-systems/go-retry/retry"
"golang.org/x/sys/unix"
"github.com/talos-systems/talos/pkg/blockdevice"
@ -20,7 +21,6 @@ import (
gptpartition "github.com/talos-systems/talos/pkg/blockdevice/table/gpt/partition"
"github.com/talos-systems/talos/pkg/blockdevice/util"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/retry"
)
// RetryFunc defines the requirements for retrying a mount point operation.

View File

@ -11,11 +11,11 @@ import (
"time"
"unsafe"
"github.com/talos-systems/go-retry/retry"
"golang.org/x/sys/unix"
"github.com/talos-systems/talos/pkg/blockdevice/lba"
"github.com/talos-systems/talos/pkg/blockdevice/table"
"github.com/talos-systems/talos/pkg/retry"
)
// InformKernelOfAdd invokes the BLKPG_ADD_PARTITION ioctl.

View File

@ -12,11 +12,11 @@ import (
"time"
"unsafe"
"github.com/talos-systems/go-retry/retry"
"golang.org/x/sys/unix"
"github.com/talos-systems/talos/pkg/blockdevice/table"
"github.com/talos-systems/talos/pkg/blockdevice/table/gpt"
"github.com/talos-systems/talos/pkg/retry"
"golang.org/x/sys/unix"
)
// BlockDevice represents a block device.

View File

@ -15,6 +15,9 @@ import (
"path/filepath"
"time"
"github.com/talos-systems/go-retry/retry"
"golang.org/x/sys/unix"
"github.com/talos-systems/talos/pkg/blockdevice"
"github.com/talos-systems/talos/pkg/blockdevice/filesystem"
"github.com/talos-systems/talos/pkg/blockdevice/filesystem/iso9660"
@ -22,9 +25,6 @@ import (
"github.com/talos-systems/talos/pkg/blockdevice/filesystem/xfs"
gptpartition "github.com/talos-systems/talos/pkg/blockdevice/table/gpt/partition"
"github.com/talos-systems/talos/pkg/blockdevice/util"
"github.com/talos-systems/talos/pkg/retry"
"golang.org/x/sys/unix"
)
// ProbedBlockDevice represents a probed block device.

View File

@ -11,9 +11,10 @@ import (
"sort"
"time"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/pkg/machinery/client"
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
"github.com/talos-systems/talos/pkg/retry"
)
// APIBoostrapper bootstraps cluster via Talos API.

View File

@ -12,7 +12,7 @@ import (
"net/url"
"time"
"github.com/talos-systems/talos/pkg/retry"
"github.com/talos-systems/go-retry/retry"
)
const b64 = "base64"

View File

@ -29,9 +29,9 @@ import (
"k8s.io/client-go/tools/clientcmd"
"github.com/talos-systems/crypto/x509"
"github.com/talos-systems/go-retry/retry"
"github.com/talos-systems/talos/pkg/machinery/constants"
"github.com/talos-systems/talos/pkg/retry"
)
// Client represents a set of helper methods for interacting with the

View File

@ -1,57 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package retry
import (
"time"
)
type constantRetryer struct {
retryer
}
// ConstantTicker represents a ticker with a constant algorithm.
type ConstantTicker struct {
ticker
}
// Constant initializes and returns a constant Retryer.
func Constant(duration time.Duration, setters ...Option) Retryer {
opts := NewDefaultOptions(setters...)
return constantRetryer{
retryer: retryer{
duration: duration,
options: opts,
},
}
}
// NewConstantTicker is a ticker that sends the time on a channel using a
// constant algorithm.
func NewConstantTicker(opts *Options) *ConstantTicker {
l := &ConstantTicker{
ticker: ticker{
C: make(chan time.Time, 1),
options: opts,
s: make(chan struct{}, 1),
},
}
return l
}
// Retry implements the Retryer interface.
func (c constantRetryer) Retry(f RetryableFunc) error {
tick := NewConstantTicker(c.options)
defer tick.Stop()
return retry(f, c.duration, tick)
}
// Tick implements the Ticker interface.
func (c ConstantTicker) Tick() time.Duration {
return c.options.Units + c.Jitter()
}

View File

@ -1,172 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//nolint: testpackage
package retry
import (
"fmt"
"testing"
"time"
)
// nolint: scopelint
func Test_constantRetryer_Retry(t *testing.T) {
type fields struct {
retryer retryer
}
type args struct {
f RetryableFunc
}
count := 0
tests := []struct {
name string
fields fields
args args
expectedCount int
wantErr bool
}{
{
name: "test expected number of retries",
fields: fields{
retryer: retryer{
duration: 2500 * time.Millisecond,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
count++
return ExpectedError(fmt.Errorf("expected"))
},
},
expectedCount: 3,
wantErr: true,
},
{
name: "test expected number of retries with units",
fields: fields{
retryer: retryer{
duration: 2250 * time.Millisecond,
options: NewDefaultOptions(WithUnits(500 * time.Millisecond)),
},
},
args: args{
f: func() error {
count++
return ExpectedError(fmt.Errorf("expected"))
},
},
expectedCount: 5,
wantErr: true,
},
{
name: "test unexpected error",
fields: fields{
retryer: retryer{
duration: 2 * time.Second,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
count++
return UnexpectedError(fmt.Errorf("unexpected"))
},
},
expectedCount: 1,
wantErr: true,
},
{
name: "test conditional unexpected error",
fields: fields{
retryer: retryer{
duration: 10 * time.Second,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
count++
if count == 2 {
return UnexpectedError(fmt.Errorf("unexpected"))
}
return ExpectedError(fmt.Errorf("unexpected"))
},
},
expectedCount: 2,
wantErr: true,
},
{
name: "test conditional no error",
fields: fields{
retryer: retryer{
duration: 10 * time.Second,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
count++
if count == 2 {
return nil
}
return ExpectedError(fmt.Errorf("unexpected"))
},
},
expectedCount: 2,
wantErr: false,
},
{
name: "no error",
fields: fields{
retryer: retryer{
duration: 1 * time.Second,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
return nil
},
},
expectedCount: 0,
wantErr: false,
},
{
name: "test timeout",
fields: fields{
retryer: retryer{
duration: 1 * time.Second,
options: NewDefaultOptions(WithUnits(10 * time.Second)),
},
},
args: args{
f: func() error {
count++
return ExpectedError(fmt.Errorf("expected"))
},
},
expectedCount: 1,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := constantRetryer{
retryer: tt.fields.retryer,
}
count = 0
if err := e.Retry(tt.args.f); (err != nil) != tt.wantErr {
t.Errorf("constantRetryer.Retry() error = %v, wantErr %v", err, tt.wantErr)
}
if count != tt.expectedCount {
t.Errorf("expected count of %d, got %d", tt.expectedCount, count)
}
})
}
}

View File

@ -1,66 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package retry
import (
"math"
"time"
)
type exponentialRetryer struct {
retryer
}
// ExponentialTicker represents a ticker with a truncated exponential algorithm.
// Please see https://en.wikipedia.org/wiki/Exponential_backoff for details on
// the algorithm.
type ExponentialTicker struct {
ticker
c float64
}
// Exponential initializes and returns a truncated exponential Retryer.
func Exponential(duration time.Duration, setters ...Option) Retryer {
opts := NewDefaultOptions(setters...)
return exponentialRetryer{
retryer: retryer{
duration: duration,
options: opts,
},
}
}
// NewExponentialTicker is a ticker that sends the time on a channel using a
// truncated exponential algorithm.
func NewExponentialTicker(opts *Options) *ExponentialTicker {
e := &ExponentialTicker{
ticker: ticker{
C: make(chan time.Time, 1),
options: opts,
s: make(chan struct{}, 1),
},
c: 1.0,
}
return e
}
// Retry implements the Retryer interface.
func (e exponentialRetryer) Retry(f RetryableFunc) error {
tick := NewExponentialTicker(e.options)
defer tick.Stop()
return retry(f, e.duration, tick)
}
// Tick implements the Ticker interface.
func (e *ExponentialTicker) Tick() time.Duration {
d := time.Duration((math.Pow(2, e.c)-1)/2)*e.options.Units + e.Jitter()
e.c++
return d
}

View File

@ -1,172 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//nolint: testpackage
package retry
import (
"fmt"
"testing"
"time"
)
// nolint: scopelint
func Test_exponentialRetryer_Retry(t *testing.T) {
type fields struct {
retryer retryer
}
type args struct {
f RetryableFunc
}
count := 0
tests := []struct {
name string
fields fields
args args
expectedCount int
wantErr bool
}{
{
name: "test expected number of retries",
fields: fields{
retryer: retryer{
duration: 1 * time.Second,
options: NewDefaultOptions(WithUnits(100 * time.Millisecond)),
},
},
args: args{
f: func() error {
count++
return ExpectedError(fmt.Errorf("expected"))
},
},
expectedCount: 4,
wantErr: true,
},
{
name: "test expected number of retries with units",
fields: fields{
retryer: retryer{
duration: 1 * time.Second,
options: NewDefaultOptions(WithUnits(50 * time.Millisecond)),
},
},
args: args{
f: func() error {
count++
return ExpectedError(fmt.Errorf("expected"))
},
},
expectedCount: 5,
wantErr: true,
},
{
name: "test unexpected error",
fields: fields{
retryer: retryer{
duration: 2 * time.Second,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
count++
return UnexpectedError(fmt.Errorf("unexpected"))
},
},
expectedCount: 1,
wantErr: true,
},
{
name: "test conditional unexpected error",
fields: fields{
retryer: retryer{
duration: 10 * time.Second,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
count++
if count == 2 {
return UnexpectedError(fmt.Errorf("unexpected"))
}
return ExpectedError(fmt.Errorf("unexpected"))
},
},
expectedCount: 2,
wantErr: true,
},
{
name: "test conditional no error",
fields: fields{
retryer: retryer{
duration: 10 * time.Second,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
count++
if count == 2 {
return nil
}
return ExpectedError(fmt.Errorf("unexpected"))
},
},
expectedCount: 2,
wantErr: false,
},
{
name: "no error",
fields: fields{
retryer: retryer{
duration: 1 * time.Second,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
return nil
},
},
expectedCount: 0,
wantErr: false,
},
{
name: "test timeout",
fields: fields{
retryer: retryer{
duration: 1 * time.Second,
options: NewDefaultOptions(WithUnits(10 * time.Second)),
},
},
args: args{
f: func() error {
count++
return ExpectedError(fmt.Errorf("expected"))
},
},
expectedCount: 2,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := exponentialRetryer{
retryer: tt.fields.retryer,
}
count = 0
if err := e.Retry(tt.args.f); (err != nil) != tt.wantErr {
t.Errorf("exponentialRetryer.Retry() error = %v, wantErr %v", err, tt.wantErr)
}
if count != tt.expectedCount {
t.Errorf("expected count of %d, got %d", tt.expectedCount, count)
}
})
}
}

View File

@ -1,63 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package retry
import (
"time"
)
type linearRetryer struct {
retryer
}
// LinearTicker represents a ticker with a linear algorithm.
type LinearTicker struct {
ticker
c int
}
// Linear initializes and returns a linear Retryer.
func Linear(duration time.Duration, setters ...Option) Retryer {
opts := NewDefaultOptions(setters...)
return linearRetryer{
retryer: retryer{
duration: duration,
options: opts,
},
}
}
// NewLinearTicker is a ticker that sends the time on a channel using a
// linear algorithm.
func NewLinearTicker(opts *Options) *LinearTicker {
l := &LinearTicker{
ticker: ticker{
C: make(chan time.Time, 1),
options: opts,
s: make(chan struct{}, 1),
},
c: 1,
}
return l
}
// Retry implements the Retryer interface.
func (l linearRetryer) Retry(f RetryableFunc) error {
tick := NewLinearTicker(l.options)
defer tick.Stop()
return retry(f, l.duration, tick)
}
// Tick implements the Ticker interface.
func (l *LinearTicker) Tick() time.Duration {
d := time.Duration(l.c)*l.options.Units + l.Jitter()
l.c++
return d
}

View File

@ -1,172 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//nolint: testpackage
package retry
import (
"fmt"
"testing"
"time"
)
// nolint: scopelint
func Test_linearRetryer_Retry(t *testing.T) {
type fields struct {
retryer retryer
}
type args struct {
f RetryableFunc
}
count := 0
tests := []struct {
name string
fields fields
args args
expectedCount int
wantErr bool
}{
{
name: "test expected number of retries",
fields: fields{
retryer: retryer{
duration: 1*time.Second + 200*time.Millisecond,
options: NewDefaultOptions(WithUnits(100 * time.Millisecond)),
},
},
args: args{
f: func() error {
count++
return ExpectedError(fmt.Errorf("expected"))
},
},
expectedCount: 5,
wantErr: true,
},
{
name: "test expected number of retries with units",
fields: fields{
retryer: retryer{
duration: 2 * time.Second,
options: NewDefaultOptions(WithUnits(50 * time.Millisecond)),
},
},
args: args{
f: func() error {
count++
return ExpectedError(fmt.Errorf("expected"))
},
},
expectedCount: 9,
wantErr: true,
},
{
name: "test unexpected error",
fields: fields{
retryer: retryer{
duration: 2 * time.Second,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
count++
return UnexpectedError(fmt.Errorf("unexpected"))
},
},
expectedCount: 1,
wantErr: true,
},
{
name: "test conditional unexpected error",
fields: fields{
retryer: retryer{
duration: 10 * time.Second,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
count++
if count == 1 {
return UnexpectedError(fmt.Errorf("unexpected"))
}
return ExpectedError(fmt.Errorf("unexpected"))
},
},
expectedCount: 1,
wantErr: true,
},
{
name: "test conditional no error",
fields: fields{
retryer: retryer{
duration: 10 * time.Second,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
count++
if count == 2 {
return nil
}
return ExpectedError(fmt.Errorf("unexpected"))
},
},
expectedCount: 2,
wantErr: false,
},
{
name: "no error",
fields: fields{
retryer: retryer{
duration: 1 * time.Second,
options: NewDefaultOptions(),
},
},
args: args{
f: func() error {
return nil
},
},
expectedCount: 0,
wantErr: false,
},
{
name: "test timeout",
fields: fields{
retryer: retryer{
duration: 1 * time.Second,
options: NewDefaultOptions(WithUnits(10 * time.Second)),
},
},
args: args{
f: func() error {
count++
return ExpectedError(fmt.Errorf("expected"))
},
},
expectedCount: 1,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := linearRetryer{
retryer: tt.fields.retryer,
}
count = 0
if err := l.Retry(tt.args.f); (err != nil) != tt.wantErr {
t.Errorf("linearRetryer.Retry() error = %v, wantErr %v", err, tt.wantErr)
}
if count != tt.expectedCount {
t.Errorf("expected count of %d, got %d", tt.expectedCount, count)
}
})
}
}

View File

@ -1,44 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package retry
import "time"
// Options is the functional options struct.
type Options struct {
Units time.Duration
Jitter time.Duration
}
// Option is the functional option func.
type Option func(*Options)
// WithUnits is a functional option for setting the units of the ticker.
func WithUnits(o time.Duration) Option {
return func(args *Options) {
args.Units = o
}
}
// WithJitter is a functional option for setting the jitter flag.
func WithJitter(o time.Duration) Option {
return func(args *Options) {
args.Jitter = o
}
}
// NewDefaultOptions initializes a Options struct with default values.
func NewDefaultOptions(setters ...Option) *Options {
opts := &Options{
Units: time.Second,
Jitter: time.Duration(0),
}
for _, setter := range setters {
setter(opts)
}
return opts
}

View File

@ -1,52 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//nolint: testpackage
package retry
import (
"reflect"
"testing"
"time"
)
// nolint: scopelint
func TestNewDefaultOptions(t *testing.T) {
type args struct {
setters []Option
}
tests := []struct {
name string
args args
want *Options
}{
{
name: "with options",
args: args{
setters: []Option{WithUnits(time.Millisecond)},
},
want: &Options{
Units: time.Millisecond,
},
},
{
name: "default",
args: args{
setters: []Option{},
},
want: &Options{
Units: time.Second,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewDefaultOptions(tt.args.setters...); !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewDefaultOptions() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -1,181 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package retry
import (
"fmt"
"math/rand"
"sync"
"time"
)
// RetryableFunc represents a function that can be retried.
type RetryableFunc func() error
// Retryer defines the requirements for retrying a function.
type Retryer interface {
Retry(RetryableFunc) error
}
// Ticker defines the requirements for providing a clock to the retry logic.
type Ticker interface {
Tick() time.Duration
StopChan() <-chan struct{}
Stop()
}
// ErrorSet represents a set of unique errors.
type ErrorSet struct {
errs []error
mu sync.Mutex
}
func (e *ErrorSet) Error() string {
if len(e.errs) == 0 {
return ""
}
errString := fmt.Sprintf("%d error(s) occurred:", len(e.errs))
for _, err := range e.errs {
errString = fmt.Sprintf("%s\n\t%s", errString, err)
}
return errString
}
// Append adds the error to the set if the error is not already present.
func (e *ErrorSet) Append(err error) error {
e.mu.Lock()
defer e.mu.Unlock()
if e.errs == nil {
e.errs = []error{}
}
ok := false
for _, existingErr := range e.errs {
if err.Error() == existingErr.Error() {
ok = true
break
}
}
if !ok {
e.errs = append(e.errs, err)
}
return e
}
// TimeoutError represents a timeout error.
type TimeoutError struct{}
func (TimeoutError) Error() string {
return "timeout"
}
// IsTimeout reutrns if the provided error is a timeout error.
func IsTimeout(err error) bool {
_, ok := err.(TimeoutError)
return ok
}
type expectedError struct{ error }
type unexpectedError struct{ error }
type retryer struct {
duration time.Duration
options *Options
}
type ticker struct {
C chan time.Time
options *Options
rand *rand.Rand
s chan struct{}
}
func (t ticker) Jitter() time.Duration {
if int(t.options.Jitter) == 0 {
return time.Duration(0)
}
if t.rand == nil {
t.rand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
return time.Duration(t.rand.Int63n(int64(t.options.Jitter)))
}
func (t ticker) StopChan() <-chan struct{} {
return t.s
}
func (t ticker) Stop() {
t.s <- struct{}{}
}
// ExpectedError error represents an error that is expected by the retrying
// function. This error is ignored.
func ExpectedError(err error) error {
if err == nil {
return nil
}
return expectedError{err}
}
// UnexpectedError error represents an error that is unexpected by the retrying
// function. This error is fatal.
func UnexpectedError(err error) error {
if err == nil {
return nil
}
return unexpectedError{err}
}
func retry(f RetryableFunc, d time.Duration, t Ticker) error {
timer := time.NewTimer(d)
defer timer.Stop()
errs := &ErrorSet{}
// We run the func first to avoid having to wait for the next tick.
if err := f(); err != nil {
if _, ok := err.(unexpectedError); ok {
return errs.Append(err)
}
} else {
return nil
}
for {
select {
case <-timer.C:
return errs.Append(TimeoutError{})
case <-t.StopChan():
return nil
case <-time.After(t.Tick()):
}
if err := f(); err != nil {
switch err.(type) {
case expectedError:
// nolint: errcheck
errs.Append(err)
continue
case unexpectedError:
return errs.Append(err)
}
}
return nil
}
}

View File

@ -1,63 +0,0 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//nolint: testpackage
package retry
import (
"errors"
"testing"
"time"
)
// nolint: scopelint
func Test_retry(t *testing.T) {
type args struct {
f RetryableFunc
d time.Duration
t Ticker
}
tests := []struct {
name string
args args
wantString string
}{
{
name: "expected error string",
args: args{
f: func() error { return ExpectedError(errors.New("test")) },
d: 2 * time.Second,
t: NewConstantTicker(NewDefaultOptions()),
},
wantString: "2 error(s) occurred:\n\ttest\n\ttimeout",
},
{
name: "unexpected error string",
args: args{
f: func() error { return UnexpectedError(errors.New("test")) },
d: 2 * time.Second,
t: NewConstantTicker(NewDefaultOptions()),
},
wantString: "1 error(s) occurred:\n\ttest",
},
{
name: "no error string",
args: args{
f: func() error { return nil },
d: 2 * time.Second,
t: NewConstantTicker(NewDefaultOptions()),
},
wantString: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := retry(tt.args.f, tt.args.d, tt.args.t); err != nil && tt.wantString != err.Error() {
t.Errorf("retry() error = %q\nwant:\n%q", err, tt.wantString)
}
})
}
}