Swarmkit笔记(9)——manager

Node.runManager()函数会启动一个manager

func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
    for {
        n.waitRole(ctx, ca.ManagerRole)
        if ctx.Err() != nil {
            return ctx.Err()
        }
        remoteAddr, _ := n.remotes.Select(n.nodeID)
        m, err := manager.New(&manager.Config{
            ForceNewCluster: n.config.ForceNewCluster,
            ProtoAddr: map[string]string{
                "tcp":  n.config.ListenRemoteAPI,
                "unix": n.config.ListenControlAPI,
            },
            AdvertiseAddr:  n.config.AdvertiseRemoteAPI,
            SecurityConfig: securityConfig,
            ExternalCAs:    n.config.ExternalCAs,
            JoinRaft:       remoteAddr.Addr,
            StateDir:       n.config.StateDir,
            HeartbeatTick:  n.config.HeartbeatTick,
            ElectionTick:   n.config.ElectionTick,
        })
        if err != nil {
            return err
        }
        done := make(chan struct{})
        go func() {
            m.Run(context.Background()) // todo: store error
            close(done)
        }()

        n.Lock()
        n.manager = m
        n.Unlock()

        connCtx, connCancel := context.WithCancel(ctx)
        go n.initManagerConnection(connCtx, ready)

        // this happens only on initial start
        if ready != nil {
            go func(ready chan struct{}) {
                select {
                case <-ready:
                    n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
                case <-connCtx.Done():
                }
            }(ready)
            ready = nil
        }

        n.waitRole(ctx, ca.AgentRole)

        n.Lock()
        n.manager = nil
        n.Unlock()

        select {
        case <-done:
        case <-ctx.Done():
            err = ctx.Err()
            m.Stop(context.Background())
            <-done
        }
        connCancel()

        if err != nil {
            return err
        }
    }
}

(1)

        n.waitRole(ctx, ca.ManagerRole)
        if ctx.Err() != nil {
            return ctx.Err()
        }

首先runManager()函数会阻塞在waitRole()函数。一旦获得manager角色,就会往下执行。

(2)

        remoteAddr, _ := n.remotes.Select(n.nodeID)
        m, err := manager.New(&manager.Config{
            ForceNewCluster: n.config.ForceNewCluster,
            ProtoAddr: map[string]string{
                "tcp":  n.config.ListenRemoteAPI,
                "unix": n.config.ListenControlAPI,
            },
            AdvertiseAddr:  n.config.AdvertiseRemoteAPI,
            SecurityConfig: securityConfig,
            ExternalCAs:    n.config.ExternalCAs,
            JoinRaft:       remoteAddr.Addr,
            StateDir:       n.config.StateDir,
            HeartbeatTick:  n.config.HeartbeatTick,
            ElectionTick:   n.config.ElectionTick,
        })
        if err != nil {
            return err
        }
        done := make(chan struct{})
        go func() {
            m.Run(context.Background()) // todo: store error
            close(done)
        }()

        n.Lock()
        n.manager = m
        n.Unlock()

a)remoteAddr, _ := n.remotes.Select(n.nodeID)作用是从当前clustermanager中(当然需要排除掉当前node)选出一个leader,赋给remoteAddr。如果当前nodecluster中的第一个manager,则remoteAddr就是一个“空的”值:{NodeID: "", Addr: ""}
b)在使用manager.New()函数创建manager时,要注意n.config.AdvertiseRemoteAPI是一直为""的。 manager.New()最后会返回一个Manager结构体:

func New(config *Config) (*Manager, error) {
    ......
    m := &Manager{
        config:      config,
        listeners:   listeners,
        caserver:    ca.NewServer(RaftNode.MemoryStore(), config.SecurityConfig),
        Dispatcher:  dispatcher.New(RaftNode, dispatcherConfig),
        server:      grpc.NewServer(opts...),
        localserver: grpc.NewServer(opts...),
        RaftNode:    RaftNode,
        started:     make(chan struct{}),
        stopped:     make(chan struct{}),
    }

    return m, nil
}

其中的listeners包含监听listen-remote-api(tcp)listen-control-api(unix)的两个socket

c)m.Run()是实际运行manager的函数,连作者自己都觉得复杂(“This function is *way* too complex.”)。可以把这个函数逻辑分成下面几块:
i)如果当前manager被选为leader,就做一大堆初始化的动作,包括为schedulerallocator等分配资源,启动goroutine等等;如果不是leader,就做一大堆收尾工作,停掉goroutine,释放资源。
ii)接下来对manager.localservermanager.server做一大堆设置,主要是authenticationproxy的方面;然后二者分别监听manager.listeners中的UnixTCP socket,处理相应的数据。

(3)

        connCtx, connCancel := context.WithCancel(ctx)
        go n.initManagerConnection(connCtx, ready)

其中Node.initManagerConnection()实现如下:

func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
    opts := []grpc.DialOption{}
    insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
    opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
    // Using listen address instead of advertised address because this is a
    // local connection.
    addr := n.config.ListenControlAPI
    opts = append(opts, grpc.WithDialer(
        func(addr string, timeout time.Duration) (net.Conn, error) {
            return net.DialTimeout("unix", addr, timeout)
        }))
    conn, err := grpc.Dial(addr, opts...)
    if err != nil {
        return err
    }
    state := grpc.Idle
    for {
        s, err := conn.WaitForStateChange(ctx, state)
        if err != nil {
            n.setControlSocket(nil)
            return err
        }
        if s == grpc.Ready {
            n.setControlSocket(conn)
            if ready != nil {
                close(ready)
                ready = nil
            }
        } else if state == grpc.Shutdown {
            n.setControlSocket(nil)
        }
        state = s
    }
}

功能就是建立一个同本地listen-control-api(unix) socket的一个连接,用来监控node的状态。

(4)把当前node也加入remotes的监控列表中:

    // this happens only on initial start
        if ready != nil {
            go func(ready chan struct{}) {
                select {
                case <-ready:
                    n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
                case <-connCtx.Done():
                }
            }(ready)
            ready = nil
        }

(5)阻塞在下列代码,等待角色变化:

n.waitRole(ctx, ca.AgentRole)

 

Go语言中的条件变量

Go语言sync package提供了条件变量(condition variable)类型:

type Cond struct {
        // L is held while observing or changing the condition
        L Locker
        // contains filtered or unexported fields
}
type Cond
    func NewCond(l Locker) *Cond
    func (c *Cond) Broadcast()
    func (c *Cond) Signal()
    func (c *Cond) Wait()
type Locker

type Locker interface {
        Lock()
        Unlock()
}
A Locker represents an object that can be locked and unlocked.

NewCond()函数输入参数是一个Locker接口类型,即实现了锁功能的变量。Broadcast()函数通知所有等待在condition variablegoroutine,而Signal()函数只会通知其中的一个goroutineWait()会让goroutine阻塞在condition variable,等待条件成立。通常的做法是:

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

进入Wait()函数会解锁,离开Wait()函数会重新加锁。由于在“解锁->收到通知->重新加锁”这段逻辑中间有可能另一个同样wait()goroutine抢先一步改变了条件,导致当前goroutine的条件不再成立,所以这块要使用循环检测。参考下例:

package main

import (
    "sync"
    "time"
    "fmt"
)

func main() {
    c := sync.NewCond(&sync.Mutex{})
    var num int

    for i := 1; i <= 2; i++ {
        go func(id int) {
            fmt.Println("Enter Thread ID:", id)
            c.L.Lock()
            for num != 1 {
                fmt.Println("Enter loop: Thread ID:", id)
                c.Wait()
                fmt.Println("Exit loop: Thread ID:", id)
            }
            num++
            c.L.Unlock()
            fmt.Println("Exit Thread ID:", id)
        }(i)
    }

    time.Sleep(time.Second)
    fmt.Println("Sleep 1 second")

    num++
    c.Broadcast()
    time.Sleep(time.Second)
    fmt.Println("Program exit")
}

一次执行结果如下:

Enter Thread ID: 2
Enter loop: Thread ID: 2
Enter Thread ID: 1
Enter loop: Thread ID: 1
Sleep 1 second
Exit loop: Thread ID: 2
Exit Thread ID: 2
Exit loop: Thread ID: 1
Enter loop: Thread ID: 1
Program exit

从上面例子可以看出由于goroutine 2改变了条件,导致goroutine 1重新进入循环,即多个goroutine阻塞在一个condition variable上存在着竞争的关系。

参考资料:
Package sync
Condition Variables