diff --git a/integration/fixtures/retry/simple.toml b/integration/fixtures/retry/simple.toml new file mode 100644 index 000000000..571f0ecb9 --- /dev/null +++ b/integration/fixtures/retry/simple.toml @@ -0,0 +1,25 @@ +defaultEntryPoints = ["http"] + +logLevel = "DEBUG" + +[entryPoints] + [entryPoints.http] + address = ":8000" + +[api] + +[retry] + +[file] +[backends] + [backends.backend1] + [backends.backend1.servers.server1] + url = "http://{{.WhoamiEndpoint}}:8080" # not valid + [backends.backend1.servers.server2] + url = "http://{{.WhoamiEndpoint}}:80" + +[frontends] + [frontends.frontend1] + backend = "backend1" + [frontends.frontend1.routes.test_1] + rule = "PathPrefix:/" diff --git a/integration/integration_test.go b/integration/integration_test.go index cf5ed5329..2e147c934 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -54,6 +54,7 @@ func init() { check.Suite(&MarathonSuite{}) check.Suite(&MesosSuite{}) check.Suite(&RateLimitSuite{}) + check.Suite(&RetrySuite{}) check.Suite(&SimpleSuite{}) check.Suite(&TimeoutSuite{}) check.Suite(&TracingSuite{}) diff --git a/integration/resources/compose/retry.yml b/integration/resources/compose/retry.yml new file mode 100644 index 000000000..88f530b86 --- /dev/null +++ b/integration/resources/compose/retry.yml @@ -0,0 +1,2 @@ +whoami: + image: emilevauge/whoami diff --git a/integration/retry_test.go b/integration/retry_test.go new file mode 100644 index 000000000..1def53435 --- /dev/null +++ b/integration/retry_test.go @@ -0,0 +1,40 @@ +package integration + +import ( + "net/http" + "os" + "time" + + "github.com/containous/traefik/integration/try" + "github.com/go-check/check" + checker "github.com/vdemeester/shakers" +) + +type RetrySuite struct{ BaseSuite } + +func (s *RetrySuite) SetUpSuite(c *check.C) { + s.createComposeProject(c, "retry") + s.composeProject.Start(c) +} + +func (s *RetrySuite) TestRetry(c *check.C) { + whoamiEndpoint := s.composeProject.Container(c, "whoami").NetworkSettings.IPAddress + file := s.adaptFile(c, "fixtures/retry/simple.toml", struct { + WhoamiEndpoint string + }{whoamiEndpoint}) + defer os.Remove(file) + + cmd, display := s.traefikCmd(withConfigFile(file)) + defer display(c) + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer cmd.Process.Kill() + + err = try.GetRequest("http://127.0.0.1:8080/api/providers", 60*time.Second, try.BodyContains("PathPrefix:/")) + c.Assert(err, checker.IsNil) + + // This simulates a DialTimeout when connecting to the backend server. + response, err := http.Get("http://127.0.0.1:8000/") + c.Assert(err, checker.IsNil) + c.Assert(response.StatusCode, checker.Equals, http.StatusOK) +} diff --git a/middlewares/error_pages.go b/middlewares/error_pages.go index 9aba4c343..0b6a79fbd 100644 --- a/middlewares/error_pages.go +++ b/middlewares/error_pages.go @@ -1,6 +1,9 @@ package middlewares import ( + "bufio" + "bytes" + "net" "net/http" "strconv" "strings" @@ -11,6 +14,9 @@ import ( "github.com/vulcand/oxy/utils" ) +// Compile time validation that the response recorder implements http interfaces correctly. +var _ Stateful = &errorPagesResponseRecorderWithCloseNotify{} + //ErrorPagesHandler is a middleware that provides the custom error pages type ErrorPagesHandler struct { HTTPCodeRanges [][2]int @@ -52,7 +58,7 @@ func NewErrorPagesHandler(errorPage *types.ErrorPage, backendURL string) (*Error } func (ep *ErrorPagesHandler) ServeHTTP(w http.ResponseWriter, req *http.Request, next http.HandlerFunc) { - recorder := newRetryResponseRecorder(w) + recorder := newErrorPagesResponseRecorder(w) next.ServeHTTP(recorder, req) @@ -75,3 +81,108 @@ func (ep *ErrorPagesHandler) ServeHTTP(w http.ResponseWriter, req *http.Request, utils.CopyHeaders(w.Header(), recorder.Header()) w.Write(recorder.GetBody().Bytes()) } + +type errorPagesResponseRecorder interface { + http.ResponseWriter + http.Flusher + GetCode() int + GetBody() *bytes.Buffer + IsStreamingResponseStarted() bool +} + +// newErrorPagesResponseRecorder returns an initialized responseRecorder. +func newErrorPagesResponseRecorder(rw http.ResponseWriter) errorPagesResponseRecorder { + recorder := &errorPagesResponseRecorderWithoutCloseNotify{ + HeaderMap: make(http.Header), + Body: new(bytes.Buffer), + Code: http.StatusOK, + responseWriter: rw, + } + if _, ok := rw.(http.CloseNotifier); ok { + return &errorPagesResponseRecorderWithCloseNotify{recorder} + } + return recorder +} + +// errorPagesResponseRecorderWithoutCloseNotify is an implementation of http.ResponseWriter that +// records its mutations for later inspection. +type errorPagesResponseRecorderWithoutCloseNotify struct { + Code int // the HTTP response code from WriteHeader + HeaderMap http.Header // the HTTP response headers + Body *bytes.Buffer // if non-nil, the bytes.Buffer to append written data to + + responseWriter http.ResponseWriter + err error + streamingResponseStarted bool +} + +type errorPagesResponseRecorderWithCloseNotify struct { + *errorPagesResponseRecorderWithoutCloseNotify +} + +// CloseNotify returns a channel that receives at most a +// single value (true) when the client connection has gone +// away. +func (rw *errorPagesResponseRecorderWithCloseNotify) CloseNotify() <-chan bool { + return rw.responseWriter.(http.CloseNotifier).CloseNotify() +} + +// Header returns the response headers. +func (rw *errorPagesResponseRecorderWithoutCloseNotify) Header() http.Header { + m := rw.HeaderMap + if m == nil { + m = make(http.Header) + rw.HeaderMap = m + } + return m +} + +func (rw *errorPagesResponseRecorderWithoutCloseNotify) GetCode() int { + return rw.Code +} + +func (rw *errorPagesResponseRecorderWithoutCloseNotify) GetBody() *bytes.Buffer { + return rw.Body +} + +func (rw *errorPagesResponseRecorderWithoutCloseNotify) IsStreamingResponseStarted() bool { + return rw.streamingResponseStarted +} + +// Write always succeeds and writes to rw.Body, if not nil. +func (rw *errorPagesResponseRecorderWithoutCloseNotify) Write(buf []byte) (int, error) { + if rw.err != nil { + return 0, rw.err + } + return rw.Body.Write(buf) +} + +// WriteHeader sets rw.Code. +func (rw *errorPagesResponseRecorderWithoutCloseNotify) WriteHeader(code int) { + rw.Code = code +} + +// Hijack hijacks the connection +func (rw *errorPagesResponseRecorderWithoutCloseNotify) Hijack() (net.Conn, *bufio.ReadWriter, error) { + return rw.responseWriter.(http.Hijacker).Hijack() +} + +// Flush sends any buffered data to the client. +func (rw *errorPagesResponseRecorderWithoutCloseNotify) Flush() { + if !rw.streamingResponseStarted { + utils.CopyHeaders(rw.responseWriter.Header(), rw.Header()) + rw.responseWriter.WriteHeader(rw.Code) + rw.streamingResponseStarted = true + } + + _, err := rw.responseWriter.Write(rw.Body.Bytes()) + if err != nil { + log.Errorf("Error writing response in responseRecorder: %s", err) + rw.err = err + } + rw.Body.Reset() + flusher, ok := rw.responseWriter.(http.Flusher) + if ok { + flusher.Flush() + } +} diff --git a/middlewares/error_pages_test.go b/middlewares/error_pages_test.go index c90093baf..0c6c25b74 100644 --- a/middlewares/error_pages_test.go +++ b/middlewares/error_pages_test.go @@ -152,3 +152,51 @@ func TestErrorPageSingleCode(t *testing.T) { assert.Contains(t, recorder.Body.String(), "503 Test Server") assert.NotContains(t, recorder.Body.String(), "oops", "Should not return the oops page") } + +func TestNewErrorPagesResponseRecorder(t *testing.T) { + testCases := []struct { + desc string + rw http.ResponseWriter + expected http.ResponseWriter + }{ + { + desc: "Without Close Notify", + rw: httptest.NewRecorder(), + expected: &errorPagesResponseRecorderWithoutCloseNotify{}, + }, + { + desc: "With Close Notify", + rw: &mockRWCloseNotify{}, + expected: &errorPagesResponseRecorderWithCloseNotify{}, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + + rec := newErrorPagesResponseRecorder(test.rw) + + assert.IsType(t, rec, test.expected) + }) + } +} + +type mockRWCloseNotify struct{} + +func (m *mockRWCloseNotify) CloseNotify() <-chan bool { + panic("implement me") +} + +func (m *mockRWCloseNotify) Header() http.Header { + panic("implement me") +} + +func (m *mockRWCloseNotify) Write([]byte) (int, error) { + panic("implement me") +} + +func (m *mockRWCloseNotify) WriteHeader(int) { + panic("implement me") +} diff --git a/middlewares/retry.go b/middlewares/retry.go index 9b57ed0a6..c1c6b26d5 100644 --- a/middlewares/retry.go +++ b/middlewares/retry.go @@ -2,20 +2,16 @@ package middlewares import ( "bufio" - "bytes" "context" "io/ioutil" "net" "net/http" "github.com/containous/traefik/log" - "github.com/vulcand/oxy/utils" ) -// Compile time validation responseRecorder implements http interfaces correctly. -var ( - _ Stateful = &retryResponseRecorderWithCloseNotify{} -) +// Compile time validation that the response writer implements http interfaces correctly. +var _ Stateful = &retryResponseWriterWithCloseNotify{} // Retry is a middleware that retries requests type Retry struct { @@ -41,30 +37,20 @@ func (retry *Retry) ServeHTTP(rw http.ResponseWriter, r *http.Request) { defer body.Close() r.Body = ioutil.NopCloser(body) } + attempts := 1 for { netErrorOccurred := false // We pass in a pointer to netErrorOccurred so that we can set it to true on network errors // when proxying the HTTP requests to the backends. This happens in the custom RecordingErrorHandler. newCtx := context.WithValue(r.Context(), defaultNetErrCtxKey, &netErrorOccurred) + retryResponseWriter := newRetryResponseWriter(rw, attempts >= retry.attempts, &netErrorOccurred) - recorder := newRetryResponseRecorder(rw) - - retry.next.ServeHTTP(recorder, r.WithContext(newCtx)) - - // It's a stream request and the body gets already sent to the client. - // Therefore we should not send the response a second time. - if recorder.IsStreamingResponseStarted() { - recorder.Flush() + retry.next.ServeHTTP(retryResponseWriter, r.WithContext(newCtx)) + if !retryResponseWriter.ShouldRetry() { break } - if !netErrorOccurred || attempts >= retry.attempts { - utils.CopyHeaders(rw.Header(), recorder.Header()) - rw.WriteHeader(recorder.GetCode()) - rw.Write(recorder.GetBody().Bytes()) - break - } attempts++ log.Debugf("New attempt %d for request: %v", attempts, r.URL) retry.listener.Retried(r, attempts) @@ -114,107 +100,69 @@ func (l RetryListeners) Retried(req *http.Request, attempt int) { } } -type retryResponseRecorder interface { +type retryResponseWriter interface { http.ResponseWriter http.Flusher - GetCode() int - GetBody() *bytes.Buffer - IsStreamingResponseStarted() bool + ShouldRetry() bool } -// newRetryResponseRecorder returns an initialized retryResponseRecorder. -func newRetryResponseRecorder(rw http.ResponseWriter) retryResponseRecorder { - recorder := &retryResponseRecorderWithoutCloseNotify{ - HeaderMap: make(http.Header), - Body: new(bytes.Buffer), - Code: http.StatusOK, - responseWriter: rw, +func newRetryResponseWriter(rw http.ResponseWriter, attemptsExhausted bool, netErrorOccured *bool) retryResponseWriter { + responseWriter := &retryResponseWriterWithoutCloseNotify{ + responseWriter: rw, + attemptsExhausted: attemptsExhausted, + netErrorOccured: netErrorOccured, } if _, ok := rw.(http.CloseNotifier); ok { - return &retryResponseRecorderWithCloseNotify{recorder} + return &retryResponseWriterWithCloseNotify{responseWriter} } - return recorder + return responseWriter } -// retryResponseRecorderWithoutCloseNotify is an implementation of http.ResponseWriter that -// records its mutations for later inspection. -type retryResponseRecorderWithoutCloseNotify struct { - Code int // the HTTP response code from WriteHeader - HeaderMap http.Header // the HTTP response headers - Body *bytes.Buffer // if non-nil, the bytes.Buffer to append written data to - - responseWriter http.ResponseWriter - err error - streamingResponseStarted bool +type retryResponseWriterWithoutCloseNotify struct { + responseWriter http.ResponseWriter + attemptsExhausted bool + netErrorOccured *bool } -type retryResponseRecorderWithCloseNotify struct { - *retryResponseRecorderWithoutCloseNotify +func (rr *retryResponseWriterWithoutCloseNotify) ShouldRetry() bool { + return *rr.netErrorOccured == true && !rr.attemptsExhausted } -// CloseNotify returns a channel that receives at most a -// single value (true) when the client connection has gone -// away. -func (rw *retryResponseRecorderWithCloseNotify) CloseNotify() <-chan bool { - return rw.responseWriter.(http.CloseNotifier).CloseNotify() -} - -// Header returns the response headers. -func (rw *retryResponseRecorderWithoutCloseNotify) Header() http.Header { - m := rw.HeaderMap - if m == nil { - m = make(http.Header) - rw.HeaderMap = m +func (rr *retryResponseWriterWithoutCloseNotify) Header() http.Header { + if rr.ShouldRetry() { + return make(http.Header) } - return m + return rr.responseWriter.Header() } -func (rw *retryResponseRecorderWithoutCloseNotify) GetCode() int { - return rw.Code -} - -func (rw *retryResponseRecorderWithoutCloseNotify) GetBody() *bytes.Buffer { - return rw.Body -} - -func (rw *retryResponseRecorderWithoutCloseNotify) IsStreamingResponseStarted() bool { - return rw.streamingResponseStarted -} - -// Write always succeeds and writes to rw.Body, if not nil. -func (rw *retryResponseRecorderWithoutCloseNotify) Write(buf []byte) (int, error) { - if rw.err != nil { - return 0, rw.err +func (rr *retryResponseWriterWithoutCloseNotify) Write(buf []byte) (int, error) { + if rr.ShouldRetry() { + return 0, nil } - return rw.Body.Write(buf) + return rr.responseWriter.Write(buf) } -// WriteHeader sets rw.Code. -func (rw *retryResponseRecorderWithoutCloseNotify) WriteHeader(code int) { - rw.Code = code -} - -// Hijack hijacks the connection -func (rw *retryResponseRecorderWithoutCloseNotify) Hijack() (net.Conn, *bufio.ReadWriter, error) { - return rw.responseWriter.(http.Hijacker).Hijack() -} - -// Flush sends any buffered data to the client. -func (rw *retryResponseRecorderWithoutCloseNotify) Flush() { - if !rw.streamingResponseStarted { - utils.CopyHeaders(rw.responseWriter.Header(), rw.Header()) - rw.responseWriter.WriteHeader(rw.Code) - rw.streamingResponseStarted = true +func (rr *retryResponseWriterWithoutCloseNotify) WriteHeader(code int) { + if rr.ShouldRetry() { + return } + rr.responseWriter.WriteHeader(code) +} - _, err := rw.responseWriter.Write(rw.Body.Bytes()) - if err != nil { - log.Errorf("Error writing response in retryResponseRecorder: %s", err) - rw.err = err - } - rw.Body.Reset() - flusher, ok := rw.responseWriter.(http.Flusher) - if ok { +func (rr *retryResponseWriterWithoutCloseNotify) Hijack() (net.Conn, *bufio.ReadWriter, error) { + return rr.responseWriter.(http.Hijacker).Hijack() +} + +func (rr *retryResponseWriterWithoutCloseNotify) Flush() { + if flusher, ok := rr.responseWriter.(http.Flusher); ok { flusher.Flush() } } + +type retryResponseWriterWithCloseNotify struct { + *retryResponseWriterWithoutCloseNotify +} + +func (rr *retryResponseWriterWithCloseNotify) CloseNotify() <-chan bool { + return rr.responseWriter.(http.CloseNotifier).CloseNotify() +} diff --git a/middlewares/retry_test.go b/middlewares/retry_test.go index a8ab483fd..4f74efba7 100644 --- a/middlewares/retry_test.go +++ b/middlewares/retry_test.go @@ -7,8 +7,6 @@ import ( "net/http" "net/http/httptest" "testing" - - "github.com/stretchr/testify/assert" ) func TestRetry(t *testing.T) { @@ -154,51 +152,3 @@ func TestRetryWithFlush(t *testing.T) { t.Errorf("Wrong body %q want %q", responseRecorder.Body.String(), "FULL DATA") } } - -func TestNewRetryResponseRecorder(t *testing.T) { - testCases := []struct { - desc string - rw http.ResponseWriter - expected http.ResponseWriter - }{ - { - desc: "Without Close Notify", - rw: httptest.NewRecorder(), - expected: &retryResponseRecorderWithoutCloseNotify{}, - }, - { - desc: "With Close Notify", - rw: &mockRWCloseNotify{}, - expected: &retryResponseRecorderWithCloseNotify{}, - }, - } - - for _, test := range testCases { - test := test - t.Run(test.desc, func(t *testing.T) { - t.Parallel() - - rec := newRetryResponseRecorder(test.rw) - - assert.IsType(t, rec, test.expected) - }) - } -} - -type mockRWCloseNotify struct{} - -func (m *mockRWCloseNotify) CloseNotify() <-chan bool { - panic("implement me") -} - -func (m *mockRWCloseNotify) Header() http.Header { - panic("implement me") -} - -func (m *mockRWCloseNotify) Write([]byte) (int, error) { - panic("implement me") -} - -func (m *mockRWCloseNotify) WriteHeader(int) { - panic("implement me") -}