第三期:gRPC客户端与服务端连接失败后,是否会有重试机制?

2年前 (2022) 程序员胖胖胖虎阿
370 0 0

grpc 版本1.50

client.go 代码:

func main() {
    flag.Parse()
    // Set up a connection to the server.
    conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()
    c := pb.NewGreeterClient(conn)

    // Contact the server and print out its response.
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
    if err != nil {
        log.Fatalf("could not greet: %v", err)
    }
    log.Printf("Greeting: %s", r.GetMessage())
}

Dial 源码:

func Dial(target string, opts ...DialOption) (*ClientConn, error) {
    return DialContext(context.Background(), target, opts...)
}

DialContext 源码:

省略次部分代码

// 首先会创建 ClientConn,初始化相关字段
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    cc := &ClientConn{
        target:            target,
        csMgr:             &connectivityStateManager{},
        conns:             make(map[*addrConn]struct{}),
        dopts:             defaultDialOptions(),
        blockingpicker:    newPickerWrapper(),
        czData:            new(channelzData),
        firstResolveEvent: grpcsync.NewEvent(),
    }

    // 将用户设置的连接参数更新到客户端连接器 ClientConn
    for _, opt := range opts {
        opt.apply(&cc.dopts)
    }

    return cc, nil
}

connect() 方法:

func (ac *addrConn) connect() error {
    ac.mu.Lock()
    // if 校验状态
    if ac.state == connectivity.Shutdown {
        ac.mu.Unlock()
        return errConnClosing
    }
    if ac.state != connectivity.Idle {
        ac.mu.Unlock()
        return nil
    }
    ac.updateConnectivityState(connectivity.Connecting, nil)
    ac.mu.Unlock()
    // 主要看这个方法,重试连接
    ac.resetTransport()
    return nil
}

进入 resetTransport() 源码

func (ac *addrConn) resetTransport() {
    ac.mu.Lock()
    // 判断状态若为 shutdown,则不再连接直接推出
    if ac.state == connectivity.Shutdown {
        ac.mu.Unlock()
        return
    }

    addrs := ac.addrs
    // 连接失败,需要进行重试的
    // Backoff 是需要等待的时间,ac.backoffIdx表示第几次重试
    backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
    // 计算出本次向gRPC服务尝试建立TCP连接的最长时间
    // 若超过这个时间还是连接不上,则主动断开,等待尝试下次连接
    // This will be the duration that dial gets to finish.
    dialDuration := minConnectTimeout
    if ac.dopts.minConnectTimeout != nil {
        dialDuration = ac.dopts.minConnectTimeout()
    }

    if dialDuration < backoffFor {
        // Give dial more time as we keep failing to connect.
        dialDuration = backoffFor
    }
    connectDeadline := time.Now().Add(dialDuration)

    // 更新结构addrConn状态为connecting
    ac.updateConnectivityState(connectivity.Connecting, nil)
    ac.mu.Unlock()

    // 向服务器连接失败后需要做的逻辑
    if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
        ac.cc.resolveNow(resolver.ResolveNowOptions{})
        // After exhausting all addresses, the addrConn enters
        // TRANSIENT_FAILURE.
        ac.mu.Lock()
        if ac.state == connectivity.Shutdown {
            ac.mu.Unlock()
            return
        }
        ac.updateConnectivityState(connectivity.TransientFailure, err)

        // Backoff.
        b := ac.resetBackoff
        ac.mu.Unlock()

        // 定时的超时间
        timer := time.NewTimer(backoffFor)
        // 1.timer.C如果连接超时,重试的次数+1,继续下一次重试连接,但需要等待一段时间
        // 2. b,直接关闭,将继续进行重新连接
        // 2. ac.ctx.Done,走这里的话,上下文结束,这里不会再次重试了
        select {
        case <-timer.C:
            ac.mu.Lock()
            ac.backoffIdx++
            ac.mu.Unlock()
        case <-b:
            timer.Stop()
        case <-ac.ctx.Done():
            timer.Stop()
            return
        }

        ac.mu.Lock()
        // 状态 != shutdown就更新为空闲状态
        if ac.state != connectivity.Shutdown {
            ac.updateConnectivityState(connectivity.Idle, err)
        }
        ac.mu.Unlock()
        return
    }
    // 连接成功,重新设置backoff为原始值0
    ac.mu.Lock()
    ac.backoffIdx = 0
    ac.mu.Unlock()
}

如何计算重试连接等待时间?

进入Backoff方法


func (bc Exponential) Backoff(retries int) time.Duration {
    if retries == 0 {
        return bc.Config.BaseDelay
    }
    backoff, max := float64(bc.Config.BaseDelay), float64(bc.Config.MaxDelay)
    for backoff < max && retries > 0 {
        // 幂次方
        backoff *= bc.Config.Multiplier
        retries--
    }
    // 不能超过最大延时时间
    if backoff > max {
        backoff = max
    }
    // Randomize backoff delays so that if a cluster of requests start at
    // the same time, they won't operate in lockstep.
    backoff *= 1 + bc.Config.Jitter*(grpcrand.Float64()*2-1)
    if backoff < 0 {
        return 0
    }
    return time.Duration(backoff)
}

总结:

  • 连接失败后,客户端会进行重试连接。
  • 重试次数越多,等待下一次连接时间也会变长,但不能超过MaxDelay值。

更多Go云原生学习资料,收录于Github:https://github.com/metashops/GoFamily

本文由mdnice多平台发布

相关文章

暂无评论

暂无评论...