2020-02-11 03:26:04 +03:00
package udp
import (
2024-02-07 19:14:07 +03:00
"errors"
2020-02-11 03:26:04 +03:00
"sync"
2022-11-21 20:36:05 +03:00
"github.com/rs/zerolog/log"
2020-02-11 03:26:04 +03:00
)
type server struct {
Handler
weight int
}
2020-05-11 13:06:07 +03:00
// WRRLoadBalancer is a naive RoundRobin load balancer for UDP services.
2020-02-11 03:26:04 +03:00
type WRRLoadBalancer struct {
servers [ ] server
2021-11-08 19:58:12 +03:00
lock sync . Mutex
2020-02-11 03:26:04 +03:00
currentWeight int
index int
}
2020-05-11 13:06:07 +03:00
// NewWRRLoadBalancer creates a new WRRLoadBalancer.
2020-02-11 03:26:04 +03:00
func NewWRRLoadBalancer ( ) * WRRLoadBalancer {
return & WRRLoadBalancer {
index : - 1 ,
}
}
2020-05-11 13:06:07 +03:00
// ServeUDP forwards the connection to the right service.
2020-02-11 03:26:04 +03:00
func ( b * WRRLoadBalancer ) ServeUDP ( conn * Conn ) {
2021-11-08 19:58:12 +03:00
b . lock . Lock ( )
2020-02-11 03:26:04 +03:00
next , err := b . next ( )
2021-11-08 19:58:12 +03:00
b . lock . Unlock ( )
2020-02-11 03:26:04 +03:00
if err != nil {
2022-11-21 20:36:05 +03:00
log . Error ( ) . Err ( err ) . Msg ( "Error during load balancing" )
2020-02-11 03:26:04 +03:00
conn . Close ( )
2021-11-08 19:58:12 +03:00
return
2020-02-11 03:26:04 +03:00
}
2021-11-08 19:58:12 +03:00
2020-02-11 03:26:04 +03:00
next . ServeUDP ( conn )
}
2020-05-11 13:06:07 +03:00
// AddServer appends a handler to the existing list.
2020-02-11 03:26:04 +03:00
func ( b * WRRLoadBalancer ) AddServer ( serverHandler Handler ) {
w := 1
b . AddWeightedServer ( serverHandler , & w )
}
2020-05-11 13:06:07 +03:00
// AddWeightedServer appends a handler to the existing list with a weight.
2020-02-11 03:26:04 +03:00
func ( b * WRRLoadBalancer ) AddWeightedServer ( serverHandler Handler , weight * int ) {
2021-11-08 19:58:12 +03:00
b . lock . Lock ( )
defer b . lock . Unlock ( )
2020-02-11 03:26:04 +03:00
w := 1
if weight != nil {
w = * weight
}
b . servers = append ( b . servers , server { Handler : serverHandler , weight : w } )
}
func ( b * WRRLoadBalancer ) maxWeight ( ) int {
2024-08-28 16:00:06 +03:00
maximum := - 1
2020-02-11 03:26:04 +03:00
for _ , s := range b . servers {
2024-08-28 16:00:06 +03:00
if s . weight > maximum {
maximum = s . weight
2020-02-11 03:26:04 +03:00
}
}
2024-08-28 16:00:06 +03:00
return maximum
2020-02-11 03:26:04 +03:00
}
func ( b * WRRLoadBalancer ) weightGcd ( ) int {
divisor := - 1
for _ , s := range b . servers {
if divisor == - 1 {
divisor = s . weight
} else {
divisor = gcd ( divisor , s . weight )
}
}
return divisor
}
func gcd ( a , b int ) int {
for b != 0 {
a , b = b , a % b
}
return a
}
func ( b * WRRLoadBalancer ) next ( ) ( Handler , error ) {
if len ( b . servers ) == 0 {
2024-02-07 19:14:07 +03:00
return nil , errors . New ( "no servers in the pool" )
2020-02-11 03:26:04 +03:00
}
// The algorithm below may look messy,
// but is actually very simple it calculates the GCD and subtracts it on every iteration,
// what interleaves servers and allows us not to build an iterator every time we readjust weights.
// Maximum weight across all enabled servers
2024-08-28 16:00:06 +03:00
maximum := b . maxWeight ( )
if maximum == 0 {
2024-02-07 19:14:07 +03:00
return nil , errors . New ( "all servers have 0 weight" )
2021-11-08 19:58:12 +03:00
}
// GCD across all enabled servers
gcd := b . weightGcd ( )
2020-02-11 03:26:04 +03:00
for {
b . index = ( b . index + 1 ) % len ( b . servers )
if b . index == 0 {
b . currentWeight -= gcd
if b . currentWeight <= 0 {
2024-08-28 16:00:06 +03:00
b . currentWeight = maximum
2020-02-11 03:26:04 +03:00
}
}
srv := b . servers [ b . index ]
if srv . weight >= b . currentWeight {
return srv , nil
}
}
}