mirror of
https://github.com/containous/traefik.git
synced 2025-01-05 09:17:52 +03:00
db4a92d877
Co-authored-by: Tom Moulard <tom.moulard@traefik.io>
298 lines
6.8 KiB
Go
298 lines
6.8 KiB
Go
package udp
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// maxDatagramSize is the maximum size of a UDP datagram.
|
|
const maxDatagramSize = 65535
|
|
|
|
const closeRetryInterval = 500 * time.Millisecond
|
|
|
|
var errClosedListener = errors.New("udp: listener closed")
|
|
|
|
// Listener augments a session-oriented Listener over a UDP PacketConn.
|
|
type Listener struct {
|
|
pConn *net.UDPConn
|
|
|
|
mu sync.RWMutex
|
|
conns map[string]*Conn
|
|
// accepting signifies whether the listener is still accepting new sessions.
|
|
// It also serves as a sentinel for Shutdown to be idempotent.
|
|
accepting bool
|
|
|
|
acceptCh chan *Conn // no need for a Once, already indirectly guarded by accepting.
|
|
|
|
// timeout defines how long to wait on an idle session,
|
|
// before releasing its related resources.
|
|
timeout time.Duration
|
|
}
|
|
|
|
// Listen creates a new listener.
|
|
func Listen(network string, laddr *net.UDPAddr, timeout time.Duration) (*Listener, error) {
|
|
if timeout <= 0 {
|
|
return nil, errors.New("timeout should be greater than zero")
|
|
}
|
|
|
|
conn, err := net.ListenUDP(network, laddr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
l := &Listener{
|
|
pConn: conn,
|
|
acceptCh: make(chan *Conn),
|
|
conns: make(map[string]*Conn),
|
|
accepting: true,
|
|
timeout: timeout,
|
|
}
|
|
|
|
go l.readLoop()
|
|
|
|
return l, nil
|
|
}
|
|
|
|
// Accept waits for and returns the next connection to the listener.
|
|
func (l *Listener) Accept() (*Conn, error) {
|
|
c := <-l.acceptCh
|
|
if c == nil {
|
|
// l.acceptCh got closed
|
|
return nil, errClosedListener
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
// Addr returns the listener's network address.
|
|
func (l *Listener) Addr() net.Addr {
|
|
return l.pConn.LocalAddr()
|
|
}
|
|
|
|
// Close closes the listener.
|
|
// It is like Shutdown with a zero graceTimeout.
|
|
func (l *Listener) Close() error {
|
|
return l.Shutdown(0)
|
|
}
|
|
|
|
// close should not be called more than once.
|
|
func (l *Listener) close() error {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
err := l.pConn.Close()
|
|
for k, v := range l.conns {
|
|
v.close()
|
|
delete(l.conns, k)
|
|
}
|
|
close(l.acceptCh)
|
|
return err
|
|
}
|
|
|
|
// Shutdown closes the listener.
|
|
// It immediately stops accepting new sessions,
|
|
// and it waits for all existing sessions to terminate,
|
|
// and a maximum of graceTimeout.
|
|
// Then it forces close any session left.
|
|
func (l *Listener) Shutdown(graceTimeout time.Duration) error {
|
|
l.mu.Lock()
|
|
if !l.accepting {
|
|
l.mu.Unlock()
|
|
return nil
|
|
}
|
|
l.accepting = false
|
|
l.mu.Unlock()
|
|
|
|
retryInterval := closeRetryInterval
|
|
if retryInterval > graceTimeout {
|
|
retryInterval = graceTimeout
|
|
}
|
|
start := time.Now()
|
|
end := start.Add(graceTimeout)
|
|
for {
|
|
if time.Now().After(end) {
|
|
break
|
|
}
|
|
|
|
l.mu.RLock()
|
|
if len(l.conns) == 0 {
|
|
l.mu.RUnlock()
|
|
break
|
|
}
|
|
l.mu.RUnlock()
|
|
|
|
time.Sleep(retryInterval)
|
|
}
|
|
return l.close()
|
|
}
|
|
|
|
// readLoop receives all packets from all remotes.
|
|
// If a packet comes from a remote that is already known to us (i.e. a "session"),
|
|
// we find that session, and otherwise we create a new one.
|
|
// We then send the data the session's readLoop.
|
|
func (l *Listener) readLoop() {
|
|
for {
|
|
// Allocating a new buffer for every read avoids
|
|
// overwriting data in c.msgs in case the next packet is received
|
|
// before c.msgs is emptied via Read()
|
|
buf := make([]byte, maxDatagramSize)
|
|
|
|
n, raddr, err := l.pConn.ReadFrom(buf)
|
|
if err != nil {
|
|
return
|
|
}
|
|
conn, err := l.getConn(raddr)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case conn.receiveCh <- buf[:n]:
|
|
case <-conn.doneCh:
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// getConn returns the ongoing session with raddr if it exists, or creates a new
|
|
// one otherwise.
|
|
func (l *Listener) getConn(raddr net.Addr) (*Conn, error) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
conn, ok := l.conns[raddr.String()]
|
|
if ok {
|
|
return conn, nil
|
|
}
|
|
|
|
if !l.accepting {
|
|
return nil, errClosedListener
|
|
}
|
|
conn = l.newConn(raddr)
|
|
l.conns[raddr.String()] = conn
|
|
l.acceptCh <- conn
|
|
go conn.readLoop()
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
func (l *Listener) newConn(rAddr net.Addr) *Conn {
|
|
return &Conn{
|
|
listener: l,
|
|
rAddr: rAddr,
|
|
receiveCh: make(chan []byte),
|
|
readCh: make(chan []byte),
|
|
sizeCh: make(chan int),
|
|
doneCh: make(chan struct{}),
|
|
timeout: l.timeout,
|
|
}
|
|
}
|
|
|
|
// Conn represents an on-going session with a client, over UDP packets.
|
|
type Conn struct {
|
|
listener *Listener
|
|
rAddr net.Addr
|
|
|
|
receiveCh chan []byte // to receive the data from the listener's readLoop
|
|
readCh chan []byte // to receive the buffer into which we should Read
|
|
sizeCh chan int // to synchronize with the end of a Read
|
|
msgs [][]byte // to store data from listener, to be consumed by Reads
|
|
|
|
muActivity sync.RWMutex
|
|
lastActivity time.Time // the last time the session saw either read or write activity
|
|
|
|
timeout time.Duration // for timeouts
|
|
doneOnce sync.Once
|
|
doneCh chan struct{}
|
|
}
|
|
|
|
// readLoop waits for data to come from the listener's readLoop.
|
|
// It then waits for a Read operation to be ready to consume said data,
|
|
// that is to say it waits on readCh to receive the slice of bytes that the Read operation wants to read onto.
|
|
// The Read operation receives the signal that the data has been written to the slice of bytes through the sizeCh.
|
|
func (c *Conn) readLoop() {
|
|
ticker := time.NewTicker(c.timeout / 10)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
if len(c.msgs) == 0 {
|
|
select {
|
|
case msg := <-c.receiveCh:
|
|
c.msgs = append(c.msgs, msg)
|
|
case <-ticker.C:
|
|
c.muActivity.RLock()
|
|
deadline := c.lastActivity.Add(c.timeout)
|
|
c.muActivity.RUnlock()
|
|
if time.Now().After(deadline) {
|
|
c.Close()
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
|
|
select {
|
|
case cBuf := <-c.readCh:
|
|
msg := c.msgs[0]
|
|
c.msgs = c.msgs[1:]
|
|
n := copy(cBuf, msg)
|
|
c.sizeCh <- n
|
|
case msg := <-c.receiveCh:
|
|
c.msgs = append(c.msgs, msg)
|
|
case <-ticker.C:
|
|
c.muActivity.RLock()
|
|
deadline := c.lastActivity.Add(c.timeout)
|
|
c.muActivity.RUnlock()
|
|
if time.Now().After(deadline) {
|
|
c.Close()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Read reads up to len(p) bytes into p from the connection.
|
|
// Each call corresponds to at most one datagram.
|
|
// If p is smaller than the datagram, the extra bytes will be discarded.
|
|
func (c *Conn) Read(p []byte) (int, error) {
|
|
select {
|
|
case c.readCh <- p:
|
|
n := <-c.sizeCh
|
|
c.muActivity.Lock()
|
|
c.lastActivity = time.Now()
|
|
c.muActivity.Unlock()
|
|
return n, nil
|
|
|
|
case <-c.doneCh:
|
|
return 0, io.EOF
|
|
}
|
|
}
|
|
|
|
// Write writes len(p) bytes from p to the underlying connection.
|
|
// Each call sends at most one datagram.
|
|
// It is an error to send a message larger than the system's max UDP datagram size.
|
|
func (c *Conn) Write(p []byte) (n int, err error) {
|
|
c.muActivity.Lock()
|
|
c.lastActivity = time.Now()
|
|
c.muActivity.Unlock()
|
|
|
|
return c.listener.pConn.WriteTo(p, c.rAddr)
|
|
}
|
|
|
|
func (c *Conn) close() {
|
|
c.doneOnce.Do(func() {
|
|
close(c.doneCh)
|
|
})
|
|
}
|
|
|
|
// Close releases resources related to the Conn.
|
|
func (c *Conn) Close() error {
|
|
c.close()
|
|
|
|
c.listener.mu.Lock()
|
|
defer c.listener.mu.Unlock()
|
|
delete(c.listener.conns, c.rAddr.String())
|
|
return nil
|
|
}
|