Docker Swarm代码分析笔记(11)——Engine.Connect

Engine.Connect()方法用来完成对Docker engine的连接工作:

// Connect will initialize a connection to the Docker daemon running on the
// host, gather machine specs (memory, cpu, ...) and monitor state changes.
func (e *Engine) Connect(config *tls.Config) error {
    host, _, err := net.SplitHostPort(e.Addr)
    if err != nil {
        return err
    }

    addr, err := net.ResolveIPAddr("ip4", host)
    if err != nil {
        return err
    }
    e.IP = addr.IP.String()

    c, err := dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout), setTCPUserTimeout)
    if err != nil {
        return err
    }
    // Use HTTP Client used by dockerclient to create engine-api client
    apiClient, err := engineapi.NewClient("tcp://"+e.Addr, "", c.HTTPClient, nil)
    if err != nil {
        return err
    }

    return e.ConnectWithClient(c, apiClient)
}

Engine.Connect()利用了另外两个Docker项目:dockerclientengine-api。首先调用dockerclient.NewDockerClientTimeout("tcp://"+e.Addr, config, time.Duration(requestTimeout), setTCPUserTimeout)生成一个DockerClient结构体:

type DockerClient struct {
    URL           *url.URL
    HTTPClient    *http.Client
    TLSConfig     *tls.Config
    monitorStats  int32
    eventStopChan chan (struct{})
}

再把DockerClient.HTTPClient传入engineapi.NewClient()函数,作为engine-apihttp.Client,作为HTTP请求的传输载体。最后调用Engine.ConnectWithClient()连接Docker engine,获得Docker信息。

 

Docker Swarm代码分析笔记(10)——Cluster.validatePendingEngine

Cluster.validatePendingEngine是实际上用来连接Docker engine的代码:

// validatePendingEngine connects to the engine,
func (c *Cluster) validatePendingEngine(engine *cluster.Engine) bool {
    // Attempt a connection to the engine. Since this is slow, don't get a hold
    // of the lock yet.
    if err := engine.Connect(c.TLSConfig); err != nil {
        log.WithFields(log.Fields{"Addr": engine.Addr}).Debugf("Failed to validate pending node: %s", err)
        return false
    }

    // The following is critical and fast. Grab a lock.
    c.Lock()
    defer c.Unlock()

    // Only validate engines from pendingEngines list
    if _, exists := c.pendingEngines[engine.Addr]; !exists {
        return false
    }

    // Make sure the engine ID is unique.
    if old, exists := c.engines[engine.ID]; exists {
        if old.Addr != engine.Addr {
            log.Errorf("ID duplicated. %s shared by %s and %s", engine.ID, old.Addr, engine.Addr)
            // Keep this engine in pendingEngines table and show its error.
            // If it's ID duplication from VM clone, user see this message and can fix it.
            // If the engine is rebooted and get new IP from DHCP, previous address will be removed
            // from discovery after a while.
            // In both cases, retry may fix the problem.
            engine.HandleIDConflict(old.Addr)
        } else {
            log.Debugf("node %q (name: %q) with address %q is already registered", engine.ID, engine.Name, engine.Addr)
            engine.Disconnect()
            // Remove it from pendingEngines table
            delete(c.pendingEngines, engine.Addr)
        }
        return false
    }

    // Engine validated, move from pendingEngines table to engines table
    delete(c.pendingEngines, engine.Addr)
    // set engine state to healthy, and start refresh loop
    engine.ValidationComplete()
    c.engines[engine.ID] = engine

    log.Infof("Registered Engine %s at %s", engine.Name, engine.Addr)
    return true
}

Cluster.validatePendingEngine永远操作Cluster.pendingEngines里的Engine,如果连接Docker engine成功,就会把这个Engine移到Cluster.engines这个map里。

 

Docker Swarm代码分析笔记(9)——Swarm的Cluster,Engine和addEngine

swarm/cluster.go属于swarm这个package,它定义了swarm这个driverCluster结构体:

// Cluster is exported
type Cluster struct {
    sync.RWMutex

    eventHandlers     *cluster.EventHandlers
    engines           map[string]*cluster.Engine
    pendingEngines    map[string]*cluster.Engine
    scheduler         *scheduler.Scheduler
    discovery         discovery.Backend
    pendingContainers map[string]*pendingContainer

    overcommitRatio float64
    engineOpts      *cluster.EngineOpts
    createRetry     int64
    TLSConfig       *tls.Config
}

其中很重要的是cluster.Engine这个结构体的定义(cluster/engine.go),每个cluster.Engine代表一个Docker engine,即Docker daemon

// Engine represents a docker engine
type Engine struct {
    sync.RWMutex

    ID  string
    IP  string
    Addr string
    Name string
    Cpus int64
    Memory  int64
    Labels  map[string]string
    Version string

    stopCh  chan struct{}
    refreshDelayer  *delayer
    containers  map[string]*Container
    images  []*Image
    networksmap[string]*Network
    volumes map[string]*Volume
    client  dockerclient.Client
    apiClient   engineapi.APIClient
    eventHandlerEventHandler
    state   engineState
    lastError   string
    updatedAt   time.Time
    failureCountint
    overcommitRatio int64
    opts*EngineOpts
    eventsMonitor   *EventsMonitor
}

创建cluster最核心的部分就是addEngine这个方法,即把Docker engine加到cluster中:

func (c *Cluster) addEngine(addr string) bool {
    // Check the engine is already registered by address.
    if c.hasEngineByAddr(addr) {
        return false
    }

    engine := cluster.NewEngine(addr, c.overcommitRatio, c.engineOpts)
    if err := engine.RegisterEventHandler(c); err != nil {
        log.Error(err)
    }
    // Add it to pending engine map, indexed by address. This will prevent
    // duplicates from entering
    c.Lock()
    c.pendingEngines[addr] = engine
    c.Unlock()

    // validatePendingEngine will start a thread to validate the engine.
    // If the engine is reachable and valid, it'll be monitored and updated in a loop.
    // If engine is not reachable, pending engines will be examined once in a while
    go c.validatePendingEngine(engine)

    return true
}

addEngine首先检查当前engine是否已经存在cluster中,如果没有则会分配一个新的engine,同时把它先加到Cluster.pendingEngines中,并启动一个新的goroutinevalidatePendingEngine)去检查它是否是个有效的engine

另外,Engine.RegisterEventHandler实质上就是把ClustereventHandlers成员赋给Engine.eventHandler

// RegisterEventHandler registers an event handler.
func (e *Engine) RegisterEventHandler(h EventHandler) error {
    if e.eventHandler != nil {
        return errors.New("event handler already set")
    }
    e.eventHandler = h
    return nil
}

ClustereventHandlers则是在NewPrimary中赋值的,所以本质上EngineCluster用的都是一套处理函数。

Docker Swarm代码分析笔记(8)——创建Docker API router

上一篇提到的API Server需要设置HTTP处理函数:

......
server.SetHandler(api.NewPrimary(cl, tlsConfig, &statusHandler{cl, nil, nil}, c.GlobalBool("debug"), c.Bool("cors")))
......

api.Server结构体定义如下:

// Server is a Docker API server.
type Server struct {
    hosts  []string
    tlsConfig  *tls.Config
    dispatcher *dispatcher
}

其中处理HTTP请求的相关方法如下:

// Dispatcher is a meta http.Handler. It acts as an http.Handler and forwards
// requests to another http.Handler that can be changed at runtime.
type dispatcher struct {
    handler http.Handler
}

// SetHandler changes the underlying handler.
func (d *dispatcher) SetHandler(handler http.Handler) {
    d.handler = handler
}

// ServeHTTP forwards requests to the underlying handler.
func (d *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if d.handler == nil {
        httpError(w, "No dispatcher defined", http.StatusInternalServerError)
        return
    }
    d.handler.ServeHTTP(w, r)
}

// SetHandler is used to overwrite the HTTP handler for the API.
// This can be the api router or a reverse proxy.
func (s *Server) SetHandler(handler http.Handler) {
    s.dispatcher.SetHandler(handler)
}

Server.SetHandler所做的就是把handler赋给Server.dispatcher.handlerhandler类型是http.Handler:

type Handler interface {
    ServeHTTP(ResponseWriter, *Request)
}

api.NewPrimary利用了mux这个project

// NewPrimary creates a new API router.
func NewPrimary(cluster cluster.Cluster, tlsConfig *tls.Config, status StatusHandler, debug, enableCors bool) *mux.Router {
    // Register the API events handler in the cluster.
    eventsHandler := newEventsHandler()
    cluster.RegisterEventHandler(eventsHandler)

    context := &context{
        cluster:       cluster,
        eventsHandler: eventsHandler,
        statusHandler: status,
        tlsConfig:     tlsConfig,
    }

    r := mux.NewRouter()
    setupPrimaryRouter(r, context, enableCors)

    if debug {
        profilerSetup(r, "/debug/")
    }

    return r
}

*mux.Router实现了ServeHTTP这个方法,所以符合http.Handler这个interface

// ServeHTTP dispatches the handler registered in the matched route.
//
// When there is a match, the route variables can be retrieved calling
// mux.Vars(request).
func (r *Router) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    ......
}

api.ServerListenAndServe()函数定义了新的http.Server变量,利用上面实现的api.Server.dispatcherServeHTTP方法来响应HTTP请求:

server = &http.Server{
                Addr:    protoAddrParts[1],
                Handler: s.dispatcher,
            }

再看一下setupPrimaryRouter这个函数:

func setupPrimaryRouter(r *mux.Router, context *context, enableCors bool) {
    for method, mappings := range routes {
        for route, fct := range mappings {
            log.WithFields(log.Fields{"method": method, "route": route}).Debug("Registering HTTP route")

            localRoute := route
            localFct := fct

            wrap := func(w http.ResponseWriter, r *http.Request) {
                log.WithFields(log.Fields{"method": r.Method, "uri": r.RequestURI}).Debug("HTTP request received")
                if enableCors {
                    writeCorsHeaders(w, r)
                }
                context.apiVersion = mux.Vars(r)["version"]
                localFct(context, w, r)
            }
            localMethod := method

            r.Path("/v{version:[0-9]+.[0-9]+}" + localRoute).Methods(localMethod).HandlerFunc(wrap)
            r.Path(localRoute).Methods(localMethod).HandlerFunc(wrap)

            if enableCors {
                optionsMethod := "OPTIONS"
                optionsFct := optionsHandler

                wrap := func(w http.ResponseWriter, r *http.Request) {
                    log.WithFields(log.Fields{"method": optionsMethod, "uri": r.RequestURI}).
                        Debug("HTTP request received")
                    if enableCors {
                        writeCorsHeaders(w, r)
                    }
                    context.apiVersion = mux.Vars(r)["version"]
                    optionsFct(context, w, r)
                }

                r.Path("/v{version:[0-9]+.[0-9]+}" + localRoute).
                    Methods(optionsMethod).HandlerFunc(wrap)
                r.Path(localRoute).Methods(optionsMethod).
                    HandlerFunc(wrap)
            }
        }
    }
}

其中routes定义如下:

var routes = map[string]map[string]handler{
    "HEAD": {
        "/containers/{name:.*}/archive": proxyContainer,
    },
    "GET": {
        "/_ping":                          ping,
        "/events":                         getEvents,
        "/info":                           getInfo,
        ......
        }
    ......
}

"HEAD": {
        "/containers/{name:.*}/archive": proxyContainer,
}

为例:
methodlocalMethod"HEAD"
mappings是:

{
        "/containers/{name:.*}/archive": proxyContainer,
}

routelocalRoute"/containers/{name:.*}/archive"
fctlocalFctproxyContainer
所以

r.Path("/v{version:[0-9]+.[0-9]+}" + localRoute).Methods(localMethod).HandlerFunc(wrap)
r.Path(localRoute).Methods(localMethod).HandlerFunc(wrap)

就是为"/v*/containers/{name:.*}/archive""/containers/{name:.*}/archive"这个PATH"HEAD"操作注册了wrap函数,而wrap函数则封装了localFct,也就是proxyContainer函数。 另外,if enableCors部分与上述类似。

 

Docker Swarm代码分析笔记(7)——创建Docker API server

Docker Swarm manage函数的最后部分是创建Docker API server部分:

server := api.NewServer(hosts, tlsConfig)
if c.Bool("replication") {
    ......
    setupReplication(c, cl, server, discovery, addr, leaderTTL, tlsConfig)
} else {
    server.SetHandler(api.NewPrimary(cl, tlsConfig, &statusHandler{cl, nil, nil}, c.GlobalBool("debug"), c.Bool("cors")))
    cluster.NewWatchdog(cl)
}

log.Fatal(server.ListenAndServe())

Server结构体定义在api/server.go

type Server struct {
    hosts      []string
    tlsConfig  *tls.Config
    dispatcher *dispatcher
}

它的核心方法是ListenAndServe(),即为每个host起一个goroutine监听并处理Docker client的连接请求。

func (s *Server) ListenAndServe() error {
    chErrors := make(chan error, len(s.hosts))

    for _, host := range s.hosts {
        protoAddrParts := strings.SplitN(host, "://", 2)
        if len(protoAddrParts) == 1 {
            protoAddrParts = append([]string{"tcp"}, protoAddrParts...)
        }

        go func() {
            log.WithFields(log.Fields{"proto": protoAddrParts[0], "addr": protoAddrParts[1]}).Info("Listening for HTTP")

            var (
                l      net.Listener
                err    error
                server = &http.Server{
                    Addr:    protoAddrParts[1],
                    Handler: s.dispatcher,
                }
            )

            switch protoAddrParts[0] {
            case "unix":
                l, err = newUnixListener(protoAddrParts[1], s.tlsConfig)
            case "tcp":
                l, err = newListener("tcp", protoAddrParts[1], s.tlsConfig)
            default:
                err = fmt.Errorf("unsupported protocol: %q", protoAddrParts[0])
            }
            if err != nil {
                chErrors <- err
            } else {
                chErrors <- server.Serve(l)
            }

        }()
    }

    for i := 0; i < len(s.hosts); i++ {
        err := <-chErrors
        if err != nil {
            return err
        }
    }
    return nil
}

 

Docker Swarm代码分析笔记(6)——swarm driver的NewCluster函数

Swarm driverNewCluster函数的核心功能如下(cluster/swarm/cluster.go):

discoveryCh, errCh := cluster.discovery.Watch(nil)
go cluster.monitorDiscovery(discoveryCh, errCh)
go cluster.monitorPendingEngines()

cluster.discovery.Watch的定义如下:

Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error)

返回两个channel:第一个channel类型是Entriestype Entries []*Entry),用来传输cluster所包含的主机信息,第二个channel用来通知是否有错误发生。

Cluster.monitorDiscovery功能是整理Cluster.engines

select {
    case entries := <-ch:
        added, removed := currentEntries.Diff(entries)
        currentEntries = entries

        // Remove engines first. `addEngine` will refuse to add an engine
        // if there's already an engine with the same ID.  If an engine
        // changes address, we have to first remove it then add it back.
        for _, entry := range removed {
            c.removeEngine(entry.String())
        }

        for _, entry := range added {
            c.addEngine(entry.String())
        }
    case err := <-errCh:
        log.Errorf("Discovery error: %v", err)
    }

Cluster.monitorPendingEngines则是验证处于pending状态的Cluster.engines现在是否能够连接上:

for _, e := range pEngines {
        if e.TimeToValidate() {
            go c.validatePendingEngine(e)
        }
    }

 

Docker Swarm代码分析笔记(5)——cluster/cluster.go

cluster\cluster.go文件定义了Cluster interface

// Cluster is exported
type Cluster interface {
    // Create a container
    CreateContainer(config *ContainerConfig, name string, authConfig *types.AuthConfig) (*Container, error)

    // Remove a container
    RemoveContainer(container *Container, force, volumes bool) error

    // Return all images
    Images() Images
    ......
}

目前实现了mesoscluster/mesos/cluster.go,目前仍然处于试验阶段)和swarmcluster/swarm/cluster.go)两种driver。如果你想实现自己的driver,就要实现上面Cluster interface的所有函数。

 

Docker Swarm代码分析笔记(4)——swarm join

Docker Swarmjoin命令的定义:

    {
        Name:      "join",
        ShortName: "j",
        Usage:     "Join a docker cluster",
        Flags:     []cli.Flag{flJoinAdvertise, flHeartBeat, flTTL, flJoinRandomDelay, flDiscoveryOpt},
        Action:    join,
    },

flHeartBeat的默认值是60s,而flTTL默认值是180s:

flHeartBeat = cli.StringFlag{
    Name:  "heartbeat",
    Value: "60s",
    Usage: "period between each heartbeat",
}
flTTL = cli.StringFlag{
    Name:  "ttl",
    Value: "180s",
    Usage: "set the expiration of an ephemeral node",
}

join函数的核心代码:

......
for {
    log.WithFields(log.Fields{"addr": addr, "discovery": dflag}).Infof("Registering on the discovery service every %s...", hb)
    if err := d.Register(addr); err != nil {
        log.Error(err)
    }
    time.Sleep(hb)
}
......

token.Register函数实现:

func (s *Discovery) Register(addr string) error {
    buf := strings.NewReader(addr)

    resp, err := http.Post(fmt.Sprintf("%s/%s/%s?ttl=%d", s.url,
        "clusters", s.token, uint64(s.ttl.Seconds())), "application/json", buf)

    if err != nil {
        return err
    }

    resp.Body.Close()
    return nil
}

join命令其实就是每隔heartbeat时间(例如,60s),向https://discovery.hub.docker.com/v1/clusters/token/ttl=180ttl取默认值),注册一下当前Docker的地址(IP:PORT)。

 

Docker Swarm代码分析笔记(3)——swarm create

Docker Swarmcreate命令代码很简单:

func create(c *cli.Context) {
    if len(c.Args()) != 0 {
        log.Fatalf("the `create` command takes no arguments. See '%s create --help'.", c.App.Name)
    }
    discovery := &token.Discovery{}
    discovery.Initialize("", 0, 0, nil)
    token, err := discovery.CreateCluster()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(token)
}

token.CreateCluster()函数的实现:

// CreateCluster returns a unique cluster token
func (s *Discovery) CreateCluster() (string, error) {
    resp, err := http.Post(fmt.Sprintf("%s/%s", s.url, "clusters"), "", nil)
    if err != nil {
        return "", err
    }

    defer resp.Body.Close()
    token, err := ioutil.ReadAll(resp.Body)
    return string(token), err
}

其实就是向https://discovery.hub.docker.com/v1/clusters发送一个https post请求,然后得到唯一的一个token,利用这个token来创建和管理cluster

 

Docker Swarm代码分析笔记(2)——discovery

Dockerdiscovery package提供了实现新的backend定义的interfacediscovey.go):

package discovery

import (
    "errors"
    "time"
)

var (
    // ErrNotSupported is returned when a discovery service is not supported.
    ErrNotSupported = errors.New("discovery service not supported")

    // ErrNotImplemented is returned when discovery feature is not implemented
    // by discovery backend.
    ErrNotImplemented = errors.New("not implemented in this discovery service")
)

// Watcher provides watching over a cluster for nodes joining and leaving.
type Watcher interface {
    // Watch the discovery for entry changes.
    // Returns a channel that will receive changes or an error.
    // Providing a non-nil stopCh can be used to stop watching.
    Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error)
}

// Backend is implemented by discovery backends which manage cluster entries.
type Backend interface {
    // Watcher must be provided by every backend.
    Watcher

    // Initialize the discovery with URIs, a heartbeat, a ttl and optional settings.
    Initialize(string, time.Duration, time.Duration, map[string]string) error

    // Register to the discovery.
    Register(string) error
}

Watcher interface用于node加入或移除出cluster时的回调函数。Initialize用于backend的初始化,而Register用于将node加入cluster