golang源码分析:grpc 链接池(1)

December 09, 2023
测试
测试
测试
测试
12 分钟阅读

开始源码分享之前,我们先问自己几个问题:

1,grpc client和server之间是长链接还是短链接?

2,我们通过grpc.Dial拿到的*ClientConn对应的是一个连接么?

3,grpc.Dial 拿到的连接应该什么时候释放?

4,同一个*ClientConn,多次rpc请求,中间连接会断开么?

5,如果一个连接不断开,心跳机制是在哪里实现的?需要我们自己实现吗?

上面几个问题,看着都觉得这么简单还用问?但是仔细一想,内心还是有些不太确定。因为没有分析过源码!下面我们带着问题来进行研究。我们生成一段代码,启动一个server

syntax = "proto3";

option go_package = "grpc/conn/hello";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";

package helloworld;

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
  rpc SayHello1 (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}
protoc --go_out=./ --go_opt=paths=import --go-grpc_out=./  --go-grpc_opt=paths=import grpc/conn/helloworld.proto

注意下,这里的最终生成路径是指定的out路径和package里的路径拼接而成的。然后我们启动我们的server

package main

import (
  helloworld "learn/grpc/conn/hello"
  "log"
  "net"

  "google.golang.org/grpc"
)

func main() {
  srv := grpc.NewServer()
  helloworld.RegisterGreeterServer(srv, &helloworld.UnimplementedGreeterServer{})
  listener, err := net.Listen("tcp", ":12345")
  if err != nil {
    log.Fatalf("failed to listen: %v", err)
  }

  err = srv.Serve(listener)
  if err != nil {
    log.Fatalf("failed to serve: %v", err)
  }
}

为了验证我们的疑问,我们分别启动三个客户端,第一个客户端每次发完一个请求,断开连接重连;第二个客户端在一个连接上发多个请求;第三个客户端发完一个请求后sleep 20s再发下一个请求,看看连接是否会断开。三个客户端的代码分别如下:

package main

import (
  "context"
  "fmt"
  "log"

  helloworld "learn/grpc/conn/hello"

  "google.golang.org/grpc"
)

func main() {
  for i := 0; i < 2; i++ {
    conn, err := grpc.Dial("127.0.0.1:12345", grpc.WithInsecure(), grpc.WithBlock())
    if err != nil {
      log.Fatalf("did not connect: %v", err)
    }
    client := helloworld.NewGreeterClient(conn)

    resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "hello world!"})
    fmt.Println(i+1, "个", resp, err)
    resp, err = client.SayHello1(context.Background(), &helloworld.HelloRequest{Name: "hello world!"})
    fmt.Println(i+1, "个", resp, err)
    if err != nil {
      log.Println("could not greet", err)
      //  log.Fatal("hhhhhh") //exit status 1
    }
    conn.Close()
  }
}
package main

import (
  "context"
  "fmt"
  "log"

  helloworld "learn/grpc/conn/hello"

  "google.golang.org/grpc"
)

func main() {
  conn, err := grpc.Dial("127.0.0.1:12345", grpc.WithInsecure(), grpc.WithBlock())
  defer conn.Close()
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  client := helloworld.NewGreeterClient(conn)
  for i := 0; i < 2; i++ {
    resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "hello world!"})
    fmt.Println(i+1, "个", resp, err)
    resp, err = client.SayHello1(context.Background(), &helloworld.HelloRequest{Name: "hello world!"})
    fmt.Println(i+1, "个", resp, err)
    if err != nil {
      log.Println("could not greet", err)
      //  log.Fatal("hhhhhh") //exit status 1
    }
  }
}
package main

import (
  "context"
  "fmt"
  "log"
  "time"

  helloworld "learn/grpc/conn/hello"

  "google.golang.org/grpc"
  "google.golang.org/grpc/keepalive"
)

func main() {
  conn, err := grpc.Dial("127.0.0.1:12345", grpc.WithInsecure(), grpc.WithBlock(), grpc.WithKeepaliveParams(keepalive.ClientParameters{
    Time:                time.Duration(time.Second), //最小10s
    Timeout:             time.Duration(time.Second),
    PermitWithoutStream: false,
  }))
  defer conn.Close()
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  client := helloworld.NewGreeterClient(conn)
  for i := 0; i < 2; i++ {
    resp, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "hello world!"})
    fmt.Println(i+1, "个", resp, err)
    resp, err = client.SayHello1(context.Background(), &helloworld.HelloRequest{Name: "hello world!"})
    fmt.Println(i+1, "个", resp, err)
    if err != nil {
      log.Println("could not greet", err)
      //  log.Fatal("hhhhhh") //exit status 1
    }
    time.Sleep(20 * time.Second)
  }
}

然后我们用wireshark抓包测试下,看下网络情况,首先到https://www.wireshark.org/#download 下载,并安装,默认情况下,没法抓127.0.0.1的包的,启动的时候需要选择Loopback lo0;然后在抓包的时候过滤我们的服务的端口:

tcp and ip.addr==127.0.0.1 and tcp.port==12345

对应的我们可以得到3个tcp连接发包记录:

对于每次发完请求都断开连接的情形,我们可以看到两次三次握手和四次挥手的记录。

对于发请求过程中不主动close ClientConn的场景,对应的只有一次三次握手和四次挥手的记录,说明grpc在发多个请求的时候并不是发完一个请求就断开连接了,而是保持了底层的http2长链接,因此我们在使用grpc的时候需要注意两个问题:A,如果链接能复用,尽量不要一个请求处理完就断开重连,这样每次都要连接的代价比较大。B,打开的连接记得要关闭,不要不断建立新连接不断开,否则有泄漏风险。

如果链接不释放会怎么样呢?从第三个场景的抓包我们可以看到,如果一个连接长时间没有请求发送,连接并不会断开,而是发送keep-alive请求,图中我们可以看到,超过时间阈值没有请求的时候,客户端和服务端分别发送了一个keep-alive的包,对端也发送了一个响应的心跳,说明连接并不会断开,tcp层会有保活机制。

那么我们对于dial,我们拿到的是一个连接么,答案是否定的,对应的应该是一个连接池,grpc的SubConn对应的才是连接池中的一个连接。http层有心跳保活机制吗?答案是有的,具体是在哪里实现的,我们在下一篇中结合源码详细介绍。

继续阅读

更多来自我们博客的帖子

如何安装 BuddyPress
由 测试 December 17, 2023
经过差不多一年的开发,BuddyPress 这个基于 WordPress Mu 的 SNS 插件正式版终于发布了。BuddyPress...
阅读更多
Filter如何工作
由 测试 December 17, 2023
在 web.xml...
阅读更多
如何理解CGAffineTransform
由 测试 December 17, 2023
CGAffineTransform A structure for holding an affine transformation matrix. ...
阅读更多