2016-02-08 21:57:32 +01:00
package provider
import (
2016-05-18 15:45:48 +02:00
"fmt"
2016-08-18 14:20:11 +02:00
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider/k8s"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
2016-02-08 21:57:32 +01:00
"io/ioutil"
"os"
2016-06-22 18:31:14 +02:00
"reflect"
2016-05-18 17:30:42 +01:00
"strconv"
2016-04-25 16:56:06 +02:00
"strings"
2016-02-08 21:57:32 +01:00
"text/template"
"time"
2016-09-12 21:06:21 +02:00
2016-09-19 19:08:39 +02:00
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
2016-02-08 21:57:32 +01:00
)
const (
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
2016-07-12 01:25:01 -04:00
defaultKubeEndpoint = "http://127.0.0.1:8080"
2016-02-08 21:57:32 +01:00
)
2016-05-03 16:52:14 +02:00
// Namespaces holds kubernetes namespaces
type Namespaces [ ] string
2016-05-18 15:45:48 +02:00
//Set adds strings elem into the the parser
//it splits str on , and ;
func ( ns * Namespaces ) Set ( str string ) error {
fargs := func ( c rune ) bool {
return c == ',' || c == ';'
}
// get function
slice := strings . FieldsFunc ( str , fargs )
* ns = append ( * ns , slice ... )
return nil
}
//Get []string
func ( ns * Namespaces ) Get ( ) interface { } { return Namespaces ( * ns ) }
//String return slice in a string
func ( ns * Namespaces ) String ( ) string { return fmt . Sprintf ( "%v" , * ns ) }
//SetValue sets []string into the parser
func ( ns * Namespaces ) SetValue ( val interface { } ) {
* ns = Namespaces ( val . ( Namespaces ) )
}
2016-08-16 19:13:18 +02:00
var _ Provider = ( * Kubernetes ) ( nil )
2016-02-08 21:57:32 +01:00
// Kubernetes holds configurations of the Kubernetes provider.
type Kubernetes struct {
2016-06-24 09:58:42 +02:00
BaseProvider ` mapstructure:",squash" `
2016-05-03 16:52:14 +02:00
Endpoint string ` description:"Kubernetes server endpoint" `
DisablePassHostHeaders bool ` description:"Kubernetes disable PassHost Headers" `
Namespaces Namespaces ` description:"Kubernetes namespaces" `
2016-07-12 01:25:01 -04:00
LabelSelector string ` description:"Kubernetes api label selector to use" `
2016-06-22 18:31:14 +02:00
lastConfiguration safe . Safe
2016-02-08 21:57:32 +01:00
}
2016-04-20 13:26:51 +02:00
func ( provider * Kubernetes ) createClient ( ) ( k8s . Client , error ) {
2016-02-08 21:57:32 +01:00
var token string
tokenBytes , err := ioutil . ReadFile ( serviceAccountToken )
if err == nil {
token = string ( tokenBytes )
log . Debugf ( "Kubernetes token: %s" , token )
} else {
2016-04-25 16:56:06 +02:00
log . Errorf ( "Kubernetes load token error: %s" , err )
2016-02-08 21:57:32 +01:00
}
caCert , err := ioutil . ReadFile ( serviceAccountCACert )
if err == nil {
log . Debugf ( "Kubernetes CA cert: %s" , serviceAccountCACert )
} else {
2016-04-25 16:56:06 +02:00
log . Errorf ( "Kubernetes load token error: %s" , err )
2016-02-08 21:57:32 +01:00
}
kubernetesHost := os . Getenv ( "KUBERNETES_SERVICE_HOST" )
kubernetesPort := os . Getenv ( "KUBERNETES_SERVICE_PORT_HTTPS" )
2016-07-11 15:39:20 -04:00
// Prioritize user provided kubernetes endpoint since kube container runtime will almost always have it
if provider . Endpoint == "" && len ( kubernetesPort ) > 0 && len ( kubernetesHost ) > 0 {
log . Debugf ( "Using environment provided kubernetes endpoint" )
2016-02-08 21:57:32 +01:00
provider . Endpoint = "https://" + kubernetesHost + ":" + kubernetesPort
}
2016-07-11 15:39:20 -04:00
if provider . Endpoint == "" {
log . Debugf ( "Using default kubernetes api endpoint" )
provider . Endpoint = defaultKubeEndpoint
}
2016-02-08 21:57:32 +01:00
log . Debugf ( "Kubernetes endpoint: %s" , provider . Endpoint )
2016-04-19 19:23:08 +02:00
return k8s . NewClient ( provider . Endpoint , caCert , token )
}
// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
2016-11-09 19:27:04 +01:00
func ( provider * Kubernetes ) Provide ( configurationChan chan <- types . ConfigMessage , pool * safe . Pool , constraints types . Constraints ) error {
2016-04-19 19:23:08 +02:00
k8sClient , err := provider . createClient ( )
2016-02-08 21:57:32 +01:00
if err != nil {
return err
}
2016-05-30 15:05:58 +02:00
provider . Constraints = append ( provider . Constraints , constraints ... )
2016-02-08 21:57:32 +01:00
pool . Go ( func ( stop chan bool ) {
operation := func ( ) error {
for {
2016-05-19 20:09:01 +02:00
stopWatch := make ( chan bool , 5 )
defer close ( stopWatch )
2016-08-19 11:09:54 +02:00
log . Debugf ( "Using label selector: '%s'" , provider . LabelSelector )
2016-07-12 01:25:01 -04:00
eventsChan , errEventsChan , err := k8sClient . WatchAll ( provider . LabelSelector , stopWatch )
2016-04-25 16:56:06 +02:00
if err != nil {
log . Errorf ( "Error watching kubernetes events: %v" , err )
2016-05-19 20:09:01 +02:00
timer := time . NewTimer ( 1 * time . Second )
select {
case <- timer . C :
return err
case <- stop :
return nil
}
2016-04-25 16:56:06 +02:00
}
for {
select {
case <- stop :
stopWatch <- true
return nil
2016-08-19 11:09:54 +02:00
case err , _ := <- errEventsChan :
2016-05-19 20:09:01 +02:00
stopWatch <- true
2016-04-19 19:23:08 +02:00
return err
2016-04-25 16:56:06 +02:00
case event := <- eventsChan :
2016-05-19 20:09:01 +02:00
log . Debugf ( "Received event from kubernetes %+v" , event )
2016-04-25 16:56:06 +02:00
templateObjects , err := provider . loadIngresses ( k8sClient )
if err != nil {
return err
}
2016-06-22 18:31:14 +02:00
if reflect . DeepEqual ( provider . lastConfiguration . Get ( ) , templateObjects ) {
log . Debugf ( "Skipping event from kubernetes %+v" , event )
} else {
provider . lastConfiguration . Set ( templateObjects )
configurationChan <- types . ConfigMessage {
ProviderName : "kubernetes" ,
Configuration : provider . loadConfig ( * templateObjects ) ,
}
2016-04-25 16:56:06 +02:00
}
2016-02-08 21:57:32 +01:00
}
}
}
}
notify := func ( err error , time time . Duration ) {
log . Errorf ( "Kubernetes connection error %+v, retrying in %s" , err , time )
}
2016-09-19 19:08:39 +02:00
err := backoff . RetryNotify ( operation , job . NewBackOff ( backoff . NewExponentialBackOff ( ) ) , notify )
2016-02-08 21:57:32 +01:00
if err != nil {
2016-08-19 10:36:54 +02:00
log . Errorf ( "Cannot connect to Kubernetes server %+v" , err )
2016-02-08 21:57:32 +01:00
}
} )
2016-04-25 16:56:06 +02:00
templateObjects , err := provider . loadIngresses ( k8sClient )
if err != nil {
return err
}
2016-06-22 18:31:14 +02:00
if reflect . DeepEqual ( provider . lastConfiguration . Get ( ) , templateObjects ) {
log . Debugf ( "Skipping configuration from kubernetes %+v" , templateObjects )
} else {
provider . lastConfiguration . Set ( templateObjects )
configurationChan <- types . ConfigMessage {
ProviderName : "kubernetes" ,
Configuration : provider . loadConfig ( * templateObjects ) ,
}
2016-04-25 16:56:06 +02:00
}
2016-02-08 21:57:32 +01:00
return nil
}
2016-04-20 13:26:51 +02:00
func ( provider * Kubernetes ) loadIngresses ( k8sClient k8s . Client ) ( * types . Configuration , error ) {
2016-07-12 01:25:01 -04:00
ingresses , err := k8sClient . GetIngresses ( provider . LabelSelector , func ( ingress k8s . Ingress ) bool {
2016-04-28 01:23:55 +01:00
if len ( provider . Namespaces ) == 0 {
return true
}
for _ , n := range provider . Namespaces {
if ingress . ObjectMeta . Namespace == n {
return true
}
}
return false
2016-04-19 19:23:08 +02:00
} )
if err != nil {
log . Errorf ( "Error retrieving ingresses: %+v" , err )
return nil , err
}
templateObjects := types . Configuration {
map [ string ] * types . Backend { } ,
map [ string ] * types . Frontend { } ,
}
2016-05-10 07:43:24 -04:00
PassHostHeader := provider . getPassHostHeader ( )
2016-04-19 19:23:08 +02:00
for _ , i := range ingresses {
for _ , r := range i . Spec . Rules {
for _ , pa := range r . HTTP . Paths {
if _ , exists := templateObjects . Backends [ r . Host + pa . Path ] ; ! exists {
templateObjects . Backends [ r . Host + pa . Path ] = & types . Backend {
Servers : make ( map [ string ] types . Server ) ,
}
}
if _ , exists := templateObjects . Frontends [ r . Host + pa . Path ] ; ! exists {
templateObjects . Frontends [ r . Host + pa . Path ] = & types . Frontend {
2016-05-10 07:43:24 -04:00
Backend : r . Host + pa . Path ,
PassHostHeader : PassHostHeader ,
Routes : make ( map [ string ] types . Route ) ,
2016-08-02 16:48:53 -07:00
Priority : len ( pa . Path ) ,
2016-04-19 19:23:08 +02:00
}
}
2016-05-25 13:16:19 +01:00
if len ( r . Host ) > 0 {
if _ , exists := templateObjects . Frontends [ r . Host + pa . Path ] . Routes [ r . Host ] ; ! exists {
templateObjects . Frontends [ r . Host + pa . Path ] . Routes [ r . Host ] = types . Route {
Rule : "Host:" + r . Host ,
}
2016-04-19 19:23:08 +02:00
}
}
if len ( pa . Path ) > 0 {
2016-05-17 16:22:37 +03:00
ruleType := i . Annotations [ "traefik.frontend.rule.type" ]
2016-05-17 13:50:06 +03:00
switch strings . ToLower ( ruleType ) {
case "pathprefixstrip" :
ruleType = "PathPrefixStrip"
case "pathstrip" :
ruleType = "PathStrip"
case "path" :
ruleType = "Path"
case "pathprefix" :
ruleType = "PathPrefix"
2016-09-12 21:06:21 +02:00
case "" :
ruleType = "PathPrefix"
2016-05-17 13:50:06 +03:00
default :
2016-09-12 21:06:21 +02:00
log . Warnf ( "Unknown RuleType %s for %s/%s, falling back to PathPrefix" , ruleType , i . ObjectMeta . Namespace , i . ObjectMeta . Name )
2016-05-17 13:50:06 +03:00
ruleType = "PathPrefix"
}
templateObjects . Frontends [ r . Host + pa . Path ] . Routes [ pa . Path ] = types . Route {
Rule : ruleType + ":" + pa . Path ,
2016-04-19 19:23:08 +02:00
}
}
2016-05-26 00:53:51 +01:00
service , err := k8sClient . GetService ( pa . Backend . ServiceName , i . ObjectMeta . Namespace )
2016-04-19 19:23:08 +02:00
if err != nil {
2016-05-19 20:09:01 +02:00
log . Warnf ( "Error retrieving services: %v" , err )
2016-04-25 16:56:06 +02:00
delete ( templateObjects . Frontends , r . Host + pa . Path )
2016-05-19 20:09:01 +02:00
log . Warnf ( "Error retrieving services %s" , pa . Backend . ServiceName )
2016-05-26 00:53:51 +01:00
continue
2016-04-25 16:56:06 +02:00
}
2016-05-26 00:53:51 +01:00
protocol := "http"
for _ , port := range service . Spec . Ports {
if equalPorts ( port , pa . Backend . ServicePort ) {
if port . Port == 443 {
protocol = "https"
}
endpoints , err := k8sClient . GetEndpoints ( service . ObjectMeta . Name , service . ObjectMeta . Namespace )
if err != nil {
log . Errorf ( "Error retrieving endpoints: %v" , err )
continue
}
if len ( endpoints . Subsets ) == 0 {
log . Warnf ( "Endpoints not found for %s/%s, falling back to Service ClusterIP" , service . ObjectMeta . Namespace , service . ObjectMeta . Name )
templateObjects . Backends [ r . Host + pa . Path ] . Servers [ string ( service . UID ) ] = types . Server {
URL : protocol + "://" + service . Spec . ClusterIP + ":" + strconv . Itoa ( port . Port ) ,
Weight : 1 ,
2016-05-20 17:34:57 +01:00
}
2016-05-26 00:53:51 +01:00
} else {
for _ , subset := range endpoints . Subsets {
for _ , address := range subset . Addresses {
url := protocol + "://" + address . IP + ":" + strconv . Itoa ( endpointPortNumber ( port , subset . Ports ) )
2016-08-04 18:55:41 +01:00
name := url
if address . TargetRef != nil && address . TargetRef . Name != "" {
name = address . TargetRef . Name
}
templateObjects . Backends [ r . Host + pa . Path ] . Servers [ name ] = types . Server {
2016-05-26 00:53:51 +01:00
URL : url ,
Weight : 1 ,
2016-05-20 17:34:57 +01:00
}
}
2016-04-20 13:26:51 +02:00
}
2016-04-19 19:23:08 +02:00
}
2016-05-26 00:53:51 +01:00
break
2016-04-19 19:23:08 +02:00
}
}
}
}
}
return & templateObjects , nil
}
2016-05-20 17:34:57 +01:00
func endpointPortNumber ( servicePort k8s . ServicePort , endpointPorts [ ] k8s . EndpointPort ) int {
if len ( endpointPorts ) > 0 {
//name is optional if there is only one port
port := endpointPorts [ 0 ]
for _ , endpointPort := range endpointPorts {
if servicePort . Name == endpointPort . Name {
port = endpointPort
}
}
return int ( port . Port )
}
return servicePort . Port
}
2016-05-18 17:30:42 +01:00
func equalPorts ( servicePort k8s . ServicePort , ingressPort k8s . IntOrString ) bool {
if servicePort . Port == ingressPort . IntValue ( ) {
return true
}
if servicePort . Name != "" && servicePort . Name == ingressPort . String ( ) {
return true
}
return false
}
2016-05-10 07:43:24 -04:00
func ( provider * Kubernetes ) getPassHostHeader ( ) bool {
2016-05-03 16:52:14 +02:00
if provider . DisablePassHostHeaders {
2016-05-10 07:43:24 -04:00
return false
}
return true
}
2016-02-08 21:57:32 +01:00
func ( provider * Kubernetes ) loadConfig ( templateObjects types . Configuration ) * types . Configuration {
var FuncMap = template . FuncMap { }
configuration , err := provider . getConfiguration ( "templates/kubernetes.tmpl" , FuncMap , templateObjects )
if err != nil {
log . Error ( err )
}
return configuration
}