Swarmkit笔记(14)——manager切换角色

Manager结构体(定义在manager/manager.go)包含一个*raft.Node成员:

// Manager is the cluster manager for Swarm.
// This is the high-level object holding and initializing all the manager
// subsystems.
type Manager struct {
    ......
    RaftNode   *raft.Node
    ......
}

raft.Node(定义在manager/state/raft/raft.go)则包含一个*events.Broadcaster成员,用来接收改变manager role的消息(变成leader还是follower):

// Node represents the Raft Node useful
// configuration.
type Node struct {
    ......
    leadershipBroadcast *events.Broadcaster
    ......
}   

发送改变当前manager role的代码位于manager/state/raft/raft.go

// Run is the main loop for a Raft node, it goes along the state machine,
// acting on the messages received from other Raft nodes in the cluster.
//
// Before running the main loop, it first starts the raft node based on saved
// cluster state. If no saved state exists, it starts a single-node cluster.
func (n *Node) Run(ctx context.Context) error {
    ......
            // If we cease to be the leader, we must cancel
            // any proposals that are currently waiting for
            // a quorum to acknowledge them. It is still
            // possible for these to become committed, but
            // if that happens we will apply them as any
            // follower would.
            if rd.SoftState != nil {
                if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
                    wasLeader = false
                    n.wait.cancelAll()
                    if atomic.LoadUint32(&n.signalledLeadership) == 1 {
                        atomic.StoreUint32(&n.signalledLeadership, 0)
                        n.leadershipBroadcast.Write(IsFollower)
                    }
                } else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
                    wasLeader = true
                }
            }

            if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
                // If all the entries in the log have become
                // committed, broadcast our leadership status.
                if n.caughtUp() {
                    atomic.StoreUint32(&n.signalledLeadership, 1)
                    n.leadershipBroadcast.Write(IsLeader)
                }
            }
    ......
}

接收消息的代码在Manager.Run()函数(manager/manager.go):

// Run starts all manager sub-systems and the gRPC server at the configured
// address.
// The call never returns unless an error occurs or `Stop()` is called.
func (m *Manager) Run(parent context.Context) error {
    ......
    leadershipCh, cancel := m.RaftNode.SubscribeLeadership()
    defer cancel()

    go m.handleLeadershipEvents(ctx, leadershipCh)

    ......

    go func() {
            err := m.RaftNode.Run(ctx)
            if err != nil {
                log.G(ctx).Error(err)
                m.Stop(ctx)
            }
        }()

    ......
}

Node.SubscribeLeadership()Manager.handleLeadershipEvents()代码比较简单,不再赘述。

FreeBSD kernel 笔记(11)——condition variables

线程同步除了使用mutex,还可以使用conditional variables(下列内容摘自FreeBSD Device Drivers):

Condition variables synchronize the execution of two or more threads based upon the value of an object. In contrast, locks synchronize threads by controlling their access to objects.

Condition variables are used in conjunction with locks to “block” threads until a condition is true. It works like this: A thread first acquires the foo lock. Then it examines the condition. If the condition is false, it sleeps on the bar condition variable. While asleep on bar , threads relinquish foo . A thread that causes the condition to be true wakes up the threads sleeping on bar . Threads woken up in this manner reacquire foo before proceeding.

此外,使用conditional variables必然涉及到lock,以下是关于lock的规则(下列内容摘自FreeBSD Kernel Developer’s Manual):

The lock argument is a pointer to either mutex(9), rwlock(9), or sx(9) lock. A mutex(9) argument must be initialized with MTX_DEF and not MTX_SPIN. A thread must hold lock before calling cvwait(), cvwaitsig(), cvwaitunlock(), cvtimedwait(), or cvtimedwaitsig(). When a thread waits on a condition, lock is atomically released before the thread is blocked, then reacquired before the function call returns. In addition, the thread will fully drop the Giant mutex (even if recursed) while the it is suspended and will reacquire the Giant mutex before the function returns. The cvwaitunlock() function does not reacquire the lock before returning. Note that the Giant mutex may be specified as lock. However, Giant may not be used as lock for the cvwaitunlock() function. All waiters must pass the same lock in con- junction with cvp.

简而言之,即线程在调用cv_wait()等系列函数检查condition变成true时,它必须已经获得lock。在cv_wait()中,线程会先释放lock,然后阻塞在这里等待condition变成true,在从cv_wait()返回后,又重新获得lock。要注意,cv_wait_unlock()函数返回是不会重新获得lock