2016-02-08 21:57:32 +01:00
package provider
import (
log "github.com/Sirupsen/logrus"
"github.com/cenkalti/backoff"
"github.com/containous/traefik/provider/k8s"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
2016-04-25 16:56:06 +02:00
"io"
2016-02-08 21:57:32 +01:00
"io/ioutil"
"os"
2016-04-25 16:56:06 +02:00
"strings"
2016-02-08 21:57:32 +01:00
"text/template"
"time"
)
const (
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)
// Kubernetes holds configurations of the Kubernetes provider.
type Kubernetes struct {
2016-05-10 07:43:24 -04:00
BaseProvider ` mapstructure:",squash" `
Endpoint string
disablePassHostHeaders bool
Namespaces [ ] string
2016-02-08 21:57:32 +01:00
}
2016-04-20 13:26:51 +02:00
func ( provider * Kubernetes ) createClient ( ) ( k8s . Client , error ) {
2016-02-08 21:57:32 +01:00
var token string
tokenBytes , err := ioutil . ReadFile ( serviceAccountToken )
if err == nil {
token = string ( tokenBytes )
log . Debugf ( "Kubernetes token: %s" , token )
} else {
2016-04-25 16:56:06 +02:00
log . Errorf ( "Kubernetes load token error: %s" , err )
2016-02-08 21:57:32 +01:00
}
caCert , err := ioutil . ReadFile ( serviceAccountCACert )
if err == nil {
log . Debugf ( "Kubernetes CA cert: %s" , serviceAccountCACert )
} else {
2016-04-25 16:56:06 +02:00
log . Errorf ( "Kubernetes load token error: %s" , err )
2016-02-08 21:57:32 +01:00
}
kubernetesHost := os . Getenv ( "KUBERNETES_SERVICE_HOST" )
kubernetesPort := os . Getenv ( "KUBERNETES_SERVICE_PORT_HTTPS" )
if len ( kubernetesPort ) > 0 && len ( kubernetesHost ) > 0 {
provider . Endpoint = "https://" + kubernetesHost + ":" + kubernetesPort
}
log . Debugf ( "Kubernetes endpoint: %s" , provider . Endpoint )
2016-04-19 19:23:08 +02:00
return k8s . NewClient ( provider . Endpoint , caCert , token )
}
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func ( provider * Kubernetes ) Provide ( configurationChan chan <- types . ConfigMessage , pool * safe . Pool ) error {
k8sClient , err := provider . createClient ( )
2016-02-08 21:57:32 +01:00
if err != nil {
return err
}
2016-04-25 16:56:06 +02:00
backOff := backoff . NewExponentialBackOff ( )
2016-02-08 21:57:32 +01:00
pool . Go ( func ( stop chan bool ) {
stopWatch := make ( chan bool )
2016-04-25 16:56:06 +02:00
defer close ( stopWatch )
2016-02-08 21:57:32 +01:00
operation := func ( ) error {
select {
case <- stop :
return nil
default :
}
for {
2016-04-25 16:56:06 +02:00
eventsChan , errEventsChan , err := k8sClient . WatchAll ( stopWatch )
if err != nil {
log . Errorf ( "Error watching kubernetes events: %v" , err )
2016-02-08 21:57:32 +01:00
return err
2016-04-25 16:56:06 +02:00
}
Watch :
for {
select {
case <- stop :
stopWatch <- true
return nil
case err := <- errEventsChan :
if strings . Contains ( err . Error ( ) , io . EOF . Error ( ) ) {
// edge case, kubernetes long-polling disconnection
break Watch
}
2016-04-19 19:23:08 +02:00
return err
2016-04-25 16:56:06 +02:00
case event := <- eventsChan :
log . Debugf ( "Received event from kubenetes %+v" , event )
templateObjects , err := provider . loadIngresses ( k8sClient )
if err != nil {
return err
}
configurationChan <- types . ConfigMessage {
ProviderName : "kubernetes" ,
Configuration : provider . loadConfig ( * templateObjects ) ,
}
2016-02-08 21:57:32 +01:00
}
}
}
}
notify := func ( err error , time time . Duration ) {
log . Errorf ( "Kubernetes connection error %+v, retrying in %s" , err , time )
}
2016-04-25 16:56:06 +02:00
err := backoff . RetryNotify ( operation , backOff , notify )
2016-02-08 21:57:32 +01:00
if err != nil {
log . Fatalf ( "Cannot connect to Kubernetes server %+v" , err )
}
} )
2016-04-25 16:56:06 +02:00
templateObjects , err := provider . loadIngresses ( k8sClient )
if err != nil {
return err
}
configurationChan <- types . ConfigMessage {
ProviderName : "kubernetes" ,
Configuration : provider . loadConfig ( * templateObjects ) ,
}
2016-02-08 21:57:32 +01:00
return nil
}
2016-04-20 13:26:51 +02:00
func ( provider * Kubernetes ) loadIngresses ( k8sClient k8s . Client ) ( * types . Configuration , error ) {
2016-04-19 19:23:08 +02:00
ingresses , err := k8sClient . GetIngresses ( func ( ingress k8s . Ingress ) bool {
2016-04-28 01:23:55 +01:00
if len ( provider . Namespaces ) == 0 {
return true
}
for _ , n := range provider . Namespaces {
if ingress . ObjectMeta . Namespace == n {
return true
}
}
return false
2016-04-19 19:23:08 +02:00
} )
if err != nil {
log . Errorf ( "Error retrieving ingresses: %+v" , err )
return nil , err
}
templateObjects := types . Configuration {
map [ string ] * types . Backend { } ,
map [ string ] * types . Frontend { } ,
}
2016-05-10 07:43:24 -04:00
PassHostHeader := provider . getPassHostHeader ( )
2016-04-19 19:23:08 +02:00
for _ , i := range ingresses {
for _ , r := range i . Spec . Rules {
for _ , pa := range r . HTTP . Paths {
if _ , exists := templateObjects . Backends [ r . Host + pa . Path ] ; ! exists {
templateObjects . Backends [ r . Host + pa . Path ] = & types . Backend {
Servers : make ( map [ string ] types . Server ) ,
}
}
if _ , exists := templateObjects . Frontends [ r . Host + pa . Path ] ; ! exists {
templateObjects . Frontends [ r . Host + pa . Path ] = & types . Frontend {
2016-05-10 07:43:24 -04:00
Backend : r . Host + pa . Path ,
PassHostHeader : PassHostHeader ,
Routes : make ( map [ string ] types . Route ) ,
2016-04-19 19:23:08 +02:00
}
}
if _ , exists := templateObjects . Frontends [ r . Host + pa . Path ] . Routes [ r . Host ] ; ! exists {
templateObjects . Frontends [ r . Host + pa . Path ] . Routes [ r . Host ] = types . Route {
Rule : "Host:" + r . Host ,
}
}
if len ( pa . Path ) > 0 {
templateObjects . Frontends [ r . Host + pa . Path ] . Routes [ pa . Path ] = types . Route {
2016-05-05 19:57:35 +02:00
Rule : "PathPrefix:" + pa . Path ,
2016-04-19 19:23:08 +02:00
}
}
2016-04-25 16:56:06 +02:00
services , err := k8sClient . GetServices ( func ( service k8s . Service ) bool {
2016-04-19 19:23:08 +02:00
return service . Name == pa . Backend . ServiceName
} )
if err != nil {
log . Errorf ( "Error retrieving services: %v" , err )
continue
}
2016-04-25 16:56:06 +02:00
if len ( services ) == 0 {
// no backends found, delete frontend...
delete ( templateObjects . Frontends , r . Host + pa . Path )
log . Errorf ( "Error retrieving services %s" , pa . Backend . ServiceName )
}
2016-04-19 19:23:08 +02:00
for _ , service := range services {
2016-04-26 22:26:25 +02:00
protocol := "http"
2016-04-19 19:23:08 +02:00
for _ , port := range service . Spec . Ports {
if port . Port == pa . Backend . ServicePort . IntValue ( ) {
2016-04-26 22:26:25 +02:00
if port . Port == 443 {
protocol = "https"
}
2016-04-20 13:26:51 +02:00
templateObjects . Backends [ r . Host + pa . Path ] . Servers [ string ( service . UID ) ] = types . Server {
URL : protocol + "://" + service . Spec . ClusterIP + ":" + pa . Backend . ServicePort . String ( ) ,
Weight : 1 ,
}
2016-04-19 19:23:08 +02:00
break
}
}
}
}
}
}
return & templateObjects , nil
}
2016-05-10 07:43:24 -04:00
func ( provider * Kubernetes ) getPassHostHeader ( ) bool {
if provider . disablePassHostHeaders {
return false
}
return true
}
2016-02-08 21:57:32 +01:00
func ( provider * Kubernetes ) loadConfig ( templateObjects types . Configuration ) * types . Configuration {
var FuncMap = template . FuncMap { }
configuration , err := provider . getConfiguration ( "templates/kubernetes.tmpl" , FuncMap , templateObjects )
if err != nil {
log . Error ( err )
}
return configuration
}