Swarmkit笔记(11)——manager创建处理swarmctl请求的server

manager.localserver是创建的本地Unix socket,用来等待处理swarmctl发来的命令请求(源码在manager/controlapi目录)。Manager.Run()函数里localserver相关代码如下:

baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())
......

proxyOpts := []grpc.DialOption{
    grpc.WithBackoffMaxDelay(time.Second),
    grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}

cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs

......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)

......
api.RegisterControlServer(m.localserver, localProxyControlAPI)

(1)首先看一下controlapi.Servercontrolapi.NewServer()的定义:

// Server is the Cluster API gRPC server.
type Server struct {
    store  *store.MemoryStore
    raft   *raft.Node
    rootCA *ca.RootCA
}

// NewServer creates a Cluster API server.
func NewServer(store *store.MemoryStore, raft *raft.Node, rootCA *ca.RootCA) *Server {
    return &Server{
        store:  store,
        raft:   raft,
        rootCA: rootCA,
    }
}

controlapi.NewServer()函数就是用来创建一个响应swarmctl程序发出的control命令请求的server

其中store.MemoryStore是一个很重要的结构体:

// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
    // updateLock must be held during an update transaction.
    updateLock sync.Mutex

    memDB *memdb.MemDB
    queue *watch.Queue

    proposer state.Proposer
}

watch.Queue定义如下:

// Queue is the structure used to publish events and watch for them.
type Queue struct {
    broadcast *events.Broadcaster
}
......
// Watch returns a channel which will receive all items published to the
// queue from this point, until cancel is called.
func (q *Queue) Watch() (eventq chan events.Event, cancel func()) {
    return q.CallbackWatch(nil)
}
......
// Publish adds an item to the queue.
func (q *Queue) Publish(item events.Event) {
    q.broadcast.Write(item)
}

简单地讲,就是当Server.store发生变化时,把数据更新到memDB的同时,也要发送消息到queue里,这样manager监听在相应channelgoroutine就可以收到并处理请求。

下面代码就是把当前cluster的信息填充到新创建的controlapi.Server变量里:

baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())

(2)

proxyOpts := []grpc.DialOption{
    grpc.WithBackoffMaxDelay(time.Second),
    grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}

cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs

......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)

上述代码创建出一个raftProxyControlServer类型的变量:

type raftProxyControlServer struct {
    local        ControlServer
    connSelector *raftpicker.ConnSelector
    cluster      raftpicker.RaftCluster
    ctxMods      []func(context.Context) (context.Context, error)
}

localProxyControlAPI含义是如果收到swarmctl请求的managerleaderswarmctlmanager当然位于同一台机器上),则会处理请求,否则就转发给这个clusterleader

(3)

api.RegisterControlServer(m.localserver, localProxyControlAPI)

上述代码则是把localserver所对应的Unix socketraftProxyControlServer关联起来。

发表评论

邮箱地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.