Swarmkit笔记(7)——exec.Executor interface

exec.Executor interface定义(位于agent/exec/executor.go):

// Executor provides controllers for tasks.
type Executor interface {
    // Describe returns the underlying node description.
    Describe(ctx context.Context) (*api.NodeDescription, error)

    // Configure uses the node object state to propagate node
    // state to the underlying executor.
    Configure(ctx context.Context, node *api.Node) error

    // Controller provides a controller for the given task.
    Controller(t *api.Task) (Controller, error)

    // SetNetworkBootstrapKeys passes the symmetric keys from the
    // manager to the executor.
    SetNetworkBootstrapKeys([]*api.EncryptionKey) error
}

container Package实现了executor结构体(位于agent/exec/container/executor.go

import engineapi "github.com/docker/engine-api/client"
type executor struct {
    client engineapi.APIClient
}

里面只有一个成员:一个Docker APIClientexecutor结构体实际只实现了下面两个方法:

// Describe returns the underlying node description from the docker client.
func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
    info, err := e.client.Info(ctx)
    if err != nil {
        return nil, err
    }

    plugins := map[api.PluginDescription]struct{}{}
    addPlugins := func(typ string, names []string) {
        for _, name := range names {
            plugins[api.PluginDescription{
                Type: typ,
                Name: name,
            }] = struct{}{}
        }
    }

    addPlugins("Volume", info.Plugins.Volume)
    // Add builtin driver "overlay" (the only builtin multi-host driver) to
    // the plugin list by default.
    addPlugins("Network", append([]string{"overlay"}, info.Plugins.Network...))
    addPlugins("Authorization", info.Plugins.Authorization)

    pluginFields := make([]api.PluginDescription, 0, len(plugins))
    for k := range plugins {
        pluginFields = append(pluginFields, k)
    }
    sort.Sort(sortedPlugins(pluginFields))

    // parse []string labels into a map[string]string
    labels := map[string]string{}
    for _, l := range info.Labels {
        stringSlice := strings.SplitN(l, "=", 2)
        // this will take the last value in the list for a given key
        // ideally, one shouldn't assign multiple values to the same key
        if len(stringSlice) > 1 {
            labels[stringSlice[0]] = stringSlice[1]
        }
    }

    description := &api.NodeDescription{
        Hostname: info.Name,
        Platform: &api.Platform{
            Architecture: info.Architecture,
            OS:           info.OSType,
        },
        Engine: &api.EngineDescription{
            EngineVersion: info.ServerVersion,
            Labels:        labels,
            Plugins:       pluginFields,
        },
        Resources: &api.Resources{
            NanoCPUs:    int64(info.NCPU) * 1e9,
            MemoryBytes: info.MemTotal,
        },
    }

    return description, nil
}

// Controller returns a docker container controller.
func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
    ctlr, err := newController(e.client, t)
    if err != nil {
        return nil, err
    }

    return ctlr, nil
}

Describe()方法返回当前Docker engine的资源配置信息,而Controller()则返回一个container.controller结构体。

其中关于client的初始化位于:

        client, err := engineapi.NewClient(engineAddr, "", nil, nil)
        if err != nil {
            return err
        }

        executor := container.NewExecutor(client)

如果没有对engineAddr做特殊设置,就会使用其默认值:unix:///var/run/docker.sockclient值默认如下:

(dlv) p client
*github.com/docker/swarmkit/vendor/github.com/docker/engine-api/client.Client {
        proto: "unix",
        addr: "/var/run/docker.sock",
        basePath: "",
        transport: (unreadable interface type "*transport.apiTransport" not found for 0xc8202ecbd8: no type entry found, use 'types' for a list of valid types),
        version: "",
        customHTTPHeaders: map[string]string [],}

Swarmkit笔记(6)——Agent运行

Node.runAgent()函数实现如下:

func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportAuthenticator, ready chan<- struct{}) error {
    var manager api.Peer
    select {
    case <-ctx.Done():
    case manager = <-n.remotes.WaitSelect(ctx):
    }
    if ctx.Err() != nil {
        return ctx.Err()
    }
    picker := picker.NewPicker(n.remotes, manager.Addr)
    conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
    if err != nil {
        return err
    }

    agent, err := New(&Config{
        Hostname:         n.config.Hostname,
        Managers:         n.remotes,
        Executor:         n.config.Executor,
        DB:               db,
        Conn:             conn,
        Picker:           picker,
        NotifyRoleChange: n.roleChangeReq,
    })
    if err != nil {
        return err
    }
    if err := agent.Start(ctx); err != nil {
        return err
    }

    n.Lock()
    n.agent = agent
    n.Unlock()

    defer func() {
        n.Lock()
        n.agent = nil
        n.Unlock()
    }()

    go func() {
        <-agent.Ready()
        close(ready)
    }()

    // todo: manually call stop on context cancellation?

    return agent.Err(context.Background())
}

上面函数解释如下:

(1)case manager = <-n.remotes.WaitSelect(ctx):首先获得manager
(2)接下来调用grpc.Dial()去连接这个manager

    picker := picker.NewPicker(n.remotes, manager.Addr)
    conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
    if err != nil {
        return err
    }

(3)生成并运行一个Agent

    agent, err := New(&Config{
        Hostname:         n.config.Hostname,
        Managers:         n.remotes,
        Executor:         n.config.Executor,
        DB:               db,
        Conn:             conn,
        Picker:           picker,
        NotifyRoleChange: n.roleChangeReq,
    })
    if err != nil {
        return err
    }
    if err := agent.Start(ctx); err != nil {
        return err
    }

关于Agent结构体定义:

// Agent implements the primary node functionality for a member of a swarm
// cluster. The primary functionality is to run and report on the status of
// tasks assigned to the node.
type Agent struct {
    config *Config

    // The latest node object state from manager
    // for this node known to the agent.
    node *api.Node

    keys []*api.EncryptionKey

    sessionq chan sessionOperation
    worker   Worker

    started chan struct{}
    ready   chan struct{}
    stopped chan struct{} // requests shutdown
    closed  chan struct{} // only closed in run
    err     error         // read only after closed is closed
}

其中Config结构体定义:

// Config provides values for an Agent.
type Config struct {
    // Hostname the name of host for agent instance.
    Hostname string

    // Managers provides the manager backend used by the agent. It will be
    // updated with managers weights as observed by the agent.
    Managers picker.Remotes

    // Conn specifies the client connection Agent will use.
    Conn *grpc.ClientConn

    // Picker is the picker used by Conn.
    // TODO(aaronl): This is only part of the config to allow resetting the
    // GRPC connection. This should be refactored to address the coupling
    // between Conn and Picker.
    Picker *picker.Picker

    // Executor specifies the executor to use for the agent.
    Executor exec.Executor

    // DB used for task storage. Must be open for the lifetime of the agent.
    DB *bolt.DB

    // NotifyRoleChange channel receives new roles from session messages.
    NotifyRoleChange chan<- api.NodeRole
}

注释都很清楚,不必赘述。

Agent.Start()会调到Agent.Run(),实现如下:

func (a *Agent) run(ctx context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    defer close(a.closed) // full shutdown.

    ctx = log.WithLogger(ctx, log.G(ctx).WithField("module", "agent"))

    log.G(ctx).Debugf("(*Agent).run")
    defer log.G(ctx).Debugf("(*Agent).run exited")

    var (
        backoff    time.Duration
        session    = newSession(ctx, a, backoff) // start the initial session
        registered = session.registered
        ready      = a.ready // first session ready
        sessionq   chan sessionOperation
    )

    if err := a.worker.Init(ctx); err != nil {
        log.G(ctx).WithError(err).Error("worker initialization failed")
        a.err = err
        return // fatal?
    }

    // setup a reliable reporter to call back to us.
    reporter := newStatusReporter(ctx, a)
    defer reporter.Close()

    a.worker.Listen(ctx, reporter)

    for {
        select {
        case operation := <-sessionq:
            operation.response <- operation.fn(session)
        case msg := <-session.tasks:
            if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
                log.G(ctx).WithError(err).Error("task assignment failed")
            }
        case msg := <-session.messages:
            if err := a.handleSessionMessage(ctx, msg); err != nil {
                log.G(ctx).WithError(err).Error("session message handler failed")
            }
        case <-registered:
            log.G(ctx).Debugln("agent: registered")
            if ready != nil {
                close(ready)
            }
            ready = nil
            registered = nil // we only care about this once per session
            backoff = 0      // reset backoff
            sessionq = a.sessionq
        case err := <-session.errs:
            // TODO(stevvooe): This may actually block if a session is closed
            // but no error was sent. Session.close must only be called here
            // for this to work.
            if err != nil {
                log.G(ctx).WithError(err).Error("agent: session failed")
                backoff = initialSessionFailureBackoff + 2*backoff
                if backoff > maxSessionFailureBackoff {
                    backoff = maxSessionFailureBackoff
                }
            }

            if err := session.close(); err != nil {
                log.G(ctx).WithError(err).Error("agent: closing session failed")
            }
            sessionq = nil
            // if we're here before <-registered, do nothing for that event
            registered = nil

            // Bounce the connection.
            if a.config.Picker != nil {
                a.config.Picker.Reset()
            }
        case <-session.closed:
            log.G(ctx).Debugf("agent: rebuild session")

            // select a session registration delay from backoff range.
            delay := time.Duration(rand.Int63n(int64(backoff)))
            session = newSession(ctx, a, delay)
            registered = session.registered
            sessionq = a.sessionq
        case <-a.stopped:
            // TODO(stevvooe): Wait on shutdown and cleanup. May need to pump
            // this loop a few times.
            return
        case <-ctx.Done():
            if a.err == nil {
                a.err = ctx.Err()
            }

            return
        }
    }
}

其中重要的是session这个概念,通过“session = newSession(ctx, a, backoff)”这行代码将sessionAgent关联起来。

Swarmkit笔记(5)——Node.Run()函数

Swarmd程序的精髓就是Node.Run()函数。刨除前面一大堆CA验证的相关代码,下面是实际执行manageragent的部分。

......
managerReady := make(chan struct{})
agentReady := make(chan struct{})
var managerErr error
var agentErr error
var wg sync.WaitGroup
wg.Add(2)
go func() {
    managerErr = n.runManager(ctx, securityConfig, managerReady) // store err and loop
    wg.Done()
    cancel()
}()
go func() {
    agentErr = n.runAgent(ctx, db, securityConfig.ClientTLSCreds, agentReady)
    wg.Done()
    cancel()
}()
......

如果node的角色是agent,则runManager goroutine就会阻塞在Node.waitRole()这里:

func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
    for {
        n.waitRole(ctx, ca.ManagerRole)
        ......
    }
}

因此只有Node.runAgent()这个goroutine可以顺畅执行。

如果node的角色是manager,则runManagerrunAgent这两个goroutine都会运行,即manager本身也是一个agent

Swarmkit笔记(4)——swarmd命令选项

Swarmd程序支持的命令选项:

func init() {
    mainCmd.Flags().BoolP("version", "v", false, "Display the version and exit")
    mainCmd.Flags().StringP("log-level", "l", "info", "Log level (options \"debug\", \"info\", \"warn\", \"error\", \"fatal\", \"panic\")")
    mainCmd.Flags().StringP("state-dir", "d", "./swarmkitstate", "State directory")
    mainCmd.Flags().StringP("join-token", "", "", "Specifies the secret token required to join the cluster")
    mainCmd.Flags().String("engine-addr", "unix:///var/run/docker.sock", "Address of engine instance of agent.")
    mainCmd.Flags().String("hostname", "", "Override reported agent hostname")
    mainCmd.Flags().String("listen-remote-api", "0.0.0.0:4242", "Listen address for remote API")
    mainCmd.Flags().String("listen-control-api", "./swarmkitstate/swarmd.sock", "Listen socket for control API")
    mainCmd.Flags().String("listen-debug", "", "Bind the Go debug server on the provided address")
    mainCmd.Flags().String("join-addr", "", "Join cluster with a node at this address")
    mainCmd.Flags().Bool("force-new-cluster", false, "Force the creation of a new cluster from data directory")
    mainCmd.Flags().Uint32("heartbeat-tick", 1, "Defines the heartbeat interval (in seconds) for raft member health-check")
    mainCmd.Flags().Uint32("election-tick", 3, "Defines the amount of ticks (in seconds) needed without a Leader to trigger a new election")
    mainCmd.Flags().Var(&externalCAOpt, "external-ca", "Specifications of one or more certificate signing endpoints")
}

(1)versionlog-levelhostname最简单,不必细说。
(2)state-dir目录存储远端manager以及CA认证等相关信息:

# ls -alt
total 24
drwx------ 5 root root 4096 Jul 29 02:40 .
drwx------ 4 root root 4096 Jul 29 02:40 raft
-rw------- 1 root root   63 Jul 29 02:40 state.json
drwxr-xr-x 2 root root 4096 Jul 29 02:40 worker
drwxr-xr-x 2 root root 4096 Jul 29 02:40 certificates
drwxr-xr-x 3 root root 4096 Jul 29 02:40 ..

(3)join-tokennode用来加入某个clustertoken,在第一次认证请求时会被用到。
(4)engine-addr指定实际用来执行executorengine位置,默认是使用本机Docker
(5)listen-remote-api指定监听一个tcp port,用来接收和处理其它node的访问请求。
(6)listen-control-api指定一个Unix socket,用来接收和处理swarmctl程序的访问请求。
(7)listen-debug指定监听一个用来debug程序的端口。
(8)join-addr指定要加入的cluster的一个node地址,通过连接这个node来加入这个cluster
(9)其余force-new-clusterheartbeat-tickelection-tickexternal-ca解释的都很清楚,不必赘述。

Swarmkit笔记(3)——swarmd程序框架

agent.Node结构体有4channel,理解它们的作用就可以理解swarmd程序的框架:

// Node implements the primary node functionality for a member of a swarm
// cluster. Node handles workloads and may also run as a manager.
type Node struct {
    ......
    started              chan struct{}
    stopped              chan struct{}
    ready                chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
    ......
    closed               chan struct{}
    ......
}

swarmd程序的框架(其中executor通过engine-addr得到,代表最终运行task的实体,实际是一个Docker engineapi.APIClient。其它参数都通过命令行直接得到。):

        ......
        // Create a context for our GRPC call
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()

        ......

        n, err := agent.NewNode(&agent.NodeConfig{
            Hostname:         hostname,
            ForceNewCluster:  forceNewCluster,
            ListenControlAPI: unix,
            ListenRemoteAPI:  addr,
            JoinAddr:         managerAddr,
            StateDir:         stateDir,
            JoinToken:        joinToken,
            ExternalCAs:      externalCAOpt.Value(),
            Executor:         executor,
            HeartbeatTick:    hb,
            ElectionTick:     election,
        })
        if err != nil {
            return err
        }

        if err := n.Start(ctx); err != nil {
            return err
        }

        c := make(chan os.Signal, 1)
        signal.Notify(c, os.Interrupt)
        go func() {
            <-c
            n.Stop(ctx)
        }()

        go func() {
            select {
            case <-n.Ready():
            case <-ctx.Done():
            }
            if ctx.Err() == nil {
                logrus.Info("node is ready")
            }
        }()

        return n.Err(ctx)

(1)

if err := n.Start(ctx); err != nil {
    return err
}

看一下Node.Start()函数的实现:

// Start starts a node instance.
func (n *Node) Start(ctx context.Context) error {
    select {
    case <-n.started:
        select {
        case <-n.closed:
            return n.err
        case <-n.stopped:
            return errAgentStopped
        case <-ctx.Done():
            return ctx.Err()
        default:
            return errAgentStarted
        }
    case <-ctx.Done():
        return ctx.Err()
    default:
    }

    close(n.started)
    go n.run(ctx)
    return nil
}

如果执行Node.Start()时没有任何异常发生,就会把Node.started这个channel关掉(close(n.started)),然后启动这个节点初始化过程:go n.run(ctx)

(2)

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
    <-c
    n.Stop(ctx)
}()

这段代码的含义是用户按Ctrl+C可以中断程序。Node.Stop()函数实现如下:

// Stop stops node execution
func (n *Node) Stop(ctx context.Context) error {
    select {
    case <-n.started:
        select {
        case <-n.closed:
            return n.err
        case <-n.stopped:
            select {
            case <-n.closed:
                return n.err
            case <-ctx.Done():
                return ctx.Err()
            }
        case <-ctx.Done():
            return ctx.Err()
        default:
            close(n.stopped)
            // recurse and wait for closure
            return n.Stop(ctx)
        }
    case <-ctx.Done():
        return ctx.Err()
    default:
        return errAgentNotStarted
    }
}

由于此时Node.started这个channel已经被关掉,所以会永远执行select的第一个case分支:case <-n.started。然后会根据当时的情况,再决定执行哪个分支。

(3)

go func() {
    select {
        case <-n.Ready():
        case <-ctx.Done():
    }
    if ctx.Err() == nil {
        logrus.Info("node is ready")
    }
}()

Node.Ready()函数会返回Node.ready这个channel

// Ready returns a channel that is closed after node's initialization has
// completes for the first time.
func (n *Node) Ready() <-chan struct{} {
    return n.ready
}

Node初始化完成后,Node.ready这个channel就会被关掉。因此如果一切顺利的话,就会看到“node is ready”的log

(4)

return n.Err(ctx)

Node.Err()函数的实现:

// Err returns the error that caused the node to shutdown or nil. Err blocks
// until the node has fully shut down.
func (n *Node) Err(ctx context.Context) error {
    select {
    case <-n.closed:
        return n.err
    case <-ctx.Done():
        return ctx.Err()
    }
}

Node.Err()函数阻塞在这里,等待Node关闭。

 

Swarmkit笔记(1)——概述

SwarmkitDocker公司新开源的一个项目,它用来创建和管理cluster。默认情况下使用Docker container来运行任务,但不限于此。

Cluster中的node分成两种:managerworkerManager node负责接收用户指令和管理clusterworker node则是通过executor执行task(默认executor即为Docker container)。Task可以组织成Service。此外,一组manager通过Raft协议形成一个组,并会选出一个leader。只有leader处理所有的请求,其它的成员只是把请求传给leader

Swarmkit提供了两个可执行程序:swarmdswarmctlswarmd用来部署在cluster中的每一个node上,彼此间互相通信,组成cluster;而swarmctl则用来向整个cluster“发号施令”。 下图可以更清楚地描述Swarmkit的内部机制(图片出处:https://pbs.twimg.com/media/Ckb8EMLVAAQrxYH.jpg):

Ckb8EMLVAAQrxYH

参考资料:
docker-swarmkit

Nomenclature

Swarmkit Internal