docker笔记(12)——docker 1.12集成docker swarm功能

docker 1.12集成了docker swarm功能。根据Docker Swarm Is Dead. Long Live Docker Swarm.这篇文章,对比docker swarmdocker 1.12有以下优点:
(1)

With swarm mode you create a swarm with the ‘init’ command, and add workers to the cluster with the ‘join’ command. The commands to create and join a swarm literally take a second or two to complete. Mouat said “Comparing getting a Kubernetes or Mesos cluster running, Docker Swarm is a snap”.

Communication between nodes on the swarm is all secured with Transport Layer Security (TLS). For simple setups, Docker 1.12 generates self-signed certificates to use when you create the swarm, or you can provide certificates from your own certificate authority. Those certificates are only used internally by the nodes; any services you publicly expose use your own certs as usual.

docker 1.12实现的swarm模式更简单,并且node之间使用TLS机制进行通信。

(2)

The self-awareness of the swarm is the biggest and most significant change. Every node in the swarm can reach every other node, and is able to route traffic where it needs to go. You no longer need to run your own load balancer and integrate it with a dynamic discovery agent, using tools like Nginx and Interlock.

Now if a node receives a request which it can’t fulfil, because it isn’t running an instance of the container that can process the request, it routes the request on to a node which can fulfil it. This is transparent to the consumer, all they see is the response to their request, they don’t know about any redirections that happened within the swarm.

docker 1.12swarm模式自带“self-awareness”和“load-balance”机制,并且可以把请求路由到符合要求的node

docker 1.12swarm模式相关的文件默认存放在/var/lib/docker/swarm这个文件夹下面。

关于docker 1.12swarm模式的demo,可参考这个video

Update:docker 1.12其实是利用swarmkit这个project来实现docker swarm cluster功能(相关代码位于daemon/cluster这个目录)。

参考资料:
The relation between “docker/swarm” and “docker/swarmkit”
Comparing Swarm, Swarmkit and Swarm Mode
Docker 1.12 Swarm Mode – Under the hood

Docker Swarm代码分析笔记(17)——event_monitor.go

Engine结构体有一个eventsMonitor成员:

type Engine struct {
    ......
    eventsMonitor   *EventsMonitor
}

EventsMonitor结构体定义如下:

//EventsMonitor monitors events
type EventsMonitor struct {
    stopChan chan struct{}
    cli      client.APIClient
    handler  func(msg events.Message) error
}

stopChan用来通知停止接受消息;cli是底层连接的client,而handler则是收到event的处理函数。

Engine.ConnectWithClient方法会给eventsMonitor成员赋值:

// ConnectWithClient is exported
func (e *Engine) ConnectWithClient(client dockerclient.Client, apiClient engineapi.APIClient) error {
    e.client = client
    e.apiClient = apiClient
    e.eventsMonitor = NewEventsMonitor(e.apiClient, e.handler)

    // Fetch the engine labels.
    if err := e.updateSpecs(); err != nil {
        return err
    }

    e.StartMonitorEvents()

    // Force a state update before returning.
    if err := e.RefreshContainers(true); err != nil {
        return err
    }

    if err := e.RefreshImages(); err != nil {
        return err
    }

    // Do not check error as older daemon does't support this call.
    e.RefreshVolumes()
    e.RefreshNetworks()

    e.emitEvent("engine_connect")

    return nil
}

其中Engine.StartMonitorEvents代码如下:

// StartMonitorEvents monitors events from the engine
func (e *Engine) StartMonitorEvents() {
    log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Debug("Start monitoring events")
    ec := make(chan error)
    e.eventsMonitor.Start(ec)

    go func() {
        if err := <-ec; err != nil {
            if !strings.Contains(err.Error(), "EOF") {
                // failing node reconnect should use back-off strategy
                <-e.refreshDelayer.Wait(e.getFailureCount())
            }
            e.StartMonitorEvents()
        }
        close(ec)
    }()
}

Engine.StartMonitorEvents就是如果从ec channel收取消息,如果是错误,就不断地循环启动Engine.StartMonitorEvents

EventsMonitor.Start函数代码如下:

// Start starts the EventsMonitor
func (em *EventsMonitor) Start(ec chan error) {
    em.stopChan = make(chan struct{})

    responseBody, err := em.cli.Events(context.Background(), types.EventsOptions{})
    if err != nil {
        ec <- err
        return
    }

    resultChan := make(chan decodingResult)

    go func() {
        dec := json.NewDecoder(responseBody)
        for {
            var result decodingResult
            result.err = dec.Decode(&result.msg)
            resultChan <- result
            if result.err == io.EOF {
                break
            }
        }
        close(resultChan)
    }()

    go func() {
        defer responseBody.Close()
        for {
            select {
            case <-em.stopChan:
                ec <- nil
                return
            case result := <-resultChan:
                if result.err != nil {
                    ec <- result.err
                    return
                }
                if err := em.handler(result.msg); err != nil {
                    ec <- err
                    return
                }
            }
        }
    }()
}

代码逻辑实际就是发出“HTTP GET /events”请求,然后等待Docker Engine的响应。因为这个HTTP请求很可能会阻塞在这里,因此随后的HTTP消息交互就会重新建立一个HTTP连接。原理在这里

type Response struct {
    ......

    // Body represents the response body.
    //
    // The http Client and Transport guarantee that Body is always
    // non-nil, even on responses without a body or responses with
    // a zero-length body. It is the caller's responsibility to
    // close Body. The default HTTP client's Transport does not
    // attempt to reuse HTTP/1.0 or HTTP/1.1 TCP connections
    // ("keep-alive") unless the Body is read to completion and is
    // closed.
    //
    // The Body is automatically dechunked if the server replied
    // with a "chunked" Transfer-Encoding.
    Body io.ReadCloser

    ......
}

如果想停止这个EventsMonitor,可以使用Engine.Stop方法:

// Stop stops the EventsMonitor
func (em *EventsMonitor) Stop() {
    if em.stopChan == nil {
        return
    }
    close(em.stopChan)
}

 

Docker Swarm代码分析笔记(16)——Node结构体

Docker Swarmscheduler会选择符合要求的node来创建container

candidates, err := s.selectNodesForContainer(nodes, config, true)

node定义在scheduler/node/node.go

// Node is an abstract type used by the scheduler.
type Node struct {
    ID         string
    IP         string
    Addr       string
    Name       string
    Labels     map[string]string
    Containers cluster.Containers
    Images     []*cluster.Image

    UsedMemory  int64
    UsedCpus    int64
    TotalMemory int64
    TotalCpus   int64

    HealthIndicator int64
}

Cluster.listNodes方法实现如下:

// listNodes returns all validated engines in the cluster, excluding pendingEngines.
func (c *Cluster) listNodes() []*node.Node {
    c.RLock()
    defer c.RUnlock()

    out := make([]*node.Node, 0, len(c.engines))
    for _, e := range c.engines {
        node := node.NewNode(e)
        for _, pc := range c.pendingContainers {
            if pc.Engine.ID == e.ID && node.Container(pc.Config.SwarmID()) == nil {
                node.AddContainer(pc.ToContainer())
            }
        }
        out = append(out, node)
    }

    return out
}

其实就是从Cluster.engines构建node列表(因为Cluster.pendingEngines还处在待定状态)。后续scheduler就会从这个node列表中选择合适的node

Docker Swarm代码分析笔记(15)——scheduler

Docker Swarm manage命令的scheduler是通过filterstrategy构建的(cli/manage.go):

sched := scheduler.New(s, fs)

scheduler实际的功能就是选择符合cluster.ContainerConfig要求的nodeDocker Engine)列表:

// SelectNodesForContainer will return a list of nodes where the container can
// be scheduled, sorted by order or preference.
func (s *Scheduler) SelectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig) ([]*node.Node, error) {
    candidates, err := s.selectNodesForContainer(nodes, config, true)

    if err != nil {
        candidates, err = s.selectNodesForContainer(nodes, config, false)
    }
    return candidates, err
}

func (s *Scheduler) selectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig, soft bool) ([]*node.Node, error) {
    accepted, err := filter.ApplyFilters(s.filters, config, nodes, soft)
    if err != nil {
        return nil, err
    }

    if len(accepted) == 0 {
        return nil, errNoNodeAvailable
    }

    return s.strategy.RankAndSort(config, accepted)
}

 

Docker Swarm代码分析笔记(14)——strategy

Docker Swarm manage命令的flStrategy选项用来指定scheduler调度的strategy的过滤项,其变量定义如下(cli/flags.go):

flStrategy = cli.StringFlag{
    Name:  "strategy",
    Usage: "placement strategy to use [" + strings.Join(strategy.List(), ", ") + "]",
    Value: strategy.List()[0],
}

strategy的默认值是SpreadPlacementStrategy

strategy在代码中的实际定义是PlacementStrategy,一个interface

// PlacementStrategy is the interface for a container placement strategy.
type PlacementStrategy interface {
    // Name of the strategy
    Name() string
    // Initialize performs any initial configuration required by the strategy and returns
    // an error if one is encountered.
    // If no initial configuration is needed, this may be a no-op and return a nil error.
    Initialize() error
    // RankAndSort applies the strategy to a list of nodes and ranks them based
    // on the best fit given the container configuration.  It returns a sorted
    // list of nodes (based on their ranks) or an error if there is no
    // available node on which to schedule the container.
    RankAndSort(config *cluster.ContainerConfig, nodes []*node.Node) ([]*node.Node, error)
}

其中RankAndSort返回一个符合条件node(也就是Docker Engine)列表,列表中元素按匹配度排序。

Docker Swarm代码分析笔记(13)——filter

Docker Swarm manage命令的filter选项用来指定scheduler选择Docker Engine的过滤项,其变量定义如下(cli/flags.go):

// hack for go vet
flFilterValue = cli.StringSlice(filter.List())
// DefaultFilterNumber is exported
DefaultFilterNumber = len(flFilterValue)

flFilter = cli.StringSliceFlag{
    Name:  "filter, f",
    Usage: "filter to use [" + strings.Join(filter.List(), ", ") + "]",
    Value: &flFilterValue,
}

获得filter值的代码如下(cli/manage.go):

// see https://github.com/codegangsta/cli/issues/160
names := c.StringSlice("filter")
if c.IsSet("filter") || c.IsSet("f") {
    names = names[DefaultFilterNumber:]
}
fs, err := filter.New(names)
if err != nil {
    log.Fatal(err)
}

默认情况下,获得所有的filterscheduler/filter/filter.go):

func init() {
    filters = []Filter{
        &HealthFilter{},
        &PortFilter{},
        &SlotsFilter{},
        &DependencyFilter{},
        &AffinityFilter{},
        &ConstraintFilter{},
    }
}

Filter是一个interfacescheduler/filter/filter.go):

// Filter is exported
type Filter interface {
    Name() string

    // Return a subset of nodes that were accepted by the filtering policy.
    Filter(*cluster.ContainerConfig, []*node.Node, bool) ([]*node.Node, error)

    // Return a list of constraints/filters provided
    GetFilters(*cluster.ContainerConfig) ([]string, error)
}

其中Filter方法用来过滤符合条件的Docker Engine,而GetFilters则返回一个描述过滤条件的字符串。

scheduler/filter/filter.go中,ApplyFilters用来过滤Docker EnginelistAllFilters返回所有的过滤条件:

// ApplyFilters applies a set of filters in batch.
func ApplyFilters(filters []Filter, config *cluster.ContainerConfig, nodes []*node.Node, soft bool) ([]*node.Node, error) {
    var (
        err        error
        candidates = nodes
    )

    for _, filter := range filters {
        candidates, err = filter.Filter(config, candidates, soft)
        if err != nil {
            // special case for when no healthy nodes are found
            if filter.Name() == "health" {
                return nil, err
            }
            return nil, fmt.Errorf("Unable to find a node that satisfies the following conditions %s", listAllFilters(filters, config, filter.Name()))
        }
    }
    return candidates, nil
}

// listAllFilters creates a string containing all applied filters
func listAllFilters(filters []Filter, config *cluster.ContainerConfig, lastFilter string) string {
    allFilters := ""
    for _, filter := range filters {
        list, err := filter.GetFilters(config)
        if err == nil && len(list) > 0 {
            allFilters = fmt.Sprintf("%s\n%v", allFilters, list)
        }
        if filter.Name() == lastFilter {
            return allFilters
        }
    }
    return allFilters
}

 

Docker Swarm代码分析笔记(12)——container相关的config

创建和更新container时需要涉及到containerconfigcluster\config.go):

// ContainerConfig is exported
// TODO store affinities and constraints in their own fields
type ContainerConfig struct {
    container.Config
    HostConfig       container.HostConfig
    NetworkingConfig network.NetworkingConfig
}

// OldContainerConfig contains additional fields for backward compatibility
// This should be removed after we stop supporting API versions <= 1.8
type OldContainerConfig struct {
    ContainerConfig
    Memory     int64
    MemorySwap int64
    CPUShares  int64  `json:"CpuShares"`
    CPUSet     string `json:"Cpuset"`
}

其中container这个package定义在/vendor/github.com/docker/engine-api/types/container/config.gocontainer.Config包含不依赖于hostcontainerconfig;依赖于hostconfig定义在container.HostConfig;而container网络相关的config保存在network.NetworkingConfig

 

Docker Swarm代码分析笔记(11)——Engine.Connect

Engine.Connect()方法用来完成对Docker engine的连接工作:

// Connect will initialize a connection to the Docker daemon running on the
// host, gather machine specs (memory, cpu, ...) and monitor state changes.
func (e *Engine) Connect(config *tls.Config) error {
    host, _, err := net.SplitHostPort(e.Addr)
    if err != nil {
        return err
    }

    addr, err := net.ResolveIPAddr("ip4", host)
    if err != nil {
        return err
    }
    e.IP = addr.IP.String()

    c, err := dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout), setTCPUserTimeout)
    if err != nil {
        return err
    }
    // Use HTTP Client used by dockerclient to create engine-api client
    apiClient, err := engineapi.NewClient("tcp://"+e.Addr, "", c.HTTPClient, nil)
    if err != nil {
        return err
    }

    return e.ConnectWithClient(c, apiClient)
}

Engine.Connect()利用了另外两个Docker项目:dockerclientengine-api。首先调用dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout), setTCPUserTimeout)生成一个DockerClient结构体:

type DockerClient struct {
    URL           *url.URL
    HTTPClient    *http.Client
    TLSConfig     *tls.Config
    monitorStats  int32
    eventStopChan chan (struct{})
}

再把DockerClient.HTTPClient传入engineapi.NewClient()函数,作为engine-apihttp.Client,作为HTTP请求的传输载体。最后调用Engine.ConnectWithClient()连接Docker engine,获得Docker信息。

 

Docker Swarm代码分析笔记(10)——Cluster.validatePendingEngine

Cluster.validatePendingEngine是实际上用来连接Docker engine的代码:

// validatePendingEngine connects to the engine,
func (c *Cluster) validatePendingEngine(engine *cluster.Engine) bool {
    // Attempt a connection to the engine. Since this is slow, don't get a hold
    // of the lock yet.
    if err := engine.Connect(c.TLSConfig); err != nil {
        log.WithFields(log.Fields{"Addr": engine.Addr}).Debugf("Failed to validate pending node: %s", err)
        return false
    }

    // The following is critical and fast. Grab a lock.
    c.Lock()
    defer c.Unlock()

    // Only validate engines from pendingEngines list
    if _, exists := c.pendingEngines[engine.Addr]; !exists {
        return false
    }

    // Make sure the engine ID is unique.
    if old, exists := c.engines[engine.ID]; exists {
        if old.Addr != engine.Addr {
            log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr)
            // Keep this engine in pendingEngines table and show its error.
            // If it's ID duplication from VM clone, user see this message and can fix it.
            // If the engine is rebooted and get new IP from DHCP, previous address will be removed
            // from discovery after a while.
            // In both cases, retry may fix the problem.
            engine.HandleIDConflict(old.Addr)
        } else {
            log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr)
            engine.Disconnect()
            // Remove it from pendingEngines table
            delete(c.pendingEngines, engine.Addr)
        }
        return false
    }

    // Engine validated, move from pendingEngines table to engines table
    delete(c.pendingEngines, engine.Addr)
    // set engine state to healthy, and start refresh loop
    engine.ValidationComplete()
    c.engines[engine.ID] = engine

    log.Infof("Registered Engine %s at %s", engine.Name, engine.Addr)
    return true
}

Cluster.validatePendingEngine永远操作Cluster.pendingEngines里的Engine,如果连接Docker engine成功,就会把这个Engine移到Cluster.engines这个map里。

 

Docker Swarm代码分析笔记(9)——Swarm的Cluster,Engine和addEngine

swarm/cluster.go属于swarm这个package,它定义了swarm这个driverCluster结构体:

// Cluster is exported
type Cluster struct {
    sync.RWMutex

    eventHandlers     *cluster.EventHandlers
    engines           map[string]*cluster.Engine
    pendingEngines    map[string]*cluster.Engine
    scheduler         *scheduler.Scheduler
    discovery         discovery.Backend
    pendingContainers map[string]*pendingContainer

    overcommitRatio float64
    engineOpts      *cluster.EngineOpts
    createRetry     int64
    TLSConfig       *tls.Config
}

其中很重要的是cluster.Engine这个结构体的定义(cluster/engine.go),每个cluster.Engine代表一个Docker engine,即Docker daemon

// Engine represents a docker engine
type Engine struct {
    sync.RWMutex

    ID  string
    IP  string
    Addr string
    Name string
    Cpus int64
    Memory  int64
    Labels  map[string]string
    Version string

    stopCh  chan struct{}
    refreshDelayer  *delayer
    containers  map[string]*Container
    images  []*Image
    networksmap[string]*Network
    volumes map[string]*Volume
    client  dockerclient.Client
    apiClient   engineapi.APIClient
    eventHandlerEventHandler
    state   engineState
    lastError   string
    updatedAt   time.Time
    failureCountint
    overcommitRatio int64
    opts*EngineOpts
    eventsMonitor   *EventsMonitor
}

创建cluster最核心的部分就是addEngine这个方法,即把Docker engine加到cluster中:

func (c *Cluster) addEngine(addr string) bool {
    // Check the engine is already registered by address.
    if c.hasEngineByAddr(addr) {
        return false
    }

    engine := cluster.NewEngine(addr, c.overcommitRatio, c.engineOpts)
    if err := engine.RegisterEventHandler(c); err != nil {
        log.Error(err)
    }
    // Add it to pending engine map, indexed by address. This will prevent
    // duplicates from entering
    c.Lock()
    c.pendingEngines[addr] = engine
    c.Unlock()

    // validatePendingEngine will start a thread to validate the engine.
    // If the engine is reachable and valid, it'll be monitored and updated in a loop.
    // If engine is not reachable, pending engines will be examined once in a while
    go c.validatePendingEngine(engine)

    return true
}

addEngine首先检查当前engine是否已经存在cluster中,如果没有则会分配一个新的engine,同时把它先加到Cluster.pendingEngines中,并启动一个新的goroutinevalidatePendingEngine)去检查它是否是个有效的engine

另外,Engine.RegisterEventHandler实质上就是把ClustereventHandlers成员赋给Engine.eventHandler

// RegisterEventHandler registers an event handler.
func (e *Engine) RegisterEventHandler(h EventHandler) error {
    if e.eventHandler != nil {
        return errors.New("event handler already set")
    }
    e.eventHandler = h
    return nil
}

ClustereventHandlers则是在NewPrimary中赋值的,所以本质上EngineCluster用的都是一套处理函数。