refactor: use retry package in ntpd

This moves to using the retry package for retrying NTP queries. It also
adds some additional logging that is useful when NTP queries fail.

Signed-off-by: Andrew Rynhard <andrew@andrewrynhard.com>
This commit is contained in:
Andrew Rynhard 2019-11-02 04:00:26 +00:00
parent e9296bed6e
commit eb75d1fb47
2 changed files with 31 additions and 57 deletions

View File

@ -7,12 +7,13 @@ package ntp
import (
"fmt"
"log"
"math/rand"
"syscall"
"time"
"github.com/beevik/ntp"
"github.com/hashicorp/go-multierror"
"github.com/talos-systems/talos/pkg/retry"
)
// NTP contains a server address
@ -20,11 +21,10 @@ type NTP struct {
Server string
MinPoll time.Duration
MaxPoll time.Duration
Retry int
}
// NewNTPClient instantiates a new ntp client for the
// specified server
// specified server.
func NewNTPClient(opts ...Option) (*NTP, error) {
ntp := defaultOptions()
@ -38,32 +38,12 @@ func NewNTPClient(opts ...Option) (*NTP, error) {
// Daemon runs the control loop for query and set time
// We dont ever want the daemon to stop, so we only log
// errors
// errors.
func (n *NTP) Daemon() (err error) {
// Do an initial hard set of time to ensure clock skew isnt too far off
var resp *ntp.Response
if resp, err = n.Query(); err != nil {
log.Printf("error querying %s for time, %s", n.Server, err)
return err
}
if err = setTime(resp.Time); err != nil {
return err
}
var randSleep time.Duration
for {
// Set some variance with how frequently we poll ntp servers.
// This is based on rand(MaxPoll) + MinPoll so we wait at least
// MinPoll.
randSleep = time.Duration(rand.Intn(int(n.MaxPoll.Seconds()))) * time.Second
time.Sleep(randSleep + n.MinPoll)
var resp *ntp.Response
if resp, err = n.Query(); err != nil {
// As long as we set initial time, we'll treat
// subsequent errors as nonfatal
log.Printf("error querying %s for time, %s", n.Server, err)
continue
}
@ -76,31 +56,34 @@ func (n *NTP) Daemon() (err error) {
}
// Query polls the ntp server and verifies a successful response.
func (n *NTP) Query() (*ntp.Response, error) {
for i := 0; i < n.Retry; i++ {
resp, err := ntp.Query(n.Server)
func (n *NTP) Query() (resp *ntp.Response, err error) {
err = retry.Constant(n.MaxPoll, retry.WithUnits(n.MinPoll), retry.WithJitter(250*time.Millisecond)).Retry(func() error {
resp, err = ntp.Query(n.Server)
if err != nil {
time.Sleep(time.Duration(i) * n.MinPoll)
continue
log.Printf("query error: %v", err)
return retry.ExpectedError(err)
}
if err := resp.Validate(); err != nil {
time.Sleep(time.Duration(i) * n.MinPoll)
continue
if err = resp.Validate(); err != nil {
return retry.UnexpectedError(err)
}
return resp, nil
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to query NTP server: %w", err)
}
return nil, fmt.Errorf("failed to get a response back from ntp server after %d retries", n.Retry)
return resp, nil
}
// GetTime returns the current system time
// GetTime returns the current system time.
func (n *NTP) GetTime() time.Time {
return time.Now()
}
// SetTime sets the system time based on the query response
// SetTime sets the system time based on the query response.
func setTime(adjustedTime time.Time) error {
log.Printf("setting time to %s", adjustedTime)
@ -109,7 +92,7 @@ func setTime(adjustedTime time.Time) error {
return syscall.Settimeofday(&timeval)
}
// adjustTime adds an offset to the current time
// adjustTime adds an offset to the current time.
func adjustTime(offset time.Duration) error {
return setTime(time.Now().Add(offset))
}

View File

@ -13,10 +13,10 @@ import (
type Option func(*NTP) error
const (
// MaxPoll is the 'recommended' interval for querying a time server
MaxPoll = 1024
// MinPoll is the minimum time allowed for a client to query a time server
MinPoll = 4
// MaxAllowablePoll is the 'recommended' interval for querying a time server
MaxAllowablePoll = 1024
// MinAllowablePoll is the minimum time allowed for a client to query a time server
MinAllowablePoll = 4
)
func defaultOptions() *NTP {
@ -24,9 +24,8 @@ func defaultOptions() *NTP {
// http://www.ntp.org/ntpfaq/NTP-s-algo.htm#AEN2082
return &NTP{
Server: "pool.ntp.org",
MaxPoll: MaxPoll * time.Second,
MaxPoll: MaxAllowablePoll * time.Second,
MinPoll: 64 * time.Second,
Retry: 3,
}
}
@ -41,9 +40,9 @@ func WithServer(o string) Option {
// WithMaxPoll configures the ntp client MaxPoll interval
func WithMaxPoll(o int) Option {
return func(n *NTP) (err error) {
// TODO add in constraints around min/max values from ntp doc
if o > MaxPoll {
return fmt.Errorf("MaxPoll(%d) is larger than maximum allowed value(%d)", o, MaxPoll)
// TODO: Add in constraints around min/max values from NTP doc.
if o > MaxAllowablePoll {
return fmt.Errorf("MaxPoll(%d) is larger than maximum allowed value(%d)", o, MaxAllowablePoll)
}
n.MaxPoll = time.Duration(o) * time.Second
return err
@ -53,18 +52,10 @@ func WithMaxPoll(o int) Option {
// WithMinPoll configures the ntp client MinPoll interval
func WithMinPoll(o int) Option {
return func(n *NTP) (err error) {
if o < MinPoll {
return fmt.Errorf("MinPoll(%d) is smaller than minimum allowed value(%d)", o, MinPoll)
if o < MinAllowablePoll {
return fmt.Errorf("MinPoll(%d) is smaller than minimum allowed value(%d)", o, MinAllowablePoll)
}
n.MinPoll = time.Duration(o) * time.Second
return err
}
}
// WithRetry configures the ntp client maximum number of retries
func WithRetry(o int) Option {
return func(n *NTP) (err error) {
n.Retry = o
return err
}
}