From 23870105565460aa70d11c7bc87e0108bfc8466e Mon Sep 17 00:00:00 2001 From: SALLEYRON Julien Date: Tue, 10 Apr 2018 17:24:04 +0200 Subject: [PATCH] Disable closeNotify when method GET for http pipelining --- Gopkg.lock | 2 +- .../github.com/vulcand/oxy/buffer/buffer.go | 7 +-- .../vulcand/oxy/cbreaker/cbreaker.go | 4 +- vendor/github.com/vulcand/oxy/forward/fwd.go | 19 +++++-- .../vulcand/oxy/roundrobin/rebalancer.go | 4 +- .../github.com/vulcand/oxy/roundrobin/rr.go | 50 +++++++++-------- .../github.com/vulcand/oxy/utils/netutils.go | 53 +++++++++++++++---- 7 files changed, 94 insertions(+), 45 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 1781467f7..443c24d8d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1215,7 +1215,7 @@ "roundrobin", "utils" ] - revision = "dacf34285ce530b272e9fe04d2f45f52e6374e36" + revision = "6956548a7fa4272adeadf828455109c53933ea86" [[projects]] name = "github.com/vulcand/predicate" diff --git a/vendor/github.com/vulcand/oxy/buffer/buffer.go b/vendor/github.com/vulcand/oxy/buffer/buffer.go index 735443d9f..6224fb8c7 100644 --- a/vendor/github.com/vulcand/oxy/buffer/buffer.go +++ b/vendor/github.com/vulcand/oxy/buffer/buffer.go @@ -349,9 +349,10 @@ func (b *bufferWriter) expectBody(r *http.Request) bool { if (b.code >= 100 && b.code < 200) || b.code == 204 || b.code == 304 { return false } - if b.header.Get("Content-Length") == "" && b.header.Get("Transfer-Encoding") == "" { - return false - } + // refer to https://github.com/vulcand/oxy/issues/113 + // if b.header.Get("Content-Length") == "" && b.header.Get("Transfer-Encoding") == "" { + // return false + // } if b.header.Get("Content-Length") == "0" { return false } diff --git a/vendor/github.com/vulcand/oxy/cbreaker/cbreaker.go b/vendor/github.com/vulcand/oxy/cbreaker/cbreaker.go index bdf1e9b58..e97f69d56 100644 --- a/vendor/github.com/vulcand/oxy/cbreaker/cbreaker.go +++ b/vendor/github.com/vulcand/oxy/cbreaker/cbreaker.go @@ -156,12 +156,12 @@ func (c *CircuitBreaker) activateFallback(w http.ResponseWriter, req *http.Reque func (c *CircuitBreaker) serve(w http.ResponseWriter, req *http.Request) { start := c.clock.UtcNow() - p := &utils.ProxyWriter{W: w} + p := utils.NewSimpleProxyWriter(w) c.next.ServeHTTP(p, req) latency := c.clock.UtcNow().Sub(start) - c.metrics.Record(p.Code, latency) + c.metrics.Record(p.StatusCode(), latency) // Note that this call is less expensive than it looks -- checkCondition only performs the real check // periodically. Because of that we can afford to call it here on every single response. diff --git a/vendor/github.com/vulcand/oxy/forward/fwd.go b/vendor/github.com/vulcand/oxy/forward/fwd.go index 789909e51..cf8f6eaef 100644 --- a/vendor/github.com/vulcand/oxy/forward/fwd.go +++ b/vendor/github.com/vulcand/oxy/forward/fwd.go @@ -221,7 +221,9 @@ func New(setters ...optSetter) (*Forwarder, error) { } if f.tlsClientConfig == nil { - f.tlsClientConfig = f.httpForwarder.roundTripper.(*http.Transport).TLSClientConfig + if ht, ok := f.httpForwarder.roundTripper.(*http.Transport); ok { + f.tlsClientConfig = ht.TLSClientConfig + } } f.httpForwarder.roundTripper = ErrorHandlingRoundTripper{ @@ -444,9 +446,16 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, inReq *http.Request, ct defer logEntry.Debug("vulcand/oxy/forward/http: completed ServeHttp on request") } - pw := &utils.ProxyWriter{ - W: w, + var pw utils.ProxyWriter + + // Disable closeNotify when method GET for http pipelining + // Waiting for https://github.com/golang/go/issues/23921 + if inReq.Method == http.MethodGet { + pw = utils.NewProxyWriterWithoutCloseNotify(w) + } else { + pw = utils.NewSimpleProxyWriter(w) } + start := time.Now().UTC() outReq := new(http.Request) @@ -464,14 +473,14 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, inReq *http.Request, ct if inReq.TLS != nil { f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v tls:version: %x, tls:resume:%t, tls:csuite:%x, tls:server:%v", - inReq.URL, pw.Code, pw.Length, time.Now().UTC().Sub(start), + inReq.URL, pw.StatusCode(), pw.GetLength(), time.Now().UTC().Sub(start), inReq.TLS.Version, inReq.TLS.DidResume, inReq.TLS.CipherSuite, inReq.TLS.ServerName) } else { f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v", - inReq.URL, pw.Code, pw.Length, time.Now().UTC().Sub(start)) + inReq.URL, pw.StatusCode(), pw.GetLength(), time.Now().UTC().Sub(start)) } } diff --git a/vendor/github.com/vulcand/oxy/roundrobin/rebalancer.go b/vendor/github.com/vulcand/oxy/roundrobin/rebalancer.go index ac170b797..9e574e8d8 100644 --- a/vendor/github.com/vulcand/oxy/roundrobin/rebalancer.go +++ b/vendor/github.com/vulcand/oxy/roundrobin/rebalancer.go @@ -148,7 +148,7 @@ func (rb *Rebalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) { defer logEntry.Debug("vulcand/oxy/roundrobin/rebalancer: competed ServeHttp on request") } - pw := &utils.ProxyWriter{W: w} + pw := utils.NewSimpleProxyWriter(w) start := rb.clock.UtcNow() // make shallow copy of request before changing anything to avoid side effects @@ -194,7 +194,7 @@ func (rb *Rebalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) { rb.next.Next().ServeHTTP(pw, &newReq) - rb.recordMetrics(newReq.URL, pw.Code, rb.clock.UtcNow().Sub(start)) + rb.recordMetrics(newReq.URL, pw.StatusCode(), rb.clock.UtcNow().Sub(start)) rb.adjustWeights() } diff --git a/vendor/github.com/vulcand/oxy/roundrobin/rr.go b/vendor/github.com/vulcand/oxy/roundrobin/rr.go index 91c3692e8..137da6272 100644 --- a/vendor/github.com/vulcand/oxy/roundrobin/rr.go +++ b/vendor/github.com/vulcand/oxy/roundrobin/rr.go @@ -185,43 +185,43 @@ func (r *RoundRobin) RemoveServer(u *url.URL) error { return nil } -func (rr *RoundRobin) Servers() []*url.URL { - rr.mutex.Lock() - defer rr.mutex.Unlock() +func (r *RoundRobin) Servers() []*url.URL { + r.mutex.Lock() + defer r.mutex.Unlock() - out := make([]*url.URL, len(rr.servers)) - for i, srv := range rr.servers { + out := make([]*url.URL, len(r.servers)) + for i, srv := range r.servers { out[i] = srv.url } return out } -func (rr *RoundRobin) ServerWeight(u *url.URL) (int, bool) { - rr.mutex.Lock() - defer rr.mutex.Unlock() +func (r *RoundRobin) ServerWeight(u *url.URL) (int, bool) { + r.mutex.Lock() + defer r.mutex.Unlock() - if s, _ := rr.findServerByURL(u); s != nil { + if s, _ := r.findServerByURL(u); s != nil { return s.weight, true } return -1, false } // In case if server is already present in the load balancer, returns error -func (rr *RoundRobin) UpsertServer(u *url.URL, options ...ServerOption) error { - rr.mutex.Lock() - defer rr.mutex.Unlock() +func (r *RoundRobin) UpsertServer(u *url.URL, options ...ServerOption) error { + r.mutex.Lock() + defer r.mutex.Unlock() if u == nil { return fmt.Errorf("server URL can't be nil") } - if s, _ := rr.findServerByURL(u); s != nil { + if s, _ := r.findServerByURL(u); s != nil { for _, o := range options { if err := o(s); err != nil { return err } } - rr.resetState() + r.resetState() return nil } @@ -236,8 +236,8 @@ func (rr *RoundRobin) UpsertServer(u *url.URL, options ...ServerOption) error { srv.weight = defaultWeight } - rr.servers = append(rr.servers, srv) - rr.resetState() + r.servers = append(r.servers, srv) + r.resetState() return nil } @@ -262,9 +262,9 @@ func (r *RoundRobin) findServerByURL(u *url.URL) (*server, int) { return nil, -1 } -func (rr *RoundRobin) maxWeight() int { +func (r *RoundRobin) maxWeight() int { max := -1 - for _, s := range rr.servers { + for _, s := range r.servers { if s.weight > max { max = s.weight } @@ -272,9 +272,9 @@ func (rr *RoundRobin) maxWeight() int { return max } -func (rr *RoundRobin) weightGcd() int { +func (r *RoundRobin) weightGcd() int { divisor := -1 - for _, s := range rr.servers { + for _, s := range r.servers { if divisor == -1 { divisor = s.weight } else { @@ -304,7 +304,15 @@ type server struct { weight int } -const defaultWeight = 1 +var defaultWeight = 1 + +func SetDefaultWeight(weight int) error { + if weight < 0 { + return fmt.Errorf("default weight should be >= 0") + } + defaultWeight = weight + return nil +} func sameURL(a, b *url.URL) bool { return a.Path == b.Path && a.Host == b.Host && a.Scheme == b.Scheme diff --git a/vendor/github.com/vulcand/oxy/utils/netutils.go b/vendor/github.com/vulcand/oxy/utils/netutils.go index afdfbcfaf..e6e6eb6a4 100644 --- a/vendor/github.com/vulcand/oxy/utils/netutils.go +++ b/vendor/github.com/vulcand/oxy/utils/netutils.go @@ -12,16 +12,43 @@ import ( log "github.com/sirupsen/logrus" ) -// ProxyWriter helps to capture response headers and status code +type ProxyWriter interface { + http.ResponseWriter + GetLength() int64 + StatusCode() int + GetWriter() http.ResponseWriter +} + +// ProxyWriterWithoutCloseNotify helps to capture response headers and status code // from the ServeHTTP. It can be safely passed to ServeHTTP handler, // wrapping the real response writer. -type ProxyWriter struct { +type ProxyWriterWithoutCloseNotify struct { W http.ResponseWriter Code int Length int64 } -func (p *ProxyWriter) StatusCode() int { +func NewProxyWriterWithoutCloseNotify(writer http.ResponseWriter) *ProxyWriterWithoutCloseNotify { + return &ProxyWriterWithoutCloseNotify{ + W: writer, + } +} + +func NewSimpleProxyWriter(writer http.ResponseWriter) *SimpleProxyWriter { + return &SimpleProxyWriter{ + ProxyWriterWithoutCloseNotify: NewProxyWriterWithoutCloseNotify(writer), + } +} + +type SimpleProxyWriter struct { + *ProxyWriterWithoutCloseNotify +} + +func (p *ProxyWriterWithoutCloseNotify) GetWriter() http.ResponseWriter { + return p.W +} + +func (p *ProxyWriterWithoutCloseNotify) StatusCode() int { if p.Code == 0 { // per contract standard lib will set this to http.StatusOK if not set // by user, here we avoid the confusion by mirroring this logic @@ -30,35 +57,39 @@ func (p *ProxyWriter) StatusCode() int { return p.Code } -func (p *ProxyWriter) Header() http.Header { +func (p *ProxyWriterWithoutCloseNotify) Header() http.Header { return p.W.Header() } -func (p *ProxyWriter) Write(buf []byte) (int, error) { +func (p *ProxyWriterWithoutCloseNotify) Write(buf []byte) (int, error) { p.Length = p.Length + int64(len(buf)) return p.W.Write(buf) } -func (p *ProxyWriter) WriteHeader(code int) { +func (p *ProxyWriterWithoutCloseNotify) WriteHeader(code int) { p.Code = code p.W.WriteHeader(code) } -func (p *ProxyWriter) Flush() { +func (p *ProxyWriterWithoutCloseNotify) Flush() { if f, ok := p.W.(http.Flusher); ok { f.Flush() } } -func (p *ProxyWriter) CloseNotify() <-chan bool { - if cn, ok := p.W.(http.CloseNotifier); ok { +func (p *ProxyWriterWithoutCloseNotify) GetLength() int64 { + return p.Length +} + +func (p *SimpleProxyWriter) CloseNotify() <-chan bool { + if cn, ok := p.GetWriter().(http.CloseNotifier); ok { return cn.CloseNotify() } - log.Warningf("Upstream ResponseWriter of type %v does not implement http.CloseNotifier. Returning dummy channel.", reflect.TypeOf(p.W)) + log.Warningf("Upstream ResponseWriter of type %v does not implement http.CloseNotifier. Returning dummy channel.", reflect.TypeOf(p.GetWriter())) return make(<-chan bool) } -func (p *ProxyWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { +func (p *ProxyWriterWithoutCloseNotify) Hijack() (net.Conn, *bufio.ReadWriter, error) { if hi, ok := p.W.(http.Hijacker); ok { return hi.Hijack() }