Swarm driver
的NewCluster
函数的核心功能如下(cluster/swarm/cluster.go
):
discoveryCh, errCh := cluster.discovery.Watch(nil)
go cluster.monitorDiscovery(discoveryCh, errCh)
go cluster.monitorPendingEngines()
cluster.discovery.Watch
的定义如下:
Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error)
返回两个channel
:第一个channel
类型是Entries
(type Entries []*Entry
),用来传输cluster
所包含的主机信息,第二个channel
用来通知是否有错误发生。
Cluster.monitorDiscovery
功能是整理Cluster.engines
:
select {
case entries := <-ch:
added, removed := currentEntries.Diff(entries)
currentEntries = entries
// Remove engines first. `addEngine` will refuse to add an engine
// if there's already an engine with the same ID. If an engine
// changes address, we have to first remove it then add it back.
for _, entry := range removed {
c.removeEngine(entry.String())
}
for _, entry := range added {
c.addEngine(entry.String())
}
case err := <-errCh:
log.Errorf("Discovery error: %v", err)
}
而Cluster.monitorPendingEngines
则是验证处于pending
状态的Cluster.engines
现在是否能够连接上:
for _, e := range pEngines {
if e.TimeToValidate() {
go c.validatePendingEngine(e)
}
}