Go语言中连接多个字符串

本文参考自How to efficiently concatenate strings in Go?Efficient String Concatenation in Go

(1)直接使用“+=”:

package main

import (
    "fmt"
    "strconv"
)

func main() {
    s := ""
    for i := 0; i <= 9; i++ {
        s += strconv.Itoa(i)
    }
    fmt.Println(s)
}

因为字符串类型在Go中是不可改变的,因此每次操作实际都要新分配字符串,所以在字符串比较多的时候效率不高。

(2)使用strings.Join()函数:

package main

import (
    "fmt"
    "strconv"
    "strings"
)

func main() {
    var s []string
    for i := 0; i <= 9; i++ {
        s = append(s, strconv.Itoa(i))
    }
    fmt.Println(strings.Join(s, ""))
}

这种方式需要花费构建slice的时间。

(3)使用bytes.Buffer

package main

import (
    "bytes"
    "fmt"
    "strconv"
)

func main() {
    var buffer bytes.Buffer
    for i := 0; i <= 9; i++ {
        buffer.WriteString(strconv.Itoa(i))
    }
    fmt.Println(buffer.String())
}

这种方式在字符串比较多时效率最高。

Go语言的http.FileServer

看一下下面这段代码:

package main

import (
    "net/http"
)

func main() {
    // To serve a directory on disk (/tmp) under an alternate URL
    // path (/tmpfiles/), use StripPrefix to modify the request
    // URL's path before the FileServer sees it:
    http.Handle("/tmpfiles/", http.StripPrefix("/tmpfiles/", http.FileServer(http.Dir("/tmp"))))
}

(1)http.Dir("/tmp")是利用本地tmp目录实现一个文件系统;
(2)http.FileServer(http.Dir("/tmp"))返回一个Handler,其用来处理访问本地"/tmp"文件夹的HTTP请求;
(3)http.StripPrefix("/tmpfiles/", http.FileServer(http.Dir("/tmp")))返回一个新的Handler,这个Handler用来处理HTTP请求(移除前缀是"/tmpfiles/"后的URL)。
总结一下,当要访问http://localhost/tmpfiles/a文件,实际访问的是本地/tmp/a文件。

Go语言中的nil

本文是Francesc CampoyGoConf上做的Understanding Nil演讲的笔记。

(1)nil没有type

(2)在Go语言中,未显示初始化的变量拥有其类型的zero value。共有6种类型变量的zero valuenilpointerslicemapchannelfunctioninterface。具体含义如下:

类型 nil值含义
pointer 指向nothing
slice slice变量中的3个成员值:buf为nil(没有backing array),len和cap都是0
map,channel,function 一个nil pointer,指向nothing
interface interface包含”type, value”,一个nil interface必须二者都为nil:”nil, nil”

对于interface来说,正因为需要<type, value>这个元组中两个值都为nilinterface值才是nil。所以需要注意下面两点:

a)Do not declare concrete error vars:

func do() error {
    var err *doError // nil of type *doError
    return err // error (*doError, nil)
}
func main() {
    err := do() // error (*doError, nil)
    fmt.Println(err == nil) // false
}

b)Do not return concrete error types:

func do() *doError {
    return nil // nil of type *doError
}
func main() {
    err := do() // nil of type *doError
    fmt.Println(err == nil) // true
}

func do() *doError {
    return nil // nil of type *doError
}
func wrapDo() error { // error (*doError, nil)
    return do() // nil of type *doError
}
func main() {
    err := wrapDo() // error (*doError, nil)
    fmt.Println(err == nil) // false
}

(3)nil一些有用的使用场景:

类型 nil值使用场景
pointer methods can be called on nil receivers
slice perfectly valid zero values
map perfect as read-only values(往nil map添加成员会导致panic)
channel essential for some concurrency patterns
function needed for completeness
interface the most used signal in Go (err != nil)

 

Swarmkit笔记(15)——cluster node存储相关的代码

raft.Node有一个memoryStore成员(定义在manager/state/raft/raft.go):

// Node represents the Raft Node useful
// configuration.
type Node struct {
    ......
    memoryStore         *store.MemoryStore
    ......
}

它非常重要,因为cluster中用来响应swarmctl命令的manager leader中的store成员其实就是指向managerNode结构体中的memoryStore

// Server is the Cluster API gRPC server.
type Server struct {
    store  *store.MemoryStore
    raft   *raft.Node
    rootCA *ca.RootCA
}

store.MemoryStore定义在manager/state/store/memory.go

// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
    // updateLock must be held during an update transaction.
    updateLock sync.Mutex

    memDB *memdb.MemDB
    queue *watch.Queue

    proposer state.Proposer
}

其中实际用来存储的memory database部分使用的是go-memdb项目。初始化store.MemoryStore使用NewMemoryStore()函数:

// NewMemoryStore returns an in-memory store. The argument is an optional
// Proposer which will be used to propagate changes to other members in a
// cluster.
func NewMemoryStore(proposer state.Proposer) *MemoryStore {
    memDB, err := memdb.NewMemDB(schema)
    if err != nil {
        // This shouldn't fail
        panic(err)
    }

    return &MemoryStore{
        memDB:    memDB,
        queue:    watch.NewQueue(0),
        proposer: proposer,
    }
}

其中schema是一个*memdb.DBSchema类型的变量:

schema        = &memdb.DBSchema{
        Tables: map[string]*memdb.TableSchema{},
    }

schema添加成员使用的是register函数(定义在manager/state/store/memory.go):

func register(os ObjectStoreConfig) {
    objectStorers = append(objectStorers, os)
    schema.Tables[os.Name] = os.Table
}

register函数在store package里各个文件(分别是cluster.gonetworks.gonodes.goservices.gotasks.go,正好对应swarmctl5个子命令。)的init()函数中使用,用来注册如何处理相应的object

ObjectStoreConfig定义在manager/state/store/object.go

// ObjectStoreConfig provides the necessary methods to store a particular object
// type inside MemoryStore.
type ObjectStoreConfig struct {
    Name             string
    Table            *memdb.TableSchema
    Save             func(ReadTx, *api.StoreSnapshot) error
    Restore          func(Tx, *api.StoreSnapshot) error
    ApplyStoreAction func(Tx, *api.StoreAction) error
    NewStoreAction   func(state.Event) (api.StoreAction, error)
}

它定义了如何存储一个object

services.go为例:

const tableService = "service"

func init() {
    register(ObjectStoreConfig{
        Name: tableService,
        Table: &memdb.TableSchema{
            Name: tableService,
            Indexes: map[string]*memdb.IndexSchema{
                indexID: {
                    Name:    indexID,
                    Unique:  true,
                    Indexer: serviceIndexerByID{},
                },
                indexName: {
                    Name:    indexName,
                    Unique:  true,
                    Indexer: serviceIndexerByName{},
                },
            },
        },
        Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error {
            var err error
            snapshot.Services, err = FindServices(tx, All)
            return err
        },
        Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
            services, err := FindServices(tx, All)
            if err != nil {
                return err
            }
            for _, s := range services {
                if err := DeleteService(tx, s.ID); err != nil {
                    return err
                }
            }
            for _, s := range snapshot.Services {
                if err := CreateService(tx, s); err != nil {
                    return err
                }
            }
            return nil
        },
        ApplyStoreAction: func(tx Tx, sa *api.StoreAction) error {
            switch v := sa.Target.(type) {
            case *api.StoreAction_Service:
                obj := v.Service
                switch sa.Action {
                case api.StoreActionKindCreate:
                    return CreateService(tx, obj)
                case api.StoreActionKindUpdate:
                    return UpdateService(tx, obj)
                case api.StoreActionKindRemove:
                    return DeleteService(tx, obj.ID)
                }
            }
            return errUnknownStoreAction
        },
        NewStoreAction: func(c state.Event) (api.StoreAction, error) {
            var sa api.StoreAction
            switch v := c.(type) {
            case state.EventCreateService:
                sa.Action = api.StoreActionKindCreate
                sa.Target = &api.StoreAction_Service{
                    Service: v.Service,
                }
            case state.EventUpdateService:
                sa.Action = api.StoreActionKindUpdate
                sa.Target = &api.StoreAction_Service{
                    Service: v.Service,
                }
            case state.EventDeleteService:
                sa.Action = api.StoreActionKindRemove
                sa.Target = &api.StoreAction_Service{
                    Service: v.Service,
                }
            default:
                return api.StoreAction{}, errUnknownStoreAction
            }
            return sa, nil
        },
    })
}

NewStoreAction是创建针对service这张tableapi.StoreAction;而ApplyStoreAction则是根据具体情况,使用相应的actioncreateupdate还是delete,等等);Save是从数据库读取所有的service并保存到一个snapshot中;Restore则是用snapshot中的值更新数据库中相应的service

再看一下manager leader用来创建service的函数(manager\controlapi\service.go):

// CreateService creates and return a Service based on the provided ServiceSpec.
// - Returns `InvalidArgument` if the ServiceSpec is malformed.
// - Returns `Unimplemented` if the ServiceSpec references unimplemented features.
// - Returns `AlreadyExists` if the ServiceID conflicts.
// - Returns an error if the creation fails.
func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRequest) (*api.CreateServiceResponse, error) {
    ......
    err := s.store.Update(func(tx store.Tx) error {
        return store.CreateService(tx, service)
    })
    if err != nil {
        return nil, err
    }
    ......
}

s.store.Update()函数是核心部分(manager/state/store/memory.go):

// Update executes a read/write transaction.
func (s *MemoryStore) Update(cb func(Tx) error) error {
    return s.update(s.proposer, cb)
} 

再看一下MemoryStore.update()函数(manager/state/store/memory.go):

func (s *MemoryStore) update(proposer state.Proposer, cb func(Tx) error) error {
    s.updateLock.Lock()
    memDBTx := s.memDB.Txn(true)

    var curVersion *api.Version

    if proposer != nil {
        curVersion = proposer.GetVersion()
    }

    var tx tx
    tx.init(memDBTx, curVersion)

    err := cb(&tx)

    if err == nil {
        if proposer == nil {
            memDBTx.Commit()
        } else {
            var sa []*api.StoreAction
            sa, err = tx.changelistStoreActions()

            if err == nil {
                if sa != nil {
                    err = proposer.ProposeValue(context.Background(), sa, func() {
                        memDBTx.Commit()
                    })
                } else {
                    memDBTx.Commit()
                }
            }
        }
    }

    if err == nil {
        for _, c := range tx.changelist {
            s.queue.Publish(c)
        }
        if len(tx.changelist) != 0 {
            s.queue.Publish(state.EventCommit{})
        }
    } else {
        memDBTx.Abort()
    }
    s.updateLock.Unlock()
    return err

}

分析一下上面这个函数:
(1)

memDBTx := s.memDB.Txn(true)

这是go-memdb的用法,true表明创建一个write transaction

(2)

if proposer != nil {
    curVersion = proposer.GetVersion()
}

proposermanagerraft.Node成员,其功能是用来通知cluster中其它follower manager所发生的变化:

// ProposeValue calls Propose on the raft and waits
// on the commit log action before returning a result
func (n *Node) ProposeValue(ctx context.Context, storeAction []*api.StoreAction, cb func()) error {
    _, err := n.processInternalRaftRequest(ctx, &api.InternalRaftRequest{Action: storeAction}, cb)
    if err != nil {
        return err
    }
    return nil
}

// GetVersion returns the sequence information for the current raft round.
func (n *Node) GetVersion() *api.Version {
    n.stopMu.RLock()
    defer n.stopMu.RUnlock()

    if !n.IsMember() {
        return nil
    }

    status := n.Node.Status()
    return &api.Version{Index: status.Commit}
}

(3)

var tx tx
tx.init(memDBTx, curVersion)

err := cb(&tx)

其中tx定义如下:

// Tx is a read/write transaction. Note that transaction does not imply
// any internal batching. The purpose of this transaction is to give the
// user a guarantee that its changes won't be visible to other transactions
// until the transaction is over.
type Tx interface {
    ReadTx
    create(table string, o Object) error
    update(table string, o Object) error
    delete(table, id string) error
}

type tx struct {
    readTx
    curVersion *api.Version
    changelist []state.Event
}

tx用来实现read/write transaction

tx.init()就是一个“一对一”的赋值:

func (tx *tx) init(memDBTx *memdb.Txn, curVersion *api.Version) {
    tx.memDBTx = memDBTx
    tx.curVersion = curVersion
    tx.changelist = nil
}

cb就是:

func(tx store.Tx) error {
        return store.CreateService(tx, service)
}

store.CreateService()函数:

// CreateService adds a new service to the store.
// Returns ErrExist if the ID is already taken.
func CreateService(tx Tx, s *api.Service) error {
    // Ensure the name is not already in use.
    if tx.lookup(tableService, indexName, strings.ToLower(s.Spec.Annotations.Name)) != nil {
        return ErrNameConflict
    }

    return tx.create(tableService, serviceEntry{s})
}

以上代码确定service name没有重复后,再创建service

// create adds a new object to the store.
// Returns ErrExist if the ID is already taken.
func (tx *tx) create(table string, o Object) error {
    if tx.lookup(table, indexID, o.ID()) != nil {
        return ErrExist
    }

    copy := o.Copy()
    meta := copy.Meta()
    if err := touchMeta(&meta, tx.curVersion); err != nil {
        return err
    }
    copy.SetMeta(meta)

    err := tx.memDBTx.Insert(table, copy)
    if err == nil {
        tx.changelist = append(tx.changelist, copy.EventCreate())
        o.SetMeta(meta)
    }
    return err
}

上面这个函数会创建一个Object副本(也就是serviceEntry结构体)存放到数据库里,并把一个state.EventCreateService加到tx.changelist中。

其实这些有callbak作为参数的函数,真正用来做事就是callback,函数的其它部分仅仅是提供了一些common的功能。比如:获得transaction和commit。

(4)

    if err == nil {
        if proposer == nil {
            memDBTx.Commit()
        } else {
            var sa []*api.StoreAction
            sa, err = tx.changelistStoreActions()

            if err == nil {
                if sa != nil {
                    err = proposer.ProposeValue(context.Background(), sa, func() {
                        memDBTx.Commit()
                    })
                } else {
                    memDBTx.Commit()
                }
            }
        }
    }

把数据commit到数据库。

(5)

    if err == nil {
        for _, c := range tx.changelist {
            s.queue.Publish(c)
        }
        if len(tx.changelist) != 0 {
            s.queue.Publish(state.EventCommit{})
        }
    } else {
        memDBTx.Abort()
    }

s.queue.Publish()函数把创建service这个消息通知到其它的goroutine(例如m.globalOrchestrator.Run()),这些goroutine会做具体的创建service操作。

此外,MemoryStore还提供了View函数,用来完成read transaction

// ReadTx is a read transaction. Note that transaction does not imply
// any internal batching. It only means that the transaction presents a
// consistent view of the data that cannot be affected by other
// transactions.
type ReadTx interface {
    lookup(table, index, id string) Object
    get(table, id string) Object
    find(table string, by By, checkType func(By) error, appendResult func(Object)) error
}

type readTx struct {
    memDBTx *memdb.Txn
}

// View executes a read transaction.
func (s *MemoryStore) View(cb func(ReadTx)) {
    memDBTx := s.memDB.Txn(false)

    readTx := readTx{
        memDBTx: memDBTx,
    }
    cb(readTx)
    memDBTx.Commit()
}

Swarmkit笔记(14)——manager切换角色

Manager结构体(定义在manager/manager.go)包含一个*raft.Node成员:

// Manager is the cluster manager for Swarm.
// This is the high-level object holding and initializing all the manager
// subsystems.
type Manager struct {
    ......
    RaftNode   *raft.Node
    ......
}

raft.Node(定义在manager/state/raft/raft.go)则包含一个*events.Broadcaster成员,用来接收改变manager role的消息(变成leader还是follower):

// Node represents the Raft Node useful
// configuration.
type Node struct {
    ......
    leadershipBroadcast *events.Broadcaster
    ......
}   

发送改变当前manager role的代码位于manager/state/raft/raft.go

// Run is the main loop for a Raft node, it goes along the state machine,
// acting on the messages received from other Raft nodes in the cluster.
//
// Before running the main loop, it first starts the raft node based on saved
// cluster state. If no saved state exists, it starts a single-node cluster.
func (n *Node) Run(ctx context.Context) error {
    ......
            // If we cease to be the leader, we must cancel
            // any proposals that are currently waiting for
            // a quorum to acknowledge them. It is still
            // possible for these to become committed, but
            // if that happens we will apply them as any
            // follower would.
            if rd.SoftState != nil {
                if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
                    wasLeader = false
                    n.wait.cancelAll()
                    if atomic.LoadUint32(&n.signalledLeadership) == 1 {
                        atomic.StoreUint32(&n.signalledLeadership, 0)
                        n.leadershipBroadcast.Write(IsFollower)
                    }
                } else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
                    wasLeader = true
                }
            }

            if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
                // If all the entries in the log have become
                // committed, broadcast our leadership status.
                if n.caughtUp() {
                    atomic.StoreUint32(&n.signalledLeadership, 1)
                    n.leadershipBroadcast.Write(IsLeader)
                }
            }
    ......
}

接收消息的代码在Manager.Run()函数(manager/manager.go):

// Run starts all manager sub-systems and the gRPC server at the configured
// address.
// The call never returns unless an error occurs or `Stop()` is called.
func (m *Manager) Run(parent context.Context) error {
    ......
    leadershipCh, cancel := m.RaftNode.SubscribeLeadership()
    defer cancel()

    go m.handleLeadershipEvents(ctx, leadershipCh)

    ......

    go func() {
            err := m.RaftNode.Run(ctx)
            if err != nil {
                log.G(ctx).Error(err)
                m.Stop(ctx)
            }
        }()

    ......
}

Node.SubscribeLeadership()Manager.handleLeadershipEvents()代码比较简单,不再赘述。

go-events package简介

go-events实现了一种处理event的机制,其核心概念是Sink(定义在event.go):

// Event marks items that can be sent as events.
type Event interface{}

// Sink accepts and sends events.
type Sink interface {
    // Write an event to the Sink. If no error is returned, the caller will
    // assume that all events have been committed to the sink. If an error is
    // received, the caller may retry sending the event.
    Write(event Event) error

    // Close the sink, possibly waiting for pending events to flush.
    Close() error
}

可以把Sink想象成一个“池子”,它提供了2个方法:Write往“池子”里发消息,Close是不用时关闭这个“池子”。 其它几个文件其实都是围绕Sink做文章,构造出各种功能。举个例子:

package main

import (
    "fmt"
    "github.com/docker/go-events"
    "time"
)

type eventRecv struct {
    name string
}

func (e *eventRecv)Write(event events.Event) error {
    fmt.Printf("%s receives %d\n", e.name, event.(int))
    return nil
}

func (e *eventRecv)Close() error {
    return nil
}

func createEventRecv(name string) *eventRecv {
    return &eventRecv{name}
}

func main() {
    e1 := createEventRecv("Foo")
    e2 := createEventRecv("Bar")

    bc := events.NewBroadcaster(e1, e2)
    bc.Write(1)
    bc.Write(2)
    time.Sleep(time.Second)
}

执行结果如下:

Foo receives 1
Bar receives 1
Foo receives 2
Bar receives 2

NewBroadcaster作用是把一个event发送到多个Sink

再看一个使用NewQueue的例子:

package main

import (
    "fmt"
    "github.com/docker/go-events"
    "time"
)

type eventRecv struct {
    name string
}

func (e *eventRecv)Write(event events.Event) error {
    fmt.Printf("%s receives %d\n", e.name, event)
    return nil
}

func (e *eventRecv)Close() error {
    return nil
}

func createEventRecv(name string) *eventRecv {
    return &eventRecv{name}
}

func main() {
    q := events.NewQueue(createEventRecv("Foo"))
    q.Write(1)
    q.Write(2)
    time.Sleep(time.Second)
}

执行结果如下:

Foo receives 1
Foo receives 2

Swarmkit笔记(13)——swarmctl通过controlClient向swarm cluster发命令

swarmctl实质上是通过controlClientswarm cluster发命令。controlClient定义在api/control.pb.go

// Client API for Control service

type ControlClient interface {
    GetNode(ctx context.Context, in *GetNodeRequest, opts ...grpc.CallOption) (*GetNodeResponse, error)
    ListNodes(ctx context.Context, in *ListNodesRequest, opts ...grpc.CallOption) (*ListNodesResponse, error)
    ......
}

type controlClient struct {
    cc *grpc.ClientConn
}

func NewControlClient(cc *grpc.ClientConn) ControlClient {
    return &controlClient{cc}
}

func (c *controlClient) GetNode(ctx context.Context, in *GetNodeRequest, opts ...grpc.CallOption) (*GetNodeResponse, error) {
    out := new(GetNodeResponse)
    err := grpc.Invoke(ctx, "/docker.swarmkit.v1.Control/GetNode", in, out, c.cc, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

......

 

Swarmkit笔记(12)——swarmctl创建service时指定资源限制

swarmctl创建service时可以指定CPUmemory资源限制:

# swarmctl service create --help
Create a service

Usage:
  swarmctl service create [flags]

Flags:
......
  --cpu-limit string            CPU cores limit (e.g. 0.5)
  --cpu-reservation string      number of CPU cores reserved (e.g. 0.5)
......
  --memory-limit string         memory limit (e.g. 512m)
  --memory-reservation string   amount of reserved memory (e.g. 512m)
    ......

*-reservation的作用是为container分配并“占住”相应的资源,所以这些资源对container一定是可用的;*-limit是限制container进程所使用的资源。解析资源的代码位于cmd/swarmctl/service/flagparser/resource.go文件。

参考资料:
Docker service Limits and Reservations

docker笔记(15)——docker swarm功能代码分析(2)

Docker daemon初始化响应docker client swarm相关命令的处理函数位于api/server/router/swarm/cluster.go

// buildRouter is a router to talk with the build controller
type swarmRouter struct {
    backend Backend
    routes  []router.Route
}

// NewRouter initializes a new build router
func NewRouter(b Backend) router.Router {
    r := &swarmRouter{
        backend: b,
    }
    r.initRoutes()
    return r
}

// Routes returns the available routers to the swarm controller
func (sr *swarmRouter) Routes() []router.Route {
    return sr.routes
}

func (sr *swarmRouter) initRoutes() {
    sr.routes = []router.Route{
        router.NewPostRoute("/swarm/init", sr.initCluster),
        router.NewPostRoute("/swarm/join", sr.joinCluster),
        router.NewPostRoute("/swarm/leave", sr.leaveCluster),
        router.NewGetRoute("/swarm", sr.inspectCluster),
        router.NewPostRoute("/swarm/update", sr.updateCluster),
        router.NewGetRoute("/services", sr.getServices),
        router.NewGetRoute("/services/{id:.*}", sr.getService),
        router.NewPostRoute("/services/create", sr.createService),
        router.NewPostRoute("/services/{id:.*}/update", sr.updateService),
        router.NewDeleteRoute("/services/{id:.*}", sr.removeService),
        router.NewGetRoute("/nodes", sr.getNodes),
        router.NewGetRoute("/nodes/{id:.*}", sr.getNode),
        router.NewDeleteRoute("/nodes/{id:.*}", sr.removeNode),
        router.NewPostRoute("/nodes/{id:.*}/update", sr.updateNode),
        router.NewGetRoute("/tasks", sr.getTasks),
        router.NewGetRoute("/tasks/{id:.*}", sr.getTask),
    }
}

以处理“/swarm/init”请求为例,实际的处理函数位于daemon/cluster/cluster.go

// Init initializes new cluster from user provided request.
func (c *Cluster) Init(req types.InitRequest) (string, error) {
    c.Lock()
    if node := c.node; node != nil {
        if !req.ForceNewCluster {
            c.Unlock()
            return "", ErrSwarmExists
        }
        if err := c.stopNode(); err != nil {
            c.Unlock()
            return "", err
        }
    }

    if err := validateAndSanitizeInitRequest(&req); err != nil {
        c.Unlock()
        return "", err
    }

    listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
    if err != nil {
        c.Unlock()
        return "", err
    }

    advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
    if err != nil {
        c.Unlock()
        return "", err
    }

    localAddr := listenHost

    // If the advertise address is not one of the system's
    // addresses, we also require a listen address.
    listenAddrIP := net.ParseIP(listenHost)
    if listenAddrIP != nil && listenAddrIP.IsUnspecified() {
        advertiseIP := net.ParseIP(advertiseHost)
        if advertiseIP == nil {
            // not an IP
            c.Unlock()
            return "", errMustSpecifyListenAddr
        }

        systemIPs := listSystemIPs()

        found := false
        for _, systemIP := range systemIPs {
            if systemIP.Equal(advertiseIP) {
                found = true
                break
            }
        }
        if !found {
            c.Unlock()
            return "", errMustSpecifyListenAddr
        }
        localAddr = advertiseIP.String()
    }

    // todo: check current state existing
    n, err := c.startNewNode(req.ForceNewCluster, localAddr, "", net.JoinHostPort(listenHost, listenPort), net.JoinHostPort(advertiseHost, advertisePort), "", "")
    if err != nil {
        c.Unlock()
        return "", err
    }
    c.Unlock()

    select {
    case <-n.Ready():
        if err := initClusterSpec(n, req.Spec); err != nil {
            return "", err
        }
        go c.reconnectOnFailure(n)
        return n.NodeID(), nil
    case <-n.done:
        c.RLock()
        defer c.RUnlock()
        if !req.ForceNewCluster { // if failure on first attempt don't keep state
            if err := c.clearState(); err != nil {
                return "", err
            }
        }
        return "", c.err
    }
}

它的核心是c.startNewNode函数,其主要逻辑如下:

......
n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
    Hostname:           c.config.Name,
    ForceNewCluster:    forceNewCluster,
    ListenControlAPI:   filepath.Join(c.root, controlSocket),
    ListenRemoteAPI:    listenAddr,
    AdvertiseRemoteAPI: advertiseAddr,
    JoinAddr:           joinAddr,
    StateDir:           c.root,
    JoinToken:          joinToken,
    Executor:           container.NewExecutor(c.config.Backend),
    HeartbeatTick:      1,
    ElectionTick:       3,
})
if err != nil {
    return nil, err
}
ctx := context.Background()
if err := n.Start(ctx); err != nil {
    return nil, err
}
......

即调用swarmkitNewNode创建一个node,接下来便start这个节点。这个nodemanager角色。