diff --git a/provider/kubernetes/client.go b/provider/kubernetes/client.go index e3cb8254f..885f14b81 100644 --- a/provider/kubernetes/client.go +++ b/provider/kubernetes/client.go @@ -39,7 +39,7 @@ func (reh *resourceEventHandler) OnDelete(obj interface{}) { // WatchAll starts the watch of the Provider resources and updates the stores. // The stores can then be accessed via the Get* functions. type Client interface { - WatchAll(namespaces Namespaces, labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error) + WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error) GetIngresses() []*extensionsv1beta1.Ingress GetService(namespace, name string) (*corev1.Service, bool, error) GetSecret(namespace, name string) (*corev1.Secret, bool, error) @@ -47,21 +47,22 @@ type Client interface { } type clientImpl struct { - clientset *kubernetes.Clientset - factories map[string]informers.SharedInformerFactory - isNamespaceAll bool + clientset *kubernetes.Clientset + factories map[string]informers.SharedInformerFactory + ingressLabelSelector labels.Selector + isNamespaceAll bool } -func newClientImpl(clientset *kubernetes.Clientset) Client { +func newClientImpl(clientset *kubernetes.Clientset) *clientImpl { return &clientImpl{ clientset: clientset, factories: make(map[string]informers.SharedInformerFactory), } } -// NewInClusterClient returns a new Provider client that is expected to run +// newInClusterClient returns a new Provider client that is expected to run // inside the cluster. -func NewInClusterClient(endpoint string) (Client, error) { +func newInClusterClient(endpoint string) (*clientImpl, error) { config, err := rest.InClusterConfig() if err != nil { return nil, fmt.Errorf("failed to create in-cluster configuration: %s", err) @@ -74,10 +75,10 @@ func NewInClusterClient(endpoint string) (Client, error) { return createClientFromConfig(config) } -// NewExternalClusterClient returns a new Provider client that may run outside +// newExternalClusterClient returns a new Provider client that may run outside // of the cluster. // The endpoint parameter must not be empty. -func NewExternalClusterClient(endpoint, token, caFilePath string) (Client, error) { +func newExternalClusterClient(endpoint, token, caFilePath string) (*clientImpl, error) { if endpoint == "" { return nil, errors.New("endpoint missing for external cluster client") } @@ -99,7 +100,7 @@ func NewExternalClusterClient(endpoint, token, caFilePath string) (Client, error return createClientFromConfig(config) } -func createClientFromConfig(c *rest.Config) (Client, error) { +func createClientFromConfig(c *rest.Config) (*clientImpl, error) { clientset, err := kubernetes.NewForConfig(c) if err != nil { return nil, err @@ -109,24 +110,17 @@ func createClientFromConfig(c *rest.Config) (Client, error) { } // WatchAll starts namespace-specific controllers for all relevant kinds. -func (c *clientImpl) WatchAll(namespaces Namespaces, labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error) { +func (c *clientImpl) WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error) { eventCh := make(chan interface{}, 1) - _, err := labels.Parse(labelSelector) - if err != nil { - return nil, err - } - if len(namespaces) == 0 { namespaces = Namespaces{metav1.NamespaceAll} c.isNamespaceAll = true } - eventHandler := newResourceEventHandler(eventCh) + eventHandler := c.newResourceEventHandler(eventCh) for _, ns := range namespaces { - factory := informers.NewFilteredSharedInformerFactory(c.clientset, resyncPeriod, ns, func(opts *metav1.ListOptions) { - opts.LabelSelector = labelSelector - }) + factory := informers.NewFilteredSharedInformerFactory(c.clientset, resyncPeriod, ns, nil) factory.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler) factory.Core().V1().Services().Informer().AddEventHandler(eventHandler) factory.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler) @@ -161,7 +155,7 @@ func (c *clientImpl) WatchAll(namespaces Namespaces, labelSelector string, stopC func (c *clientImpl) GetIngresses() []*extensionsv1beta1.Ingress { var result []*extensionsv1beta1.Ingress for ns, factory := range c.factories { - ings, err := factory.Extensions().V1beta1().Ingresses().Lister().List(labels.Everything()) + ings, err := factory.Extensions().V1beta1().Ingresses().Lister().List(c.ingressLabelSelector) if err != nil { log.Errorf("Failed to list ingresses in namespace %s: %s", ns, err) } @@ -215,8 +209,18 @@ func (c *clientImpl) lookupNamespace(ns string) string { return ns } -func newResourceEventHandler(events chan<- interface{}) cache.ResourceEventHandler { - return &resourceEventHandler{events} +func (c *clientImpl) newResourceEventHandler(events chan<- interface{}) cache.ResourceEventHandler { + return &cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + // Ignore Ingresses that do not match our custom label selector. + if ing, ok := obj.(*extensionsv1beta1.Ingress); ok { + lbls := labels.Set(ing.GetLabels()) + return c.ingressLabelSelector.Matches(lbls) + } + return true + }, + Handler: &resourceEventHandler{events}, + } } // eventHandlerFunc will pass the obj on to the events channel or drop it. diff --git a/provider/kubernetes/client_mock_test.go b/provider/kubernetes/client_mock_test.go index 22b58b8f1..c968e0d56 100644 --- a/provider/kubernetes/client_mock_test.go +++ b/provider/kubernetes/client_mock_test.go @@ -61,6 +61,6 @@ func (c clientMock) GetSecret(namespace, name string) (*corev1.Secret, bool, err return nil, false, nil } -func (c clientMock) WatchAll(namespaces Namespaces, labelString string, stopCh <-chan struct{}) (<-chan interface{}, error) { +func (c clientMock) WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error) { return c.watchChan, nil } diff --git a/provider/kubernetes/kubernetes.go b/provider/kubernetes/kubernetes.go index 6037af90d..41c944a8a 100644 --- a/provider/kubernetes/kubernetes.go +++ b/provider/kubernetes/kubernetes.go @@ -24,6 +24,7 @@ import ( "gopkg.in/yaml.v2" corev1 "k8s.io/api/core/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" ) @@ -45,24 +46,37 @@ type Provider struct { DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers" export:"true"` EnablePassTLSCert bool `description:"Kubernetes enable Pass TLS Client Certs" export:"true"` Namespaces Namespaces `description:"Kubernetes namespaces" export:"true"` - LabelSelector string `description:"Kubernetes api label selector to use" export:"true"` + LabelSelector string `description:"Kubernetes Ingress label selector to use" export:"true"` IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for" export:"true"` lastConfiguration safe.Safe } -func (p *Provider) newK8sClient() (Client, error) { +func (p *Provider) newK8sClient(ingressLabelSelector string) (Client, error) { + ingLabelSel, err := labels.Parse(ingressLabelSelector) + if err != nil { + return nil, fmt.Errorf("invalid ingress label selector: %q", ingressLabelSelector) + } + log.Infof("ingress label selector is: %q", ingLabelSel) + withEndpoint := "" if p.Endpoint != "" { withEndpoint = fmt.Sprintf(" with endpoint %v", p.Endpoint) } + var cl *clientImpl if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" { log.Infof("Creating in-cluster Provider client%s", withEndpoint) - return NewInClusterClient(p.Endpoint) + cl, err = newInClusterClient(p.Endpoint) + } else { + log.Infof("Creating cluster-external Provider client%s", withEndpoint) + cl, err = newExternalClusterClient(p.Endpoint, p.Token, p.CertAuthFilePath) } - log.Infof("Creating cluster-external Provider client%s", withEndpoint) - return NewExternalClusterClient(p.Endpoint, p.Token, p.CertAuthFilePath) + if err == nil { + cl.ingressLabelSelector = ingLabelSel + } + + return cl, err } // Provide allows the k8s provider to provide configurations to traefik @@ -83,7 +97,8 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s return fmt.Errorf("value for IngressClass has to be empty or start with the prefix %q, instead found %q", traefikDefaultIngressClass, p.IngressClass) } - k8sClient, err := p.newK8sClient() + log.Debugf("Using Ingress label selector: %q", p.LabelSelector) + k8sClient, err := p.newK8sClient(p.LabelSelector) if err != nil { return err } @@ -94,8 +109,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s for { stopWatch := make(chan struct{}, 1) defer close(stopWatch) - log.Debugf("Using label selector: '%s'", p.LabelSelector) - eventsChan, err := k8sClient.WatchAll(p.Namespaces, p.LabelSelector, stopWatch) + eventsChan, err := k8sClient.WatchAll(p.Namespaces, stopWatch) if err != nil { log.Errorf("Error watching kubernetes events: %v", err) timer := time.NewTimer(1 * time.Second)