Swarmkit笔记(9)——manager

Node.runManager()函数会启动一个manager

func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
    for {
        n.waitRole(ctx, ca.ManagerRole)
        if ctx.Err() != nil {
            return ctx.Err()
        }
        remoteAddr, _ := n.remotes.Select(n.nodeID)
        m, err := manager.New(&manager.Config{
            ForceNewCluster: n.config.ForceNewCluster,
            ProtoAddr: map[string]string{
                "tcp":  n.config.ListenRemoteAPI,
                "unix": n.config.ListenControlAPI,
            },
            AdvertiseAddr:  n.config.AdvertiseRemoteAPI,
            SecurityConfig: securityConfig,
            ExternalCAs:    n.config.ExternalCAs,
            JoinRaft:       remoteAddr.Addr,
            StateDir:       n.config.StateDir,
            HeartbeatTick:  n.config.HeartbeatTick,
            ElectionTick:   n.config.ElectionTick,
        })
        if err != nil {
            return err
        }
        done := make(chan struct{})
        go func() {
            m.Run(context.Background()) // todo: store error
            close(done)
        }()

        n.Lock()
        n.manager = m
        n.Unlock()

        connCtx, connCancel := context.WithCancel(ctx)
        go n.initManagerConnection(connCtx, ready)

        // this happens only on initial start
        if ready != nil {
            go func(ready chan struct{}) {
                select {
                case <-ready:
                    n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
                case <-connCtx.Done():
                }
            }(ready)
            ready = nil
        }

        n.waitRole(ctx, ca.AgentRole)

        n.Lock()
        n.manager = nil
        n.Unlock()

        select {
        case <-done:
        case <-ctx.Done():
            err = ctx.Err()
            m.Stop(context.Background())
            <-done
        }
        connCancel()

        if err != nil {
            return err
        }
    }
}

(1)

        n.waitRole(ctx, ca.ManagerRole)
        if ctx.Err() != nil {
            return ctx.Err()
        }

首先runManager()函数会阻塞在waitRole()函数。一旦获得manager角色,就会往下执行。

(2)

        remoteAddr, _ := n.remotes.Select(n.nodeID)
        m, err := manager.New(&manager.Config{
            ForceNewCluster: n.config.ForceNewCluster,
            ProtoAddr: map[string]string{
                "tcp":  n.config.ListenRemoteAPI,
                "unix": n.config.ListenControlAPI,
            },
            AdvertiseAddr:  n.config.AdvertiseRemoteAPI,
            SecurityConfig: securityConfig,
            ExternalCAs:    n.config.ExternalCAs,
            JoinRaft:       remoteAddr.Addr,
            StateDir:       n.config.StateDir,
            HeartbeatTick:  n.config.HeartbeatTick,
            ElectionTick:   n.config.ElectionTick,
        })
        if err != nil {
            return err
        }
        done := make(chan struct{})
        go func() {
            m.Run(context.Background()) // todo: store error
            close(done)
        }()

        n.Lock()
        n.manager = m
        n.Unlock()

a)remoteAddr, _ := n.remotes.Select(n.nodeID)作用是从当前clustermanager中(当然需要排除掉当前node)选出一个leader,赋给remoteAddr。如果当前nodecluster中的第一个manager,则remoteAddr就是一个“空的”值:{NodeID: "", Addr: ""}
b)在使用manager.New()函数创建manager时,要注意n.config.AdvertiseRemoteAPI是一直为""的。 manager.New()最后会返回一个Manager结构体:

func New(config *Config) (*Manager, error) {
    ......
    m := &Manager{
        config:      config,
        listeners:   listeners,
        caserver:    ca.NewServer(RaftNode.MemoryStore(), config.SecurityConfig),
        Dispatcher:  dispatcher.New(RaftNode, dispatcherConfig),
        server:      grpc.NewServer(opts...),
        localserver: grpc.NewServer(opts...),
        RaftNode:    RaftNode,
        started:     make(chan struct{}),
        stopped:     make(chan struct{}),
    }

    return m, nil
}

其中的listeners包含监听listen-remote-api(tcp)listen-control-api(unix)的两个socket

c)m.Run()是实际运行manager的函数,连作者自己都觉得复杂(“This function is *way* too complex.”)。可以把这个函数逻辑分成下面几块:
i)如果当前manager被选为leader,就做一大堆初始化的动作,包括为schedulerallocator等分配资源,启动goroutine等等;如果不是leader,就做一大堆收尾工作,停掉goroutine,释放资源。
ii)接下来对manager.localservermanager.server做一大堆设置,主要是authenticationproxy的方面;然后二者分别监听manager.listeners中的UnixTCP socket,处理相应的数据。

(3)

        connCtx, connCancel := context.WithCancel(ctx)
        go n.initManagerConnection(connCtx, ready)

其中Node.initManagerConnection()实现如下:

func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
    opts := []grpc.DialOption{}
    insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
    opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
    // Using listen address instead of advertised address because this is a
    // local connection.
    addr := n.config.ListenControlAPI
    opts = append(opts, grpc.WithDialer(
        func(addr string, timeout time.Duration) (net.Conn, error) {
            return net.DialTimeout("unix", addr, timeout)
        }))
    conn, err := grpc.Dial(addr, opts...)
    if err != nil {
        return err
    }
    state := grpc.Idle
    for {
        s, err := conn.WaitForStateChange(ctx, state)
        if err != nil {
            n.setControlSocket(nil)
            return err
        }
        if s == grpc.Ready {
            n.setControlSocket(conn)
            if ready != nil {
                close(ready)
                ready = nil
            }
        } else if state == grpc.Shutdown {
            n.setControlSocket(nil)
        }
        state = s
    }
}

功能就是建立一个同本地listen-control-api(unix) socket的一个连接,用来监控node的状态。

(4)把当前node也加入remotes的监控列表中:

    // this happens only on initial start
        if ready != nil {
            go func(ready chan struct{}) {
                select {
                case <-ready:
                    n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
                case <-connCtx.Done():
                }
            }(ready)
            ready = nil
        }

(5)阻塞在下列代码,等待角色变化:

n.waitRole(ctx, ca.AgentRole)

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注