在分析完grpc连接的创建、使用和销毁过程后golang源码分析:grpc 链接池(2),我们来分析下grpc留给我们的编程扩展接口resolver 、balancer和picker是如何嵌入grpc连接池的。
总的来说:每个 ClientConn 对应有多个 SubConn,ClientConn 会基于名字发现(resolver)得到多个 SubConn,并面向多个 SubConn 之间实现负载均衡(balancer),每次客户端请求的时候根据picker提供的Pick接口,从连接池中选择一个SubConn来完成请求。resolver 与 balancer 都是抽象的,内建的 resolver 包括 dns、manual、passthrough,内建的 balancer 包括 roundrobin、grpclb。当然也可以基于插件化的 Register 模式来在模块自身的 init() 函数中将自己注册。
1,resolver
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOptions)
// Close closes the resolver.
Close()
}
当我们调用Dial获取连接池的时候,首先是获取resolver,通过解析target,获得schema,然后通过schema在全局注册表中找到对应的resolver,需要注意的是,我们在自定义resolver的时候引用的grpc版本一定要和发起连接的时候的grpc版本一致,否则会出现resolver找不到使用默认的passthrough的情况,这是踩坑的血泪记忆。获取resolver的源码定义在google.golang.org/grpc@v1.50.1/clientconn.go
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
resolverBuilder, err := cc.parseTargetAndFindResolver()
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
cc.resolverWrapper = rWrapper
func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
rb = cc.getResolver(parsedTarget.Scheme)
func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
return resolver.Get(scheme)
和我们注册的过程是对应的
func init() {
resolver.Register(&mockResolverBuilder{})
}
当然在ClientConn中使用的时候都是经过装饰器包裹了一层的google.golang.org/grpc@v1.50.1/resolver_conn_wrapper.go,它会调用建造器的Build接口:
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
那么什么时候调用我们定义的resolver的ResolveNow 接口呢?在创建连接的时候:
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
ac.cc.resolveNow(resolver.ResolveNowOptions{})
func (ac *addrConn) resetTransport() {
if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
ac.cc.resolveNow(resolver.ResolveNowOptions{})
它通过一个协程调用了resolverWrapper对应的方法
func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
r := cc.resolverWrapper
go r.resolveNow(o)
实现位于google.golang.org/grpc@v1.50.1/resolver_conn_wrapper.go
func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
ccr.resolverMu.Lock()
if !ccr.done.HasFired() {
ccr.resolver.ResolveNow(o)
如果状态更新,会调用UpdateState
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
return balancer.ErrBadResolverState
}
google.golang.org/grpc@v1.50.1/clientconn.go
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
bw := cc.balancerWrapper
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
2,balancer
balancer的注册过程和resolver的过程一样,只不过使用的的时候不是通过target的schema来加载的,而是通过 grpc.WithDefaultServiceConfig选项实现的
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, mybalancer.Name)
google.golang.org/grpc@v1.50.1/balancer/balancer.go balancer的核心接口是UpdateClientConnState和UpdateSubConnState,它传入的对象是ClientConnState,里面保存了ResolverState,也就是resolver的解析结果
type Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes. If the error returned is ErrBadResolverState, the ClientConn
// will begin calling ResolveNow on the active name resolver with
// exponential backoff until a subsequent call to UpdateClientConnState
// returns a nil error. Any other errors are currently ignored.
UpdateClientConnState(ClientConnState) error
// ResolverError is called by gRPC when the name resolver reports an error.
ResolverError(error)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
type ClientConnState struct {
ResolverState resolver.State
// The parsed load balancing configuration returned by the builder's
// ParseConfig method, if implemented.
BalancerConfig serviceconfig.LoadBalancingConfig
}
接口当然也是被装饰器包裹者google.golang.org/grpc@v1.50.1/balancer_conn_wrappers.go,它会启动一个监视器,当连接状态发生变化的时候,会调用对应事件处理函数处理,事件生成是时候并不是同步处理,而是先发送到channel里面
func (ccb *ccBalancerWrapper) watcher() {
for {
select {
case u := <-ccb.updateCh.Get():
switch update := u.(type) {
case *ccStateUpdate:
ccb.handleClientConnStateChange(update.ccs)
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
select {
case res = <-ccb.resultCh.Get():
ccb.resultCh.Load()
func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))
最终事件是交给balancer的UpdateClientConnState处理了,在basebalancer里面也实现了这个接口,这里会遍历ResolverState.Addresses的地址列表,然后发起连接,也就是建立连接池的初始子连接。并且生产picker
google.golang.org/grpc@v1.50.1/balancer/base/balancer.go
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
for _, a := range s.ResolverState.Addresses {
addrsSet.Set(a, nil)
if _, ok := b.subConns.Get(a); !ok {
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
sc.Connect()
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
basebalancer也实现了另外一个接口,思路一样,只不过处理的是连接池里的子连接:
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
b.state == connectivity.TransientFailure {
b.regeneratePicker()
}
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
他们调用的UpdateState位于google.golang.org/grpc@v1.50.1/balancer_conn_wrappers.go,会更新picker
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
ccb.cc.blockingpicker.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
3,picker
我们定义picker的时候这册思路也一样,需要实现builder
func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
它的参数定义位于google.golang.org/grpc@v1.50.1/balancer/base/base.go
type PickerBuildInfo struct {
// ReadySCs is a map from all ready SubConns to the Addresses used to
// create them.
ReadySCs map[balancer.SubConn]SubConnInfo
}
注意这里面的map,包含了已经建立的连接,picker的实现,只需要定义自己的选择算法,从中选择合适的连接供Invoke使用。这个map是什么时候生成的呢,我们看下picker的实例化逻辑
google.golang.org/grpc@v1.50.1/balancer/base/balancer.go
func (b *baseBalancer) regeneratePicker() {
for _, addr := range b.subConns.Keys() {
sci, _ := b.subConns.Get(addr)
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[sc] = SubConnInfo{Address: addr}
}
}
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
可以看到,它通过地址获取连接,选取ready的连接,放入到这个map,最后用这个map初始化了picker。而地址的来源正是resolver解析得到的,它保存在:
// ClientConnState describes the state of a ClientConn relevant to the
// balancer.
type ClientConnState struct {
ResolverState resolver.State
// The parsed load balancing configuration returned by the builder's
// ParseConfig method, if implemented.
BalancerConfig serviceconfig.LoadBalancingConfig
}
google.golang.org/grpc@v1.50.1/internal/balancer/gracefulswitch/gracefulswitch.go
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
return balToUpdate.UpdateClientConnState(state)
当Invoke的时候,会先调用装饰器的pick方法
google.golang.org/grpc@v1.50.1/clientconn.go
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
}
google.golang.org/grpc@v1.50.1/picker_wrapper.go
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
pickResult, err := p.Pick(info)