Agent
和manager
之间的通信是通过session
进行的,下面是agent.session
结构体定义:
// session encapsulates one round of registration with the manager. session
// starts the registration and heartbeat control cycle. Any failure will result
// in a complete shutdown of the session and it must be reestablished.
//
// All communication with the master is done through session. Changes that
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
tasks chan *api.TasksMessage
registered chan struct{} // closed registration
closed chan struct{}
}
(1)registered channel
是用来通知agent
已经向manager
注册成功了:
func (s *session) run(ctx context.Context, delay time.Duration) {
time.Sleep(delay) // delay before registering.
if err := s.start(ctx); err != nil {
select {
case s.errs <- err:
case <-s.closed:
case <-ctx.Done():
}
return
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("session.id", s.sessionID))
go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)
close(s.registered)
}
session.run
函数中,如果session.start()
运行没有问题,则会在最后close registered
这个channel
。而在Agent.Run()
中:
func (a *Agent) run(ctx context.Context) {
.....
session = newSession(ctx, a, backoff) // start the initial session
registered = session.registered
for {
select {
......
case <-registered:
log.G(ctx).Debugln("agent: registered")
if ready != nil {
close(ready)
}
ready = nil
registered = nil // we only care about this once per session
backoff = 0 // reset backoff
sessionq = a.sessionq
......
}
}
一旦registered
被close
,<-registered
这个case
则会马上被执行。
(2)当session
运行出现错误时,会把error
发到errs channel
。在Agent.Run()
中:
case err := <-session.errs:
// TODO(stevvooe): This may actually block if a session is closed
// but no error was sent. Session.close must only be called here
// for this to work.
if err != nil {
log.G(ctx).WithError(err).Error("agent: session failed")
backoff = initialSessionFailureBackoff + 2*backoff
if backoff > maxSessionFailureBackoff {
backoff = maxSessionFailureBackoff
}
}
if err := session.close(); err != nil {
log.G(ctx).WithError(err).Error("agent: closing session failed")
}
sessionq = nil
// if we're here before <-registered, do nothing for that event
registered = nil
// Bounce the connection.
if a.config.Picker != nil {
a.config.Picker.Reset()
}
收到error
后,会关闭这个session
并做一些扫尾工作。
(3)messages channel
用来接收manager
发送给agent
的消息,并转给Agent.run()
函数进行处理:
case msg := <-session.messages:
if err := a.handleSessionMessage(ctx, msg); err != nil {
log.G(ctx).WithError(err).Error("session message handler failed")
}
(4)tasks channel
用来接收manager
发送给agent
的需要在这个node
上运行的task
信息,同样需要转给Agent.run()
函数进行处理:
case msg := <-session.tasks:
if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
log.G(ctx).WithError(err).Error("task assignment failed")
}
(5)closed channel
在session.close()
函数中被关闭。也就是在case err := <-session.errs:
这个分支中才会执行。一旦closed channel
被关闭后,会重新建立连接:
case <-session.closed:
log.G(ctx).Debugf("agent: rebuild session")
// select a session registration delay from backoff range.
delay := time.Duration(rand.Int63n(int64(backoff)))
session = newSession(ctx, a, delay)
registered = session.registered
sessionq = a.sessionq
再看一下session.start()
这个函数:
// start begins the session and returns the first SessionMessage.
func (s *session) start(ctx context.Context) error {
log.G(ctx).Debugf("(*session).start")
client := api.NewDispatcherClient(s.agent.config.Conn)
description, err := s.agent.config.Executor.Describe(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
Errorf("node description unavailable")
return err
}
// Override hostname
if s.agent.config.Hostname != "" {
description.Hostname = s.agent.config.Hostname
}
errChan := make(chan error, 1)
var (
msg*api.SessionMessage
stream api.Dispatcher_SessionClient
)
// Note: we don't defer cancellation of this context, because the
// streaming RPC is used after this function returned. We only cancel
// it in the timeout case to make sure the goroutine completes.
sessionCtx, cancelSession := context.WithCancel(ctx)
// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
go func() {
stream, err = client.Session(sessionCtx, &api.SessionRequest{
Description: description,
})
if err != nil {
errChan <- err
return
}
msg, err = stream.Recv()
errChan <- err
}()
select {
case err := <-errChan:
if err != nil {
return err
}
case <-time.After(dispatcherRPCTimeout):
cancelSession()
return errors.New("session initiation timed out")
}
s.sessionID = msg.SessionID
s.session = stream
return s.handleSessionMessage(ctx, msg)
}
(1)
client := api.NewDispatcherClient(s.agent.config.Conn)
description, err := s.agent.config.Executor.Describe(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
Errorf("node description unavailable")
return err
}
// Override hostname
if s.agent.config.Hostname != "" {
description.Hostname = s.agent.config.Hostname
}
而关于api.NewDispatcherClient()
函数和其所返回的类型定义如下:
type dispatcherClient struct {
cc *grpc.ClientConn
}
func NewDispatcherClient(cc *grpc.ClientConn) DispatcherClient {
return &dispatcherClient{cc}
}
s.agent.config.Conn
就是之前在Node.runAgent()
函数中通过下列代码得到的和manager
直接的GRPC
连接:
conn, err := grpc.Dial(manager.Addr,
grpc.WithPicker(picker),
grpc.WithTransportCredentials(creds),
grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
s.agent.config.Executor.Describe()
返回对当前node
的描述(类型是:*api.NodeDescription
)。
(2)
errChan := make(chan error, 1)
var (
msg*api.SessionMessage
stream api.Dispatcher_SessionClient
)
// Note: we don't defer cancellation of this context, because the
// streaming RPC is used after this function returned. We only cancel
// it in the timeout case to make sure the goroutine completes.
sessionCtx, cancelSession := context.WithCancel(ctx)
// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
go func() {
stream, err = client.Session(sessionCtx, &api.SessionRequest{
Description: description,
})
if err != nil {
errChan <- err
return
}
msg, err = stream.Recv()
errChan <- err
}()
而dispatcherClient.Session()
代码如下:
func (c *dispatcherClient) Session(ctx context.Context, in *SessionRequest, opts ...grpc.CallOption) (Dispatcher_SessionClient, error) {
stream, err := grpc.NewClientStream(ctx, &_Dispatcher_serviceDesc.Streams[0], c.cc, "/docker.swarmkit.v1.Dispatcher/Session", opts...)
if err != nil {
return nil, err
}
x := &dispatcherSessionClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
返回的是一个符合Dispatcher_SessionClient interface
类型的变量:
type Dispatcher_SessionClient interface {
Recv() (*SessionMessage, error)
grpc.ClientStream
}
grpc.NewClientStream()
函数返回的是grpc.ClientStream interface
,而dispatcherSessionClient
定义如下:
type dispatcherSessionClient struct {
grpc.ClientStream
}
为了满足Dispatcher_SessionClient interface
定义,dispatcherSessionClient
结构体还实现了Recv
方法:
func (x *dispatcherSessionClient) Recv() (*SessionMessage, error) {
m := new(SessionMessage)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
x.ClientStream.SendMsg()
发送的是SessionRequest
,而它仅包含一个NodeDescription
:
// SessionRequest starts a session.
type SessionRequest struct {
Description *NodeDescription `protobuf:"bytes,1,opt,name=description" json:"description,omitempty"`
}
x.ClientStream.CloseSend()
表示所有的发送操作已完成。
接下来收到manager
的消息后,把err
发到errChan
:
msg, err = stream.Recv()
errChan <- err
(3)
select {
case err := <-errChan:
if err != nil {
return err
}
case <-time.After(dispatcherRPCTimeout):
cancelSession()
return errors.New("session initiation timed out")
}
s.sessionID = msg.SessionID
s.session = stream
return s.handleSessionMessage(ctx, msg)
一开始goroutine
阻塞在select
,一旦收到正确的响应,就会完成session
的初始化。然后继续等待manager
分配任务。
一旦session.start()
成功,就会启动另外3
个goroutine
:
go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)
session.heartbeat()
会创建一个新的dispatcherClient
变量,然后在1
秒钟后发送api.HeartbeatRequest
请求,manager
会返回api.HeartbeatResponse
,告诉agent
每隔多长时间发送heartbeat
,目前默认时间是5
秒。
session.watch()
也会新创建一个dispatcherTasksClient
变量,然后发送api.TasksRequest
请求,通知manager
自己已经ready
。接下来就阻塞在Recv()
函数,等待manager
发送task
请求。
session.listen()
复用session.session
变量,阻塞在Recv()
函数,等待manager
发送SessionMessage
,然后处理。