// Manager is the cluster manager for Swarm.
// This is the high-level object holding and initializing all the manager
// subsystems.
type Manager struct {
    RaftNode   *raft.Node

raft.Node(定义在manager/state/raft/raft.go)则包含一个*events.Broadcaster成员,用来接收改变manager role的消息(变成leader还是follower):

// Node represents the Raft Node useful
// configuration.
type Node struct {
    leadershipBroadcast *events.Broadcaster

发送改变当前manager role的代码位于manager/state/raft/raft.go

// Run is the main loop for a Raft node, it goes along the state machine,
// acting on the messages received from other Raft nodes in the cluster.
// Before running the main loop, it first starts the raft node based on saved
// cluster state. If no saved state exists, it starts a single-node cluster.
func (n *Node) Run(ctx context.Context) error {
            // If we cease to be the leader, we must cancel
            // any proposals that are currently waiting for
            // a quorum to acknowledge them. It is still
            // possible for these to become committed, but
            // if that happens we will apply them as any
            // follower would.
            if rd.SoftState != nil {
                if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
                    wasLeader = false
                    if atomic.LoadUint32(&n.signalledLeadership) == 1 {
                        atomic.StoreUint32(&n.signalledLeadership, 0)
                } else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
                    wasLeader = true

            if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
                // If all the entries in the log have become
                // committed, broadcast our leadership status.
                if n.caughtUp() {
                    atomic.StoreUint32(&n.signalledLeadership, 1)


// Run starts all manager sub-systems and the gRPC server at the configured
// address.
// The call never returns unless an error occurs or `Stop()` is called.
func (m *Manager) Run(parent context.Context) error {
    leadershipCh, cancel := m.RaftNode.SubscribeLeadership()
    defer cancel()

    go m.handleLeadershipEvents(ctx, leadershipCh)


    go func() {
            err := m.RaftNode.Run(ctx)
            if err != nil {



FreeBSD kernel 笔记(11)——condition variables

线程同步除了使用mutex,还可以使用conditional variables(下列内容摘自FreeBSD Device Drivers):

Condition variables synchronize the execution of two or more threads based upon the value of an object. In contrast, locks synchronize threads by controlling their access to objects.

Condition variables are used in conjunction with locks to “block” threads until a condition is true. It works like this: A thread first acquires the foo lock. Then it examines the condition. If the condition is false, it sleeps on the bar condition variable. While asleep on bar , threads relinquish foo . A thread that causes the condition to be true wakes up the threads sleeping on bar . Threads woken up in this manner reacquire foo before proceeding.

此外,使用conditional variables必然涉及到lock,以下是关于lock的规则(下列内容摘自FreeBSD Kernel Developer’s Manual):

The lock argument is a pointer to either mutex(9), rwlock(9), or sx(9) lock. A mutex(9) argument must be initialized with MTX_DEF and not MTX_SPIN. A thread must hold lock before calling cvwait(), cvwaitsig(), cvwaitunlock(), cvtimedwait(), or cvtimedwaitsig(). When a thread waits on a condition, lock is atomically released before the thread is blocked, then reacquired before the function call returns. In addition, the thread will fully drop the Giant mutex (even if recursed) while the it is suspended and will reacquire the Giant mutex before the function returns. The cvwaitunlock() function does not reacquire the lock before returning. Note that the Giant mutex may be specified as lock. However, Giant may not be used as lock for the cvwaitunlock() function. All waiters must pass the same lock in con- junction with cvp.