Node.runManager()
函数会启动一个manager
:
func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
for {
n.waitRole(ctx, ca.ManagerRole)
if ctx.Err() != nil {
return ctx.Err()
}
remoteAddr, _ := n.remotes.Select(n.nodeID)
m, err := manager.New(&manager.Config{
ForceNewCluster: n.config.ForceNewCluster,
ProtoAddr: map[string]string{
"tcp": n.config.ListenRemoteAPI,
"unix": n.config.ListenControlAPI,
},
AdvertiseAddr: n.config.AdvertiseRemoteAPI,
SecurityConfig: securityConfig,
ExternalCAs: n.config.ExternalCAs,
JoinRaft: remoteAddr.Addr,
StateDir: n.config.StateDir,
HeartbeatTick: n.config.HeartbeatTick,
ElectionTick: n.config.ElectionTick,
})
if err != nil {
return err
}
done := make(chan struct{})
go func() {
m.Run(context.Background()) // todo: store error
close(done)
}()
n.Lock()
n.manager = m
n.Unlock()
connCtx, connCancel := context.WithCancel(ctx)
go n.initManagerConnection(connCtx, ready)
// this happens only on initial start
if ready != nil {
go func(ready chan struct{}) {
select {
case <-ready:
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
case <-connCtx.Done():
}
}(ready)
ready = nil
}
n.waitRole(ctx, ca.AgentRole)
n.Lock()
n.manager = nil
n.Unlock()
select {
case <-done:
case <-ctx.Done():
err = ctx.Err()
m.Stop(context.Background())
<-done
}
connCancel()
if err != nil {
return err
}
}
}
(1)
n.waitRole(ctx, ca.ManagerRole)
if ctx.Err() != nil {
return ctx.Err()
}
首先runManager()
函数会阻塞在waitRole()
函数。一旦获得manager
角色,就会往下执行。
(2)
remoteAddr, _ := n.remotes.Select(n.nodeID)
m, err := manager.New(&manager.Config{
ForceNewCluster: n.config.ForceNewCluster,
ProtoAddr: map[string]string{
"tcp": n.config.ListenRemoteAPI,
"unix": n.config.ListenControlAPI,
},
AdvertiseAddr: n.config.AdvertiseRemoteAPI,
SecurityConfig: securityConfig,
ExternalCAs: n.config.ExternalCAs,
JoinRaft: remoteAddr.Addr,
StateDir: n.config.StateDir,
HeartbeatTick: n.config.HeartbeatTick,
ElectionTick: n.config.ElectionTick,
})
if err != nil {
return err
}
done := make(chan struct{})
go func() {
m.Run(context.Background()) // todo: store error
close(done)
}()
n.Lock()
n.manager = m
n.Unlock()
a)remoteAddr, _ := n.remotes.Select(n.nodeID)
作用是从当前cluster
的manager
中(当然需要排除掉当前node
)选出一个leader
,赋给remoteAddr
。如果当前node
是cluster
中的第一个manager
,则remoteAddr
就是一个“空的”值:{NodeID: "", Addr: ""}
;
b)在使用manager.New()
函数创建manager
时,要注意n.config.AdvertiseRemoteAPI
是一直为""
的。 manager.New()
最后会返回一个Manager
结构体:
func New(config *Config) (*Manager, error) {
......
m := &Manager{
config: config,
listeners: listeners,
caserver: ca.NewServer(RaftNode.MemoryStore(), config.SecurityConfig),
Dispatcher: dispatcher.New(RaftNode, dispatcherConfig),
server: grpc.NewServer(opts...),
localserver: grpc.NewServer(opts...),
RaftNode: RaftNode,
started: make(chan struct{}),
stopped: make(chan struct{}),
}
return m, nil
}
其中的listeners
包含监听listen-remote-api(tcp)
和listen-control-api(unix)
的两个socket
。
c)m.Run()
是实际运行manager
的函数,连作者自己都觉得复杂(“This function is *way* too complex.
”)。可以把这个函数逻辑分成下面几块:
i)如果当前manager
被选为leader
,就做一大堆初始化的动作,包括为scheduler
,allocator
等分配资源,启动goroutine
等等;如果不是leader
,就做一大堆收尾工作,停掉goroutine
,释放资源。
ii)接下来对manager.localserver
和manager.server
做一大堆设置,主要是authentication
和proxy
的方面;然后二者分别监听manager.listeners
中的Unix
和TCP socket
,处理相应的数据。
(3)
connCtx, connCancel := context.WithCancel(ctx)
go n.initManagerConnection(connCtx, ready)
其中Node.initManagerConnection()
实现如下:
func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
opts := []grpc.DialOption{}
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
// Using listen address instead of advertised address because this is a
// local connection.
addr := n.config.ListenControlAPI
opts = append(opts, grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return err
}
state := grpc.Idle
for {
s, err := conn.WaitForStateChange(ctx, state)
if err != nil {
n.setControlSocket(nil)
return err
}
if s == grpc.Ready {
n.setControlSocket(conn)
if ready != nil {
close(ready)
ready = nil
}
} else if state == grpc.Shutdown {
n.setControlSocket(nil)
}
state = s
}
}
功能就是建立一个同本地listen-control-api(unix) socket
的一个连接,用来监控node
的状态。
(4)把当前node
也加入remotes
的监控列表中:
// this happens only on initial start
if ready != nil {
go func(ready chan struct{}) {
select {
case <-ready:
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
case <-connCtx.Done():
}
}(ready)
ready = nil
}
(5)阻塞在下列代码,等待角色变化:
n.waitRole(ctx, ca.AgentRole)