golang源码分析:grpc 链接池(3)resolver 、balancer和picker

December 09, 2023
测试
测试
测试
测试
18 分钟阅读

在分析完grpc连接的创建、使用和销毁过程后golang源码分析:grpc 链接池(2),我们来分析下grpc留给我们的编程扩展接口resolver 、balancer和picker是如何嵌入grpc连接池的。

总的来说:每个 ClientConn 对应有多个 SubConn,ClientConn 会基于名字发现(resolver)得到多个 SubConn,并面向多个 SubConn 之间实现负载均衡(balancer),每次客户端请求的时候根据picker提供的Pick接口,从连接池中选择一个SubConn来完成请求。resolver 与 balancer 都是抽象的,内建的 resolver 包括 dns、manual、passthrough,内建的 balancer 包括 roundrobin、grpclb。当然也可以基于插件化的 Register 模式来在模块自身的 init() 函数中将自己注册。

1,resolver

// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOptions)
// Close closes the resolver.
Close()
}

当我们调用Dial获取连接池的时候,首先是获取resolver,通过解析target,获得schema,然后通过schema在全局注册表中找到对应的resolver,需要注意的是,我们在自定义resolver的时候引用的grpc版本一定要和发起连接的时候的grpc版本一致,否则会出现resolver找不到使用默认的passthrough的情况,这是踩坑的血泪记忆。获取resolver的源码定义在google.golang.org/grpc@v1.50.1/clientconn.go

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  resolverBuilder, err := cc.parseTargetAndFindResolver()
  rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
  cc.resolverWrapper = rWrapper
func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
rb = cc.getResolver(parsedTarget.Scheme)
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
return resolver.Get(scheme)

和我们注册的过程是对应的

func init() {
  resolver.Register(&mockResolverBuilder{})
 }

当然在ClientConn中使用的时候都是经过装饰器包裹了一层的google.golang.org/grpc@v1.50.1/resolver_conn_wrapper.go,它会调用建造器的Build接口:

func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
  ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)

那么什么时候调用我们定义的resolver的ResolveNow 接口呢?在创建连接的时候:

func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
ac.cc.resolveNow(resolver.ResolveNowOptions{})
func (ac *addrConn) resetTransport() {
  if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
  ac.cc.resolveNow(resolver.ResolveNowOptions{})

它通过一个协程调用了resolverWrapper对应的方法

func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
  r := cc.resolverWrapper
  go r.resolveNow(o)

实现位于google.golang.org/grpc@v1.50.1/resolver_conn_wrapper.go

func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
  ccr.resolverMu.Lock()
  if !ccr.done.HasFired() {
  ccr.resolver.ResolveNow(o)

如果状态更新,会调用UpdateState

func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
return balancer.ErrBadResolverState
}

google.golang.org/grpc@v1.50.1/clientconn.go

func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
  cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
  bw := cc.balancerWrapper
  uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})

2,balancer

balancer的注册过程和resolver的过程一样,只不过使用的的时候不是通过target的schema来加载的,而是通过 grpc.WithDefaultServiceConfig选项实现的

 grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, mybalancer.Name)

google.golang.org/grpc@v1.50.1/balancer/balancer.go balancer的核心接口是UpdateClientConnState和UpdateSubConnState,它传入的对象是ClientConnState,里面保存了ResolverState,也就是resolver的解析结果

type Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes.  If the error returned is ErrBadResolverState, the ClientConn
// will begin calling ResolveNow on the active name resolver with
// exponential backoff until a subsequent call to UpdateClientConnState
// returns a nil error.  Any other errors are currently ignored.
UpdateClientConnState(ClientConnState) error
// ResolverError is called by gRPC when the name resolver reports an error.
ResolverError(error)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
type ClientConnState struct {
ResolverState resolver.State
// The parsed load balancing configuration returned by the builder's
// ParseConfig method, if implemented.
BalancerConfig serviceconfig.LoadBalancingConfig
}

接口当然也是被装饰器包裹者google.golang.org/grpc@v1.50.1/balancer_conn_wrappers.go,它会启动一个监视器,当连接状态发生变化的时候,会调用对应事件处理函数处理,事件生成是时候并不是同步处理,而是先发送到channel里面

func (ccb *ccBalancerWrapper) watcher() {
  for {
  select {
    case u := <-ccb.updateCh.Get():
    switch update := u.(type) {
    case *ccStateUpdate:
       ccb.handleClientConnStateChange(update.ccs)
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
  ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
  select {
    case res = <-ccb.resultCh.Get():
    ccb.resultCh.Load()
func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
   ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))

最终事件是交给balancer的UpdateClientConnState处理了,在basebalancer里面也实现了这个接口,这里会遍历ResolverState.Addresses的地址列表,然后发起连接,也就是建立连接池的初始子连接。并且生产picker

google.golang.org/grpc@v1.50.1/balancer/base/balancer.go

func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
  for _, a := range s.ResolverState.Addresses {
     addrsSet.Set(a, nil)
     if _, ok := b.subConns.Get(a); !ok {
      sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
      sc.Connect()
  b.regeneratePicker()
  b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})

basebalancer也实现了另外一个接口,思路一样,只不过处理的是连接池里的子连接:

func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
  if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
  b.state == connectivity.TransientFailure {
  b.regeneratePicker()
  }
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})

他们调用的UpdateState位于google.golang.org/grpc@v1.50.1/balancer_conn_wrappers.go,会更新picker

func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
  ccb.cc.blockingpicker.updatePicker(s.Picker)
  ccb.cc.csMgr.updateState(s.ConnectivityState)

3,picker

我们定义picker的时候这册思路也一样,需要实现builder

func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {

它的参数定义位于google.golang.org/grpc@v1.50.1/balancer/base/base.go

type PickerBuildInfo struct {
// ReadySCs is a map from all ready SubConns to the Addresses used to
// create them.
ReadySCs map[balancer.SubConn]SubConnInfo
}

注意这里面的map,包含了已经建立的连接,picker的实现,只需要定义自己的选择算法,从中选择合适的连接供Invoke使用。这个map是什么时候生成的呢,我们看下picker的实例化逻辑

google.golang.org/grpc@v1.50.1/balancer/base/balancer.go

func (b *baseBalancer) regeneratePicker() {
  for _, addr := range b.subConns.Keys() {
     sci, _ := b.subConns.Get(addr)
      if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
      readySCs[sc] = SubConnInfo{Address: addr}
      }
  }
  b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})

可以看到,它通过地址获取连接,选取ready的连接,放入到这个map,最后用这个map初始化了picker。而地址的来源正是resolver解析得到的,它保存在:

// ClientConnState describes the state of a ClientConn relevant to the
// balancer.
type ClientConnState struct {
ResolverState resolver.State
// The parsed load balancing configuration returned by the builder's
// ParseConfig method, if implemented.
BalancerConfig serviceconfig.LoadBalancingConfig
}

google.golang.org/grpc@v1.50.1/internal/balancer/gracefulswitch/gracefulswitch.go

func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
return balToUpdate.UpdateClientConnState(state)

当Invoke的时候,会先调用装饰器的pick方法

google.golang.org/grpc@v1.50.1/clientconn.go

func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
Ctx:            ctx,
FullMethodName: method,
})
}

google.golang.org/grpc@v1.50.1/picker_wrapper.go

func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
   pickResult, err := p.Pick(info)

继续阅读

更多来自我们博客的帖子

如何安装 BuddyPress
由 测试 December 17, 2023
经过差不多一年的开发,BuddyPress 这个基于 WordPress Mu 的 SNS 插件正式版终于发布了。BuddyPress...
阅读更多
Filter如何工作
由 测试 December 17, 2023
在 web.xml...
阅读更多
如何理解CGAffineTransform
由 测试 December 17, 2023
CGAffineTransform A structure for holding an affine transformation matrix. ...
阅读更多