docker笔记(14)——docker swarm功能代码分析(1)

Docker 1.12集成了docker swarm功能,其client的相关代码位于api/client/swarm文件夹下。以docker swarm init命令的代码(api/client/swarm/init.go)为例:

const (
    generatedSecretEntropyBytes = 16
    generatedSecretBase         = 36
    // floor(log(2^128-1, 36)) + 1
    maxGeneratedSecretLength = 25
)

type initOptions struct {
    swarmOptions
    listenAddr NodeAddrOption
    // Not a NodeAddrOption because it has no default port.
    advertiseAddr   string
    forceNewCluster bool
}

func newInitCommand(dockerCli *client.DockerCli) *cobra.Command {
    opts := initOptions{
        listenAddr: NewListenAddrOption(),
    }

    cmd := &cobra.Command{
        Use:   "init [OPTIONS]",
        Short: "Initialize a swarm",
        Args:  cli.NoArgs,
        RunE: func(cmd *cobra.Command, args []string) error {
            return runInit(dockerCli, cmd.Flags(), opts)
        },
    }

    flags := cmd.Flags()
    flags.Var(&opts.listenAddr, flagListenAddr, "Listen address (format: <ip|interface>[:port])")
    flags.StringVar(&opts.advertiseAddr, flagAdvertiseAddr, "", "Advertised address (format: <ip|interface>[:port])")
    flags.BoolVar(&opts.forceNewCluster, "force-new-cluster", false, "Force create a new cluster from current state.")
    addSwarmFlags(flags, &opts.swarmOptions)
    return cmd
}

func runInit(dockerCli *client.DockerCli, flags *pflag.FlagSet, opts initOptions) error {
    client := dockerCli.Client()
    ctx := context.Background()

    req := swarm.InitRequest{
        ListenAddr:      opts.listenAddr.String(),
        AdvertiseAddr:   opts.advertiseAddr,
        ForceNewCluster: opts.forceNewCluster,
        Spec:            opts.swarmOptions.ToSpec(),
    }

    nodeID, err := client.SwarmInit(ctx, req)
    if err != nil {
        if strings.Contains(err.Error(), "could not choose an IP address to advertise") || strings.Contains(err.Error(), "could not find the system's IP address") {
            return errors.New(err.Error() + " - specify one with --advertise-addr")
        }
        return err
    }

    fmt.Fprintf(dockerCli.Out(), "Swarm initialized: current node (%s) is now a manager.\n\n", nodeID)

    if err := printJoinCommand(ctx, dockerCli, nodeID, true, false); err != nil {
        return err
    }

    fmt.Fprint(dockerCli.Out(), "To add a manager to this swarm, run 'docker swarm join-token manager' and follow the instructions.\n\n")
    return nil
}

其中client.DockerCli代表docker command line client

// DockerCli represents the docker command line client.
// Instances of the client can be returned from NewDockerCli.
type DockerCli struct {
    // initializing closure
    init func() error

    // configFile has the client configuration file
    configFile *configfile.ConfigFile
    // in holds the input stream and closer (io.ReadCloser) for the client.
    in io.ReadCloser
    // out holds the output stream (io.Writer) for the client.
    out io.Writer
    // err holds the error stream (io.Writer) for the client.
    err io.Writer
    // keyFile holds the key file as a string.
    keyFile string
    // inFd holds the file descriptor of the client's STDIN (if valid).
    inFd uintptr
    // outFd holds file descriptor of the client's STDOUT (if valid).
    outFd uintptr
    // isTerminalIn indicates whether the client's STDIN is a TTY
    isTerminalIn bool
    // isTerminalOut indicates whether the client's STDOUT is a TTY
    isTerminalOut bool
    // client is the http client that performs all API operations
    client client.APIClient
    // state holds the terminal input state
    inState *term.State
    // outState holds the terminal output state
    outState *term.State
}

其中的client成员便是engine-api/client,所以上述client.SwarmInit的代码位于engine-api/client/swarm_init.go

// SwarmInit initializes the Swarm.
func (cli *Client) SwarmInit(ctx context.Context, req swarm.InitRequest) (string, error) {
    serverResp, err := cli.post(ctx, "/swarm/init", nil, req, nil)
    if err != nil {
        return "", err
    }

    var response string
    err = json.NewDecoder(serverResp.body).Decode(&response)
    ensureReaderClosed(serverResp)
    return response, err
}

Swarmkit笔记(11)——manager创建处理swarmctl请求的server

manager.localserver是创建的本地Unix socket,用来等待处理swarmctl发来的命令请求(源码在manager/controlapi目录)。Manager.Run()函数里localserver相关代码如下:

baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())
......

proxyOpts := []grpc.DialOption{
    grpc.WithBackoffMaxDelay(time.Second),
    grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}

cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs

......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)

......
api.RegisterControlServer(m.localserver, localProxyControlAPI)

(1)首先看一下controlapi.Servercontrolapi.NewServer()的定义:

// Server is the Cluster API gRPC server.
type Server struct {
    store  *store.MemoryStore
    raft   *raft.Node
    rootCA *ca.RootCA
}

// NewServer creates a Cluster API server.
func NewServer(store *store.MemoryStore, raft *raft.Node, rootCA *ca.RootCA) *Server {
    return &Server{
        store:  store,
        raft:   raft,
        rootCA: rootCA,
    }
}

controlapi.NewServer()函数就是用来创建一个响应swarmctl程序发出的control命令请求的server

其中store.MemoryStore是一个很重要的结构体:

// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
    // updateLock must be held during an update transaction.
    updateLock sync.Mutex

    memDB *memdb.MemDB
    queue *watch.Queue

    proposer state.Proposer
}

watch.Queue定义如下:

// Queue is the structure used to publish events and watch for them.
type Queue struct {
    broadcast *events.Broadcaster
}
......
// Watch returns a channel which will receive all items published to the
// queue from this point, until cancel is called.
func (q *Queue) Watch() (eventq chan events.Event, cancel func()) {
    return q.CallbackWatch(nil)
}
......
// Publish adds an item to the queue.
func (q *Queue) Publish(item events.Event) {
    q.broadcast.Write(item)
}

简单地讲,就是当Server.store发生变化时,把数据更新到memDB的同时,也要发送消息到queue里,这样manager监听在相应channelgoroutine就可以收到并处理请求。

下面代码就是把当前cluster的信息填充到新创建的controlapi.Server变量里:

baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())

(2)

proxyOpts := []grpc.DialOption{
    grpc.WithBackoffMaxDelay(time.Second),
    grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}

cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs

......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)

上述代码创建出一个raftProxyControlServer类型的变量:

type raftProxyControlServer struct {
    local        ControlServer
    connSelector *raftpicker.ConnSelector
    cluster      raftpicker.RaftCluster
    ctxMods      []func(context.Context) (context.Context, error)
}

localProxyControlAPI含义是如果收到swarmctl请求的managerleaderswarmctlmanager当然位于同一台机器上),则会处理请求,否则就转发给这个clusterleader

(3)

api.RegisterControlServer(m.localserver, localProxyControlAPI)

上述代码则是把localserver所对应的Unix socketraftProxyControlServer关联起来。

Swarmkit笔记(10)——manager处理session请求

Manager处理session请求是通过_Dispatcher_Session_Handler这个函数(./api/dispatcher.pb.go):

func _Dispatcher_Session_Handler(srv interface{}, stream grpc.ServerStream) error {
    m := new(SessionRequest)
    if err := stream.RecvMsg(m); err != nil {
        return err
    }
    return srv.(DispatcherServer).Session(m, &dispatcherSessionServer{stream})
}

实际函数调用栈如下:

0  0x0000000000b65cbf in github.com/docker/swarmkit/manager/dispatcher.(*Dispatcher).Session
   at /go/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go:768
1  0x0000000000782aa5 in github.com/docker/swarmkit/api.(*authenticatedWrapperDispatcherServer).Session
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:207
2  0x000000000078e505 in github.com/docker/swarmkit/api.(*raftProxyDispatcherServer).Session
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:1121
3  0x0000000000789c2a in github.com/docker/swarmkit/api._Dispatcher_Session_Handler
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:667
4  0x0000000000909646 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).processStreamingRPC
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:602
5  0x000000000090b002 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).handleStream
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:686
6  0x000000000090fcbe in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).serveStreams.func1.1
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:348
7  0x0000000000462bf0 in runtime.goexit
   at /usr/local/go/src/runtime/asm_amd64.s:1998

Dispatcher.Session()函数代码如下:

// Session is a stream which controls agent connection.
// Each message contains list of backup Managers with weights. Also there is
// a special boolean field Disconnect which if true indicates that node should
// reconnect to another Manager immediately.
func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
    ctx := stream.Context()
    nodeInfo, err := ca.RemoteNode(ctx)
    if err != nil {
        return err
    }
    nodeID := nodeInfo.NodeID

    if err := d.isRunningLocked(); err != nil {
        return err
    }

    // register the node.
    sessionID, err := d.register(stream.Context(), nodeID, r.Description)
    if err != nil {
        return err
    }

    fields := logrus.Fields{
        "node.id":      nodeID,
        "node.session": sessionID,
        "method":       "(*Dispatcher).Session",
    }
    if nodeInfo.ForwardedBy != nil {
        fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
    }
    log := log.G(ctx).WithFields(fields)

    var nodeObj *api.Node
    nodeUpdates, cancel, err := store.ViewAndWatch(d.store, func(readTx store.ReadTx) error {
        nodeObj = store.GetNode(readTx, nodeID)
        return nil
    }, state.EventUpdateNode{Node: &api.Node{ID: nodeID},
        Checks: []state.NodeCheckFunc{state.NodeCheckID}},
    )
    if cancel != nil {
        defer cancel()
    }

    if err != nil {
        log.WithError(err).Error("ViewAndWatch Node failed")
    }

    if _, err = d.nodes.GetWithSession(nodeID, sessionID); err != nil {
        return err
    }

    if err := stream.Send(&api.SessionMessage{
        SessionID:            sessionID,
        Node:                 nodeObj,
        Managers:             d.getManagers(),
        NetworkBootstrapKeys: d.networkBootstrapKeys,
    }); err != nil {
        return err
    }

    managerUpdates, mgrCancel := d.mgrQueue.Watch()
    defer mgrCancel()
    keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch()
    defer keyMgrCancel()

    // disconnectNode is a helper forcibly shutdown connection
    disconnectNode := func() error {
        // force disconnect by shutting down the stream.
        transportStream, ok := transport.StreamFromContext(stream.Context())
        if ok {
            // if we have the transport stream, we can signal a disconnect
            // in the client.
            if err := transportStream.ServerTransport().Close(); err != nil {
                log.WithError(err).Error("session end")
            }
        }

        nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"}
        if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
            log.WithError(err).Error("failed to remove node")
        }
        // still return an abort if the transport closure was ineffective.
        return grpc.Errorf(codes.Aborted, "node must disconnect")
    }

    for {
        // After each message send, we need to check the nodes sessionID hasn't
        // changed. If it has, we will the stream and make the node
        // re-register.
        node, err := d.nodes.GetWithSession(nodeID, sessionID)
        if err != nil {
            return err
        }

        var mgrs []*api.WeightedPeer

        var disconnect bool

        select {
        case ev := <-managerUpdates:
            mgrs = ev.([]*api.WeightedPeer)
        case ev := <-nodeUpdates:
            nodeObj = ev.(state.EventUpdateNode).Node
        case <-stream.Context().Done():
            return stream.Context().Err()
        case <-node.Disconnect:
            disconnect = true
        case <-d.ctx.Done():
            disconnect = true
        case <-keyMgrUpdates:
        }
        if mgrs == nil {
            mgrs = d.getManagers()
        }

        if err := stream.Send(&api.SessionMessage{
            SessionID:            sessionID,
            Node:                 nodeObj,
            Managers:             mgrs,
            NetworkBootstrapKeys: d.networkBootstrapKeys,
        }); err != nil {
            return err
        }
        if disconnect {
            return disconnectNode()
        }
    }
}

这个stream是处理agent连接的。前半部分是把连接的agent记录下来;后半部分是如果cluster信息发送变化,比如managerleader发生变化,需要通知agent重新连接。disconnectNode()函数则是需要同agent node断开连接时的处理:包括断开连接,agent node信息删除,等等。

Swarmkit笔记(9)——manager

Node.runManager()函数会启动一个manager

func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
    for {
        n.waitRole(ctx, ca.ManagerRole)
        if ctx.Err() != nil {
            return ctx.Err()
        }
        remoteAddr, _ := n.remotes.Select(n.nodeID)
        m, err := manager.New(&manager.Config{
            ForceNewCluster: n.config.ForceNewCluster,
            ProtoAddr: map[string]string{
                "tcp":  n.config.ListenRemoteAPI,
                "unix": n.config.ListenControlAPI,
            },
            AdvertiseAddr:  n.config.AdvertiseRemoteAPI,
            SecurityConfig: securityConfig,
            ExternalCAs:    n.config.ExternalCAs,
            JoinRaft:       remoteAddr.Addr,
            StateDir:       n.config.StateDir,
            HeartbeatTick:  n.config.HeartbeatTick,
            ElectionTick:   n.config.ElectionTick,
        })
        if err != nil {
            return err
        }
        done := make(chan struct{})
        go func() {
            m.Run(context.Background()) // todo: store error
            close(done)
        }()

        n.Lock()
        n.manager = m
        n.Unlock()

        connCtx, connCancel := context.WithCancel(ctx)
        go n.initManagerConnection(connCtx, ready)

        // this happens only on initial start
        if ready != nil {
            go func(ready chan struct{}) {
                select {
                case <-ready:
                    n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
                case <-connCtx.Done():
                }
            }(ready)
            ready = nil
        }

        n.waitRole(ctx, ca.AgentRole)

        n.Lock()
        n.manager = nil
        n.Unlock()

        select {
        case <-done:
        case <-ctx.Done():
            err = ctx.Err()
            m.Stop(context.Background())
            <-done
        }
        connCancel()

        if err != nil {
            return err
        }
    }
}

(1)

        n.waitRole(ctx, ca.ManagerRole)
        if ctx.Err() != nil {
            return ctx.Err()
        }

首先runManager()函数会阻塞在waitRole()函数。一旦获得manager角色,就会往下执行。

(2)

        remoteAddr, _ := n.remotes.Select(n.nodeID)
        m, err := manager.New(&manager.Config{
            ForceNewCluster: n.config.ForceNewCluster,
            ProtoAddr: map[string]string{
                "tcp":  n.config.ListenRemoteAPI,
                "unix": n.config.ListenControlAPI,
            },
            AdvertiseAddr:  n.config.AdvertiseRemoteAPI,
            SecurityConfig: securityConfig,
            ExternalCAs:    n.config.ExternalCAs,
            JoinRaft:       remoteAddr.Addr,
            StateDir:       n.config.StateDir,
            HeartbeatTick:  n.config.HeartbeatTick,
            ElectionTick:   n.config.ElectionTick,
        })
        if err != nil {
            return err
        }
        done := make(chan struct{})
        go func() {
            m.Run(context.Background()) // todo: store error
            close(done)
        }()

        n.Lock()
        n.manager = m
        n.Unlock()

a)remoteAddr, _ := n.remotes.Select(n.nodeID)作用是从当前clustermanager中(当然需要排除掉当前node)选出一个leader,赋给remoteAddr。如果当前nodecluster中的第一个manager,则remoteAddr就是一个“空的”值:{NodeID: "", Addr: ""}
b)在使用manager.New()函数创建manager时,要注意n.config.AdvertiseRemoteAPI是一直为""的。 manager.New()最后会返回一个Manager结构体:

func New(config *Config) (*Manager, error) {
    ......
    m := &Manager{
        config:      config,
        listeners:   listeners,
        caserver:    ca.NewServer(RaftNode.MemoryStore(), config.SecurityConfig),
        Dispatcher:  dispatcher.New(RaftNode, dispatcherConfig),
        server:      grpc.NewServer(opts...),
        localserver: grpc.NewServer(opts...),
        RaftNode:    RaftNode,
        started:     make(chan struct{}),
        stopped:     make(chan struct{}),
    }

    return m, nil
}

其中的listeners包含监听listen-remote-api(tcp)listen-control-api(unix)的两个socket

c)m.Run()是实际运行manager的函数,连作者自己都觉得复杂(“This function is *way* too complex.”)。可以把这个函数逻辑分成下面几块:
i)如果当前manager被选为leader,就做一大堆初始化的动作,包括为schedulerallocator等分配资源,启动goroutine等等;如果不是leader,就做一大堆收尾工作,停掉goroutine,释放资源。
ii)接下来对manager.localservermanager.server做一大堆设置,主要是authenticationproxy的方面;然后二者分别监听manager.listeners中的UnixTCP socket,处理相应的数据。

(3)

        connCtx, connCancel := context.WithCancel(ctx)
        go n.initManagerConnection(connCtx, ready)

其中Node.initManagerConnection()实现如下:

func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
    opts := []grpc.DialOption{}
    insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
    opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
    // Using listen address instead of advertised address because this is a
    // local connection.
    addr := n.config.ListenControlAPI
    opts = append(opts, grpc.WithDialer(
        func(addr string, timeout time.Duration) (net.Conn, error) {
            return net.DialTimeout("unix", addr, timeout)
        }))
    conn, err := grpc.Dial(addr, opts...)
    if err != nil {
        return err
    }
    state := grpc.Idle
    for {
        s, err := conn.WaitForStateChange(ctx, state)
        if err != nil {
            n.setControlSocket(nil)
            return err
        }
        if s == grpc.Ready {
            n.setControlSocket(conn)
            if ready != nil {
                close(ready)
                ready = nil
            }
        } else if state == grpc.Shutdown {
            n.setControlSocket(nil)
        }
        state = s
    }
}

功能就是建立一个同本地listen-control-api(unix) socket的一个连接,用来监控node的状态。

(4)把当前node也加入remotes的监控列表中:

    // this happens only on initial start
        if ready != nil {
            go func(ready chan struct{}) {
                select {
                case <-ready:
                    n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
                case <-connCtx.Done():
                }
            }(ready)
            ready = nil
        }

(5)阻塞在下列代码,等待角色变化:

n.waitRole(ctx, ca.AgentRole)

 

Go语言中的条件变量

Go语言sync package提供了条件变量(condition variable)类型:

type Cond struct {
        // L is held while observing or changing the condition
        L Locker
        // contains filtered or unexported fields
}
type Cond
    func NewCond(l Locker) *Cond
    func (c *Cond) Broadcast()
    func (c *Cond) Signal()
    func (c *Cond) Wait()
type Locker

type Locker interface {
        Lock()
        Unlock()
}
A Locker represents an object that can be locked and unlocked.

NewCond()函数输入参数是一个Locker接口类型,即实现了锁功能的变量。Broadcast()函数通知所有等待在condition variablegoroutine,而Signal()函数只会通知其中的一个goroutineWait()会让goroutine阻塞在condition variable,等待条件成立。通常的做法是:

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

进入Wait()函数会解锁,离开Wait()函数会重新加锁。由于在“解锁->收到通知->重新加锁”这段逻辑中间有可能另一个同样wait()goroutine抢先一步改变了条件,导致当前goroutine的条件不再成立,所以这块要使用循环检测。参考下例:

package main

import (
    "sync"
    "time"
    "fmt"
)

func main() {
    c := sync.NewCond(&sync.Mutex{})
    var num int

    for i := 1; i <= 2; i++ {
        go func(id int) {
            fmt.Println("Enter Thread ID:", id)
            c.L.Lock()
            for num != 1 {
                fmt.Println("Enter loop: Thread ID:", id)
                c.Wait()
                fmt.Println("Exit loop: Thread ID:", id)
            }
            num++
            c.L.Unlock()
            fmt.Println("Exit Thread ID:", id)
        }(i)
    }

    time.Sleep(time.Second)
    fmt.Println("Sleep 1 second")

    num++
    c.Broadcast()
    time.Sleep(time.Second)
    fmt.Println("Program exit")
}

一次执行结果如下:

Enter Thread ID: 2
Enter loop: Thread ID: 2
Enter Thread ID: 1
Enter loop: Thread ID: 1
Sleep 1 second
Exit loop: Thread ID: 2
Exit Thread ID: 2
Exit loop: Thread ID: 1
Enter loop: Thread ID: 1
Program exit

从上面例子可以看出由于goroutine 2改变了条件,导致goroutine 1重新进入循环,即多个goroutine阻塞在一个condition variable上存在着竞争的关系。

参考资料:
Package sync
Condition Variables

Swarmkit笔记(8)——agent.session

Agentmanager之间的通信是通过session进行的,下面是agent.session结构体定义:

// session encapsulates one round of registration with the manager. session
// starts the registration and heartbeat control cycle. Any failure will result
// in a complete shutdown of the session and it must be reestablished.
//
// All communication with the master is done through session.  Changes that
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
    agent     *Agent
    sessionID string
    session   api.Dispatcher_SessionClient
    errs      chan error
    messages  chan *api.SessionMessage
    tasks     chan *api.TasksMessage

    registered chan struct{} // closed registration
    closed     chan struct{}
}

(1)registered channel是用来通知agent已经向manager注册成功了:

func (s *session) run(ctx context.Context, delay time.Duration) {
    time.Sleep(delay) // delay before registering.

    if err := s.start(ctx); err != nil {
        select {
        case s.errs <- err:
        case <-s.closed:
        case <-ctx.Done():
        }
        return
    }

    ctx = log.WithLogger(ctx, log.G(ctx).WithField("session.id", s.sessionID))

    go runctx(ctx, s.closed, s.errs, s.heartbeat)
    go runctx(ctx, s.closed, s.errs, s.watch)
    go runctx(ctx, s.closed, s.errs, s.listen)

    close(s.registered)
}

session.run函数中,如果session.start()运行没有问题,则会在最后close registered这个channel。而在Agent.Run()中:

func (a *Agent) run(ctx context.Context) {
        .....
        session    = newSession(ctx, a, backoff) // start the initial session
        registered = session.registered
for {
        select {
            ......
        case <-registered:
            log.G(ctx).Debugln("agent: registered")
            if ready != nil {
                close(ready)
            }
            ready = nil
            registered = nil // we only care about this once per session
            backoff = 0      // reset backoff
            sessionq = a.sessionq
            ......
    }
}

一旦registeredclose<-registered这个case则会马上被执行。

(2)当session运行出现错误时,会把error发到errs channel。在Agent.Run()中:

case err := <-session.errs:
        // TODO(stevvooe): This may actually block if a session is closed
        // but no error was sent. Session.close must only be called here
        // for this to work.
        if err != nil {
            log.G(ctx).WithError(err).Error("agent: session failed")
            backoff = initialSessionFailureBackoff + 2*backoff
            if backoff > maxSessionFailureBackoff {
                backoff = maxSessionFailureBackoff
            }
        }

        if err := session.close(); err != nil {
            log.G(ctx).WithError(err).Error("agent: closing session failed")
        }
        sessionq = nil
        // if we're here before <-registered, do nothing for that event
        registered = nil

        // Bounce the connection.
        if a.config.Picker != nil {
            a.config.Picker.Reset()
        }

收到error后,会关闭这个session并做一些扫尾工作。

(3)messages channel用来接收manager发送给agent的消息,并转给Agent.run()函数进行处理:

case msg := <-session.messages:
        if err := a.handleSessionMessage(ctx, msg); err != nil {
            log.G(ctx).WithError(err).Error("session message handler failed")
        }

(4)tasks channel用来接收manager发送给agent的需要在这个node上运行的task信息,同样需要转给Agent.run()函数进行处理:

case msg := <-session.tasks:
        if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
            log.G(ctx).WithError(err).Error("task assignment failed")
        }

(5)closed channelsession.close()函数中被关闭。也就是在case err := <-session.errs:这个分支中才会执行。一旦closed channel被关闭后,会重新建立连接:

case <-session.closed:
        log.G(ctx).Debugf("agent: rebuild session")

        // select a session registration delay from backoff range.
        delay := time.Duration(rand.Int63n(int64(backoff)))
        session = newSession(ctx, a, delay)
        registered = session.registered
        sessionq = a.sessionq  

再看一下session.start()这个函数:

// start begins the session and returns the first SessionMessage.
func (s *session) start(ctx context.Context) error {
    log.G(ctx).Debugf("(*session).start")

    client := api.NewDispatcherClient(s.agent.config.Conn)

    description, err := s.agent.config.Executor.Describe(ctx)
    if err != nil {
        log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
            Errorf("node description unavailable")
        return err
    }
    // Override hostname
    if s.agent.config.Hostname != "" {
        description.Hostname = s.agent.config.Hostname
    }

    errChan := make(chan error, 1)
    var (
        msg*api.SessionMessage
        stream api.Dispatcher_SessionClient
    )
    // Note: we don't defer cancellation of this context, because the
    // streaming RPC is used after this function returned. We only cancel
    // it in the timeout case to make sure the goroutine completes.
    sessionCtx, cancelSession := context.WithCancel(ctx)

    // Need to run Session in a goroutine since there's no way to set a
    // timeout for an individual Recv call in a stream.
    go func() {
        stream, err = client.Session(sessionCtx, &api.SessionRequest{
            Description: description,
        })
        if err != nil {
            errChan <- err
            return
        }

        msg, err = stream.Recv()
        errChan <- err
    }()

    select {
    case err := <-errChan:
        if err != nil {
            return err
        }
    case <-time.After(dispatcherRPCTimeout):
        cancelSession()
        return errors.New("session initiation timed out")
    }

    s.sessionID = msg.SessionID
    s.session = stream

    return s.handleSessionMessage(ctx, msg)
}

(1)

    client := api.NewDispatcherClient(s.agent.config.Conn)

    description, err := s.agent.config.Executor.Describe(ctx)
    if err != nil {
        log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
            Errorf("node description unavailable")
        return err
    }
    // Override hostname
    if s.agent.config.Hostname != "" {
        description.Hostname = s.agent.config.Hostname
    }

而关于api.NewDispatcherClient()函数和其所返回的类型定义如下:

    type dispatcherClient struct {
        cc *grpc.ClientConn
    }

    func NewDispatcherClient(cc *grpc.ClientConn) DispatcherClient {
        return &dispatcherClient{cc}
    }

s.agent.config.Conn就是之前在Node.runAgent()函数中通过下列代码得到的和manager直接的GRPC连接:

conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))

s.agent.config.Executor.Describe()返回对当前node的描述(类型是:*api.NodeDescription)。
(2)

    errChan := make(chan error, 1)
    var (
        msg*api.SessionMessage
        stream api.Dispatcher_SessionClient
    )
    // Note: we don't defer cancellation of this context, because the
    // streaming RPC is used after this function returned. We only cancel
    // it in the timeout case to make sure the goroutine completes.
    sessionCtx, cancelSession := context.WithCancel(ctx)

    // Need to run Session in a goroutine since there's no way to set a
    // timeout for an individual Recv call in a stream.
    go func() {
        stream, err = client.Session(sessionCtx, &api.SessionRequest{
            Description: description,
        })
        if err != nil {
            errChan <- err
            return
        }

        msg, err = stream.Recv()
        errChan <- err
    }()

dispatcherClient.Session()代码如下:

func (c *dispatcherClient) Session(ctx context.Context, in *SessionRequest, opts ...grpc.CallOption) (Dispatcher_SessionClient, error) {
    stream, err := grpc.NewClientStream(ctx, &_Dispatcher_serviceDesc.Streams[0], c.cc, "/docker.swarmkit.v1.Dispatcher/Session", opts...)
    if err != nil {
        return nil, err
    }
    x := &dispatcherSessionClient{stream}
    if err := x.ClientStream.SendMsg(in); err != nil {
        return nil, err
    }
    if err := x.ClientStream.CloseSend(); err != nil {
        return nil, err
    }
    return x, nil
}

返回的是一个符合Dispatcher_SessionClient interface类型的变量:

type Dispatcher_SessionClient interface {
    Recv() (*SessionMessage, error)
    grpc.ClientStream
}

grpc.NewClientStream()函数返回的是grpc.ClientStream interface,而dispatcherSessionClient定义如下:

type dispatcherSessionClient struct {
    grpc.ClientStream
}  

为了满足Dispatcher_SessionClient interface定义,dispatcherSessionClient结构体还实现了Recv方法:

func (x *dispatcherSessionClient) Recv() (*SessionMessage, error) {
    m := new(SessionMessage)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

x.ClientStream.SendMsg()发送的是SessionRequest,而它仅包含一个NodeDescription

// SessionRequest starts a session.
type SessionRequest struct {
    Description *NodeDescription `protobuf:"bytes,1,opt,name=description" json:"description,omitempty"`
}

x.ClientStream.CloseSend()表示所有的发送操作已完成。
接下来收到manager的消息后,把err发到errChan

msg, err = stream.Recv()
errChan <- err

(3)

    select {
    case err := <-errChan:
        if err != nil {
            return err
        }
    case <-time.After(dispatcherRPCTimeout):
        cancelSession()
        return errors.New("session initiation timed out")
    }

    s.sessionID = msg.SessionID
    s.session = stream

    return s.handleSessionMessage(ctx, msg)

一开始goroutine阻塞在select,一旦收到正确的响应,就会完成session的初始化。然后继续等待manager分配任务。

一旦session.start()成功,就会启动另外3goroutine

go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)

session.heartbeat()会创建一个新的dispatcherClient变量,然后在1秒钟后发送api.HeartbeatRequest请求,manager会返回api.HeartbeatResponse,告诉agent每隔多长时间发送heartbeat,目前默认时间是5秒。

session.watch()也会新创建一个dispatcherTasksClient变量,然后发送api.TasksRequest请求,通知manager自己已经ready。接下来就阻塞在Recv()函数,等待manager发送task请求。

session.listen()复用session.session变量,阻塞在Recv()函数,等待manager发送SessionMessage,然后处理。

Swarmkit笔记(7)——exec.Executor interface

exec.Executor interface定义(位于agent/exec/executor.go):

// Executor provides controllers for tasks.
type Executor interface {
    // Describe returns the underlying node description.
    Describe(ctx context.Context) (*api.NodeDescription, error)

    // Configure uses the node object state to propagate node
    // state to the underlying executor.
    Configure(ctx context.Context, node *api.Node) error

    // Controller provides a controller for the given task.
    Controller(t *api.Task) (Controller, error)

    // SetNetworkBootstrapKeys passes the symmetric keys from the
    // manager to the executor.
    SetNetworkBootstrapKeys([]*api.EncryptionKey) error
}

container Package实现了executor结构体(位于agent/exec/container/executor.go

import engineapi "github.com/docker/engine-api/client"
type executor struct {
    client engineapi.APIClient
}

里面只有一个成员:一个Docker APIClientexecutor结构体实际只实现了下面两个方法:

// Describe returns the underlying node description from the docker client.
func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
    info, err := e.client.Info(ctx)
    if err != nil {
        return nil, err
    }

    plugins := map[api.PluginDescription]struct{}{}
    addPlugins := func(typ string, names []string) {
        for _, name := range names {
            plugins[api.PluginDescription{
                Type: typ,
                Name: name,
            }] = struct{}{}
        }
    }

    addPlugins("Volume", info.Plugins.Volume)
    // Add builtin driver "overlay" (the only builtin multi-host driver) to
    // the plugin list by default.
    addPlugins("Network", append([]string{"overlay"}, info.Plugins.Network...))
    addPlugins("Authorization", info.Plugins.Authorization)

    pluginFields := make([]api.PluginDescription, 0, len(plugins))
    for k := range plugins {
        pluginFields = append(pluginFields, k)
    }
    sort.Sort(sortedPlugins(pluginFields))

    // parse []string labels into a map[string]string
    labels := map[string]string{}
    for _, l := range info.Labels {
        stringSlice := strings.SplitN(l, "=", 2)
        // this will take the last value in the list for a given key
        // ideally, one shouldn't assign multiple values to the same key
        if len(stringSlice) > 1 {
            labels[stringSlice[0]] = stringSlice[1]
        }
    }

    description := &api.NodeDescription{
        Hostname: info.Name,
        Platform: &api.Platform{
            Architecture: info.Architecture,
            OS:           info.OSType,
        },
        Engine: &api.EngineDescription{
            EngineVersion: info.ServerVersion,
            Labels:        labels,
            Plugins:       pluginFields,
        },
        Resources: &api.Resources{
            NanoCPUs:    int64(info.NCPU) * 1e9,
            MemoryBytes: info.MemTotal,
        },
    }

    return description, nil
}

// Controller returns a docker container controller.
func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
    ctlr, err := newController(e.client, t)
    if err != nil {
        return nil, err
    }

    return ctlr, nil
}

Describe()方法返回当前Docker engine的资源配置信息,而Controller()则返回一个container.controller结构体。

其中关于client的初始化位于:

        client, err := engineapi.NewClient(engineAddr, "", nil, nil)
        if err != nil {
            return err
        }

        executor := container.NewExecutor(client)

如果没有对engineAddr做特殊设置,就会使用其默认值:unix:///var/run/docker.sockclient值默认如下:

(dlv) p client
*github.com/docker/swarmkit/vendor/github.com/docker/engine-api/client.Client {
        proto: "unix",
        addr: "/var/run/docker.sock",
        basePath: "",
        transport: (unreadable interface type "*transport.apiTransport" not found for 0xc8202ecbd8: no type entry found, use 'types' for a list of valid types),
        version: "",
        customHTTPHeaders: map[string]string [],}

Swarmkit笔记(6)——Agent运行

Node.runAgent()函数实现如下:

func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportAuthenticator, ready chan<- struct{}) error {
    var manager api.Peer
    select {
    case <-ctx.Done():
    case manager = <-n.remotes.WaitSelect(ctx):
    }
    if ctx.Err() != nil {
        return ctx.Err()
    }
    picker := picker.NewPicker(n.remotes, manager.Addr)
    conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
    if err != nil {
        return err
    }

    agent, err := New(&Config{
        Hostname:         n.config.Hostname,
        Managers:         n.remotes,
        Executor:         n.config.Executor,
        DB:               db,
        Conn:             conn,
        Picker:           picker,
        NotifyRoleChange: n.roleChangeReq,
    })
    if err != nil {
        return err
    }
    if err := agent.Start(ctx); err != nil {
        return err
    }

    n.Lock()
    n.agent = agent
    n.Unlock()

    defer func() {
        n.Lock()
        n.agent = nil
        n.Unlock()
    }()

    go func() {
        <-agent.Ready()
        close(ready)
    }()

    // todo: manually call stop on context cancellation?

    return agent.Err(context.Background())
}

上面函数解释如下:

(1)case manager = <-n.remotes.WaitSelect(ctx):首先获得manager
(2)接下来调用grpc.Dial()去连接这个manager

    picker := picker.NewPicker(n.remotes, manager.Addr)
    conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
    if err != nil {
        return err
    }

(3)生成并运行一个Agent

    agent, err := New(&Config{
        Hostname:         n.config.Hostname,
        Managers:         n.remotes,
        Executor:         n.config.Executor,
        DB:               db,
        Conn:             conn,
        Picker:           picker,
        NotifyRoleChange: n.roleChangeReq,
    })
    if err != nil {
        return err
    }
    if err := agent.Start(ctx); err != nil {
        return err
    }

关于Agent结构体定义:

// Agent implements the primary node functionality for a member of a swarm
// cluster. The primary functionality is to run and report on the status of
// tasks assigned to the node.
type Agent struct {
    config *Config

    // The latest node object state from manager
    // for this node known to the agent.
    node *api.Node

    keys []*api.EncryptionKey

    sessionq chan sessionOperation
    worker   Worker

    started chan struct{}
    ready   chan struct{}
    stopped chan struct{} // requests shutdown
    closed  chan struct{} // only closed in run
    err     error         // read only after closed is closed
}

其中Config结构体定义:

// Config provides values for an Agent.
type Config struct {
    // Hostname the name of host for agent instance.
    Hostname string

    // Managers provides the manager backend used by the agent. It will be
    // updated with managers weights as observed by the agent.
    Managers picker.Remotes

    // Conn specifies the client connection Agent will use.
    Conn *grpc.ClientConn

    // Picker is the picker used by Conn.
    // TODO(aaronl): This is only part of the config to allow resetting the
    // GRPC connection. This should be refactored to address the coupling
    // between Conn and Picker.
    Picker *picker.Picker

    // Executor specifies the executor to use for the agent.
    Executor exec.Executor

    // DB used for task storage. Must be open for the lifetime of the agent.
    DB *bolt.DB

    // NotifyRoleChange channel receives new roles from session messages.
    NotifyRoleChange chan<- api.NodeRole
}

注释都很清楚,不必赘述。

Agent.Start()会调到Agent.Run(),实现如下:

func (a *Agent) run(ctx context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    defer close(a.closed) // full shutdown.

    ctx = log.WithLogger(ctx, log.G(ctx).WithField("module", "agent"))

    log.G(ctx).Debugf("(*Agent).run")
    defer log.G(ctx).Debugf("(*Agent).run exited")

    var (
        backoff    time.Duration
        session    = newSession(ctx, a, backoff) // start the initial session
        registered = session.registered
        ready      = a.ready // first session ready
        sessionq   chan sessionOperation
    )

    if err := a.worker.Init(ctx); err != nil {
        log.G(ctx).WithError(err).Error("worker initialization failed")
        a.err = err
        return // fatal?
    }

    // setup a reliable reporter to call back to us.
    reporter := newStatusReporter(ctx, a)
    defer reporter.Close()

    a.worker.Listen(ctx, reporter)

    for {
        select {
        case operation := <-sessionq:
            operation.response <- operation.fn(session)
        case msg := <-session.tasks:
            if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
                log.G(ctx).WithError(err).Error("task assignment failed")
            }
        case msg := <-session.messages:
            if err := a.handleSessionMessage(ctx, msg); err != nil {
                log.G(ctx).WithError(err).Error("session message handler failed")
            }
        case <-registered:
            log.G(ctx).Debugln("agent: registered")
            if ready != nil {
                close(ready)
            }
            ready = nil
            registered = nil // we only care about this once per session
            backoff = 0      // reset backoff
            sessionq = a.sessionq
        case err := <-session.errs:
            // TODO(stevvooe): This may actually block if a session is closed
            // but no error was sent. Session.close must only be called here
            // for this to work.
            if err != nil {
                log.G(ctx).WithError(err).Error("agent: session failed")
                backoff = initialSessionFailureBackoff + 2*backoff
                if backoff > maxSessionFailureBackoff {
                    backoff = maxSessionFailureBackoff
                }
            }

            if err := session.close(); err != nil {
                log.G(ctx).WithError(err).Error("agent: closing session failed")
            }
            sessionq = nil
            // if we're here before <-registered, do nothing for that event
            registered = nil

            // Bounce the connection.
            if a.config.Picker != nil {
                a.config.Picker.Reset()
            }
        case <-session.closed:
            log.G(ctx).Debugf("agent: rebuild session")

            // select a session registration delay from backoff range.
            delay := time.Duration(rand.Int63n(int64(backoff)))
            session = newSession(ctx, a, delay)
            registered = session.registered
            sessionq = a.sessionq
        case <-a.stopped:
            // TODO(stevvooe): Wait on shutdown and cleanup. May need to pump
            // this loop a few times.
            return
        case <-ctx.Done():
            if a.err == nil {
                a.err = ctx.Err()
            }

            return
        }
    }
}

其中重要的是session这个概念,通过“session = newSession(ctx, a, backoff)”这行代码将sessionAgent关联起来。

Swarmkit笔记(5)——Node.Run()函数

Swarmd程序的精髓就是Node.Run()函数。刨除前面一大堆CA验证的相关代码,下面是实际执行manageragent的部分。

......
managerReady := make(chan struct{})
agentReady := make(chan struct{})
var managerErr error
var agentErr error
var wg sync.WaitGroup
wg.Add(2)
go func() {
    managerErr = n.runManager(ctx, securityConfig, managerReady) // store err and loop
    wg.Done()
    cancel()
}()
go func() {
    agentErr = n.runAgent(ctx, db, securityConfig.ClientTLSCreds, agentReady)
    wg.Done()
    cancel()
}()
......

如果node的角色是agent,则runManager goroutine就会阻塞在Node.waitRole()这里:

func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
    for {
        n.waitRole(ctx, ca.ManagerRole)
        ......
    }
}

因此只有Node.runAgent()这个goroutine可以顺畅执行。

如果node的角色是manager,则runManagerrunAgent这两个goroutine都会运行,即manager本身也是一个agent

Swarmkit笔记(4)——swarmd命令选项

Swarmd程序支持的命令选项:

func init() {
    mainCmd.Flags().BoolP("version", "v", false, "Display the version and exit")
    mainCmd.Flags().StringP("log-level", "l", "info", "Log level (options \"debug\", \"info\", \"warn\", \"error\", \"fatal\", \"panic\")")
    mainCmd.Flags().StringP("state-dir", "d", "./swarmkitstate", "State directory")
    mainCmd.Flags().StringP("join-token", "", "", "Specifies the secret token required to join the cluster")
    mainCmd.Flags().String("engine-addr", "unix:///var/run/docker.sock", "Address of engine instance of agent.")
    mainCmd.Flags().String("hostname", "", "Override reported agent hostname")
    mainCmd.Flags().String("listen-remote-api", "0.0.0.0:4242", "Listen address for remote API")
    mainCmd.Flags().String("listen-control-api", "./swarmkitstate/swarmd.sock", "Listen socket for control API")
    mainCmd.Flags().String("listen-debug", "", "Bind the Go debug server on the provided address")
    mainCmd.Flags().String("join-addr", "", "Join cluster with a node at this address")
    mainCmd.Flags().Bool("force-new-cluster", false, "Force the creation of a new cluster from data directory")
    mainCmd.Flags().Uint32("heartbeat-tick", 1, "Defines the heartbeat interval (in seconds) for raft member health-check")
    mainCmd.Flags().Uint32("election-tick", 3, "Defines the amount of ticks (in seconds) needed without a Leader to trigger a new election")
    mainCmd.Flags().Var(&externalCAOpt, "external-ca", "Specifications of one or more certificate signing endpoints")
}

(1)versionlog-levelhostname最简单,不必细说。
(2)state-dir目录存储远端manager以及CA认证等相关信息:

# ls -alt
total 24
drwx------ 5 root root 4096 Jul 29 02:40 .
drwx------ 4 root root 4096 Jul 29 02:40 raft
-rw------- 1 root root   63 Jul 29 02:40 state.json
drwxr-xr-x 2 root root 4096 Jul 29 02:40 worker
drwxr-xr-x 2 root root 4096 Jul 29 02:40 certificates
drwxr-xr-x 3 root root 4096 Jul 29 02:40 ..

(3)join-tokennode用来加入某个clustertoken,在第一次认证请求时会被用到。
(4)engine-addr指定实际用来执行executorengine位置,默认是使用本机Docker
(5)listen-remote-api指定监听一个tcp port,用来接收和处理其它node的访问请求。
(6)listen-control-api指定一个Unix socket,用来接收和处理swarmctl程序的访问请求。
(7)listen-debug指定监听一个用来debug程序的端口。
(8)join-addr指定要加入的cluster的一个node地址,通过连接这个node来加入这个cluster
(9)其余force-new-clusterheartbeat-tickelection-tickexternal-ca解释的都很清楚,不必赘述。