Manager
结构体(定义在manager/manager.go
)包含一个*raft.Node
成员:
// Manager is the cluster manager for Swarm.
// This is the high-level object holding and initializing all the manager
// subsystems.
type Manager struct {
......
RaftNode *raft.Node
......
}
而raft.Node
(定义在manager/state/raft/raft.go
)则包含一个*events.Broadcaster
成员,用来接收改变manager role
的消息(变成leader
还是follower
):
// Node represents the Raft Node useful
// configuration.
type Node struct {
......
leadershipBroadcast *events.Broadcaster
......
}
发送改变当前manager role
的代码位于manager/state/raft/raft.go
:
// Run is the main loop for a Raft node, it goes along the state machine,
// acting on the messages received from other Raft nodes in the cluster.
//
// Before running the main loop, it first starts the raft node based on saved
// cluster state. If no saved state exists, it starts a single-node cluster.
func (n *Node) Run(ctx context.Context) error {
......
// If we cease to be the leader, we must cancel
// any proposals that are currently waiting for
// a quorum to acknowledge them. It is still
// possible for these to become committed, but
// if that happens we will apply them as any
// follower would.
if rd.SoftState != nil {
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
n.wait.cancelAll()
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
atomic.StoreUint32(&n.signalledLeadership, 0)
n.leadershipBroadcast.Write(IsFollower)
}
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
wasLeader = true
}
}
if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
// If all the entries in the log have become
// committed, broadcast our leadership status.
if n.caughtUp() {
atomic.StoreUint32(&n.signalledLeadership, 1)
n.leadershipBroadcast.Write(IsLeader)
}
}
......
}
接收消息的代码在Manager.Run()
函数(manager/manager.go
):
// Run starts all manager sub-systems and the gRPC server at the configured
// address.
// The call never returns unless an error occurs or `Stop()` is called.
func (m *Manager) Run(parent context.Context) error {
......
leadershipCh, cancel := m.RaftNode.SubscribeLeadership()
defer cancel()
go m.handleLeadershipEvents(ctx, leadershipCh)
......
go func() {
err := m.RaftNode.Run(ctx)
if err != nil {
log.G(ctx).Error(err)
m.Stop(ctx)
}
}()
......
}
Node.SubscribeLeadership()
和Manager.handleLeadershipEvents()
代码比较简单,不再赘述。