Swarmkit笔记(10)——manager处理session请求

Manager处理session请求是通过_Dispatcher_Session_Handler这个函数(./api/dispatcher.pb.go):

func _Dispatcher_Session_Handler(srv interface{}, stream grpc.ServerStream) error {
    m := new(SessionRequest)
    if err := stream.RecvMsg(m); err != nil {
        return err
    }
    return srv.(DispatcherServer).Session(m, &dispatcherSessionServer{stream})
}

实际函数调用栈如下:

0  0x0000000000b65cbf in github.com/docker/swarmkit/manager/dispatcher.(*Dispatcher).Session
   at /go/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go:768
1  0x0000000000782aa5 in github.com/docker/swarmkit/api.(*authenticatedWrapperDispatcherServer).Session
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:207
2  0x000000000078e505 in github.com/docker/swarmkit/api.(*raftProxyDispatcherServer).Session
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:1121
3  0x0000000000789c2a in github.com/docker/swarmkit/api._Dispatcher_Session_Handler
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:667
4  0x0000000000909646 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).processStreamingRPC
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:602
5  0x000000000090b002 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).handleStream
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:686
6  0x000000000090fcbe in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).serveStreams.func1.1
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:348
7  0x0000000000462bf0 in runtime.goexit
   at /usr/local/go/src/runtime/asm_amd64.s:1998

Dispatcher.Session()函数代码如下:

// Session is a stream which controls agent connection.
// Each message contains list of backup Managers with weights. Also there is
// a special boolean field Disconnect which if true indicates that node should
// reconnect to another Manager immediately.
func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
    ctx := stream.Context()
    nodeInfo, err := ca.RemoteNode(ctx)
    if err != nil {
        return err
    }
    nodeID := nodeInfo.NodeID

    if err := d.isRunningLocked(); err != nil {
        return err
    }

    // register the node.
    sessionID, err := d.register(stream.Context(), nodeID, r.Description)
    if err != nil {
        return err
    }

    fields := logrus.Fields{
        "node.id":      nodeID,
        "node.session": sessionID,
        "method":       "(*Dispatcher).Session",
    }
    if nodeInfo.ForwardedBy != nil {
        fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
    }
    log := log.G(ctx).WithFields(fields)

    var nodeObj *api.Node
    nodeUpdates, cancel, err := store.ViewAndWatch(d.store, func(readTx store.ReadTx) error {
        nodeObj = store.GetNode(readTx, nodeID)
        return nil
    }, state.EventUpdateNode{Node: &api.Node{ID: nodeID},
        Checks: []state.NodeCheckFunc{state.NodeCheckID}},
    )
    if cancel != nil {
        defer cancel()
    }

    if err != nil {
        log.WithError(err).Error("ViewAndWatch Node failed")
    }

    if _, err = d.nodes.GetWithSession(nodeID, sessionID); err != nil {
        return err
    }

    if err := stream.Send(&api.SessionMessage{
        SessionID:            sessionID,
        Node:                 nodeObj,
        Managers:             d.getManagers(),
        NetworkBootstrapKeys: d.networkBootstrapKeys,
    }); err != nil {
        return err
    }

    managerUpdates, mgrCancel := d.mgrQueue.Watch()
    defer mgrCancel()
    keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch()
    defer keyMgrCancel()

    // disconnectNode is a helper forcibly shutdown connection
    disconnectNode := func() error {
        // force disconnect by shutting down the stream.
        transportStream, ok := transport.StreamFromContext(stream.Context())
        if ok {
            // if we have the transport stream, we can signal a disconnect
            // in the client.
            if err := transportStream.ServerTransport().Close(); err != nil {
                log.WithError(err).Error("session end")
            }
        }

        nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"}
        if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
            log.WithError(err).Error("failed to remove node")
        }
        // still return an abort if the transport closure was ineffective.
        return grpc.Errorf(codes.Aborted, "node must disconnect")
    }

    for {
        // After each message send, we need to check the nodes sessionID hasn't
        // changed. If it has, we will the stream and make the node
        // re-register.
        node, err := d.nodes.GetWithSession(nodeID, sessionID)
        if err != nil {
            return err
        }

        var mgrs []*api.WeightedPeer

        var disconnect bool

        select {
        case ev := <-managerUpdates:
            mgrs = ev.([]*api.WeightedPeer)
        case ev := <-nodeUpdates:
            nodeObj = ev.(state.EventUpdateNode).Node
        case <-stream.Context().Done():
            return stream.Context().Err()
        case <-node.Disconnect:
            disconnect = true
        case <-d.ctx.Done():
            disconnect = true
        case <-keyMgrUpdates:
        }
        if mgrs == nil {
            mgrs = d.getManagers()
        }

        if err := stream.Send(&api.SessionMessage{
            SessionID:            sessionID,
            Node:                 nodeObj,
            Managers:             mgrs,
            NetworkBootstrapKeys: d.networkBootstrapKeys,
        }); err != nil {
            return err
        }
        if disconnect {
            return disconnectNode()
        }
    }
}

这个stream是处理agent连接的。前半部分是把连接的agent记录下来;后半部分是如果cluster信息发送变化,比如managerleader发生变化,需要通知agent重新连接。disconnectNode()函数则是需要同agent node断开连接时的处理:包括断开连接,agent node信息删除,等等。

发表评论

电子邮件地址不会被公开。 必填项已用*标注