diff --git a/autogen/.placeholder b/autogen/.placeholder deleted file mode 100644 index e69de29bb..000000000 diff --git a/autogen/gentemplates/gen.go b/autogen/gentemplates/gen.go index 057cb3e52..6346a4be9 100644 --- a/autogen/gentemplates/gen.go +++ b/autogen/gentemplates/gen.go @@ -143,6 +143,14 @@ var _templatesConsul_catalogTmpl = []byte(`[backends] expression = "{{ $circuitBreaker.Expression }}" {{end}} + {{ $responseForwarding := getResponseForwarding $service.TraefikLabels }} + {{if $responseForwarding }} + [backends."backend-{{ $backendName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.FlushInterval }}" + {{end}} + + + {{ $loadBalancer := getLoadBalancer $service.TraefikLabels }} {{if $loadBalancer }} [backends."backend-{{ $backendName }}".loadBalancer] @@ -620,6 +628,12 @@ var _templatesDockerTmpl = []byte(`{{$backendServers := .Servers}} expression = "{{ $circuitBreaker.Expression }}" {{end}} + {{ $responseForwarding := getResponseForwarding $backend.SegmentLabels }} + {{if $responseForwarding }} + [backends."backend-{{ $backendName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.FlushInterval }}" + {{end}} + {{ $loadBalancer := getLoadBalancer $backend.SegmentLabels }} {{if $loadBalancer }} [backends."backend-{{ $backendName }}".loadBalancer] @@ -948,6 +962,12 @@ var _templatesEcsTmpl = []byte(`[backends] expression = "{{ $circuitBreaker.Expression }}" {{end}} + {{ $responseForwarding := getResponseForwarding $firstInstance.SegmentLabels }} + {{if $responseForwarding }} + [backends."backend-{{ $serviceName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.FlushInterval }}" + {{end}} + {{ $loadBalancer := getLoadBalancer $firstInstance.SegmentLabels }} {{if $loadBalancer }} [backends."backend-{{ $serviceName }}".loadBalancer] @@ -1258,6 +1278,11 @@ var _templatesKubernetesTmpl = []byte(`[backends] expression = "{{ $backend.CircuitBreaker.Expression }}" {{end}} + {{if $backend.ResponseForwarding }} + [backends."{{ $backendName }}".responseForwarding] + flushInterval = "{{ $backend.responseForwarding.FlushInterval }}" + {{end}} + [backends."{{ $backendName }}".loadBalancer] method = "{{ $backend.LoadBalancer.Method }}" sticky = {{ $backend.LoadBalancer.Sticky }} @@ -1492,6 +1517,12 @@ var _templatesKvTmpl = []byte(`[backends] [backends."{{ $backendName }}".circuitBreaker] expression = "{{ $circuitBreaker.Expression }}" {{end}} + + {{ $responseForwarding := getResponseForwarding $backend }} + {{if $responseForwarding }} + [backends."{{ $backendName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.flushInterval }}" + {{end}} {{ $loadBalancer := getLoadBalancer $backend }} {{if $loadBalancer }} @@ -1862,6 +1893,12 @@ var _templatesMarathonTmpl = []byte(`{{ $apps := .Applications }} [backends."{{ $backendName }}".circuitBreaker] expression = "{{ $circuitBreaker.Expression }}" {{end}} + + {{ $responseForwarding := getResponseForwarding $app.SegmentLabels }} + {{if $responseForwarding }} + [backends."{{ $backendName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.FlushInterval }}" + {{end}} {{ $loadBalancer := getLoadBalancer $app.SegmentLabels }} {{if $loadBalancer }} @@ -2177,6 +2214,12 @@ var _templatesMesosTmpl = []byte(`[backends] expression = "{{ $circuitBreaker.Expression }}" {{end}} + {{ $responseForwarding := getResponseForwarding $app.TraefikLabels }} + {{if $responseForwarding }} + [backends."backend-{{ $backendName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.FlushInterval }}" + {{end}} + {{ $loadBalancer := getLoadBalancer $app.TraefikLabels }} {{if $loadBalancer }} [backends."backend-{{ $backendName }}".loadBalancer] @@ -2545,6 +2588,12 @@ var _templatesRancherTmpl = []byte(`{{ $backendServers := .Backends }} expression = "{{ $circuitBreaker.Expression }}" {{end}} + {{ $responseForwarding := getResponseForwarding $backend.SegmentLabels }} + {{if $responseForwarding }} + [backends."backend-{{ $backendName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.FlushInterval }}" + {{end}} + {{ $loadBalancer := getLoadBalancer $backend.SegmentLabels }} {{if $loadBalancer }} [backends."backend-{{ $backendName }}".loadBalancer] diff --git a/docs/configuration/backends/consulcatalog.md b/docs/configuration/backends/consulcatalog.md index d161eadfc..7dc9248cb 100644 --- a/docs/configuration/backends/consulcatalog.md +++ b/docs/configuration/backends/consulcatalog.md @@ -105,6 +105,7 @@ Additional settings can be defined using Consul Catalog tags. | `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. | | `.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend. ex: `NetworkErrorRatio() > 0.` | +| `.backend.responseForwarding.flushInterval=10ms` | Defines the interval between two flushes when forwarding response from backend to client. | | `.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. | | `.backend.healthcheck.interval=1s` | Defines the health check interval. | | `.backend.healthcheck.port=8080` | Sets a different port for the health check. | diff --git a/docs/configuration/backends/docker.md b/docs/configuration/backends/docker.md index 3515660b7..e8d133ac2 100644 --- a/docs/configuration/backends/docker.md +++ b/docs/configuration/backends/docker.md @@ -213,9 +213,9 @@ Labels can be used on containers to override default behavior. |---------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `traefik.docker.network` | Overrides the default docker network to use for connections to the container. [1] | | `traefik.domain` | Sets the default base domain for the frontend rules. For more information, check the [Container Labels section's of the user guide "Let's Encrypt & Docker"](/user-guide/docker-and-lets-encrypt/#container-labels) | -| `traefik.enable=false` | Disables this container in Traefik. | +| `traefik.enable=false` | Disables this container in Traefik. | | `traefik.port=80` | Registers this port. Useful when the container exposes multiples ports. | -| `traefik.tags=foo,bar,myTag` | Adds Traefik tags to the Docker container/service to be used in [constraints](/configuration/commons/#constraints). | +| `traefik.tags=foo,bar,myTag` | Adds Traefik tags to the Docker container/service to be used in [constraints](/configuration/commons/#constraints). | | `traefik.protocol=https` | Overrides the default `http` protocol | | `traefik.weight=10` | Assigns this weight to the container | | `traefik.backend=foo` | Gives the name `foo` to the generated backend for this container. | @@ -225,6 +225,7 @@ Labels can be used on containers to override default behavior. | `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend | +| `traefik.backend.responseForwarding.flushInterval=10ms` | Defines the interval between two flushes when forwarding response from backend to client. | | `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. | | `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. | | `traefik.backend.healthcheck.port=8080` | Sets a different port for the health check. | diff --git a/docs/configuration/backends/ecs.md b/docs/configuration/backends/ecs.md index 74effb56b..b4d84319c 100644 --- a/docs/configuration/backends/ecs.md +++ b/docs/configuration/backends/ecs.md @@ -150,6 +150,7 @@ Labels can be used on task containers to override default behaviour: | `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend | +| `traefik.backend.responseForwarding.flushInterval=10ms` | Defines the interval between two flushes when forwarding response from backend to client. | | `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. | | `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. (Default: 30s) | | `traefik.backend.healthcheck.scheme=http` | Overrides the server URL scheme. | diff --git a/docs/configuration/backends/file.md b/docs/configuration/backends/file.md index 50c0fe496..bd24d9b72 100644 --- a/docs/configuration/backends/file.md +++ b/docs/configuration/backends/file.md @@ -23,6 +23,9 @@ Traefik can be configured with a file. [backends.backend1.circuitBreaker] expression = "NetworkErrorRatio() > 0.5" + + [backends.backend1.responseForwarding] + flushInterval = "10ms" [backends.backend1.loadBalancer] method = "drr" diff --git a/docs/configuration/backends/kubernetes.md b/docs/configuration/backends/kubernetes.md index eb526993e..c38af2bbf 100644 --- a/docs/configuration/backends/kubernetes.md +++ b/docs/configuration/backends/kubernetes.md @@ -277,6 +277,7 @@ The following annotations are applicable on the Service object associated with a | `traefik.backend.loadbalancer.sticky: "true"` | Enable backend sticky sessions (DEPRECATED). | | `traefik.ingress.kubernetes.io/affinity: "true"` | Enable backend sticky sessions. | | `traefik.ingress.kubernetes.io/circuit-breaker-expression: ` | Set the circuit breaker expression for the backend. | +| `traefik.ingress.kubernetes.io/responseforwarding-flushinterval: "10ms` | Defines the interval between two flushes when forwarding response from backend to client. | | `traefik.ingress.kubernetes.io/load-balancer-method: drr` | Override the default `wrr` load balancer algorithm. | | `traefik.ingress.kubernetes.io/max-conn-amount: "10"` | Sets the maximum number of simultaneous connections to the backend.
Must be used in conjunction with the label below to take effect. | | `traefik.ingress.kubernetes.io/max-conn-extractor-func: client.ip` | Set the function to be used against the request to determine what to limit maximum connections to the backend by.
Must be used in conjunction with the above label to take effect. | diff --git a/docs/configuration/backends/marathon.md b/docs/configuration/backends/marathon.md index 4b7f41397..ef4b20b99 100644 --- a/docs/configuration/backends/marathon.md +++ b/docs/configuration/backends/marathon.md @@ -208,6 +208,7 @@ The following labels can be defined on Marathon applications. They adjust the be | `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend | +| `traefik.backend.responseForwarding.flushInterval=10ms` | Defines the interval between two flushes when forwarding response from backend to client. | | `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. | | `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. (Default: 30s) | | `traefik.backend.healthcheck.port=8080` | Sets a different port for the health check. | diff --git a/docs/configuration/backends/mesos.md b/docs/configuration/backends/mesos.md index 2c2628390..13568bdc8 100644 --- a/docs/configuration/backends/mesos.md +++ b/docs/configuration/backends/mesos.md @@ -122,6 +122,7 @@ The following labels can be defined on Mesos tasks. They adjust the behavior for | `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend | +| `traefik.backend.responseForwarding.flushInterval=10ms` | Defines the interval between two flushes when forwarding response from backend to client. | | `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. | | `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. (Default: 30s) | | `traefik.backend.healthcheck.scheme=http` | Overrides the server URL scheme. | diff --git a/docs/configuration/backends/rancher.md b/docs/configuration/backends/rancher.md index f91c4d9b9..9e96c2c2f 100644 --- a/docs/configuration/backends/rancher.md +++ b/docs/configuration/backends/rancher.md @@ -140,8 +140,8 @@ Labels can be used on task containers to override default behavior: | Label | Description | |---------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `traefik.domain` | Sets the default base domain for the frontend rules. | -| `traefik.enable=false` | Disables this container in Traefik. | +| `traefik.domain` | Sets the default base domain for the frontend rules. | +| `traefik.enable=false` | Disables this container in Traefik. | | `traefik.port=80` | Registers this port. Useful when the container exposes multiple ports. | | `traefik.protocol=https` | Overrides the default `http` protocol. | | `traefik.weight=10` | Assigns this weight to the container. | @@ -152,6 +152,7 @@ Labels can be used on task containers to override default behavior: | `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. | | `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend | +| `traefik.backend.responseForwarding.flushInterval=10ms` | Defines the interval between two flushes when forwarding response from backend to client. | | `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. | | `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. | | `traefik.backend.healthcheck.port=8080` | Sets a different port for the health check. | diff --git a/integration/fixtures/grpc/config_with_flush.toml b/integration/fixtures/grpc/config_with_flush.toml new file mode 100644 index 000000000..9615e3dd2 --- /dev/null +++ b/integration/fixtures/grpc/config_with_flush.toml @@ -0,0 +1,31 @@ +defaultEntryPoints = ["https"] + +rootCAs = [ """{{ .CertContent }}""" ] + +[entryPoints] + [entryPoints.https] + address = ":4443" + [entryPoints.https.tls] + [[entryPoints.https.tls.certificates]] + certFile = """{{ .CertContent }}""" + keyFile = """{{ .KeyContent }}""" + + +[api] + +[file] + +[backends] + [backends.backend1] + [backends.backend1.responseForwarding] + flushInterval="1ms" + [backends.backend1.servers.server1] + url = "https://127.0.0.1:{{ .GRPCServerPort }}" + weight = 1 + + +[frontends] + [frontends.frontend1] + backend = "backend1" + [frontends.frontend1.routes.test_1] + rule = "Host:127.0.0.1" diff --git a/integration/grpc_test.go b/integration/grpc_test.go index 8ec9813a0..d26f8a502 100644 --- a/integration/grpc_test.go +++ b/integration/grpc_test.go @@ -356,3 +356,64 @@ func (s *GRPCSuite) TestGRPCBuffer(c *check.C) { }) c.Assert(err, check.IsNil) } + +func (s *GRPCSuite) TestGRPCBufferWithFlushInterval(c *check.C) { + stopStreamExample := make(chan bool) + defer func() { stopStreamExample <- true }() + lis, err := net.Listen("tcp", ":0") + c.Assert(err, check.IsNil) + _, port, err := net.SplitHostPort(lis.Addr().String()) + c.Assert(err, check.IsNil) + + go func() { + err := startGRPCServer(lis, &myserver{ + stopStreamExample: stopStreamExample, + }) + c.Log(err) + c.Assert(err, check.IsNil) + }() + + file := s.adaptFile(c, "fixtures/grpc/config_with_flush.toml", struct { + CertContent string + KeyContent string + GRPCServerPort string + }{ + CertContent: string(LocalhostCert), + KeyContent: string(LocalhostKey), + GRPCServerPort: port, + }) + + defer os.Remove(file) + cmd, display := s.traefikCmd(withConfigFile(file)) + defer display(c) + + err = cmd.Start() + c.Assert(err, check.IsNil) + defer cmd.Process.Kill() + + // wait for Traefik + err = try.GetRequest("http://127.0.0.1:8080/api/providers", 1*time.Second, try.BodyContains("Host:127.0.0.1")) + c.Assert(err, check.IsNil) + var client helloworld.Greeter_StreamExampleClient + client, closer, err := callStreamExampleClientGRPC() + defer closer() + c.Assert(err, check.IsNil) + + received := make(chan bool) + go func() { + tr, err := client.Recv() + c.Assert(err, check.IsNil) + c.Assert(len(tr.Data), check.Equals, 512) + received <- true + }() + + err = try.Do(time.Millisecond*100, func() error { + select { + case <-received: + return nil + default: + return errors.New("failed to receive stream data") + } + }) + c.Assert(err, check.IsNil) +} diff --git a/provider/consulcatalog/config.go b/provider/consulcatalog/config.go index 22cc99651..460b4246f 100644 --- a/provider/consulcatalog/config.go +++ b/provider/consulcatalog/config.go @@ -34,6 +34,7 @@ func (p *Provider) buildConfigurationV2(catalog []catalogUpdate) *types.Configur "getMaxConn": label.GetMaxConn, "getHealthCheck": label.GetHealthCheck, "getBuffering": label.GetBuffering, + "getResponseForwarding": label.GetResponseForwarding, "getServer": p.getServer, // Frontend functions diff --git a/provider/consulcatalog/config_test.go b/provider/consulcatalog/config_test.go index 80d1c5fa3..e3017b982 100644 --- a/provider/consulcatalog/config_test.go +++ b/provider/consulcatalog/config_test.go @@ -405,6 +405,7 @@ func TestProviderBuildConfiguration(t *testing.T) { label.TraefikBackend + "=foobar", label.TraefikBackendCircuitBreakerExpression + "=NetworkErrorRatio() > 0.5", + label.TraefikBackendResponseForwardingFlushInterval + "=10ms", label.TraefikBackendHealthCheckPath + "=/health", label.TraefikBackendHealthCheckScheme + "=http", label.TraefikBackendHealthCheckPort + "=880", @@ -673,6 +674,9 @@ func TestProviderBuildConfiguration(t *testing.T) { CircuitBreaker: &types.CircuitBreaker{ Expression: "NetworkErrorRatio() > 0.5", }, + ResponseForwarding: &types.ResponseForwarding{ + FlushInterval: "10ms", + }, LoadBalancer: &types.LoadBalancer{ Method: "drr", Sticky: true, diff --git a/provider/docker/config.go b/provider/docker/config.go index 576a848c1..a77d19181 100644 --- a/provider/docker/config.go +++ b/provider/docker/config.go @@ -33,13 +33,14 @@ func (p *Provider) buildConfigurationV2(containersInspected []dockerData) *types "getDomain": label.GetFuncString(label.TraefikDomain, p.Domain), // Backend functions - "getIPAddress": p.getDeprecatedIPAddress, // TODO: Should we expose getIPPort instead? - "getServers": p.getServers, - "getMaxConn": label.GetMaxConn, - "getHealthCheck": label.GetHealthCheck, - "getBuffering": label.GetBuffering, - "getCircuitBreaker": label.GetCircuitBreaker, - "getLoadBalancer": label.GetLoadBalancer, + "getIPAddress": p.getDeprecatedIPAddress, // TODO: Should we expose getIPPort instead? + "getServers": p.getServers, + "getMaxConn": label.GetMaxConn, + "getHealthCheck": label.GetHealthCheck, + "getBuffering": label.GetBuffering, + "getResponseForwarding": label.GetResponseForwarding, + "getCircuitBreaker": label.GetCircuitBreaker, + "getLoadBalancer": label.GetLoadBalancer, // Frontend functions "getBackendName": getBackendName, diff --git a/provider/docker/config_container_docker_test.go b/provider/docker/config_container_docker_test.go index ea01e7b71..4fd8596ed 100644 --- a/provider/docker/config_container_docker_test.go +++ b/provider/docker/config_container_docker_test.go @@ -434,6 +434,7 @@ func TestDockerBuildConfiguration(t *testing.T) { label.TraefikBackend: "foobar", label.TraefikBackendCircuitBreakerExpression: "NetworkErrorRatio() > 0.5", + label.TraefikBackendResponseForwardingFlushInterval: "10ms", label.TraefikBackendHealthCheckScheme: "http", label.TraefikBackendHealthCheckPath: "/health", label.TraefikBackendHealthCheckPort: "880", @@ -666,6 +667,9 @@ func TestDockerBuildConfiguration(t *testing.T) { CircuitBreaker: &types.CircuitBreaker{ Expression: "NetworkErrorRatio() > 0.5", }, + ResponseForwarding: &types.ResponseForwarding{ + FlushInterval: "10ms", + }, LoadBalancer: &types.LoadBalancer{ Method: "drr", Sticky: true, diff --git a/provider/docker/config_container_swarm_test.go b/provider/docker/config_container_swarm_test.go index 29e862757..e0e868a2e 100644 --- a/provider/docker/config_container_swarm_test.go +++ b/provider/docker/config_container_swarm_test.go @@ -383,6 +383,7 @@ func TestSwarmBuildConfiguration(t *testing.T) { label.TraefikBackend: "foobar", label.TraefikBackendCircuitBreakerExpression: "NetworkErrorRatio() > 0.5", + label.TraefikBackendResponseForwardingFlushInterval: "10ms", label.TraefikBackendHealthCheckScheme: "http", label.TraefikBackendHealthCheckPath: "/health", label.TraefikBackendHealthCheckPort: "880", @@ -584,6 +585,9 @@ func TestSwarmBuildConfiguration(t *testing.T) { CircuitBreaker: &types.CircuitBreaker{ Expression: "NetworkErrorRatio() > 0.5", }, + ResponseForwarding: &types.ResponseForwarding{ + FlushInterval: "10ms", + }, LoadBalancer: &types.LoadBalancer{ Method: "drr", Sticky: true, diff --git a/provider/ecs/config.go b/provider/ecs/config.go index ccc25e8bf..0183f297b 100644 --- a/provider/ecs/config.go +++ b/provider/ecs/config.go @@ -21,14 +21,16 @@ import ( func (p *Provider) buildConfigurationV2(instances []ecsInstance) (*types.Configuration, error) { var ecsFuncMap = template.FuncMap{ // Backend functions - "getHost": getHost, - "getPort": getPort, - "getCircuitBreaker": label.GetCircuitBreaker, - "getLoadBalancer": label.GetLoadBalancer, - "getMaxConn": label.GetMaxConn, - "getHealthCheck": label.GetHealthCheck, - "getBuffering": label.GetBuffering, - "getServers": getServers, + "getHost": getHost, + "getPort": getPort, + "getCircuitBreaker": label.GetCircuitBreaker, + "getLoadBalancer": label.GetLoadBalancer, + "getMaxConn": label.GetMaxConn, + "getHealthCheck": label.GetHealthCheck, + "getBuffering": label.GetBuffering, + "getResponseForwarding": label.GetResponseForwarding, + + "getServers": getServers, // Frontend functions "filterFrontends": filterFrontends, diff --git a/provider/ecs/config_test.go b/provider/ecs/config_test.go index 0a4a169c8..bdb9a3749 100644 --- a/provider/ecs/config_test.go +++ b/provider/ecs/config_test.go @@ -342,6 +342,7 @@ func TestBuildConfiguration(t *testing.T) { label.TraefikBackend: aws.String("foobar"), label.TraefikBackendCircuitBreakerExpression: aws.String("NetworkErrorRatio() > 0.5"), + label.TraefikBackendResponseForwardingFlushInterval: aws.String("10ms"), label.TraefikBackendHealthCheckScheme: aws.String("http"), label.TraefikBackendHealthCheckPath: aws.String("/health"), label.TraefikBackendHealthCheckPort: aws.String("880"), @@ -458,6 +459,9 @@ func TestBuildConfiguration(t *testing.T) { CircuitBreaker: &types.CircuitBreaker{ Expression: "NetworkErrorRatio() > 0.5", }, + ResponseForwarding: &types.ResponseForwarding{ + FlushInterval: "10ms", + }, LoadBalancer: &types.LoadBalancer{ Method: "drr", Sticky: true, diff --git a/provider/kubernetes/annotations.go b/provider/kubernetes/annotations.go index c1b4abde3..042b5708e 100644 --- a/provider/kubernetes/annotations.go +++ b/provider/kubernetes/annotations.go @@ -7,42 +7,43 @@ import ( ) const ( - annotationKubernetesIngressClass = "kubernetes.io/ingress.class" - annotationKubernetesAuthRealm = "ingress.kubernetes.io/auth-realm" - annotationKubernetesAuthType = "ingress.kubernetes.io/auth-type" - annotationKubernetesAuthSecret = "ingress.kubernetes.io/auth-secret" - annotationKubernetesAuthHeaderField = "ingress.kubernetes.io/auth-header-field" - annotationKubernetesAuthForwardResponseHeaders = "ingress.kubernetes.io/auth-response-headers" - annotationKubernetesAuthRemoveHeader = "ingress.kubernetes.io/auth-remove-header" - annotationKubernetesAuthForwardURL = "ingress.kubernetes.io/auth-url" - annotationKubernetesAuthForwardTrustHeaders = "ingress.kubernetes.io/auth-trust-headers" - annotationKubernetesAuthForwardTLSSecret = "ingress.kubernetes.io/auth-tls-secret" - annotationKubernetesAuthForwardTLSInsecure = "ingress.kubernetes.io/auth-tls-insecure" - annotationKubernetesRewriteTarget = "ingress.kubernetes.io/rewrite-target" - annotationKubernetesWhiteListSourceRange = "ingress.kubernetes.io/whitelist-source-range" - annotationKubernetesWhiteListUseXForwardedFor = "ingress.kubernetes.io/whitelist-x-forwarded-for" - annotationKubernetesPreserveHost = "ingress.kubernetes.io/preserve-host" - annotationKubernetesPassTLSCert = "ingress.kubernetes.io/pass-tls-cert" // Deprecated - annotationKubernetesPassTLSClientCert = "ingress.kubernetes.io/pass-client-tls-cert" - annotationKubernetesFrontendEntryPoints = "ingress.kubernetes.io/frontend-entry-points" - annotationKubernetesPriority = "ingress.kubernetes.io/priority" - annotationKubernetesCircuitBreakerExpression = "ingress.kubernetes.io/circuit-breaker-expression" - annotationKubernetesLoadBalancerMethod = "ingress.kubernetes.io/load-balancer-method" - annotationKubernetesAffinity = "ingress.kubernetes.io/affinity" - annotationKubernetesSessionCookieName = "ingress.kubernetes.io/session-cookie-name" - annotationKubernetesRuleType = "ingress.kubernetes.io/rule-type" - annotationKubernetesRedirectEntryPoint = "ingress.kubernetes.io/redirect-entry-point" - annotationKubernetesRedirectPermanent = "ingress.kubernetes.io/redirect-permanent" - annotationKubernetesRedirectRegex = "ingress.kubernetes.io/redirect-regex" - annotationKubernetesRedirectReplacement = "ingress.kubernetes.io/redirect-replacement" - annotationKubernetesMaxConnAmount = "ingress.kubernetes.io/max-conn-amount" - annotationKubernetesMaxConnExtractorFunc = "ingress.kubernetes.io/max-conn-extractor-func" - annotationKubernetesRateLimit = "ingress.kubernetes.io/rate-limit" - annotationKubernetesErrorPages = "ingress.kubernetes.io/error-pages" - annotationKubernetesBuffering = "ingress.kubernetes.io/buffering" - annotationKubernetesAppRoot = "ingress.kubernetes.io/app-root" - annotationKubernetesServiceWeights = "ingress.kubernetes.io/service-weights" - annotationKubernetesRequestModifier = "ingress.kubernetes.io/request-modifier" + annotationKubernetesIngressClass = "kubernetes.io/ingress.class" + annotationKubernetesAuthRealm = "ingress.kubernetes.io/auth-realm" + annotationKubernetesAuthType = "ingress.kubernetes.io/auth-type" + annotationKubernetesAuthSecret = "ingress.kubernetes.io/auth-secret" + annotationKubernetesAuthHeaderField = "ingress.kubernetes.io/auth-header-field" + annotationKubernetesAuthForwardResponseHeaders = "ingress.kubernetes.io/auth-response-headers" + annotationKubernetesAuthRemoveHeader = "ingress.kubernetes.io/auth-remove-header" + annotationKubernetesAuthForwardURL = "ingress.kubernetes.io/auth-url" + annotationKubernetesAuthForwardTrustHeaders = "ingress.kubernetes.io/auth-trust-headers" + annotationKubernetesAuthForwardTLSSecret = "ingress.kubernetes.io/auth-tls-secret" + annotationKubernetesAuthForwardTLSInsecure = "ingress.kubernetes.io/auth-tls-insecure" + annotationKubernetesRewriteTarget = "ingress.kubernetes.io/rewrite-target" + annotationKubernetesWhiteListSourceRange = "ingress.kubernetes.io/whitelist-source-range" + annotationKubernetesWhiteListUseXForwardedFor = "ingress.kubernetes.io/whitelist-x-forwarded-for" + annotationKubernetesPreserveHost = "ingress.kubernetes.io/preserve-host" + annotationKubernetesPassTLSCert = "ingress.kubernetes.io/pass-tls-cert" // Deprecated + annotationKubernetesPassTLSClientCert = "ingress.kubernetes.io/pass-client-tls-cert" + annotationKubernetesFrontendEntryPoints = "ingress.kubernetes.io/frontend-entry-points" + annotationKubernetesPriority = "ingress.kubernetes.io/priority" + annotationKubernetesCircuitBreakerExpression = "ingress.kubernetes.io/circuit-breaker-expression" + annotationKubernetesLoadBalancerMethod = "ingress.kubernetes.io/load-balancer-method" + annotationKubernetesAffinity = "ingress.kubernetes.io/affinity" + annotationKubernetesSessionCookieName = "ingress.kubernetes.io/session-cookie-name" + annotationKubernetesRuleType = "ingress.kubernetes.io/rule-type" + annotationKubernetesRedirectEntryPoint = "ingress.kubernetes.io/redirect-entry-point" + annotationKubernetesRedirectPermanent = "ingress.kubernetes.io/redirect-permanent" + annotationKubernetesRedirectRegex = "ingress.kubernetes.io/redirect-regex" + annotationKubernetesRedirectReplacement = "ingress.kubernetes.io/redirect-replacement" + annotationKubernetesMaxConnAmount = "ingress.kubernetes.io/max-conn-amount" + annotationKubernetesMaxConnExtractorFunc = "ingress.kubernetes.io/max-conn-extractor-func" + annotationKubernetesRateLimit = "ingress.kubernetes.io/rate-limit" + annotationKubernetesErrorPages = "ingress.kubernetes.io/error-pages" + annotationKubernetesBuffering = "ingress.kubernetes.io/buffering" + annotationKubernetesResponseForwardingFlushInterval = "ingress.kubernetes.io/responseforwarding-flushinterval" + annotationKubernetesAppRoot = "ingress.kubernetes.io/app-root" + annotationKubernetesServiceWeights = "ingress.kubernetes.io/service-weights" + annotationKubernetesRequestModifier = "ingress.kubernetes.io/request-modifier" annotationKubernetesSSLForceHost = "ingress.kubernetes.io/ssl-force-host" annotationKubernetesSSLRedirect = "ingress.kubernetes.io/ssl-redirect" diff --git a/provider/kubernetes/builder_configuration_test.go b/provider/kubernetes/builder_configuration_test.go index 7ec4a9f94..c350bf663 100644 --- a/provider/kubernetes/builder_configuration_test.go +++ b/provider/kubernetes/builder_configuration_test.go @@ -93,6 +93,13 @@ func circuitBreaker(exp string) func(*types.Backend) { } } +func responseForwarding(interval string) func(*types.Backend) { + return func(b *types.Backend) { + b.ResponseForwarding = &types.ResponseForwarding{} + b.ResponseForwarding.FlushInterval = interval + } +} + func buffering(opts ...func(*types.Buffering)) func(*types.Backend) { return func(b *types.Backend) { if b.Buffering == nil { diff --git a/provider/kubernetes/kubernetes.go b/provider/kubernetes/kubernetes.go index 0aa07d85e..9d761c60c 100644 --- a/provider/kubernetes/kubernetes.go +++ b/provider/kubernetes/kubernetes.go @@ -337,6 +337,7 @@ func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error) templateObjects.Backends[baseName].LoadBalancer = getLoadBalancer(service) templateObjects.Backends[baseName].MaxConn = getMaxConn(service) templateObjects.Backends[baseName].Buffering = getBuffering(service) + templateObjects.Backends[baseName].ResponseForwarding = getResponseForwarding(service) protocol := label.DefaultProtocol @@ -494,6 +495,7 @@ func (p *Provider) addGlobalBackend(cl Client, i *extensionsv1beta1.Ingress, tem templateObjects.Backends[defaultBackendName].LoadBalancer = getLoadBalancer(service) templateObjects.Backends[defaultBackendName].MaxConn = getMaxConn(service) templateObjects.Backends[defaultBackendName].Buffering = getBuffering(service) + templateObjects.Backends[defaultBackendName].ResponseForwarding = getResponseForwarding(service) endpoints, exists, err := cl.GetEndpoints(service.Namespace, service.Name) if err != nil { @@ -951,6 +953,17 @@ func getWhiteList(i *extensionsv1beta1.Ingress) *types.WhiteList { } } +func getResponseForwarding(service *corev1.Service) *types.ResponseForwarding { + flushIntervalValue := getStringValue(service.Annotations, annotationKubernetesResponseForwardingFlushInterval, "") + if len(flushIntervalValue) == 0 { + return nil + } + + return &types.ResponseForwarding{ + FlushInterval: flushIntervalValue, + } +} + func getBuffering(service *corev1.Service) *types.Buffering { var buffering *types.Buffering diff --git a/provider/kubernetes/kubernetes_test.go b/provider/kubernetes/kubernetes_test.go index d1aba1ad7..e45b0d77b 100644 --- a/provider/kubernetes/kubernetes_test.go +++ b/provider/kubernetes/kubernetes_test.go @@ -908,6 +908,9 @@ func TestServiceAnnotations(t *testing.T) { iRule( iHost("max-conn"), iPaths(onePath(iBackend("service4", intstr.FromInt(804))))), + iRule( + iHost("flush"), + iPaths(onePath(iBackend("service5", intstr.FromInt(805))))), ), ), } @@ -958,6 +961,15 @@ retryexpression: IsNetworkError() && Attempts() <= 2 clusterIP("10.0.0.4"), sPorts(sPort(804, "http"))), ), + buildService( + sName("service5"), + sNamespace("testing"), + sUID("5"), + sAnnotation(annotationKubernetesResponseForwardingFlushInterval, "10ms"), + sSpec( + clusterIP("10.0.0.5"), + sPorts(sPort(80, ""))), + ), } endpoints := []*corev1.Endpoints{ @@ -1005,6 +1017,17 @@ retryexpression: IsNetworkError() && Attempts() <= 2 eAddresses(eAddress("10.4.0.2")), ePorts(ePort(8080, "http"))), ), + buildEndpoint( + eNamespace("testing"), + eName("service5"), + eUID("5"), + subset( + eAddresses(eAddress("10.4.0.1")), + ePorts(ePort(8080, "http"))), + subset( + eAddresses(eAddress("10.4.0.2")), + ePorts(ePort(8080, "http"))), + ), } watchChan := make(chan interface{}) @@ -1028,6 +1051,11 @@ retryexpression: IsNetworkError() && Attempts() <= 2 lbMethod("drr"), circuitBreaker("NetworkErrorRatio() > 0.5"), ), + backend("flush", + servers(), + lbMethod("wrr"), + responseForwarding("10ms"), + ), backend("bar", servers( server("http://10.15.0.1:8080", weight(1)), @@ -1073,6 +1101,10 @@ retryexpression: IsNetworkError() && Attempts() <= 2 passHostHeader(), routes( route("max-conn", "Host:max-conn"))), + frontend("flush", + passHostHeader(), + routes( + route("flush", "Host:flush"))), ), ) diff --git a/provider/kv/keynames.go b/provider/kv/keynames.go index 38876c797..b9c2546a4 100644 --- a/provider/kv/keynames.go +++ b/provider/kv/keynames.go @@ -3,6 +3,7 @@ package kv const ( pathBackends = "/backends/" pathBackendCircuitBreakerExpression = "/circuitbreaker/expression" + pathBackendResponseForwardingFlushInterval = "/responseforwarding/flushinterval" pathBackendHealthCheckScheme = "/healthcheck/scheme" pathBackendHealthCheckPath = "/healthcheck/path" pathBackendHealthCheckPort = "/healthcheck/port" diff --git a/provider/kv/kv_config.go b/provider/kv/kv_config.go index 4883ce108..e06a2b7ba 100644 --- a/provider/kv/kv_config.go +++ b/provider/kv/kv_config.go @@ -59,6 +59,7 @@ func (p *Provider) buildConfiguration() *types.Configuration { // Backend functions "getServers": p.getServers, "getCircuitBreaker": p.getCircuitBreaker, + "getResponseForwarding": p.getResponseForwarding, "getLoadBalancer": p.getLoadBalancer, "getMaxConn": p.getMaxConn, "getHealthCheck": p.getHealthCheck, @@ -269,6 +270,20 @@ func (p *Provider) getLoadBalancer(rootPath string) *types.LoadBalancer { return lb } +func (p *Provider) getResponseForwarding(rootPath string) *types.ResponseForwarding { + if !p.has(rootPath, pathBackendResponseForwardingFlushInterval) { + return nil + } + value := p.get("", rootPath, pathBackendResponseForwardingFlushInterval) + if len(value) == 0 { + return nil + } + + return &types.ResponseForwarding{ + FlushInterval: value, + } +} + func (p *Provider) getCircuitBreaker(rootPath string) *types.CircuitBreaker { if !p.has(rootPath, pathBackendCircuitBreakerExpression) { return nil diff --git a/provider/label/names.go b/provider/label/names.go index 442a2f890..4fb07e688 100644 --- a/provider/label/names.go +++ b/provider/label/names.go @@ -29,6 +29,7 @@ const ( SuffixBackendMaxConnAmount = "backend.maxconn.amount" SuffixBackendMaxConnExtractorFunc = "backend.maxconn.extractorfunc" SuffixBackendBuffering = "backend.buffering" + SuffixBackendResponseForwardingFlushInterval = "backend.responseForwarding.flushInterval" SuffixBackendBufferingMaxRequestBodyBytes = SuffixBackendBuffering + ".maxRequestBodyBytes" SuffixBackendBufferingMemRequestBodyBytes = SuffixBackendBuffering + ".memRequestBodyBytes" SuffixBackendBufferingMaxResponseBodyBytes = SuffixBackendBuffering + ".maxResponseBodyBytes" @@ -131,6 +132,7 @@ const ( TraefikBackendMaxConnAmount = Prefix + SuffixBackendMaxConnAmount TraefikBackendMaxConnExtractorFunc = Prefix + SuffixBackendMaxConnExtractorFunc TraefikBackendBuffering = Prefix + SuffixBackendBuffering + TraefikBackendResponseForwardingFlushInterval = Prefix + SuffixBackendResponseForwardingFlushInterval TraefikBackendBufferingMaxRequestBodyBytes = Prefix + SuffixBackendBufferingMaxRequestBodyBytes TraefikBackendBufferingMemRequestBodyBytes = Prefix + SuffixBackendBufferingMemRequestBodyBytes TraefikBackendBufferingMaxResponseBodyBytes = Prefix + SuffixBackendBufferingMaxResponseBodyBytes diff --git a/provider/label/partial.go b/provider/label/partial.go index a2026743d..d4b7652ad 100644 --- a/provider/label/partial.go +++ b/provider/label/partial.go @@ -354,6 +354,19 @@ func GetHealthCheck(labels map[string]string) *types.HealthCheck { } } +// GetResponseForwarding Create ResponseForwarding from labels +func GetResponseForwarding(labels map[string]string) *types.ResponseForwarding { + if !HasPrefix(labels, TraefikBackendResponseForwardingFlushInterval) { + return nil + } + + value := GetStringValue(labels, TraefikBackendResponseForwardingFlushInterval, "0") + + return &types.ResponseForwarding{ + FlushInterval: value, + } +} + // GetBuffering Create buffering from labels func GetBuffering(labels map[string]string) *types.Buffering { if !HasPrefix(labels, TraefikBackendBuffering) { diff --git a/provider/marathon/config.go b/provider/marathon/config.go index 5e892cb4a..a99b44693 100644 --- a/provider/marathon/config.go +++ b/provider/marathon/config.go @@ -32,13 +32,14 @@ func (p *Provider) buildConfigurationV2(applications *marathon.Applications) *ty "getBackendName": p.getBackendName, // Backend functions - "getPort": getPort, - "getCircuitBreaker": label.GetCircuitBreaker, - "getLoadBalancer": label.GetLoadBalancer, - "getMaxConn": label.GetMaxConn, - "getHealthCheck": label.GetHealthCheck, - "getBuffering": label.GetBuffering, - "getServers": p.getServers, + "getPort": getPort, + "getCircuitBreaker": label.GetCircuitBreaker, + "getLoadBalancer": label.GetLoadBalancer, + "getMaxConn": label.GetMaxConn, + "getHealthCheck": label.GetHealthCheck, + "getBuffering": label.GetBuffering, + "getResponseForwarding": label.GetResponseForwarding, + "getServers": p.getServers, // Frontend functions "getSegmentNameSuffix": getSegmentNameSuffix, diff --git a/provider/marathon/config_test.go b/provider/marathon/config_test.go index 172b88963..c5f09abeb 100644 --- a/provider/marathon/config_test.go +++ b/provider/marathon/config_test.go @@ -357,6 +357,7 @@ func TestBuildConfiguration(t *testing.T) { withLabel(label.TraefikBackend, "foobar"), withLabel(label.TraefikBackendCircuitBreakerExpression, "NetworkErrorRatio() > 0.5"), + withLabel(label.TraefikBackendResponseForwardingFlushInterval, "10ms"), withLabel(label.TraefikBackendHealthCheckScheme, "http"), withLabel(label.TraefikBackendHealthCheckPath, "/health"), withLabel(label.TraefikBackendHealthCheckPort, "880"), @@ -586,6 +587,9 @@ func TestBuildConfiguration(t *testing.T) { CircuitBreaker: &types.CircuitBreaker{ Expression: "NetworkErrorRatio() > 0.5", }, + ResponseForwarding: &types.ResponseForwarding{ + FlushInterval: "10ms", + }, LoadBalancer: &types.LoadBalancer{ Method: "drr", Sticky: true, diff --git a/provider/mesos/config.go b/provider/mesos/config.go index 61f415e88..efb7b606e 100644 --- a/provider/mesos/config.go +++ b/provider/mesos/config.go @@ -29,15 +29,16 @@ func (p *Provider) buildConfigurationV2(tasks []state.Task) *types.Configuration "getID": getID, // Backend functions - "getBackendName": getBackendName, - "getCircuitBreaker": label.GetCircuitBreaker, - "getLoadBalancer": label.GetLoadBalancer, - "getMaxConn": label.GetMaxConn, - "getHealthCheck": label.GetHealthCheck, - "getBuffering": label.GetBuffering, - "getServers": p.getServers, - "getHost": p.getHost, - "getServerPort": p.getServerPort, + "getBackendName": getBackendName, + "getCircuitBreaker": label.GetCircuitBreaker, + "getLoadBalancer": label.GetLoadBalancer, + "getMaxConn": label.GetMaxConn, + "getHealthCheck": label.GetHealthCheck, + "getBuffering": label.GetBuffering, + "getResponseForwarding": label.GetResponseForwarding, + "getServers": p.getServers, + "getHost": p.getHost, + "getServerPort": p.getServerPort, // Frontend functions "getSegmentNameSuffix": getSegmentNameSuffix, diff --git a/provider/mesos/config_test.go b/provider/mesos/config_test.go index 9961fd118..bdef2344d 100644 --- a/provider/mesos/config_test.go +++ b/provider/mesos/config_test.go @@ -314,6 +314,7 @@ func TestBuildConfiguration(t *testing.T) { withLabel(label.TraefikBackend, "foobar"), withLabel(label.TraefikBackendCircuitBreakerExpression, "NetworkErrorRatio() > 0.5"), + withLabel(label.TraefikBackendResponseForwardingFlushInterval, "10ms"), withLabel(label.TraefikBackendHealthCheckScheme, "http"), withLabel(label.TraefikBackendHealthCheckPath, "/health"), withLabel(label.TraefikBackendHealthCheckPort, "880"), @@ -546,6 +547,9 @@ func TestBuildConfiguration(t *testing.T) { CircuitBreaker: &types.CircuitBreaker{ Expression: "NetworkErrorRatio() > 0.5", }, + ResponseForwarding: &types.ResponseForwarding{ + FlushInterval: "10ms", + }, LoadBalancer: &types.LoadBalancer{ Method: "drr", Stickiness: &types.Stickiness{ diff --git a/provider/rancher/config.go b/provider/rancher/config.go index d6232b47a..aa11c3cf7 100644 --- a/provider/rancher/config.go +++ b/provider/rancher/config.go @@ -20,12 +20,13 @@ func (p *Provider) buildConfigurationV2(services []rancherData) *types.Configura "getDomain": label.GetFuncString(label.TraefikDomain, p.Domain), // Backend functions - "getCircuitBreaker": label.GetCircuitBreaker, - "getLoadBalancer": label.GetLoadBalancer, - "getMaxConn": label.GetMaxConn, - "getHealthCheck": label.GetHealthCheck, - "getBuffering": label.GetBuffering, - "getServers": getServers, + "getCircuitBreaker": label.GetCircuitBreaker, + "getLoadBalancer": label.GetLoadBalancer, + "getMaxConn": label.GetMaxConn, + "getHealthCheck": label.GetHealthCheck, + "getBuffering": label.GetBuffering, + "getResponseForwarding": label.GetResponseForwarding, + "getServers": getServers, // Frontend functions "getBackendName": getBackendName, diff --git a/provider/rancher/config_test.go b/provider/rancher/config_test.go index 1bcd2f2c6..b0fc04583 100644 --- a/provider/rancher/config_test.go +++ b/provider/rancher/config_test.go @@ -41,6 +41,7 @@ func TestProviderBuildConfiguration(t *testing.T) { label.TraefikBackend: "foobar", label.TraefikBackendCircuitBreakerExpression: "NetworkErrorRatio() > 0.5", + label.TraefikBackendResponseForwardingFlushInterval: "10ms", label.TraefikBackendHealthCheckScheme: "http", label.TraefikBackendHealthCheckPath: "/health", label.TraefikBackendHealthCheckPort: "880", @@ -277,6 +278,9 @@ func TestProviderBuildConfiguration(t *testing.T) { CircuitBreaker: &types.CircuitBreaker{ Expression: "NetworkErrorRatio() > 0.5", }, + ResponseForwarding: &types.ResponseForwarding{ + FlushInterval: "10ms", + }, LoadBalancer: &types.LoadBalancer{ Method: "drr", Sticky: true, diff --git a/server/server_configuration.go b/server/server_configuration.go index 73cc07eab..3dbc7f08e 100644 --- a/server/server_configuration.go +++ b/server/server_configuration.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/containous/flaeg/parse" "github.com/containous/mux" "github.com/containous/traefik/configuration" "github.com/containous/traefik/healthcheck" @@ -163,7 +164,7 @@ func (s *Server) loadFrontendConfig( postConfigs = append(postConfigs, postConfig) } - fwd, err := s.buildForwarder(entryPointName, entryPoint, frontendName, frontend, responseModifier) + fwd, err := s.buildForwarder(entryPointName, entryPoint, frontendName, frontend, responseModifier, backend) if err != nil { return nil, fmt.Errorf("failed to create the forwarder for frontend %s: %v", frontendName, err) } @@ -216,7 +217,7 @@ func (s *Server) loadFrontendConfig( func (s *Server) buildForwarder(entryPointName string, entryPoint *configuration.EntryPoint, frontendName string, frontend *types.Frontend, - responseModifier modifyResponse) (http.Handler, error) { + responseModifier modifyResponse, backend *types.Backend) (http.Handler, error) { roundTripper, err := s.getRoundTripper(entryPointName, frontend.PassTLSCert, entryPoint.TLS) if err != nil { @@ -228,6 +229,14 @@ func (s *Server) buildForwarder(entryPointName string, entryPoint *configuration return nil, fmt.Errorf("error creating rewriter for frontend %s: %v", frontendName, err) } + var flushInterval parse.Duration + if backend.ResponseForwarding != nil { + err := flushInterval.Set(backend.ResponseForwarding.FlushInterval) + if err != nil { + return nil, fmt.Errorf("error creating flush interval for frontend %s: %v", frontendName, err) + } + } + var fwd http.Handler fwd, err = forward.New( forward.Stream(true), @@ -236,6 +245,7 @@ func (s *Server) buildForwarder(entryPointName string, entryPoint *configuration forward.Rewriter(rewriter), forward.ResponseModifier(responseModifier), forward.BufferPool(s.bufferPool), + forward.StreamingFlushInterval(time.Duration(flushInterval)), forward.WebsocketConnectionClosedHook(func(req *http.Request, conn net.Conn) { server := req.Context().Value(http.ServerContextKey).(*http.Server) if server != nil { diff --git a/templates/consul_catalog.tmpl b/templates/consul_catalog.tmpl index 2004bc6a0..d04fbe46b 100644 --- a/templates/consul_catalog.tmpl +++ b/templates/consul_catalog.tmpl @@ -8,6 +8,14 @@ expression = "{{ $circuitBreaker.Expression }}" {{end}} + {{ $responseForwarding := getResponseForwarding $service.TraefikLabels }} + {{if $responseForwarding }} + [backends."backend-{{ $backendName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.FlushInterval }}" + {{end}} + + + {{ $loadBalancer := getLoadBalancer $service.TraefikLabels }} {{if $loadBalancer }} [backends."backend-{{ $backendName }}".loadBalancer] diff --git a/templates/docker.tmpl b/templates/docker.tmpl index a26345745..bea2990b0 100644 --- a/templates/docker.tmpl +++ b/templates/docker.tmpl @@ -9,6 +9,12 @@ expression = "{{ $circuitBreaker.Expression }}" {{end}} + {{ $responseForwarding := getResponseForwarding $backend.SegmentLabels }} + {{if $responseForwarding }} + [backends."backend-{{ $backendName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.FlushInterval }}" + {{end}} + {{ $loadBalancer := getLoadBalancer $backend.SegmentLabels }} {{if $loadBalancer }} [backends."backend-{{ $backendName }}".loadBalancer] diff --git a/templates/ecs.tmpl b/templates/ecs.tmpl index b80e3afef..6dd57231b 100644 --- a/templates/ecs.tmpl +++ b/templates/ecs.tmpl @@ -8,6 +8,12 @@ expression = "{{ $circuitBreaker.Expression }}" {{end}} + {{ $responseForwarding := getResponseForwarding $firstInstance.SegmentLabels }} + {{if $responseForwarding }} + [backends."backend-{{ $serviceName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.FlushInterval }}" + {{end}} + {{ $loadBalancer := getLoadBalancer $firstInstance.SegmentLabels }} {{if $loadBalancer }} [backends."backend-{{ $serviceName }}".loadBalancer] diff --git a/templates/kubernetes.tmpl b/templates/kubernetes.tmpl index c522ffae5..fef76bc12 100644 --- a/templates/kubernetes.tmpl +++ b/templates/kubernetes.tmpl @@ -8,6 +8,11 @@ expression = "{{ $backend.CircuitBreaker.Expression }}" {{end}} + {{if $backend.ResponseForwarding }} + [backends."{{ $backendName }}".responseForwarding] + flushInterval = "{{ $backend.responseForwarding.FlushInterval }}" + {{end}} + [backends."{{ $backendName }}".loadBalancer] method = "{{ $backend.LoadBalancer.Method }}" sticky = {{ $backend.LoadBalancer.Sticky }} diff --git a/templates/kv.tmpl b/templates/kv.tmpl index 73842e1d2..15062243d 100644 --- a/templates/kv.tmpl +++ b/templates/kv.tmpl @@ -7,6 +7,12 @@ [backends."{{ $backendName }}".circuitBreaker] expression = "{{ $circuitBreaker.Expression }}" {{end}} + + {{ $responseForwarding := getResponseForwarding $backend }} + {{if $responseForwarding }} + [backends."{{ $backendName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.flushInterval }}" + {{end}} {{ $loadBalancer := getLoadBalancer $backend }} {{if $loadBalancer }} diff --git a/templates/marathon.tmpl b/templates/marathon.tmpl index 15c5ce285..7a5038e44 100644 --- a/templates/marathon.tmpl +++ b/templates/marathon.tmpl @@ -10,6 +10,12 @@ [backends."{{ $backendName }}".circuitBreaker] expression = "{{ $circuitBreaker.Expression }}" {{end}} + + {{ $responseForwarding := getResponseForwarding $app.SegmentLabels }} + {{if $responseForwarding }} + [backends."{{ $backendName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.FlushInterval }}" + {{end}} {{ $loadBalancer := getLoadBalancer $app.SegmentLabels }} {{if $loadBalancer }} diff --git a/templates/mesos.tmpl b/templates/mesos.tmpl index 51f30308c..a77ba862d 100644 --- a/templates/mesos.tmpl +++ b/templates/mesos.tmpl @@ -11,6 +11,12 @@ expression = "{{ $circuitBreaker.Expression }}" {{end}} + {{ $responseForwarding := getResponseForwarding $app.TraefikLabels }} + {{if $responseForwarding }} + [backends."backend-{{ $backendName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.FlushInterval }}" + {{end}} + {{ $loadBalancer := getLoadBalancer $app.TraefikLabels }} {{if $loadBalancer }} [backends."backend-{{ $backendName }}".loadBalancer] diff --git a/templates/rancher.tmpl b/templates/rancher.tmpl index eb4976870..197761c1d 100644 --- a/templates/rancher.tmpl +++ b/templates/rancher.tmpl @@ -10,6 +10,12 @@ expression = "{{ $circuitBreaker.Expression }}" {{end}} + {{ $responseForwarding := getResponseForwarding $backend.SegmentLabels }} + {{if $responseForwarding }} + [backends."backend-{{ $backendName }}".responseForwarding] + flushInterval = "{{ $responseForwarding.FlushInterval }}" + {{end}} + {{ $loadBalancer := getLoadBalancer $backend.SegmentLabels }} {{if $loadBalancer }} [backends."backend-{{ $backendName }}".loadBalancer] diff --git a/types/types.go b/types/types.go index 99393afb5..52844053f 100644 --- a/types/types.go +++ b/types/types.go @@ -22,12 +22,18 @@ import ( // Backend holds backend configuration. type Backend struct { - Servers map[string]Server `json:"servers,omitempty"` - CircuitBreaker *CircuitBreaker `json:"circuitBreaker,omitempty"` - LoadBalancer *LoadBalancer `json:"loadBalancer,omitempty"` - MaxConn *MaxConn `json:"maxConn,omitempty"` - HealthCheck *HealthCheck `json:"healthCheck,omitempty"` - Buffering *Buffering `json:"buffering,omitempty"` + Servers map[string]Server `json:"servers,omitempty"` + CircuitBreaker *CircuitBreaker `json:"circuitBreaker,omitempty"` + LoadBalancer *LoadBalancer `json:"loadBalancer,omitempty"` + MaxConn *MaxConn `json:"maxConn,omitempty"` + HealthCheck *HealthCheck `json:"healthCheck,omitempty"` + Buffering *Buffering `json:"buffering,omitempty"` + ResponseForwarding *ResponseForwarding `json:"forwardingResponse,omitempty"` +} + +// ResponseForwarding holds configuration for the forward of the response +type ResponseForwarding struct { + FlushInterval string `json:"flushInterval,omitempty"` } // MaxConn holds maximum connection configuration