Docker Swarm代码分析笔记(17)——event_monitor.go

Engine结构体有一个eventsMonitor成员:

type Engine struct {
    ......
    eventsMonitor   *EventsMonitor
}

EventsMonitor结构体定义如下:

//EventsMonitor monitors events
type EventsMonitor struct {
    stopChan chan struct{}
    cli      client.APIClient
    handler  func(msg events.Message) error
}

stopChan用来通知停止接受消息;cli是底层连接的client,而handler则是收到event的处理函数。

Engine.ConnectWithClient方法会给eventsMonitor成员赋值:

// ConnectWithClient is exported
func (e *Engine) ConnectWithClient(client dockerclient.Client, apiClient engineapi.APIClient) error {
    e.client = client
    e.apiClient = apiClient
    e.eventsMonitor = NewEventsMonitor(e.apiClient, e.handler)

    // Fetch the engine labels.
    if err := e.updateSpecs(); err != nil {
        return err
    }

    e.StartMonitorEvents()

    // Force a state update before returning.
    if err := e.RefreshContainers(true); err != nil {
        return err
    }

    if err := e.RefreshImages(); err != nil {
        return err
    }

    // Do not check error as older daemon does't support this call.
    e.RefreshVolumes()
    e.RefreshNetworks()

    e.emitEvent("engine_connect")

    return nil
}

其中Engine.StartMonitorEvents代码如下:

// StartMonitorEvents monitors events from the engine
func (e *Engine) StartMonitorEvents() {
    log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Debug("Start monitoring events")
    ec := make(chan error)
    e.eventsMonitor.Start(ec)

    go func() {
        if err := <-ec; err != nil {
            if !strings.Contains(err.Error(), "EOF") {
                // failing node reconnect should use back-off strategy
                <-e.refreshDelayer.Wait(e.getFailureCount())
            }
            e.StartMonitorEvents()
        }
        close(ec)
    }()
}

Engine.StartMonitorEvents就是如果从ec channel收取消息,如果是错误,就不断地循环启动Engine.StartMonitorEvents

EventsMonitor.Start函数代码如下:

// Start starts the EventsMonitor
func (em *EventsMonitor) Start(ec chan error) {
    em.stopChan = make(chan struct{})

    responseBody, err := em.cli.Events(context.Background(), types.EventsOptions{})
    if err != nil {
        ec <- err
        return
    }

    resultChan := make(chan decodingResult)

    go func() {
        dec := json.NewDecoder(responseBody)
        for {
            var result decodingResult
            result.err = dec.Decode(&result.msg)
            resultChan <- result
            if result.err == io.EOF {
                break
            }
        }
        close(resultChan)
    }()

    go func() {
        defer responseBody.Close()
        for {
            select {
            case <-em.stopChan:
                ec <- nil
                return
            case result := <-resultChan:
                if result.err != nil {
                    ec <- result.err
                    return
                }
                if err := em.handler(result.msg); err != nil {
                    ec <- err
                    return
                }
            }
        }
    }()
}

代码逻辑实际就是发出“HTTP GET /events”请求,然后等待Docker Engine的响应。因为这个HTTP请求很可能会阻塞在这里,因此随后的HTTP消息交互就会重新建立一个HTTP连接。原理在这里

type Response struct {
    ......

    // Body represents the response body.
    //
    // The http Client and Transport guarantee that Body is always
    // non-nil, even on responses without a body or responses with
    // a zero-length body. It is the caller's responsibility to
    // close Body. The default HTTP client's Transport does not
    // attempt to reuse HTTP/1.0 or HTTP/1.1 TCP connections
    // ("keep-alive") unless the Body is read to completion and is
    // closed.
    //
    // The Body is automatically dechunked if the server replied
    // with a "chunked" Transfer-Encoding.
    Body io.ReadCloser

    ......
}

如果想停止这个EventsMonitor,可以使用Engine.Stop方法:

// Stop stops the EventsMonitor
func (em *EventsMonitor) Stop() {
    if em.stopChan == nil {
        return
    }
    close(em.stopChan)
}

 

Docker Swarm代码分析笔记(16)——Node结构体

Docker Swarmscheduler会选择符合要求的node来创建container

candidates, err := s.selectNodesForContainer(nodes, config, true)

node定义在scheduler/node/node.go

// Node is an abstract type used by the scheduler.
type Node struct {
    ID         string
    IP         string
    Addr       string
    Name       string
    Labels     map[string]string
    Containers cluster.Containers
    Images     []*cluster.Image

    UsedMemory  int64
    UsedCpus    int64
    TotalMemory int64
    TotalCpus   int64

    HealthIndicator int64
}

Cluster.listNodes方法实现如下:

// listNodes returns all validated engines in the cluster, excluding pendingEngines.
func (c *Cluster) listNodes() []*node.Node {
    c.RLock()
    defer c.RUnlock()

    out := make([]*node.Node, 0, len(c.engines))
    for _, e := range c.engines {
        node := node.NewNode(e)
        for _, pc := range c.pendingContainers {
            if pc.Engine.ID == e.ID && node.Container(pc.Config.SwarmID()) == nil {
                node.AddContainer(pc.ToContainer())
            }
        }
        out = append(out, node)
    }

    return out
}

其实就是从Cluster.engines构建node列表(因为Cluster.pendingEngines还处在待定状态)。后续scheduler就会从这个node列表中选择合适的node

Docker Swarm代码分析笔记(15)——scheduler

Docker Swarm manage命令的scheduler是通过filterstrategy构建的(cli/manage.go):

sched := scheduler.New(s, fs)

scheduler实际的功能就是选择符合cluster.ContainerConfig要求的nodeDocker Engine)列表:

// SelectNodesForContainer will return a list of nodes where the container can
// be scheduled, sorted by order or preference.
func (s *Scheduler) SelectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig) ([]*node.Node, error) {
    candidates, err := s.selectNodesForContainer(nodes, config, true)

    if err != nil {
        candidates, err = s.selectNodesForContainer(nodes, config, false)
    }
    return candidates, err
}

func (s *Scheduler) selectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig, soft bool) ([]*node.Node, error) {
    accepted, err := filter.ApplyFilters(s.filters, config, nodes, soft)
    if err != nil {
        return nil, err
    }

    if len(accepted) == 0 {
        return nil, errNoNodeAvailable
    }

    return s.strategy.RankAndSort(config, accepted)
}

 

Docker Swarm代码分析笔记(14)——strategy

Docker Swarm manage命令的flStrategy选项用来指定scheduler调度的strategy的过滤项,其变量定义如下(cli/flags.go):

flStrategy = cli.StringFlag{
    Name:  "strategy",
    Usage: "placement strategy to use [" + strings.Join(strategy.List(), ", ") + "]",
    Value: strategy.List()[0],
}

strategy的默认值是SpreadPlacementStrategy

strategy在代码中的实际定义是PlacementStrategy,一个interface

// PlacementStrategy is the interface for a container placement strategy.
type PlacementStrategy interface {
    // Name of the strategy
    Name() string
    // Initialize performs any initial configuration required by the strategy and returns
    // an error if one is encountered.
    // If no initial configuration is needed, this may be a no-op and return a nil error.
    Initialize() error
    // RankAndSort applies the strategy to a list of nodes and ranks them based
    // on the best fit given the container configuration.  It returns a sorted
    // list of nodes (based on their ranks) or an error if there is no
    // available node on which to schedule the container.
    RankAndSort(config *cluster.ContainerConfig, nodes []*node.Node) ([]*node.Node, error)
}

其中RankAndSort返回一个符合条件node(也就是Docker Engine)列表,列表中元素按匹配度排序。

Docker Swarm代码分析笔记(13)——filter

Docker Swarm manage命令的filter选项用来指定scheduler选择Docker Engine的过滤项,其变量定义如下(cli/flags.go):

// hack for go vet
flFilterValue = cli.StringSlice(filter.List())
// DefaultFilterNumber is exported
DefaultFilterNumber = len(flFilterValue)

flFilter = cli.StringSliceFlag{
    Name:  "filter, f",
    Usage: "filter to use [" + strings.Join(filter.List(), ", ") + "]",
    Value: &flFilterValue,
}

获得filter值的代码如下(cli/manage.go):

// see https://github.com/codegangsta/cli/issues/160
names := c.StringSlice("filter")
if c.IsSet("filter") || c.IsSet("f") {
    names = names[DefaultFilterNumber:]
}
fs, err := filter.New(names)
if err != nil {
    log.Fatal(err)
}

默认情况下,获得所有的filterscheduler/filter/filter.go):

func init() {
    filters = []Filter{
        &HealthFilter{},
        &PortFilter{},
        &SlotsFilter{},
        &DependencyFilter{},
        &AffinityFilter{},
        &ConstraintFilter{},
    }
}

Filter是一个interfacescheduler/filter/filter.go):

// Filter is exported
type Filter interface {
    Name() string

    // Return a subset of nodes that were accepted by the filtering policy.
    Filter(*cluster.ContainerConfig, []*node.Node, bool) ([]*node.Node, error)

    // Return a list of constraints/filters provided
    GetFilters(*cluster.ContainerConfig) ([]string, error)
}

其中Filter方法用来过滤符合条件的Docker Engine,而GetFilters则返回一个描述过滤条件的字符串。

scheduler/filter/filter.go中,ApplyFilters用来过滤Docker EnginelistAllFilters返回所有的过滤条件:

// ApplyFilters applies a set of filters in batch.
func ApplyFilters(filters []Filter, config *cluster.ContainerConfig, nodes []*node.Node, soft bool) ([]*node.Node, error) {
    var (
        err        error
        candidates = nodes
    )

    for _, filter := range filters {
        candidates, err = filter.Filter(config, candidates, soft)
        if err != nil {
            // special case for when no healthy nodes are found
            if filter.Name() == "health" {
                return nil, err
            }
            return nil, fmt.Errorf("Unable to find a node that satisfies the following conditions %s", listAllFilters(filters, config, filter.Name()))
        }
    }
    return candidates, nil
}

// listAllFilters creates a string containing all applied filters
func listAllFilters(filters []Filter, config *cluster.ContainerConfig, lastFilter string) string {
    allFilters := ""
    for _, filter := range filters {
        list, err := filter.GetFilters(config)
        if err == nil && len(list) > 0 {
            allFilters = fmt.Sprintf("%s\n%v", allFilters, list)
        }
        if filter.Name() == lastFilter {
            return allFilters
        }
    }
    return allFilters
}

 

Docker Swarm代码分析笔记(12)——container相关的config

创建和更新container时需要涉及到containerconfigcluster\config.go):

// ContainerConfig is exported
// TODO store affinities and constraints in their own fields
type ContainerConfig struct {
    container.Config
    HostConfig       container.HostConfig
    NetworkingConfig network.NetworkingConfig
}

// OldContainerConfig contains additional fields for backward compatibility
// This should be removed after we stop supporting API versions <= 1.8
type OldContainerConfig struct {
    ContainerConfig
    Memory     int64
    MemorySwap int64
    CPUShares  int64  `json:"CpuShares"`
    CPUSet     string `json:"Cpuset"`
}

其中container这个package定义在/vendor/github.com/docker/engine-api/types/container/config.gocontainer.Config包含不依赖于hostcontainerconfig;依赖于hostconfig定义在container.HostConfig;而container网络相关的config保存在network.NetworkingConfig

 

Docker Swarm代码分析笔记(11)——Engine.Connect

Engine.Connect()方法用来完成对Docker engine的连接工作:

// Connect will initialize a connection to the Docker daemon running on the
// host, gather machine specs (memory, cpu, ...) and monitor state changes.
func (e *Engine) Connect(config *tls.Config) error {
    host, _, err := net.SplitHostPort(e.Addr)
    if err != nil {
        return err
    }

    addr, err := net.ResolveIPAddr("ip4", host)
    if err != nil {
        return err
    }
    e.IP = addr.IP.String()

    c, err := dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout), setTCPUserTimeout)
    if err != nil {
        return err
    }
    // Use HTTP Client used by dockerclient to create engine-api client
    apiClient, err := engineapi.NewClient("tcp://"+e.Addr, "", c.HTTPClient, nil)
    if err != nil {
        return err
    }

    return e.ConnectWithClient(c, apiClient)
}

Engine.Connect()利用了另外两个Docker项目:dockerclientengine-api。首先调用dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout), setTCPUserTimeout)生成一个DockerClient结构体:

type DockerClient struct {
    URL           *url.URL
    HTTPClient    *http.Client
    TLSConfig     *tls.Config
    monitorStats  int32
    eventStopChan chan (struct{})
}

再把DockerClient.HTTPClient传入engineapi.NewClient()函数,作为engine-apihttp.Client,作为HTTP请求的传输载体。最后调用Engine.ConnectWithClient()连接Docker engine,获得Docker信息。

 

Docker Swarm代码分析笔记(10)——Cluster.validatePendingEngine

Cluster.validatePendingEngine是实际上用来连接Docker engine的代码:

// validatePendingEngine connects to the engine,
func (c *Cluster) validatePendingEngine(engine *cluster.Engine) bool {
    // Attempt a connection to the engine. Since this is slow, don't get a hold
    // of the lock yet.
    if err := engine.Connect(c.TLSConfig); err != nil {
        log.WithFields(log.Fields{"Addr": engine.Addr}).Debugf("Failed to validate pending node: %s", err)
        return false
    }

    // The following is critical and fast. Grab a lock.
    c.Lock()
    defer c.Unlock()

    // Only validate engines from pendingEngines list
    if _, exists := c.pendingEngines[engine.Addr]; !exists {
        return false
    }

    // Make sure the engine ID is unique.
    if old, exists := c.engines[engine.ID]; exists {
        if old.Addr != engine.Addr {
            log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr)
            // Keep this engine in pendingEngines table and show its error.
            // If it's ID duplication from VM clone, user see this message and can fix it.
            // If the engine is rebooted and get new IP from DHCP, previous address will be removed
            // from discovery after a while.
            // In both cases, retry may fix the problem.
            engine.HandleIDConflict(old.Addr)
        } else {
            log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr)
            engine.Disconnect()
            // Remove it from pendingEngines table
            delete(c.pendingEngines, engine.Addr)
        }
        return false
    }

    // Engine validated, move from pendingEngines table to engines table
    delete(c.pendingEngines, engine.Addr)
    // set engine state to healthy, and start refresh loop
    engine.ValidationComplete()
    c.engines[engine.ID] = engine

    log.Infof("Registered Engine %s at %s", engine.Name, engine.Addr)
    return true
}

Cluster.validatePendingEngine永远操作Cluster.pendingEngines里的Engine,如果连接Docker engine成功,就会把这个Engine移到Cluster.engines这个map里。

 

Docker Swarm代码分析笔记(9)——Swarm的Cluster,Engine和addEngine

swarm/cluster.go属于swarm这个package,它定义了swarm这个driverCluster结构体:

// Cluster is exported
type Cluster struct {
    sync.RWMutex

    eventHandlers     *cluster.EventHandlers
    engines           map[string]*cluster.Engine
    pendingEngines    map[string]*cluster.Engine
    scheduler         *scheduler.Scheduler
    discovery         discovery.Backend
    pendingContainers map[string]*pendingContainer

    overcommitRatio float64
    engineOpts      *cluster.EngineOpts
    createRetry     int64
    TLSConfig       *tls.Config
}

其中很重要的是cluster.Engine这个结构体的定义(cluster/engine.go),每个cluster.Engine代表一个Docker engine,即Docker daemon

// Engine represents a docker engine
type Engine struct {
    sync.RWMutex

    ID  string
    IP  string
    Addr string
    Name string
    Cpus int64
    Memory  int64
    Labels  map[string]string
    Version string

    stopCh  chan struct{}
    refreshDelayer  *delayer
    containers  map[string]*Container
    images  []*Image
    networksmap[string]*Network
    volumes map[string]*Volume
    client  dockerclient.Client
    apiClient   engineapi.APIClient
    eventHandlerEventHandler
    state   engineState
    lastError   string
    updatedAt   time.Time
    failureCountint
    overcommitRatio int64
    opts*EngineOpts
    eventsMonitor   *EventsMonitor
}

创建cluster最核心的部分就是addEngine这个方法,即把Docker engine加到cluster中:

func (c *Cluster) addEngine(addr string) bool {
    // Check the engine is already registered by address.
    if c.hasEngineByAddr(addr) {
        return false
    }

    engine := cluster.NewEngine(addr, c.overcommitRatio, c.engineOpts)
    if err := engine.RegisterEventHandler(c); err != nil {
        log.Error(err)
    }
    // Add it to pending engine map, indexed by address. This will prevent
    // duplicates from entering
    c.Lock()
    c.pendingEngines[addr] = engine
    c.Unlock()

    // validatePendingEngine will start a thread to validate the engine.
    // If the engine is reachable and valid, it'll be monitored and updated in a loop.
    // If engine is not reachable, pending engines will be examined once in a while
    go c.validatePendingEngine(engine)

    return true
}

addEngine首先检查当前engine是否已经存在cluster中,如果没有则会分配一个新的engine,同时把它先加到Cluster.pendingEngines中,并启动一个新的goroutinevalidatePendingEngine)去检查它是否是个有效的engine

另外,Engine.RegisterEventHandler实质上就是把ClustereventHandlers成员赋给Engine.eventHandler

// RegisterEventHandler registers an event handler.
func (e *Engine) RegisterEventHandler(h EventHandler) error {
    if e.eventHandler != nil {
        return errors.New("event handler already set")
    }
    e.eventHandler = h
    return nil
}

ClustereventHandlers则是在NewPrimary中赋值的,所以本质上EngineCluster用的都是一套处理函数。

Docker Swarm代码分析笔记(8)——创建Docker API router

上一篇提到的API Server需要设置HTTP处理函数:

......
server.SetHandler(api.NewPrimary(cl, tlsConfig, &statusHandler{cl, nil, nil}, c.GlobalBool("debug"), c.Bool("cors")))
......

api.Server结构体定义如下:

// Server is a Docker API server.
type Server struct {
    hosts  []string
    tlsConfig  *tls.Config
    dispatcher *dispatcher
}

其中处理HTTP请求的相关方法如下:

// Dispatcher is a meta http.Handler. It acts as an http.Handler and forwards
// requests to another http.Handler that can be changed at runtime.
type dispatcher struct {
    handler http.Handler
}

// SetHandler changes the underlying handler.
func (d *dispatcher) SetHandler(handler http.Handler) {
    d.handler = handler
}

// ServeHTTP forwards requests to the underlying handler.
func (d *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if d.handler == nil {
        httpError(w, "No dispatcher defined", http.StatusInternalServerError)
        return
    }
    d.handler.ServeHTTP(w, r)
}

// SetHandler is used to overwrite the HTTP handler for the API.
// This can be the api router or a reverse proxy.
func (s *Server) SetHandler(handler http.Handler) {
    s.dispatcher.SetHandler(handler)
}

Server.SetHandler所做的就是把handler赋给Server.dispatcher.handlerhandler类型是http.Handler:

type Handler interface {
    ServeHTTP(ResponseWriter, *Request)
}

api.NewPrimary利用了mux这个project

// NewPrimary creates a new API router.
func NewPrimary(cluster cluster.Cluster, tlsConfig *tls.Config, status StatusHandler, debug, enableCors bool) *mux.Router {
    // Register the API events handler in the cluster.
    eventsHandler := newEventsHandler()
    cluster.RegisterEventHandler(eventsHandler)

    context := &context{
        cluster:       cluster,
        eventsHandler: eventsHandler,
        statusHandler: status,
        tlsConfig:     tlsConfig,
    }

    r := mux.NewRouter()
    setupPrimaryRouter(r, context, enableCors)

    if debug {
        profilerSetup(r, "/debug/")
    }

    return r
}

*mux.Router实现了ServeHTTP这个方法,所以符合http.Handler这个interface

// ServeHTTP dispatches the handler registered in the matched route.
//
// When there is a match, the route variables can be retrieved calling
// mux.Vars(request).
func (r *Router) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    ......
}

api.ServerListenAndServe()函数定义了新的http.Server变量,利用上面实现的api.Server.dispatcherServeHTTP方法来响应HTTP请求:

server = &http.Server{
                Addr:    protoAddrParts[1],
                Handler: s.dispatcher,
            }

再看一下setupPrimaryRouter这个函数:

func setupPrimaryRouter(r *mux.Router, context *context, enableCors bool) {
    for method, mappings := range routes {
        for route, fct := range mappings {
            log.WithFields(log.Fields{"method": method, "route": route}).Debug("Registering HTTP route")

            localRoute := route
            localFct := fct

            wrap := func(w http.ResponseWriter, r *http.Request) {
                log.WithFields(log.Fields{"method": r.Method, "uri": r.RequestURI}).Debug("HTTP request received")
                if enableCors {
                    writeCorsHeaders(w, r)
                }
                context.apiVersion = mux.Vars(r)["version"]
                localFct(context, w, r)
            }
            localMethod := method

            r.Path("/v{version:[0-9]+.[0-9]+}" + localRoute).Methods(localMethod).HandlerFunc(wrap)
            r.Path(localRoute).Methods(localMethod).HandlerFunc(wrap)

            if enableCors {
                optionsMethod := "OPTIONS"
                optionsFct := optionsHandler

                wrap := func(w http.ResponseWriter, r *http.Request) {
                    log.WithFields(log.Fields{"method": optionsMethod, "uri": r.RequestURI}).
                        Debug("HTTP request received")
                    if enableCors {
                        writeCorsHeaders(w, r)
                    }
                    context.apiVersion = mux.Vars(r)["version"]
                    optionsFct(context, w, r)
                }

                r.Path("/v{version:[0-9]+.[0-9]+}" + localRoute).
                    Methods(optionsMethod).HandlerFunc(wrap)
                r.Path(localRoute).Methods(optionsMethod).
                    HandlerFunc(wrap)
            }
        }
    }
}

其中routes定义如下:

var routes = map[string]map[string]handler{
    "HEAD": {
        "/containers/{name:.*}/archive": proxyContainer,
    },
    "GET": {
        "/_ping":                          ping,
        "/events":                         getEvents,
        "/info":                           getInfo,
        ......
        }
    ......
}

"HEAD": {
        "/containers/{name:.*}/archive": proxyContainer,
}

为例:
methodlocalMethod"HEAD"
mappings是:

{
        "/containers/{name:.*}/archive": proxyContainer,
}

routelocalRoute"/containers/{name:.*}/archive"
fctlocalFctproxyContainer
所以

r.Path("/v{version:[0-9]+.[0-9]+}" + localRoute).Methods(localMethod).HandlerFunc(wrap)
r.Path(localRoute).Methods(localMethod).HandlerFunc(wrap)

就是为"/v*/containers/{name:.*}/archive""/containers/{name:.*}/archive"这个PATH"HEAD"操作注册了wrap函数,而wrap函数则封装了localFct,也就是proxyContainer函数。 另外,if enableCors部分与上述类似。