2020-02-11 03:26:04 +03:00
package udp
import (
"errors"
"io"
"net"
"sync"
"time"
)
2021-11-09 17:12:07 +03:00
// maxDatagramSize is the maximum size of a UDP datagram.
const maxDatagramSize = 65535
2020-02-11 03:26:04 +03:00
const closeRetryInterval = 500 * time . Millisecond
var errClosedListener = errors . New ( "udp: listener closed" )
// Listener augments a session-oriented Listener over a UDP PacketConn.
type Listener struct {
pConn * net . UDPConn
mu sync . RWMutex
conns map [ string ] * Conn
// accepting signifies whether the listener is still accepting new sessions.
// It also serves as a sentinel for Shutdown to be idempotent.
accepting bool
acceptCh chan * Conn // no need for a Once, already indirectly guarded by accepting.
2021-01-07 19:16:03 +03:00
// timeout defines how long to wait on an idle session,
// before releasing its related resources.
timeout time . Duration
2020-02-11 03:26:04 +03:00
}
// Listen creates a new listener.
2021-01-07 19:16:03 +03:00
func Listen ( network string , laddr * net . UDPAddr , timeout time . Duration ) ( * Listener , error ) {
if timeout <= 0 {
return nil , errors . New ( "timeout should be greater than zero" )
}
2020-02-11 03:26:04 +03:00
conn , err := net . ListenUDP ( network , laddr )
if err != nil {
return nil , err
}
l := & Listener {
pConn : conn ,
acceptCh : make ( chan * Conn ) ,
conns : make ( map [ string ] * Conn ) ,
accepting : true ,
2021-01-07 19:16:03 +03:00
timeout : timeout ,
2020-02-11 03:26:04 +03:00
}
go l . readLoop ( )
return l , nil
}
// Accept waits for and returns the next connection to the listener.
func ( l * Listener ) Accept ( ) ( * Conn , error ) {
c := <- l . acceptCh
if c == nil {
// l.acceptCh got closed
return nil , errClosedListener
}
return c , nil
}
// Addr returns the listener's network address.
func ( l * Listener ) Addr ( ) net . Addr {
return l . pConn . LocalAddr ( )
}
// Close closes the listener.
// It is like Shutdown with a zero graceTimeout.
func ( l * Listener ) Close ( ) error {
return l . Shutdown ( 0 )
}
// close should not be called more than once.
func ( l * Listener ) close ( ) error {
l . mu . Lock ( )
defer l . mu . Unlock ( )
err := l . pConn . Close ( )
for k , v := range l . conns {
v . close ( )
delete ( l . conns , k )
}
close ( l . acceptCh )
return err
}
// Shutdown closes the listener.
// It immediately stops accepting new sessions,
// and it waits for all existing sessions to terminate,
// and a maximum of graceTimeout.
// Then it forces close any session left.
func ( l * Listener ) Shutdown ( graceTimeout time . Duration ) error {
l . mu . Lock ( )
if ! l . accepting {
l . mu . Unlock ( )
return nil
}
l . accepting = false
l . mu . Unlock ( )
retryInterval := closeRetryInterval
if retryInterval > graceTimeout {
retryInterval = graceTimeout
}
start := time . Now ( )
end := start . Add ( graceTimeout )
for {
if time . Now ( ) . After ( end ) {
break
}
l . mu . RLock ( )
if len ( l . conns ) == 0 {
l . mu . RUnlock ( )
break
}
l . mu . RUnlock ( )
time . Sleep ( retryInterval )
}
return l . close ( )
}
// readLoop receives all packets from all remotes.
// If a packet comes from a remote that is already known to us (i.e. a "session"),
// we find that session, and otherwise we create a new one.
// We then send the data the session's readLoop.
func ( l * Listener ) readLoop ( ) {
for {
2020-06-08 19:12:04 +03:00
// Allocating a new buffer for every read avoids
// overwriting data in c.msgs in case the next packet is received
// before c.msgs is emptied via Read()
2021-11-09 17:12:07 +03:00
buf := make ( [ ] byte , maxDatagramSize )
2020-02-11 03:26:04 +03:00
n , raddr , err := l . pConn . ReadFrom ( buf )
if err != nil {
return
}
conn , err := l . getConn ( raddr )
if err != nil {
continue
}
2021-11-09 17:12:07 +03:00
2020-02-11 03:26:04 +03:00
select {
case conn . receiveCh <- buf [ : n ] :
case <- conn . doneCh :
continue
}
}
}
// getConn returns the ongoing session with raddr if it exists, or creates a new
// one otherwise.
func ( l * Listener ) getConn ( raddr net . Addr ) ( * Conn , error ) {
l . mu . Lock ( )
defer l . mu . Unlock ( )
conn , ok := l . conns [ raddr . String ( ) ]
if ok {
return conn , nil
}
if ! l . accepting {
return nil , errClosedListener
}
conn = l . newConn ( raddr )
l . conns [ raddr . String ( ) ] = conn
l . acceptCh <- conn
go conn . readLoop ( )
return conn , nil
}
func ( l * Listener ) newConn ( rAddr net . Addr ) * Conn {
return & Conn {
listener : l ,
rAddr : rAddr ,
receiveCh : make ( chan [ ] byte ) ,
readCh : make ( chan [ ] byte ) ,
sizeCh : make ( chan int ) ,
doneCh : make ( chan struct { } ) ,
2021-01-07 19:16:03 +03:00
timeout : l . timeout ,
2020-02-11 03:26:04 +03:00
}
}
// Conn represents an on-going session with a client, over UDP packets.
type Conn struct {
listener * Listener
rAddr net . Addr
receiveCh chan [ ] byte // to receive the data from the listener's readLoop
readCh chan [ ] byte // to receive the buffer into which we should Read
sizeCh chan int // to synchronize with the end of a Read
msgs [ ] [ ] byte // to store data from listener, to be consumed by Reads
2020-03-18 16:50:06 +03:00
muActivity sync . RWMutex
lastActivity time . Time // the last time the session saw either read or write activity
2020-06-04 12:04:04 +03:00
timeout time . Duration // for timeouts
2020-02-11 03:26:04 +03:00
doneOnce sync . Once
doneCh chan struct { }
}
// readLoop waits for data to come from the listener's readLoop.
// It then waits for a Read operation to be ready to consume said data,
// that is to say it waits on readCh to receive the slice of bytes that the Read operation wants to read onto.
// The Read operation receives the signal that the data has been written to the slice of bytes through the sizeCh.
func ( c * Conn ) readLoop ( ) {
2021-01-07 19:16:03 +03:00
ticker := time . NewTicker ( c . timeout / 10 )
2020-06-04 12:04:04 +03:00
defer ticker . Stop ( )
2020-02-11 03:26:04 +03:00
for {
if len ( c . msgs ) == 0 {
select {
case msg := <- c . receiveCh :
c . msgs = append ( c . msgs , msg )
2020-06-04 12:04:04 +03:00
case <- ticker . C :
2020-03-18 16:50:06 +03:00
c . muActivity . RLock ( )
2021-01-07 19:16:03 +03:00
deadline := c . lastActivity . Add ( c . timeout )
2020-03-18 16:50:06 +03:00
c . muActivity . RUnlock ( )
if time . Now ( ) . After ( deadline ) {
c . Close ( )
return
}
continue
2020-02-11 03:26:04 +03:00
}
}
select {
case cBuf := <- c . readCh :
msg := c . msgs [ 0 ]
c . msgs = c . msgs [ 1 : ]
n := copy ( cBuf , msg )
c . sizeCh <- n
case msg := <- c . receiveCh :
c . msgs = append ( c . msgs , msg )
2020-06-04 12:04:04 +03:00
case <- ticker . C :
2020-03-18 16:50:06 +03:00
c . muActivity . RLock ( )
2021-01-07 19:16:03 +03:00
deadline := c . lastActivity . Add ( c . timeout )
2020-03-18 16:50:06 +03:00
c . muActivity . RUnlock ( )
if time . Now ( ) . After ( deadline ) {
c . Close ( )
return
}
2020-02-11 03:26:04 +03:00
}
}
}
2021-11-09 17:12:07 +03:00
// Read reads up to len(p) bytes into p from the connection.
// Each call corresponds to at most one datagram.
// If p is smaller than the datagram, the extra bytes will be discarded.
2020-02-11 03:26:04 +03:00
func ( c * Conn ) Read ( p [ ] byte ) ( int , error ) {
select {
case c . readCh <- p :
n := <- c . sizeCh
2020-03-18 16:50:06 +03:00
c . muActivity . Lock ( )
c . lastActivity = time . Now ( )
c . muActivity . Unlock ( )
2020-02-11 03:26:04 +03:00
return n , nil
2021-11-09 17:12:07 +03:00
2020-02-11 03:26:04 +03:00
case <- c . doneCh :
return 0 , io . EOF
}
}
2021-11-09 17:12:07 +03:00
// Write writes len(p) bytes from p to the underlying connection.
// Each call sends at most one datagram.
// It is an error to send a message larger than the system's max UDP datagram size.
2020-02-11 03:26:04 +03:00
func ( c * Conn ) Write ( p [ ] byte ) ( n int , err error ) {
2020-03-18 16:50:06 +03:00
c . muActivity . Lock ( )
c . lastActivity = time . Now ( )
c . muActivity . Unlock ( )
2021-11-09 17:12:07 +03:00
return c . listener . pConn . WriteTo ( p , c . rAddr )
2020-02-11 03:26:04 +03:00
}
func ( c * Conn ) close ( ) {
c . doneOnce . Do ( func ( ) {
close ( c . doneCh )
} )
}
// Close releases resources related to the Conn.
func ( c * Conn ) Close ( ) error {
c . close ( )
c . listener . mu . Lock ( )
defer c . listener . mu . Unlock ( )
delete ( c . listener . conns , c . rAddr . String ( ) )
return nil
}