Docker Swarm代码分析笔记(9)——Swarm的Cluster,Engine和addEngine

swarm/cluster.go属于swarm这个package,它定义了swarm这个driverCluster结构体:

// Cluster is exported
type Cluster struct {
    sync.RWMutex

    eventHandlers     *cluster.EventHandlers
    engines           map[string]*cluster.Engine
    pendingEngines    map[string]*cluster.Engine
    scheduler         *scheduler.Scheduler
    discovery         discovery.Backend
    pendingContainers map[string]*pendingContainer

    overcommitRatio float64
    engineOpts      *cluster.EngineOpts
    createRetry     int64
    TLSConfig       *tls.Config
}

其中很重要的是cluster.Engine这个结构体的定义(cluster/engine.go),每个cluster.Engine代表一个Docker engine,即Docker daemon

// Engine represents a docker engine
type Engine struct {
    sync.RWMutex

    ID  string
    IP  string
    Addr string
    Name string
    Cpus int64
    Memory  int64
    Labels  map[string]string
    Version string

    stopCh  chan struct{}
    refreshDelayer  *delayer
    containers  map[string]*Container
    images  []*Image
    networksmap[string]*Network
    volumes map[string]*Volume
    client  dockerclient.Client
    apiClient   engineapi.APIClient
    eventHandlerEventHandler
    state   engineState
    lastError   string
    updatedAt   time.Time
    failureCountint
    overcommitRatio int64
    opts*EngineOpts
    eventsMonitor   *EventsMonitor
}

创建cluster最核心的部分就是addEngine这个方法,即把Docker engine加到cluster中:

func (c *Cluster) addEngine(addr string) bool {
    // Check the engine is already registered by address.
    if c.hasEngineByAddr(addr) {
        return false
    }

    engine := cluster.NewEngine(addr, c.overcommitRatio, c.engineOpts)
    if err := engine.RegisterEventHandler(c); err != nil {
        log.Error(err)
    }
    // Add it to pending engine map, indexed by address. This will prevent
    // duplicates from entering
    c.Lock()
    c.pendingEngines[addr] = engine
    c.Unlock()

    // validatePendingEngine will start a thread to validate the engine.
    // If the engine is reachable and valid, it'll be monitored and updated in a loop.
    // If engine is not reachable, pending engines will be examined once in a while
    go c.validatePendingEngine(engine)

    return true
}

addEngine首先检查当前engine是否已经存在cluster中,如果没有则会分配一个新的engine,同时把它先加到Cluster.pendingEngines中,并启动一个新的goroutinevalidatePendingEngine)去检查它是否是个有效的engine

另外,Engine.RegisterEventHandler实质上就是把ClustereventHandlers成员赋给Engine.eventHandler

// RegisterEventHandler registers an event handler.
func (e *Engine) RegisterEventHandler(h EventHandler) error {
    if e.eventHandler != nil {
        return errors.New("event handler already set")
    }
    e.eventHandler = h
    return nil
}

ClustereventHandlers则是在NewPrimary中赋值的,所以本质上EngineCluster用的都是一套处理函数。