manager.localserver
是创建的本地Unix socket
,用来等待处理swarmctl
发来的命令请求(源码在manager/controlapi
目录)。Manager.Run()
函数里localserver
相关代码如下:
baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())
......
proxyOpts := []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}
cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs
......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)
......
api.RegisterControlServer(m.localserver, localProxyControlAPI)
(1)首先看一下controlapi.Server
和controlapi.NewServer()
的定义:
// Server is the Cluster API gRPC server.
type Server struct {
store *store.MemoryStore
raft *raft.Node
rootCA *ca.RootCA
}
// NewServer creates a Cluster API server.
func NewServer(store *store.MemoryStore, raft *raft.Node, rootCA *ca.RootCA) *Server {
return &Server{
store: store,
raft: raft,
rootCA: rootCA,
}
}
controlapi.NewServer()
函数就是用来创建一个响应swarmctl
程序发出的control
命令请求的server
。
其中store.MemoryStore
是一个很重要的结构体:
// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
// updateLock must be held during an update transaction.
updateLock sync.Mutex
memDB *memdb.MemDB
queue *watch.Queue
proposer state.Proposer
}
而watch.Queue
定义如下:
// Queue is the structure used to publish events and watch for them.
type Queue struct {
broadcast *events.Broadcaster
}
......
// Watch returns a channel which will receive all items published to the
// queue from this point, until cancel is called.
func (q *Queue) Watch() (eventq chan events.Event, cancel func()) {
return q.CallbackWatch(nil)
}
......
// Publish adds an item to the queue.
func (q *Queue) Publish(item events.Event) {
q.broadcast.Write(item)
}
简单地讲,就是当Server.store
发生变化时,把数据更新到memDB
的同时,也要发送消息到queue
里,这样manager
监听在相应channel
的goroutine
就可以收到并处理请求。
下面代码就是把当前cluster
的信息填充到新创建的controlapi.Server
变量里:
baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())
(2)
proxyOpts := []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}
cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs
......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)
上述代码创建出一个raftProxyControlServer
类型的变量:
type raftProxyControlServer struct {
local ControlServer
connSelector *raftpicker.ConnSelector
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
localProxyControlAPI
含义是如果收到swarmctl
请求的manager
是leader
(swarmctl
和manager
当然位于同一台机器上),则会处理请求,否则就转发给这个cluster
的leader
。
(3)
api.RegisterControlServer(m.localserver, localProxyControlAPI)
上述代码则是把localserver
所对应的Unix socket
和raftProxyControlServer
关联起来。