From e222d5cb2fbb8d8c7acab6e48af0b0fd0be6074d Mon Sep 17 00:00:00 2001 From: Romain Date: Mon, 9 Sep 2024 10:08:08 +0200 Subject: [PATCH] Add support for backend protocol selection in HTTP and GRPC routes --- integration/k8s_conformance_test.go | 2 + .../httproute/with_app_protocol_service.yml | 61 +++++++++ .../kubernetes/gateway/fixtures/services.yml | 107 ++++++++++++++++ pkg/provider/kubernetes/gateway/grpcroute.go | 103 ++++++--------- pkg/provider/kubernetes/gateway/httproute.go | 120 +++++++----------- pkg/provider/kubernetes/gateway/kubernetes.go | 80 +++++++++++- .../kubernetes/gateway/kubernetes_test.go | 98 ++++++++++++++ pkg/provider/kubernetes/gateway/tcproute.go | 93 ++++---------- pkg/provider/kubernetes/gateway/tlsroute.go | 50 ++++++-- 9 files changed, 495 insertions(+), 219 deletions(-) create mode 100644 pkg/provider/kubernetes/gateway/fixtures/httproute/with_app_protocol_service.yml diff --git a/integration/k8s_conformance_test.go b/integration/k8s_conformance_test.go index 9316715af..56c1e07f8 100644 --- a/integration/k8s_conformance_test.go +++ b/integration/k8s_conformance_test.go @@ -213,6 +213,8 @@ func (s *K8sConformanceSuite) TestK8sGatewayAPIConformance() { features.SupportHTTPRoutePathRedirect, features.SupportHTTPRouteResponseHeaderModification, features.SupportTLSRoute, + features.SupportHTTPRouteBackendProtocolH2C, + features.SupportHTTPRouteBackendProtocolWebSocket, ), }) require.NoError(s.T(), err) diff --git a/pkg/provider/kubernetes/gateway/fixtures/httproute/with_app_protocol_service.yml b/pkg/provider/kubernetes/gateway/fixtures/httproute/with_app_protocol_service.yml new file mode 100644 index 000000000..c364c8f52 --- /dev/null +++ b/pkg/provider/kubernetes/gateway/fixtures/httproute/with_app_protocol_service.yml @@ -0,0 +1,61 @@ +--- +kind: GatewayClass +apiVersion: gateway.networking.k8s.io/v1 +metadata: + name: my-gateway-class +spec: + controllerName: traefik.io/gateway-controller + +--- +kind: Gateway +apiVersion: gateway.networking.k8s.io/v1 +metadata: + name: my-gateway + namespace: default +spec: + gatewayClassName: my-gateway-class + listeners: # Use GatewayClass defaults for listener definition. + - name: http + protocol: HTTP + port: 80 + allowedRoutes: + kinds: + - kind: HTTPRoute + group: gateway.networking.k8s.io + namespaces: + from: Same + +--- +kind: HTTPRoute +apiVersion: gateway.networking.k8s.io/v1 +metadata: + name: http-multi-protocols + namespace: default +spec: + parentRefs: + - name: my-gateway + kind: Gateway + group: gateway.networking.k8s.io + hostnames: + - "foo.com" + rules: + - matches: + - path: + type: Exact + value: /bar + backendRefs: + - name: whoami-h2c + port: 80 + weight: 1 + kind: Service + group: "" + - name: whoami-ws + port: 80 + weight: 1 + kind: Service + group: "" + - name: whoami-wss + port: 80 + weight: 1 + kind: Service + group: "" diff --git a/pkg/provider/kubernetes/gateway/fixtures/services.yml b/pkg/provider/kubernetes/gateway/fixtures/services.yml index 8a4cb3b72..fe7cf9d80 100644 --- a/pkg/provider/kubernetes/gateway/fixtures/services.yml +++ b/pkg/provider/kubernetes/gateway/fixtures/services.yml @@ -7,9 +7,11 @@ metadata: spec: ports: - name: web2 + protocol: TCP port: 8000 targetPort: web2 - name: web + protocol: TCP port: 80 targetPort: web selector: @@ -48,9 +50,11 @@ metadata: spec: ports: - name: web2 + protocol: TCP port: 8000 targetPort: web2 - name: web + protocol: TCP port: 80 targetPort: web selector: @@ -89,6 +93,7 @@ metadata: spec: ports: - name: web + protocol: TCP port: 8080 targetPort: web selector: @@ -317,3 +322,105 @@ status: ingress: - hostname: foo.bar - ip: 1.2.3.4 + +--- +kind: EndpointSlice +apiVersion: discovery.k8s.io/v1 +metadata: + name: whoami-h2c + namespace: default + labels: + kubernetes.io/service-name: whoami-h2c + +addressType: IPv4 +ports: + - name: h2c + protocol: TCP + port: 80 +endpoints: + - addresses: + - 10.10.0.13 + conditions: + ready: true + +--- +apiVersion: v1 +kind: Service +metadata: + name: whoami-h2c + namespace: default + +spec: + ports: + - protocol: TCP + port: 80 + name: h2c + appProtocol: kubernetes.io/h2c + +--- +kind: EndpointSlice +apiVersion: discovery.k8s.io/v1 +metadata: + name: whoami-ws + namespace: default + labels: + kubernetes.io/service-name: whoami-ws + +addressType: IPv4 +ports: + - name: ws + protocol: TCP + port: 80 +endpoints: + - addresses: + - 10.10.0.14 + conditions: + ready: true + +--- +apiVersion: v1 +kind: Service +metadata: + name: whoami-ws + namespace: default + +spec: + ports: + - protocol: TCP + port: 80 + name: ws + appProtocol: kubernetes.io/ws + +--- +kind: EndpointSlice +apiVersion: discovery.k8s.io/v1 +metadata: + name: whoami-wss + namespace: default + labels: + kubernetes.io/service-name: whoami-wss + +addressType: IPv4 +ports: + - name: wss + protocol: TCP + port: 80 +endpoints: + - addresses: + - 10.10.0.15 + conditions: + ready: true + +--- +apiVersion: v1 +kind: Service +metadata: + name: whoami-wss + namespace: default + +spec: + ports: + - protocol: TCP + port: 80 + name: wss + appProtocol: kubernetes.io/wss diff --git a/pkg/provider/kubernetes/gateway/grpcroute.go b/pkg/provider/kubernetes/gateway/grpcroute.go index 9447943a0..fd72609a5 100644 --- a/pkg/provider/kubernetes/gateway/grpcroute.go +++ b/pkg/provider/kubernetes/gateway/grpcroute.go @@ -2,7 +2,6 @@ package gateway import ( "context" - "errors" "fmt" "net" "strconv" @@ -260,23 +259,16 @@ func (p *Provider) loadGRPCBackendRef(route *gatev1.GRPCRoute, backendRef gatev1 ObservedGeneration: route.Generation, LastTransitionTime: metav1.Now(), Reason: string(gatev1.RouteReasonUnsupportedProtocol), - Message: fmt.Sprintf("Cannot load GRPCBackendRef %s/%s/%s/%s port is required", group, kind, namespace, backendRef.Name), + Message: fmt.Sprintf("Cannot load GRPCBackendRef %s/%s/%s/%s: port is required", group, kind, namespace, backendRef.Name), } } portStr := strconv.FormatInt(int64(port), 10) serviceName = provider.Normalize(serviceName + "-" + portStr) - lb, err := p.loadGRPCServers(namespace, backendRef) - if err != nil { - return serviceName, nil, &metav1.Condition{ - Type: string(gatev1.RouteConditionResolvedRefs), - Status: metav1.ConditionFalse, - ObservedGeneration: route.Generation, - LastTransitionTime: metav1.Now(), - Reason: string(gatev1.RouteReasonBackendNotFound), - Message: fmt.Sprintf("Cannot load GRPCBackendRef %s/%s/%s/%s: %s", group, kind, namespace, backendRef.Name, err), - } + lb, errCondition := p.loadGRPCServers(namespace, route, backendRef) + if errCondition != nil { + return serviceName, nil, errCondition } return serviceName, &dynamic.Service{LoadBalancer: lb}, nil @@ -319,72 +311,49 @@ func (p *Provider) loadGRPCMiddlewares(conf *dynamic.Configuration, namespace, r return middlewareNames, nil } -func (p *Provider) loadGRPCServers(namespace string, backendRef gatev1.GRPCBackendRef) (*dynamic.ServersLoadBalancer, error) { - if backendRef.Port == nil { - return nil, errors.New("port is required for Kubernetes Service reference") - } - - service, exists, err := p.client.GetService(namespace, string(backendRef.Name)) +func (p *Provider) loadGRPCServers(namespace string, route *gatev1.GRPCRoute, backendRef gatev1.GRPCBackendRef) (*dynamic.ServersLoadBalancer, *metav1.Condition) { + backendAddresses, svcPort, err := p.getBackendAddresses(namespace, backendRef.BackendRef) if err != nil { - return nil, fmt.Errorf("getting service: %w", err) - } - if !exists { - return nil, errors.New("service not found") - } - - var svcPort *corev1.ServicePort - for _, p := range service.Spec.Ports { - if p.Port == int32(*backendRef.Port) { - svcPort = &p - break + return nil, &metav1.Condition{ + Type: string(gatev1.RouteConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: route.Generation, + LastTransitionTime: metav1.Now(), + Reason: string(gatev1.RouteReasonBackendNotFound), + Message: fmt.Sprintf("Cannot load GRPCBackendRef %s/%s: %s", namespace, backendRef.Name, err), } } - if svcPort == nil { - return nil, fmt.Errorf("service port %d not found", *backendRef.Port) + + if svcPort.Protocol != corev1.ProtocolTCP { + return nil, &metav1.Condition{ + Type: string(gatev1.RouteConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: route.Generation, + LastTransitionTime: metav1.Now(), + Reason: string(gatev1.RouteReasonUnsupportedProtocol), + Message: fmt.Sprintf("Cannot load GRPCBackendRef %s/%s: only TCP protocol is supported", namespace, backendRef.Name), + } } - endpointSlices, err := p.client.ListEndpointSlicesForService(namespace, string(backendRef.Name)) - if err != nil { - return nil, fmt.Errorf("getting endpointslices: %w", err) - } - if len(endpointSlices) == 0 { - return nil, errors.New("endpointslices not found") + if svcPort.AppProtocol != nil && *svcPort.AppProtocol != appProtocolH2C { + return nil, &metav1.Condition{ + Type: string(gatev1.RouteConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: route.Generation, + LastTransitionTime: metav1.Now(), + Reason: string(gatev1.RouteReasonUnsupportedProtocol), + Message: fmt.Sprintf("Cannot load GRPCBackendRef %s/%s: only kubernetes.io/h2c appProtocol is supported", namespace, backendRef.Name), + } } lb := &dynamic.ServersLoadBalancer{} lb.SetDefaults() - addresses := map[string]struct{}{} - for _, endpointSlice := range endpointSlices { - var port int32 - for _, p := range endpointSlice.Ports { - if svcPort.Name == *p.Name { - port = *p.Port - break - } - } - if port == 0 { - continue - } - - for _, endpoint := range endpointSlice.Endpoints { - if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { - continue - } - - for _, address := range endpoint.Addresses { - if _, ok := addresses[address]; ok { - continue - } - - addresses[address] = struct{}{} - lb.Servers = append(lb.Servers, dynamic.Server{ - URL: fmt.Sprintf("h2c://%s", net.JoinHostPort(address, strconv.Itoa(int(port)))), - }) - } - } + for _, ba := range backendAddresses { + lb.Servers = append(lb.Servers, dynamic.Server{ + URL: fmt.Sprintf("h2c://%s", net.JoinHostPort(ba.Address, strconv.Itoa(int(ba.Port)))), + }) } - return lb, nil } diff --git a/pkg/provider/kubernetes/gateway/httproute.go b/pkg/provider/kubernetes/gateway/httproute.go index 685a6fbfd..c18dc48d3 100644 --- a/pkg/provider/kubernetes/gateway/httproute.go +++ b/pkg/provider/kubernetes/gateway/httproute.go @@ -260,23 +260,16 @@ func (p *Provider) loadService(route *gatev1.HTTPRoute, backendRef gatev1.HTTPBa ObservedGeneration: route.Generation, LastTransitionTime: metav1.Now(), Reason: string(gatev1.RouteReasonUnsupportedProtocol), - Message: fmt.Sprintf("Cannot load HTTPBackendRef %s/%s/%s/%s port is required", group, kind, namespace, backendRef.Name), + Message: fmt.Sprintf("Cannot load HTTPBackendRef %s/%s/%s/%s: port is required", group, kind, namespace, backendRef.Name), } } portStr := strconv.FormatInt(int64(port), 10) serviceName = provider.Normalize(serviceName + "-" + portStr) - lb, err := p.loadHTTPServers(namespace, backendRef) - if err != nil { - return serviceName, nil, &metav1.Condition{ - Type: string(gatev1.RouteConditionResolvedRefs), - Status: metav1.ConditionFalse, - ObservedGeneration: route.Generation, - LastTransitionTime: metav1.Now(), - Reason: string(gatev1.RouteReasonBackendNotFound), - Message: fmt.Sprintf("Cannot load HTTPBackendRef %s/%s/%s/%s: %s", group, kind, namespace, backendRef.Name, err), - } + lb, errCondition := p.loadHTTPServers(namespace, route, backendRef) + if errCondition != nil { + return serviceName, nil, errCondition } return serviceName, &dynamic.Service{LoadBalancer: lb}, nil @@ -372,74 +365,39 @@ func (p *Provider) loadHTTPRouteFilterExtensionRef(namespace string, extensionRe return filterFunc(string(extensionRef.Name), namespace) } -func (p *Provider) loadHTTPServers(namespace string, backendRef gatev1.HTTPBackendRef) (*dynamic.ServersLoadBalancer, error) { - if backendRef.Port == nil { - return nil, errors.New("port is required for Kubernetes Service reference") - } - - service, exists, err := p.client.GetService(namespace, string(backendRef.Name)) +func (p *Provider) loadHTTPServers(namespace string, route *gatev1.HTTPRoute, backendRef gatev1.HTTPBackendRef) (*dynamic.ServersLoadBalancer, *metav1.Condition) { + backendAddresses, svcPort, err := p.getBackendAddresses(namespace, backendRef.BackendRef) if err != nil { - return nil, fmt.Errorf("getting service: %w", err) - } - if !exists { - return nil, errors.New("service not found") - } - - var svcPort *corev1.ServicePort - for _, p := range service.Spec.Ports { - if p.Port == int32(*backendRef.Port) { - svcPort = &p - break + return nil, &metav1.Condition{ + Type: string(gatev1.RouteConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: route.Generation, + LastTransitionTime: metav1.Now(), + Reason: string(gatev1.RouteReasonBackendNotFound), + Message: fmt.Sprintf("Cannot load HTTPBackendRef %s/%s: %s", namespace, backendRef.Name, err), } } - if svcPort == nil { - return nil, fmt.Errorf("service port %d not found", *backendRef.Port) - } - endpointSlices, err := p.client.ListEndpointSlicesForService(namespace, string(backendRef.Name)) + protocol, err := getProtocol(svcPort) if err != nil { - return nil, fmt.Errorf("getting endpointslices: %w", err) - } - if len(endpointSlices) == 0 { - return nil, errors.New("endpointslices not found") + return nil, &metav1.Condition{ + Type: string(gatev1.RouteConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: route.Generation, + LastTransitionTime: metav1.Now(), + Reason: string(gatev1.RouteReasonUnsupportedProtocol), + Message: fmt.Sprintf("Cannot load HTTPBackendRef %s/%s: %s", namespace, backendRef.Name, err), + } } lb := &dynamic.ServersLoadBalancer{} lb.SetDefaults() - protocol := getProtocol(*svcPort) - - addresses := map[string]struct{}{} - for _, endpointSlice := range endpointSlices { - var port int32 - for _, p := range endpointSlice.Ports { - if svcPort.Name == *p.Name { - port = *p.Port - break - } - } - if port == 0 { - continue - } - - for _, endpoint := range endpointSlice.Endpoints { - if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { - continue - } - - for _, address := range endpoint.Addresses { - if _, ok := addresses[address]; ok { - continue - } - - addresses[address] = struct{}{} - lb.Servers = append(lb.Servers, dynamic.Server{ - URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(address, strconv.Itoa(int(port)))), - }) - } - } + for _, ba := range backendAddresses { + lb.Servers = append(lb.Servers, dynamic.Server{ + URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(ba.Address, strconv.Itoa(int(ba.Port)))), + }) } - return lb, nil } @@ -702,13 +660,29 @@ func createURLRewrite(filter *gatev1.HTTPURLRewriteFilter, pathMatch gatev1.HTTP }, nil } -func getProtocol(portSpec corev1.ServicePort) string { - protocol := "http" - if portSpec.Port == 443 || strings.HasPrefix(portSpec.Name, "https") { - protocol = "https" +func getProtocol(portSpec corev1.ServicePort) (string, error) { + if portSpec.Protocol != corev1.ProtocolTCP { + return "", errors.New("only TCP protocol is supported") } - return protocol + if portSpec.AppProtocol == nil { + protocol := "http" + if portSpec.Port == 443 || strings.HasPrefix(portSpec.Name, "https") { + protocol = "https" + } + return protocol, nil + } + + switch ap := *portSpec.AppProtocol; ap { + case appProtocolH2C: + return "h2c", nil + case appProtocolWS: + return "http", nil + case appProtocolWSS: + return "https", nil + default: + return "", fmt.Errorf("unsupported application protocol %s", ap) + } } func mergeHTTPConfiguration(from, to *dynamic.Configuration) { diff --git a/pkg/provider/kubernetes/gateway/kubernetes.go b/pkg/provider/kubernetes/gateway/kubernetes.go index b8c878627..f3e50e274 100644 --- a/pkg/provider/kubernetes/gateway/kubernetes.go +++ b/pkg/provider/kubernetes/gateway/kubernetes.go @@ -35,7 +35,8 @@ import ( ) const ( - providerName = "kubernetesgateway" + providerName = "kubernetesgateway" + controllerName = "traefik.io/gateway-controller" groupCore = "core" @@ -48,6 +49,10 @@ const ( kindTCPRoute = "TCPRoute" kindTLSRoute = "TLSRoute" kindService = "Service" + + appProtocolH2C = "kubernetes.io/h2c" + appProtocolWS = "kubernetes.io/ws" + appProtocolWSS = "kubernetes.io/wss" ) // Provider holds configurations of the provider. @@ -854,6 +859,79 @@ func (p *Provider) allowedNamespaces(gatewayNamespace string, routeNamespaces *g return nil, fmt.Errorf("unsupported RouteSelectType: %q", *routeNamespaces.From) } +type backendAddress struct { + Address string + Port int32 +} + +func (p *Provider) getBackendAddresses(namespace string, ref gatev1.BackendRef) ([]backendAddress, corev1.ServicePort, error) { + if ref.Port == nil { + return nil, corev1.ServicePort{}, errors.New("port is required for Kubernetes Service reference") + } + + service, exists, err := p.client.GetService(namespace, string(ref.Name)) + if err != nil { + return nil, corev1.ServicePort{}, fmt.Errorf("getting service: %w", err) + } + if !exists { + return nil, corev1.ServicePort{}, errors.New("service not found") + } + + var svcPort *corev1.ServicePort + for _, p := range service.Spec.Ports { + if p.Port == int32(*ref.Port) { + svcPort = &p + break + } + } + if svcPort == nil { + return nil, corev1.ServicePort{}, fmt.Errorf("service port %d not found", *ref.Port) + } + + endpointSlices, err := p.client.ListEndpointSlicesForService(namespace, string(ref.Name)) + if err != nil { + return nil, corev1.ServicePort{}, fmt.Errorf("getting endpointslices: %w", err) + } + if len(endpointSlices) == 0 { + return nil, corev1.ServicePort{}, errors.New("endpointslices not found") + } + + uniqAddresses := map[string]struct{}{} + backendServers := make([]backendAddress, 0) + for _, endpointSlice := range endpointSlices { + var port int32 + for _, p := range endpointSlice.Ports { + if svcPort.Name == *p.Name { + port = *p.Port + break + } + } + if port == 0 { + continue + } + + for _, endpoint := range endpointSlice.Endpoints { + if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { + continue + } + + for _, address := range endpoint.Addresses { + if _, ok := uniqAddresses[address]; ok { + continue + } + + uniqAddresses[address] = struct{}{} + backendServers = append(backendServers, backendAddress{ + Address: address, + Port: port, + }) + } + } + } + + return backendServers, *svcPort, nil +} + func supportedRouteKinds(protocol gatev1.ProtocolType, experimentalChannel bool) ([]gatev1.RouteGroupKind, []metav1.Condition) { group := gatev1.Group(gatev1.GroupName) diff --git a/pkg/provider/kubernetes/gateway/kubernetes_test.go b/pkg/provider/kubernetes/gateway/kubernetes_test.go index 79750f4c8..c5cdb413b 100644 --- a/pkg/provider/kubernetes/gateway/kubernetes_test.go +++ b/pkg/provider/kubernetes/gateway/kubernetes_test.go @@ -2452,6 +2452,104 @@ func TestLoadHTTPRoutes_backendExtensionRef(t *testing.T) { TLS: &dynamic.TLSConfiguration{}, }, }, + { + desc: "Simple HTTPRoute, with appProtocol service", + paths: []string{"services.yml", "httproute/with_app_protocol_service.yml"}, + groupKindBackendFuncs: map[string]map[string]BuildBackendFunc{ + traefikv1alpha1.GroupName: {"TraefikService": func(name, namespace string) (string, *dynamic.Service, error) { + // func should never be executed in case of cross-provider reference. + return "", nil, errors.New("BOOM") + }}, + }, + entryPoints: map[string]Entrypoint{"web": { + Address: ":80", + }}, + expected: &dynamic.Configuration{ + UDP: &dynamic.UDPConfiguration{ + Routers: map[string]*dynamic.UDPRouter{}, + Services: map[string]*dynamic.UDPService{}, + }, + TCP: &dynamic.TCPConfiguration{ + Routers: map[string]*dynamic.TCPRouter{}, + Middlewares: map[string]*dynamic.TCPMiddleware{}, + Services: map[string]*dynamic.TCPService{}, + ServersTransports: map[string]*dynamic.TCPServersTransport{}, + }, + HTTP: &dynamic.HTTPConfiguration{ + Routers: map[string]*dynamic.Router{ + "default-http-multi-protocols-my-gateway-web-0-1c0cf64bde37d9d0df06": { + EntryPoints: []string{"web"}, + Service: "default-http-multi-protocols-my-gateway-web-0-1c0cf64bde37d9d0df06-wrr", + Rule: "Host(`foo.com`) && Path(`/bar`)", + Priority: 100008, + RuleSyntax: "v3", + }, + }, + Middlewares: map[string]*dynamic.Middleware{}, + Services: map[string]*dynamic.Service{ + "default-http-multi-protocols-my-gateway-web-0-1c0cf64bde37d9d0df06-wrr": { + Weighted: &dynamic.WeightedRoundRobin{ + Services: []dynamic.WRRService{ + { + Name: "default-whoami-h2c-80", + Weight: ptr.To(1), + }, + { + Name: "default-whoami-ws-80", + Weight: ptr.To(1), + }, + { + Name: "default-whoami-wss-80", + Weight: ptr.To(1), + }, + }, + }, + }, + "default-whoami-h2c-80": { + LoadBalancer: &dynamic.ServersLoadBalancer{ + Servers: []dynamic.Server{ + { + URL: "h2c://10.10.0.13:80", + }, + }, + PassHostHeader: ptr.To(true), + ResponseForwarding: &dynamic.ResponseForwarding{ + FlushInterval: ptypes.Duration(100 * time.Millisecond), + }, + }, + }, + "default-whoami-ws-80": { + LoadBalancer: &dynamic.ServersLoadBalancer{ + Servers: []dynamic.Server{ + { + URL: "http://10.10.0.14:80", + }, + }, + PassHostHeader: ptr.To(true), + ResponseForwarding: &dynamic.ResponseForwarding{ + FlushInterval: ptypes.Duration(100 * time.Millisecond), + }, + }, + }, + "default-whoami-wss-80": { + LoadBalancer: &dynamic.ServersLoadBalancer{ + Servers: []dynamic.Server{ + { + URL: "https://10.10.0.15:80", + }, + }, + PassHostHeader: ptr.To(true), + ResponseForwarding: &dynamic.ResponseForwarding{ + FlushInterval: ptypes.Duration(100 * time.Millisecond), + }, + }, + }, + }, + ServersTransports: map[string]*dynamic.ServersTransport{}, + }, + TLS: &dynamic.TLSConfiguration{}, + }, + }, } for _, test := range testCases { diff --git a/pkg/provider/kubernetes/gateway/tcproute.go b/pkg/provider/kubernetes/gateway/tcproute.go index 63b2e214c..689b42b0e 100644 --- a/pkg/provider/kubernetes/gateway/tcproute.go +++ b/pkg/provider/kubernetes/gateway/tcproute.go @@ -2,7 +2,6 @@ package gateway import ( "context" - "errors" "fmt" "net" "strconv" @@ -252,87 +251,45 @@ func (p *Provider) loadTCPService(route *gatev1alpha2.TCPRoute, backendRef gatev portStr := strconv.FormatInt(int64(port), 10) serviceName = provider.Normalize(serviceName + "-" + portStr) - lb, err := p.loadTCPServers(namespace, backendRef) - if err != nil { - return serviceName, nil, &metav1.Condition{ - Type: string(gatev1.RouteConditionResolvedRefs), - Status: metav1.ConditionFalse, - ObservedGeneration: route.Generation, - LastTransitionTime: metav1.Now(), - Reason: string(gatev1.RouteReasonBackendNotFound), - Message: fmt.Sprintf("Cannot load TCPRoute BackendRef %s/%s/%s/%s: %s", group, kind, namespace, backendRef.Name, err), - } + lb, errCondition := p.loadTCPServers(namespace, route, backendRef) + if errCondition != nil { + return serviceName, nil, errCondition } return serviceName, &dynamic.TCPService{LoadBalancer: lb}, nil } -func (p *Provider) loadTCPServers(namespace string, backendRef gatev1.BackendRef) (*dynamic.TCPServersLoadBalancer, error) { - if backendRef.Port == nil { - return nil, errors.New("port is required for Kubernetes Service reference") - } - - service, exists, err := p.client.GetService(namespace, string(backendRef.Name)) +func (p *Provider) loadTCPServers(namespace string, route *gatev1alpha2.TCPRoute, backendRef gatev1.BackendRef) (*dynamic.TCPServersLoadBalancer, *metav1.Condition) { + backendAddresses, svcPort, err := p.getBackendAddresses(namespace, backendRef) if err != nil { - return nil, fmt.Errorf("getting service: %w", err) - } - if !exists { - return nil, errors.New("service not found") - } - - var svcPort *corev1.ServicePort - for _, p := range service.Spec.Ports { - if p.Port == int32(*backendRef.Port) { - svcPort = &p - break + return nil, &metav1.Condition{ + Type: string(gatev1.RouteConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: route.GetGeneration(), + LastTransitionTime: metav1.Now(), + Reason: string(gatev1.RouteReasonBackendNotFound), + Message: fmt.Sprintf("Cannot load TCPRoute BackendRef %s/%s: %s", namespace, backendRef.Name, err), } } - if svcPort == nil { - return nil, fmt.Errorf("service port %d not found", *backendRef.Port) - } - endpointSlices, err := p.client.ListEndpointSlicesForService(namespace, string(backendRef.Name)) - if err != nil { - return nil, fmt.Errorf("getting endpointslices: %w", err) - } - if len(endpointSlices) == 0 { - return nil, errors.New("endpointslices not found") + if svcPort.Protocol != corev1.ProtocolTCP { + return nil, &metav1.Condition{ + Type: string(gatev1.RouteConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: route.GetGeneration(), + LastTransitionTime: metav1.Now(), + Reason: string(gatev1.RouteReasonUnsupportedProtocol), + Message: fmt.Sprintf("Cannot load TCPRoute BackendRef %s/%s: only TCP protocol is supported", namespace, backendRef.Name), + } } lb := &dynamic.TCPServersLoadBalancer{} - addresses := map[string]struct{}{} - for _, endpointSlice := range endpointSlices { - var port int32 - for _, p := range endpointSlice.Ports { - if svcPort.Name == *p.Name { - port = *p.Port - break - } - } - if port == 0 { - continue - } - - for _, endpoint := range endpointSlice.Endpoints { - if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { - continue - } - - for _, address := range endpoint.Addresses { - if _, ok := addresses[address]; ok { - continue - } - - addresses[address] = struct{}{} - lb.Servers = append(lb.Servers, dynamic.TCPServer{ - // TODO determine whether the servers needs TLS, from the port? - Address: net.JoinHostPort(address, strconv.Itoa(int(port))), - }) - } - } + for _, ba := range backendAddresses { + lb.Servers = append(lb.Servers, dynamic.TCPServer{ + Address: net.JoinHostPort(ba.Address, strconv.Itoa(int(ba.Port))), + }) } - return lb, nil } diff --git a/pkg/provider/kubernetes/gateway/tlsroute.go b/pkg/provider/kubernetes/gateway/tlsroute.go index adbff1da5..ee8da7fab 100644 --- a/pkg/provider/kubernetes/gateway/tlsroute.go +++ b/pkg/provider/kubernetes/gateway/tlsroute.go @@ -3,6 +3,7 @@ package gateway import ( "context" "fmt" + "net" "regexp" "strconv" "strings" @@ -10,6 +11,7 @@ import ( "github.com/rs/zerolog/log" "github.com/traefik/traefik/v3/pkg/config/dynamic" "github.com/traefik/traefik/v3/pkg/provider" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ktypes "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" @@ -251,21 +253,49 @@ func (p *Provider) loadTLSService(route *gatev1alpha2.TLSRoute, backendRef gatev portStr := strconv.FormatInt(int64(port), 10) serviceName = provider.Normalize(serviceName + "-" + portStr) - lb, err := p.loadTCPServers(namespace, backendRef) - if err != nil { - return serviceName, nil, &metav1.Condition{ - Type: string(gatev1.RouteConditionResolvedRefs), - Status: metav1.ConditionFalse, - ObservedGeneration: route.Generation, - LastTransitionTime: metav1.Now(), - Reason: string(gatev1.RouteReasonBackendNotFound), - Message: fmt.Sprintf("Cannot load TLSRoute BackendRef %s/%s/%s/%s: %s", group, kind, namespace, backendRef.Name, err), - } + lb, errCondition := p.loadTLSServers(namespace, route, backendRef) + if errCondition != nil { + return serviceName, nil, errCondition } return serviceName, &dynamic.TCPService{LoadBalancer: lb}, nil } +func (p *Provider) loadTLSServers(namespace string, route *gatev1alpha2.TLSRoute, backendRef gatev1.BackendRef) (*dynamic.TCPServersLoadBalancer, *metav1.Condition) { + backendAddresses, svcPort, err := p.getBackendAddresses(namespace, backendRef) + if err != nil { + return nil, &metav1.Condition{ + Type: string(gatev1.RouteConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: route.GetGeneration(), + LastTransitionTime: metav1.Now(), + Reason: string(gatev1.RouteReasonBackendNotFound), + Message: fmt.Sprintf("Cannot load TLSRoute BackendRef %s/%s: %s", namespace, backendRef.Name, err), + } + } + + if svcPort.Protocol != corev1.ProtocolTCP { + return nil, &metav1.Condition{ + Type: string(gatev1.RouteConditionResolvedRefs), + Status: metav1.ConditionFalse, + ObservedGeneration: route.GetGeneration(), + LastTransitionTime: metav1.Now(), + Reason: string(gatev1.RouteReasonUnsupportedProtocol), + Message: fmt.Sprintf("Cannot load TLSRoute BackendRef %s/%s: only TCP protocol is supported", namespace, backendRef.Name), + } + } + + lb := &dynamic.TCPServersLoadBalancer{} + + for _, ba := range backendAddresses { + lb.Servers = append(lb.Servers, dynamic.TCPServer{ + // TODO determine whether the servers needs TLS, from the port? + Address: net.JoinHostPort(ba.Address, strconv.Itoa(int(ba.Port))), + }) + } + return lb, nil +} + func hostSNIRule(hostnames []gatev1.Hostname) string { rules := make([]string, 0, len(hostnames)) uniqHostnames := map[gatev1.Hostname]struct{}{}