代码分享 MOSN 源码解析 - HTTP 系能力

rootsongjc for 云原生代理 MOSN 开源团队 · 2020年03月27日 · 82 次阅读

本文的目的是分析 MOSN 源码中的 HTTP 系能力,内容基于 MOSN 0.9.0。

概述

HTTP 是互联网界最常用的一种协议之一,MOSN 也提供了对其强大的支持。

MOSN HTTP 报文组成

img

上图是图解 HTTP 中关于 HTTP 报文报文的介绍。MOSN 对于 HTTP 报文的处理并没有使用 go 官网 net/http中的结构也没有独立设计一套相关结构 而是复用了业界开源的fasthttp 的结构。

type stream struct {
    str.BaseStream

    id               uint64
    readDisableCount int32
    ctx              context.Context
    // 请求报文
    request  *fasthttp.Request
    // 响应报文
    response *fasthttp.Response

    receiver types.StreamReceiveListener
}

MOSN HTTP 处理流程

流程图

上图是 HTTP 请求在 MOSN 中的流动过程,下面将具体讲解。

流程注册

func init() {
    str.Register(protocol.HTTP1, &streamConnFactory{})
}

pkg/stream/http 包加载过程中将包含 HTTP 对于 MOSN 上下游连接的处理逻辑的结构体值注册到统一的 stream 处理工厂。

捕获请求

捕获请求

由上图可知,MOSN 捕捉到一个请求之后会开启一个 goroutine 读取连接中的数据,也就是 serverStreamConnection.serve 函数。

func (conn *serverStreamConnection) serve() {
    for {
        // 1. pre alloc stream-level ctx with bufferCtx
        ctx := conn.contextManager.Get()
        // 通过sync.Pool 实现实现内存复用
        buffers := httpBuffersByContext(ctx)
        request := &buffers.serverRequest

        // 2. blocking read using fasthttp.Request.Read
        err := request.ReadLimitBody(conn.br, defaultMaxRequestBodySize)
        if err == nil {
            // 3. 'Expect: 100-continue' request handling.
            // See http://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html for details.
            if request.MayContinue() {
                // Send 'HTTP/1.1 100 Continue' response.
                conn.conn.Write(buffer.NewIoBufferBytes(strResponseContinue))

                // read request body
                err = request.ContinueReadBody(conn.br, defaultMaxRequestBodySize)

                // remove 'Expect' header, so it would not be sent to the upstream
                request.Header.Del("Expect")
            }
        }
        // 读取错误的处理
        if err != nil {
            // "read timeout with nothing read" is the error of returned by fasthttp v1.2.0
            // if connection closed with nothing read.
            if err != errConnClose && err != io.EOF && err.Error() != "read timeout with nothing read" {
                // write error response
                conn.conn.Write(buffer.NewIoBufferBytes(strErrorResponse))

                // close connection with flush
                conn.conn.Close(api.FlushWrite, api.LocalClose)
            }
            return
        }

        // 数据读取结束
        id := protocol.GenerateID()
        s := &buffers.serverStream

        // 4. request processing
        s.stream = stream{
            id:       id,
            ctx:      mosnctx.WithValue(ctx, types.ContextKeyStreamID, id),
            request:  request,
            response: &buffers.serverResponse,
        }
        s.connection = conn
        s.responseDoneChan = make(chan bool, 1)
        s.header = mosnhttp.RequestHeader{&s.request.Header, nil}

        var span types.Span
        if trace.IsEnabled() {
            tracer := trace.Tracer(protocol.HTTP1)
            if tracer != nil {
                span = tracer.Start(ctx, s.header, time.Now())
            }
        }
        // 上下文 中注入链式追踪信息
        s.stream.ctx = s.connection.contextManager.InjectTrace(ctx, span)

        if log.Proxy.GetLogLevel() >= log.DEBUG {
            log.Proxy.Debugf(s.stream.ctx, "[stream] [http] new stream detect, requestId = %v", s.stream.id)
        }

        s.receiver = conn.serverStreamConnListener.NewStreamDetect(s.stream.ctx, s, span)

        conn.mutex.Lock()
        conn.stream = s
        conn.mutex.Unlock()

        if atomic.LoadInt32(&s.readDisableCount) <= 0 {
            s.handleRequest()
        }

        // 5. wait for proxy done
        select {
        case <-s.responseDoneChan:
        case <-conn.connClosed:
            return
        }

        conn.contextManager.Next()
    }
}

由以上代码可以得知,因为不能判断连接是长连接还是短连接,所以 MOSN 调用方不主动关闭连接,MOSN 也不会主动关闭,除非出现错误。

转发请求

转发请求

由上图可知,MOSN 在捕获请求之后,开启一个 goroutine 创建对 upstream 的连接,并且通过这个连接和 upstream 进行数据交互,与此同时开启另外一个 goroutine 对这个连接进行监控用来处理连接返回数据。其主要处理逻辑在 downStream.receive 中。

for i := 0; i <= int(types.End-types.InitPhase); i++ {
        fmt.Println("yu",i,s.downstreamReqTrailers == nil,phase,int(types.End-types.InitPhase),types.WaitNofity)
        switch phase {
        // init phase
        case types.InitPhase:
            phase++

            // downstream filter before route
        case types.DownFilter:
            if log.Proxy.GetLogLevel() >= log.DEBUG {
                log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
            }
            s.runReceiveFilters(phase, s.downstreamReqHeaders, s.downstreamReqDataBuf, s.downstreamReqTrailers)

            if p, err := s.processError(id); err != nil {
                return p
            }
            phase++

            // match route
        case types.MatchRoute:
            if log.Proxy.GetLogLevel() >= log.DEBUG {
                log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
            }
            s.matchRoute()
            if p, err := s.processError(id); err != nil {
                return p
            }
            phase++

            // downstream filter after route
        case types.DownFilterAfterRoute:
            if log.Proxy.GetLogLevel() >= log.DEBUG {
                log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
            }
            s.runReceiveFilters(phase, s.downstreamReqHeaders, s.downstreamReqDataBuf, s.downstreamReqTrailers)

            if p, err := s.processError(id); err != nil {
                return p
            }
            phase++

            // downstream receive header
        case types.DownRecvHeader:
            if s.downstreamReqHeaders != nil {
                if log.Proxy.GetLogLevel() >= log.DEBUG {
                    log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
                }
                s.receiveHeaders(s.downstreamReqDataBuf == nil && s.downstreamReqTrailers == nil)

                if p, err := s.processError(id); err != nil {
                    return p
                }
            }
            phase++

            // downstream receive data
        case types.DownRecvData:
            if s.downstreamReqDataBuf != nil {
                if log.Proxy.GetLogLevel() >= log.DEBUG {
                    log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
                }
                s.downstreamReqDataBuf.Count(1)

                s.receiveData(s.downstreamReqTrailers == nil)

                if p, err := s.processError(id); err != nil {
                    return p
                }
            }
            phase++

            // downstream receive trailer
        case types.DownRecvTrailer:
            if s.downstreamReqTrailers != nil {
                if log.Proxy.GetLogLevel() >= log.DEBUG {
                    log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
                }
                s.receiveTrailers()

                if p, err := s.processError(id); err != nil {
                    return p
                }
            }
            phase++

            // downstream oneway
        case types.Oneway:
            if s.oneway {
                if log.Proxy.GetLogLevel() >= log.DEBUG {
                    log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
                }
                s.cleanStream()

                // downstreamCleaned has set, return types.End
                if p, err := s.processError(id); err != nil {
                    return p
                }
            }

            // no oneway, skip types.Retry
            phase = types.WaitNofity

            // retry request
        case types.Retry:
            if log.Proxy.GetLogLevel() >= log.DEBUG {
                log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
            }

            if s.downstreamReqDataBuf != nil {
                s.downstreamReqDataBuf.Count(1)
            }
            s.doRetry()
            if p, err := s.processError(id); err != nil {
                return p
            }
            phase++

            // wait for upstreamRequest or reset
        case types.WaitNofity:
            if log.Proxy.GetLogLevel() >= log.DEBUG {
                log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
            }
            if p, err := s.waitNotify(id); err != nil {
                return p
            }

            if log.Proxy.GetLogLevel() >= log.DEBUG {
                log.Proxy.Debugf(s.context, "[proxy] [downstream] OnReceive send downstream response %+v", s.downstreamRespHeaders)
            }

            phase++

            // upstream filter
        case types.UpFilter:
            if log.Proxy.GetLogLevel() >= log.DEBUG {
                log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
            }
            s.runAppendFilters(phase, s.downstreamRespHeaders, s.downstreamRespDataBuf, s.downstreamRespTrailers)

            if p, err := s.processError(id); err != nil {
                return p
            }

            // maybe direct response
            if s.upstreamRequest == nil {
                fakeUpstreamRequest := &upstreamRequest{
                    downStream: s,
                }

                s.upstreamRequest = fakeUpstreamRequest
            }

            phase++

            // upstream receive header
        case types.UpRecvHeader:
            // send downstream response
            if s.downstreamRespHeaders != nil {
                if log.Proxy.GetLogLevel() >= log.DEBUG {
                    log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
                }
                s.upstreamRequest.receiveHeaders(s.downstreamRespDataBuf == nil && s.downstreamRespTrailers == nil)

                if p, err := s.processError(id); err != nil {
                    return p
                }
            }
            phase++

            // upstream receive data
        case types.UpRecvData:
            if s.downstreamRespDataBuf != nil {
                if log.Proxy.GetLogLevel() >= log.DEBUG {
                    log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
                }
                s.upstreamRequest.receiveData(s.downstreamRespTrailers == nil)

                if p, err := s.processError(id); err != nil {
                    return p
                }
            }
            phase++

            // upstream receive triler
        case types.UpRecvTrailer:
            if s.downstreamRespTrailers != nil {
                if log.Proxy.GetLogLevel() >= log.DEBUG {
                    log.Proxy.Debugf(s.context, "[proxy] [downstream] enter phase %d, proxyId = %d  ", phase, id)
                }
                s.upstreamRequest.receiveTrailers()

                if p, err := s.processError(id); err != nil {
                    return p
                }
            }
            phase++

            // process end
        case types.End:
            return types.End

        default:
            log.Proxy.Errorf(s.context, "[proxy] [downstream] unexpected phase: %d", phase)
            return types.End
        }
    }

    log.Proxy.Errorf(s.context, "[proxy] [downstream] unexpected phase cycle time")
    return types.End
}

这个函数是处理转发逻辑的核心函数规定在处理的各个阶段需要做的事情。比如在case types.UpRecvHeader的时候进行连接创建以及对下游数据的初步读取。

其他

连接复用

在 MOSN 处理 HTTP 请求的过程中我们可以很明显的看到,针对可能频繁使用的连接,MOSN 实现了一套复用机制。

type connPool struct {
    MaxConn int

    host types.Host

    statReport bool

    clientMux        sync.Mutex
    availableClients []*activeClient // available clients
    totalClientCount uint64          // total clients
}
func (p *connPool) getAvailableClient(ctx context.Context) (*activeClient, types.PoolFailureReason) {
    p.clientMux.Lock()
    defer p.clientMux.Unlock()

    n := len(p.availableClients)
    // no available client
    if n == 0 {
        // max conns is 0 means no limit
        maxConns := p.host.ClusterInfo().ResourceManager().Connections().Max()
        if maxConns == 0 || p.totalClientCount < maxConns {
            ac, reason := newActiveClient(ctx, p)
            if ac != nil && reason == "" {
                p.totalClientCount++
            }
            return ac, reason
        } else {
            p.host.HostStats().UpstreamRequestPendingOverflow.Inc(1)
            p.host.ClusterInfo().Stats().UpstreamRequestPendingOverflow.Inc(1)
            return nil, types.Overflow
        }
    } else {
        n--
        c := p.availableClients[n]
        p.availableClients[n] = nil
        p.availableClients = p.availableClients[:n]
        return c, ""
    }
}

由上述代码可知,MOSN 维护了一个有效长连接的栈,当栈中还有有效连接则从栈顶取出有效的长连接,如果不存在则新建一个 tcp 长连接。MOSN 通过这种方式维护了连接池来实现高效的连接复用。

内存复用

HTTP 的处理过程中会频繁的申请空间来解析 HTTP 报文,为了减少频繁的内存申请,常规的做法是内存复用,MOSN 也不例外,其基于 sync.pool 设计了内存复用模块。

func httpBuffersByContext(context context.Context) *httpBuffers {
    ctx := buffer.PoolContext(context)
    return ctx.Find(&ins, nil).(*httpBuffers)
}

总结

本文简单的分析了 MOSN 中对于 HTTP 请求的处理过程,其中优化方式主要如下:

  1. tcp 黏包:使用 fasthttp 的 request 和 response 来解析报文。
  2. 实现连接复用:连接池。
  3. 实现内存复用:sync.pool。

原文地址:https://mosn.io/zh/blog/code/mosn-http/ 作者:陈爱祥

更多原创文章干货分享,请关注公众号
  • 加微信实战群请加微信(注明:实战群):gocnio
暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册