Swarmkit笔记(14)——manager切换角色

Manager结构体(定义在manager/manager.go)包含一个*raft.Node成员:

// Manager is the cluster manager for Swarm.
// This is the high-level object holding and initializing all the manager
// subsystems.
type Manager struct {
    ......
    RaftNode   *raft.Node
    ......
}

raft.Node(定义在manager/state/raft/raft.go)则包含一个*events.Broadcaster成员,用来接收改变manager role的消息(变成leader还是follower):

// Node represents the Raft Node useful
// configuration.
type Node struct {
    ......
    leadershipBroadcast *events.Broadcaster
    ......
}   

发送改变当前manager role的代码位于manager/state/raft/raft.go

// Run is the main loop for a Raft node, it goes along the state machine,
// acting on the messages received from other Raft nodes in the cluster.
//
// Before running the main loop, it first starts the raft node based on saved
// cluster state. If no saved state exists, it starts a single-node cluster.
func (n *Node) Run(ctx context.Context) error {
    ......
            // If we cease to be the leader, we must cancel
            // any proposals that are currently waiting for
            // a quorum to acknowledge them. It is still
            // possible for these to become committed, but
            // if that happens we will apply them as any
            // follower would.
            if rd.SoftState != nil {
                if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
                    wasLeader = false
                    n.wait.cancelAll()
                    if atomic.LoadUint32(&n.signalledLeadership) == 1 {
                        atomic.StoreUint32(&n.signalledLeadership, 0)
                        n.leadershipBroadcast.Write(IsFollower)
                    }
                } else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
                    wasLeader = true
                }
            }

            if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
                // If all the entries in the log have become
                // committed, broadcast our leadership status.
                if n.caughtUp() {
                    atomic.StoreUint32(&n.signalledLeadership, 1)
                    n.leadershipBroadcast.Write(IsLeader)
                }
            }
    ......
}

接收消息的代码在Manager.Run()函数(manager/manager.go):

// Run starts all manager sub-systems and the gRPC server at the configured
// address.
// The call never returns unless an error occurs or `Stop()` is called.
func (m *Manager) Run(parent context.Context) error {
    ......
    leadershipCh, cancel := m.RaftNode.SubscribeLeadership()
    defer cancel()

    go m.handleLeadershipEvents(ctx, leadershipCh)

    ......

    go func() {
            err := m.RaftNode.Run(ctx)
            if err != nil {
                log.G(ctx).Error(err)
                m.Stop(ctx)
            }
        }()

    ......
}

Node.SubscribeLeadership()Manager.handleLeadershipEvents()代码比较简单,不再赘述。

发表评论

邮箱地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.