docker笔记(12)——docker 1.12集成docker swarm功能

docker 1.12集成了docker swarm功能。根据Docker Swarm Is Dead. Long Live Docker Swarm.这篇文章,对比docker swarmdocker 1.12有以下优点:

With swarm mode you create a swarm with the ‘init’ command, and add workers to the cluster with the ‘join’ command. The commands to create and join a swarm literally take a second or two to complete. Mouat said “Comparing getting a Kubernetes or Mesos cluster running, Docker Swarm is a snap”.

Communication between nodes on the swarm is all secured with Transport Layer Security (TLS). For simple setups, Docker 1.12 generates self-signed certificates to use when you create the swarm, or you can provide certificates from your own certificate authority. Those certificates are only used internally by the nodes; any services you publicly expose use your own certs as usual.

docker 1.12实现的swarm模式更简单,并且node之间使用TLS机制进行通信。


The self-awareness of the swarm is the biggest and most significant change. Every node in the swarm can reach every other node, and is able to route traffic where it needs to go. You no longer need to run your own load balancer and integrate it with a dynamic discovery agent, using tools like Nginx and Interlock.

Now if a node receives a request which it can’t fulfil, because it isn’t running an instance of the container that can process the request, it routes the request on to a node which can fulfil it. This is transparent to the consumer, all they see is the response to their request, they don’t know about any redirections that happened within the swarm.

docker 1.12swarm模式自带“self-awareness”和“load-balance”机制,并且可以把请求路由到符合要求的node

docker 1.12swarm模式相关的文件默认存放在/var/lib/docker/swarm这个文件夹下面。

关于docker 1.12swarm模式的demo,可参考这个video

Update:docker 1.12其实是利用swarmkit这个project来实现docker swarm cluster功能(相关代码位于daemon/cluster这个目录)。

The relation between “docker/swarm” and “docker/swarmkit”
Comparing Swarm, Swarmkit and Swarm Mode
Docker 1.12 Swarm Mode – Under the hood

Docker Swarm代码分析笔记(17)——event_monitor.go


type Engine struct {
    eventsMonitor   *EventsMonitor


//EventsMonitor monitors events
type EventsMonitor struct {
    stopChan chan struct{}
    cli      client.APIClient
    handler  func(msg events.Message) error



// ConnectWithClient is exported
func (e *Engine) ConnectWithClient(client dockerclient.Client, apiClient engineapi.APIClient) error {
    e.client = client
    e.apiClient = apiClient
    e.eventsMonitor = NewEventsMonitor(e.apiClient, e.handler)

    // Fetch the engine labels.
    if err := e.updateSpecs(); err != nil {
        return err


    // Force a state update before returning.
    if err := e.RefreshContainers(true); err != nil {
        return err

    if err := e.RefreshImages(); err != nil {
        return err

    // Do not check error as older daemon does't support this call.


    return nil


// StartMonitorEvents monitors events from the engine
func (e *Engine) StartMonitorEvents() {
    log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Debug("Start monitoring events")
    ec := make(chan error)

    go func() {
        if err := <-ec; err != nil {
            if !strings.Contains(err.Error(), "EOF") {
                // failing node reconnect should use back-off strategy

Engine.StartMonitorEvents就是如果从ec channel收取消息,如果是错误,就不断地循环启动Engine.StartMonitorEvents


// Start starts the EventsMonitor
func (em *EventsMonitor) Start(ec chan error) {
    em.stopChan = make(chan struct{})

    responseBody, err := em.cli.Events(context.Background(), types.EventsOptions{})
    if err != nil {
        ec <- err

    resultChan := make(chan decodingResult)

    go func() {
        dec := json.NewDecoder(responseBody)
        for {
            var result decodingResult
            result.err = dec.Decode(&result.msg)
            resultChan <- result
            if result.err == io.EOF {

    go func() {
        defer responseBody.Close()
        for {
            select {
            case <-em.stopChan:
                ec <- nil
            case result := <-resultChan:
                if result.err != nil {
                    ec <- result.err
                if err := em.handler(result.msg); err != nil {
                    ec <- err

代码逻辑实际就是发出“HTTP GET /events”请求,然后等待Docker Engine的响应。因为这个HTTP请求很可能会阻塞在这里,因此随后的HTTP消息交互就会重新建立一个HTTP连接。原理在这里

type Response struct {

    // Body represents the response body.
    // The http Client and Transport guarantee that Body is always
    // non-nil, even on responses without a body or responses with
    // a zero-length body. It is the caller's responsibility to
    // close Body. The default HTTP client's Transport does not
    // attempt to reuse HTTP/1.0 or HTTP/1.1 TCP connections
    // ("keep-alive") unless the Body is read to completion and is
    // closed.
    // The Body is automatically dechunked if the server replied
    // with a "chunked" Transfer-Encoding.
    Body io.ReadCloser



// Stop stops the EventsMonitor
func (em *EventsMonitor) Stop() {
    if em.stopChan == nil {


Docker Swarm代码分析笔记(16)——Node结构体

Docker Swarmscheduler会选择符合要求的node来创建container

candidates, err := s.selectNodesForContainer(nodes, config, true)


// Node is an abstract type used by the scheduler.
type Node struct {
    ID         string
    IP         string
    Addr       string
    Name       string
    Labels     map[string]string
    Containers cluster.Containers
    Images     []*cluster.Image

    UsedMemory  int64
    UsedCpus    int64
    TotalMemory int64
    TotalCpus   int64

    HealthIndicator int64


// listNodes returns all validated engines in the cluster, excluding pendingEngines.
func (c *Cluster) listNodes() []*node.Node {
    defer c.RUnlock()

    out := make([]*node.Node, 0, len(c.engines))
    for _, e := range c.engines {
        node := node.NewNode(e)
        for _, pc := range c.pendingContainers {
            if pc.Engine.ID == e.ID && node.Container(pc.Config.SwarmID()) == nil {
        out = append(out, node)

    return out


Docker Swarm代码分析笔记(15)——scheduler

Docker Swarm manage命令的scheduler是通过filterstrategy构建的(cli/manage.go):

sched := scheduler.New(s, fs)

scheduler实际的功能就是选择符合cluster.ContainerConfig要求的nodeDocker Engine)列表:

// SelectNodesForContainer will return a list of nodes where the container can
// be scheduled, sorted by order or preference.
func (s *Scheduler) SelectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig) ([]*node.Node, error) {
    candidates, err := s.selectNodesForContainer(nodes, config, true)

    if err != nil {
        candidates, err = s.selectNodesForContainer(nodes, config, false)
    return candidates, err

func (s *Scheduler) selectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig, soft bool) ([]*node.Node, error) {
    accepted, err := filter.ApplyFilters(s.filters, config, nodes, soft)
    if err != nil {
        return nil, err

    if len(accepted) == 0 {
        return nil, errNoNodeAvailable

    return s.strategy.RankAndSort(config, accepted)


Docker Swarm代码分析笔记(14)——strategy

Docker Swarm manage命令的flStrategy选项用来指定scheduler调度的strategy的过滤项,其变量定义如下(cli/flags.go):

flStrategy = cli.StringFlag{
    Name:  "strategy",
    Usage: "placement strategy to use [" + strings.Join(strategy.List(), ", ") + "]",
    Value: strategy.List()[0],



// PlacementStrategy is the interface for a container placement strategy.
type PlacementStrategy interface {
    // Name of the strategy
    Name() string
    // Initialize performs any initial configuration required by the strategy and returns
    // an error if one is encountered.
    // If no initial configuration is needed, this may be a no-op and return a nil error.
    Initialize() error
    // RankAndSort applies the strategy to a list of nodes and ranks them based
    // on the best fit given the container configuration.  It returns a sorted
    // list of nodes (based on their ranks) or an error if there is no
    // available node on which to schedule the container.
    RankAndSort(config *cluster.ContainerConfig, nodes []*node.Node) ([]*node.Node, error)

其中RankAndSort返回一个符合条件node(也就是Docker Engine)列表,列表中元素按匹配度排序。

Docker Swarm代码分析笔记(13)——filter

Docker Swarm manage命令的filter选项用来指定scheduler选择Docker Engine的过滤项,其变量定义如下(cli/flags.go):

// hack for go vet
flFilterValue = cli.StringSlice(filter.List())
// DefaultFilterNumber is exported
DefaultFilterNumber = len(flFilterValue)

flFilter = cli.StringSliceFlag{
    Name:  "filter, f",
    Usage: "filter to use [" + strings.Join(filter.List(), ", ") + "]",
    Value: &flFilterValue,


// see
names := c.StringSlice("filter")
if c.IsSet("filter") || c.IsSet("f") {
    names = names[DefaultFilterNumber:]
fs, err := filter.New(names)
if err != nil {


func init() {
    filters = []Filter{


// Filter is exported
type Filter interface {
    Name() string

    // Return a subset of nodes that were accepted by the filtering policy.
    Filter(*cluster.ContainerConfig, []*node.Node, bool) ([]*node.Node, error)

    // Return a list of constraints/filters provided
    GetFilters(*cluster.ContainerConfig) ([]string, error)

其中Filter方法用来过滤符合条件的Docker Engine,而GetFilters则返回一个描述过滤条件的字符串。

scheduler/filter/filter.go中,ApplyFilters用来过滤Docker EnginelistAllFilters返回所有的过滤条件:

// ApplyFilters applies a set of filters in batch.
func ApplyFilters(filters []Filter, config *cluster.ContainerConfig, nodes []*node.Node, soft bool) ([]*node.Node, error) {
    var (
        err        error
        candidates = nodes

    for _, filter := range filters {
        candidates, err = filter.Filter(config, candidates, soft)
        if err != nil {
            // special case for when no healthy nodes are found
            if filter.Name() == "health" {
                return nil, err
            return nil, fmt.Errorf("Unable to find a node that satisfies the following conditions %s", listAllFilters(filters, config, filter.Name()))
    return candidates, nil

// listAllFilters creates a string containing all applied filters
func listAllFilters(filters []Filter, config *cluster.ContainerConfig, lastFilter string) string {
    allFilters := ""
    for _, filter := range filters {
        list, err := filter.GetFilters(config)
        if err == nil && len(list) > 0 {
            allFilters = fmt.Sprintf("%s\n%v", allFilters, list)
        if filter.Name() == lastFilter {
            return allFilters
    return allFilters


Docker Swarm代码分析笔记(12)——container相关的config


// ContainerConfig is exported
// TODO store affinities and constraints in their own fields
type ContainerConfig struct {
    HostConfig       container.HostConfig
    NetworkingConfig network.NetworkingConfig

// OldContainerConfig contains additional fields for backward compatibility
// This should be removed after we stop supporting API versions <= 1.8
type OldContainerConfig struct {
    Memory     int64
    MemorySwap int64
    CPUShares  int64  `json:"CpuShares"`
    CPUSet     string `json:"Cpuset"`



Docker Swarm代码分析笔记(11)——Engine.Connect

Engine.Connect()方法用来完成对Docker engine的连接工作:

// Connect will initialize a connection to the Docker daemon running on the
// host, gather machine specs (memory, cpu, ...) and monitor state changes.
func (e *Engine) Connect(config *tls.Config) error {
    host, _, err := net.SplitHostPort(e.Addr)
    if err != nil {
        return err

    addr, err := net.ResolveIPAddr("ip4", host)
    if err != nil {
        return err
    e.IP = addr.IP.String()

    c, err := dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout), setTCPUserTimeout)
    if err != nil {
        return err
    // Use HTTP Client used by dockerclient to create engine-api client
    apiClient, err := engineapi.NewClient("tcp://"+e.Addr, "", c.HTTPClient, nil)
    if err != nil {
        return err

    return e.ConnectWithClient(c, apiClient)

Engine.Connect()利用了另外两个Docker项目:dockerclientengine-api。首先调用dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout), setTCPUserTimeout)生成一个DockerClient结构体:

type DockerClient struct {
    URL           *url.URL
    HTTPClient    *http.Client
    TLSConfig     *tls.Config
    monitorStats  int32
    eventStopChan chan (struct{})

再把DockerClient.HTTPClient传入engineapi.NewClient()函数,作为engine-apihttp.Client,作为HTTP请求的传输载体。最后调用Engine.ConnectWithClient()连接Docker engine,获得Docker信息。


Docker Swarm代码分析笔记(10)——Cluster.validatePendingEngine

Cluster.validatePendingEngine是实际上用来连接Docker engine的代码:

// validatePendingEngine connects to the engine,
func (c *Cluster) validatePendingEngine(engine *cluster.Engine) bool {
    // Attempt a connection to the engine. Since this is slow, don't get a hold
    // of the lock yet.
    if err := engine.Connect(c.TLSConfig); err != nil {
        log.WithFields(log.Fields{"Addr": engine.Addr}).Debugf("Failed to validate pending node: %s", err)
        return false

    // The following is critical and fast. Grab a lock.
    defer c.Unlock()

    // Only validate engines from pendingEngines list
    if _, exists := c.pendingEngines[engine.Addr]; !exists {
        return false

    // Make sure the engine ID is unique.
    if old, exists := c.engines[engine.ID]; exists {
        if old.Addr != engine.Addr {
            log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr)
            // Keep this engine in pendingEngines table and show its error.
            // If it's ID duplication from VM clone, user see this message and can fix it.
            // If the engine is rebooted and get new IP from DHCP, previous address will be removed
            // from discovery after a while.
            // In both cases, retry may fix the problem.
        } else {
            log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr)
            // Remove it from pendingEngines table
            delete(c.pendingEngines, engine.Addr)
        return false

    // Engine validated, move from pendingEngines table to engines table
    delete(c.pendingEngines, engine.Addr)
    // set engine state to healthy, and start refresh loop
    c.engines[engine.ID] = engine

    log.Infof("Registered Engine %s at %s", engine.Name, engine.Addr)
    return true

Cluster.validatePendingEngine永远操作Cluster.pendingEngines里的Engine,如果连接Docker engine成功,就会把这个Engine移到Cluster.engines这个map里。


Docker Swarm代码分析笔记(9)——Swarm的Cluster,Engine和addEngine


// Cluster is exported
type Cluster struct {

    eventHandlers     *cluster.EventHandlers
    engines           map[string]*cluster.Engine
    pendingEngines    map[string]*cluster.Engine
    scheduler         *scheduler.Scheduler
    discovery         discovery.Backend
    pendingContainers map[string]*pendingContainer

    overcommitRatio float64
    engineOpts      *cluster.EngineOpts
    createRetry     int64
    TLSConfig       *tls.Config

其中很重要的是cluster.Engine这个结构体的定义(cluster/engine.go),每个cluster.Engine代表一个Docker engine,即Docker daemon

// Engine represents a docker engine
type Engine struct {

    ID  string
    IP  string
    Addr string
    Name string
    Cpus int64
    Memory  int64
    Labels  map[string]string
    Version string

    stopCh  chan struct{}
    refreshDelayer  *delayer
    containers  map[string]*Container
    images  []*Image
    volumes map[string]*Volume
    client  dockerclient.Client
    apiClient   engineapi.APIClient
    state   engineState
    lastError   string
    updatedAt   time.Time
    overcommitRatio int64
    eventsMonitor   *EventsMonitor

创建cluster最核心的部分就是addEngine这个方法,即把Docker engine加到cluster中:

func (c *Cluster) addEngine(addr string) bool {
    // Check the engine is already registered by address.
    if c.hasEngineByAddr(addr) {
        return false

    engine := cluster.NewEngine(addr, c.overcommitRatio, c.engineOpts)
    if err := engine.RegisterEventHandler(c); err != nil {
    // Add it to pending engine map, indexed by address. This will prevent
    // duplicates from entering
    c.pendingEngines[addr] = engine

    // validatePendingEngine will start a thread to validate the engine.
    // If the engine is reachable and valid, it'll be monitored and updated in a loop.
    // If engine is not reachable, pending engines will be examined once in a while
    go c.validatePendingEngine(engine)

    return true



// RegisterEventHandler registers an event handler.
func (e *Engine) RegisterEventHandler(h EventHandler) error {
    if e.eventHandler != nil {
        return errors.New("event handler already set")
    e.eventHandler = h
    return nil