Swarmkit笔记(11)——manager创建处理swarmctl请求的server

manager.localserver是创建的本地Unix socket,用来等待处理swarmctl发来的命令请求(源码在manager/controlapi目录)。Manager.Run()函数里localserver相关代码如下:

baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())
......

proxyOpts := []grpc.DialOption{
    grpc.WithBackoffMaxDelay(time.Second),
    grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}

cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs

......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)

......
api.RegisterControlServer(m.localserver, localProxyControlAPI)

(1)首先看一下controlapi.Servercontrolapi.NewServer()的定义:

// Server is the Cluster API gRPC server.
type Server struct {
    store  *store.MemoryStore
    raft   *raft.Node
    rootCA *ca.RootCA
}

// NewServer creates a Cluster API server.
func NewServer(store *store.MemoryStore, raft *raft.Node, rootCA *ca.RootCA) *Server {
    return &Server{
        store:  store,
        raft:   raft,
        rootCA: rootCA,
    }
}

controlapi.NewServer()函数就是用来创建一个响应swarmctl程序发出的control命令请求的server

其中store.MemoryStore是一个很重要的结构体:

// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
    // updateLock must be held during an update transaction.
    updateLock sync.Mutex

    memDB *memdb.MemDB
    queue *watch.Queue

    proposer state.Proposer
}

watch.Queue定义如下:

// Queue is the structure used to publish events and watch for them.
type Queue struct {
    broadcast *events.Broadcaster
}
......
// Watch returns a channel which will receive all items published to the
// queue from this point, until cancel is called.
func (q *Queue) Watch() (eventq chan events.Event, cancel func()) {
    return q.CallbackWatch(nil)
}
......
// Publish adds an item to the queue.
func (q *Queue) Publish(item events.Event) {
    q.broadcast.Write(item)
}

简单地讲,就是当Server.store发生变化时,把数据更新到memDB的同时,也要发送消息到queue里,这样manager监听在相应channelgoroutine就可以收到并处理请求。

下面代码就是把当前cluster的信息填充到新创建的controlapi.Server变量里:

baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())

(2)

proxyOpts := []grpc.DialOption{
    grpc.WithBackoffMaxDelay(time.Second),
    grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}

cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs

......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)

上述代码创建出一个raftProxyControlServer类型的变量:

type raftProxyControlServer struct {
    local        ControlServer
    connSelector *raftpicker.ConnSelector
    cluster      raftpicker.RaftCluster
    ctxMods      []func(context.Context) (context.Context, error)
}

localProxyControlAPI含义是如果收到swarmctl请求的managerleaderswarmctlmanager当然位于同一台机器上),则会处理请求,否则就转发给这个clusterleader

(3)

api.RegisterControlServer(m.localserver, localProxyControlAPI)

上述代码则是把localserver所对应的Unix socketraftProxyControlServer关联起来。

Swarmkit笔记(10)——manager处理session请求

Manager处理session请求是通过_Dispatcher_Session_Handler这个函数(./api/dispatcher.pb.go):

func _Dispatcher_Session_Handler(srv interface{}, stream grpc.ServerStream) error {
    m := new(SessionRequest)
    if err := stream.RecvMsg(m); err != nil {
        return err
    }
    return srv.(DispatcherServer).Session(m, &dispatcherSessionServer{stream})
}

实际函数调用栈如下:

0  0x0000000000b65cbf in github.com/docker/swarmkit/manager/dispatcher.(*Dispatcher).Session
   at /go/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go:768
1  0x0000000000782aa5 in github.com/docker/swarmkit/api.(*authenticatedWrapperDispatcherServer).Session
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:207
2  0x000000000078e505 in github.com/docker/swarmkit/api.(*raftProxyDispatcherServer).Session
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:1121
3  0x0000000000789c2a in github.com/docker/swarmkit/api._Dispatcher_Session_Handler
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:667
4  0x0000000000909646 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).processStreamingRPC
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:602
5  0x000000000090b002 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).handleStream
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:686
6  0x000000000090fcbe in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).serveStreams.func1.1
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:348
7  0x0000000000462bf0 in runtime.goexit
   at /usr/local/go/src/runtime/asm_amd64.s:1998

Dispatcher.Session()函数代码如下:

// Session is a stream which controls agent connection.
// Each message contains list of backup Managers with weights. Also there is
// a special boolean field Disconnect which if true indicates that node should
// reconnect to another Manager immediately.
func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
    ctx := stream.Context()
    nodeInfo, err := ca.RemoteNode(ctx)
    if err != nil {
        return err
    }
    nodeID := nodeInfo.NodeID

    if err := d.isRunningLocked(); err != nil {
        return err
    }

    // register the node.
    sessionID, err := d.register(stream.Context(), nodeID, r.Description)
    if err != nil {
        return err
    }

    fields := logrus.Fields{
        "node.id":      nodeID,
        "node.session": sessionID,
        "method":       "(*Dispatcher).Session",
    }
    if nodeInfo.ForwardedBy != nil {
        fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
    }
    log := log.G(ctx).WithFields(fields)

    var nodeObj *api.Node
    nodeUpdates, cancel, err := store.ViewAndWatch(d.store, func(readTx store.ReadTx) error {
        nodeObj = store.GetNode(readTx, nodeID)
        return nil
    }, state.EventUpdateNode{Node: &api.Node{ID: nodeID},
        Checks: []state.NodeCheckFunc{state.NodeCheckID}},
    )
    if cancel != nil {
        defer cancel()
    }

    if err != nil {
        log.WithError(err).Error("ViewAndWatch Node failed")
    }

    if _, err = d.nodes.GetWithSession(nodeID, sessionID); err != nil {
        return err
    }

    if err := stream.Send(&api.SessionMessage{
        SessionID:            sessionID,
        Node:                 nodeObj,
        Managers:             d.getManagers(),
        NetworkBootstrapKeys: d.networkBootstrapKeys,
    }); err != nil {
        return err
    }

    managerUpdates, mgrCancel := d.mgrQueue.Watch()
    defer mgrCancel()
    keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch()
    defer keyMgrCancel()

    // disconnectNode is a helper forcibly shutdown connection
    disconnectNode := func() error {
        // force disconnect by shutting down the stream.
        transportStream, ok := transport.StreamFromContext(stream.Context())
        if ok {
            // if we have the transport stream, we can signal a disconnect
            // in the client.
            if err := transportStream.ServerTransport().Close(); err != nil {
                log.WithError(err).Error("session end")
            }
        }

        nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"}
        if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
            log.WithError(err).Error("failed to remove node")
        }
        // still return an abort if the transport closure was ineffective.
        return grpc.Errorf(codes.Aborted, "node must disconnect")
    }

    for {
        // After each message send, we need to check the nodes sessionID hasn't
        // changed. If it has, we will the stream and make the node
        // re-register.
        node, err := d.nodes.GetWithSession(nodeID, sessionID)
        if err != nil {
            return err
        }

        var mgrs []*api.WeightedPeer

        var disconnect bool

        select {
        case ev := <-managerUpdates:
            mgrs = ev.([]*api.WeightedPeer)
        case ev := <-nodeUpdates:
            nodeObj = ev.(state.EventUpdateNode).Node
        case <-stream.Context().Done():
            return stream.Context().Err()
        case <-node.Disconnect:
            disconnect = true
        case <-d.ctx.Done():
            disconnect = true
        case <-keyMgrUpdates:
        }
        if mgrs == nil {
            mgrs = d.getManagers()
        }

        if err := stream.Send(&api.SessionMessage{
            SessionID:            sessionID,
            Node:                 nodeObj,
            Managers:             mgrs,
            NetworkBootstrapKeys: d.networkBootstrapKeys,
        }); err != nil {
            return err
        }
        if disconnect {
            return disconnectNode()
        }
    }
}

这个stream是处理agent连接的。前半部分是把连接的agent记录下来;后半部分是如果cluster信息发送变化,比如managerleader发生变化,需要通知agent重新连接。disconnectNode()函数则是需要同agent node断开连接时的处理:包括断开连接,agent node信息删除,等等。