Docker daemon
初始化响应docker client swarm
相关命令的处理函数位于api/server/router/swarm/cluster.go
:
// buildRouter is a router to talk with the build controller
type swarmRouter struct {
backend Backend
routes []router.Route
}
// NewRouter initializes a new build router
func NewRouter(b Backend) router.Router {
r := &swarmRouter{
backend: b,
}
r.initRoutes()
return r
}
// Routes returns the available routers to the swarm controller
func (sr *swarmRouter) Routes() []router.Route {
return sr.routes
}
func (sr *swarmRouter) initRoutes() {
sr.routes = []router.Route{
router.NewPostRoute("/swarm/init", sr.initCluster),
router.NewPostRoute("/swarm/join", sr.joinCluster),
router.NewPostRoute("/swarm/leave", sr.leaveCluster),
router.NewGetRoute("/swarm", sr.inspectCluster),
router.NewPostRoute("/swarm/update", sr.updateCluster),
router.NewGetRoute("/services", sr.getServices),
router.NewGetRoute("/services/{id:.*}", sr.getService),
router.NewPostRoute("/services/create", sr.createService),
router.NewPostRoute("/services/{id:.*}/update", sr.updateService),
router.NewDeleteRoute("/services/{id:.*}", sr.removeService),
router.NewGetRoute("/nodes", sr.getNodes),
router.NewGetRoute("/nodes/{id:.*}", sr.getNode),
router.NewDeleteRoute("/nodes/{id:.*}", sr.removeNode),
router.NewPostRoute("/nodes/{id:.*}/update", sr.updateNode),
router.NewGetRoute("/tasks", sr.getTasks),
router.NewGetRoute("/tasks/{id:.*}", sr.getTask),
}
}
以处理“/swarm/init
”请求为例,实际的处理函数位于daemon/cluster/cluster.go
:
// Init initializes new cluster from user provided request.
func (c *Cluster) Init(req types.InitRequest) (string, error) {
c.Lock()
if node := c.node; node != nil {
if !req.ForceNewCluster {
c.Unlock()
return "", ErrSwarmExists
}
if err := c.stopNode(); err != nil {
c.Unlock()
return "", err
}
}
if err := validateAndSanitizeInitRequest(&req); err != nil {
c.Unlock()
return "", err
}
listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
if err != nil {
c.Unlock()
return "", err
}
advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
if err != nil {
c.Unlock()
return "", err
}
localAddr := listenHost
// If the advertise address is not one of the system's
// addresses, we also require a listen address.
listenAddrIP := net.ParseIP(listenHost)
if listenAddrIP != nil && listenAddrIP.IsUnspecified() {
advertiseIP := net.ParseIP(advertiseHost)
if advertiseIP == nil {
// not an IP
c.Unlock()
return "", errMustSpecifyListenAddr
}
systemIPs := listSystemIPs()
found := false
for _, systemIP := range systemIPs {
if systemIP.Equal(advertiseIP) {
found = true
break
}
}
if !found {
c.Unlock()
return "", errMustSpecifyListenAddr
}
localAddr = advertiseIP.String()
}
// todo: check current state existing
n, err := c.startNewNode(req.ForceNewCluster, localAddr, "", net.JoinHostPort(listenHost, listenPort), net.JoinHostPort(advertiseHost, advertisePort), "", "")
if err != nil {
c.Unlock()
return "", err
}
c.Unlock()
select {
case <-n.Ready():
if err := initClusterSpec(n, req.Spec); err != nil {
return "", err
}
go c.reconnectOnFailure(n)
return n.NodeID(), nil
case <-n.done:
c.RLock()
defer c.RUnlock()
if !req.ForceNewCluster { // if failure on first attempt don't keep state
if err := c.clearState(); err != nil {
return "", err
}
}
return "", c.err
}
}
它的核心是c.startNewNode
函数,其主要逻辑如下:
......
n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
Hostname: c.config.Name,
ForceNewCluster: forceNewCluster,
ListenControlAPI: filepath.Join(c.root, controlSocket),
ListenRemoteAPI: listenAddr,
AdvertiseRemoteAPI: advertiseAddr,
JoinAddr: joinAddr,
StateDir: c.root,
JoinToken: joinToken,
Executor: container.NewExecutor(c.config.Backend),
HeartbeatTick: 1,
ElectionTick: 3,
})
if err != nil {
return nil, err
}
ctx := context.Background()
if err := n.Start(ctx); err != nil {
return nil, err
}
......
即调用swarmkit
的NewNode
创建一个node
,接下来便start
这个节点。这个node
是manager
角色。