Swarmkit笔记(8)——agent.session

Agentmanager之间的通信是通过session进行的,下面是agent.session结构体定义:

// session encapsulates one round of registration with the manager. session
// starts the registration and heartbeat control cycle. Any failure will result
// in a complete shutdown of the session and it must be reestablished.
//
// All communication with the master is done through session.  Changes that
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
    agent     *Agent
    sessionID string
    session   api.Dispatcher_SessionClient
    errs      chan error
    messages  chan *api.SessionMessage
    tasks     chan *api.TasksMessage

    registered chan struct{} // closed registration
    closed     chan struct{}
}

(1)registered channel是用来通知agent已经向manager注册成功了:

func (s *session) run(ctx context.Context, delay time.Duration) {
    time.Sleep(delay) // delay before registering.

    if err := s.start(ctx); err != nil {
        select {
        case s.errs <- err:
        case <-s.closed:
        case <-ctx.Done():
        }
        return
    }

    ctx = log.WithLogger(ctx, log.G(ctx).WithField("session.id", s.sessionID))

    go runctx(ctx, s.closed, s.errs, s.heartbeat)
    go runctx(ctx, s.closed, s.errs, s.watch)
    go runctx(ctx, s.closed, s.errs, s.listen)

    close(s.registered)
}

session.run函数中,如果session.start()运行没有问题,则会在最后close registered这个channel。而在Agent.Run()中:

func (a *Agent) run(ctx context.Context) {
        .....
        session    = newSession(ctx, a, backoff) // start the initial session
        registered = session.registered
for {
        select {
            ......
        case <-registered:
            log.G(ctx).Debugln("agent: registered")
            if ready != nil {
                close(ready)
            }
            ready = nil
            registered = nil // we only care about this once per session
            backoff = 0      // reset backoff
            sessionq = a.sessionq
            ......
    }
}

一旦registeredclose<-registered这个case则会马上被执行。

(2)当session运行出现错误时,会把error发到errs channel。在Agent.Run()中:

case err := <-session.errs:
        // TODO(stevvooe): This may actually block if a session is closed
        // but no error was sent. Session.close must only be called here
        // for this to work.
        if err != nil {
            log.G(ctx).WithError(err).Error("agent: session failed")
            backoff = initialSessionFailureBackoff + 2*backoff
            if backoff > maxSessionFailureBackoff {
                backoff = maxSessionFailureBackoff
            }
        }

        if err := session.close(); err != nil {
            log.G(ctx).WithError(err).Error("agent: closing session failed")
        }
        sessionq = nil
        // if we're here before <-registered, do nothing for that event
        registered = nil

        // Bounce the connection.
        if a.config.Picker != nil {
            a.config.Picker.Reset()
        }

收到error后,会关闭这个session并做一些扫尾工作。

(3)messages channel用来接收manager发送给agent的消息,并转给Agent.run()函数进行处理:

case msg := <-session.messages:
        if err := a.handleSessionMessage(ctx, msg); err != nil {
            log.G(ctx).WithError(err).Error("session message handler failed")
        }

(4)tasks channel用来接收manager发送给agent的需要在这个node上运行的task信息,同样需要转给Agent.run()函数进行处理:

case msg := <-session.tasks:
        if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
            log.G(ctx).WithError(err).Error("task assignment failed")
        }

(5)closed channelsession.close()函数中被关闭。也就是在case err := <-session.errs:这个分支中才会执行。一旦closed channel被关闭后,会重新建立连接:

case <-session.closed:
        log.G(ctx).Debugf("agent: rebuild session")

        // select a session registration delay from backoff range.
        delay := time.Duration(rand.Int63n(int64(backoff)))
        session = newSession(ctx, a, delay)
        registered = session.registered
        sessionq = a.sessionq  

再看一下session.start()这个函数:

// start begins the session and returns the first SessionMessage.
func (s *session) start(ctx context.Context) error {
    log.G(ctx).Debugf("(*session).start")

    client := api.NewDispatcherClient(s.agent.config.Conn)

    description, err := s.agent.config.Executor.Describe(ctx)
    if err != nil {
        log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
            Errorf("node description unavailable")
        return err
    }
    // Override hostname
    if s.agent.config.Hostname != "" {
        description.Hostname = s.agent.config.Hostname
    }

    errChan := make(chan error, 1)
    var (
        msg*api.SessionMessage
        stream api.Dispatcher_SessionClient
    )
    // Note: we don't defer cancellation of this context, because the
    // streaming RPC is used after this function returned. We only cancel
    // it in the timeout case to make sure the goroutine completes.
    sessionCtx, cancelSession := context.WithCancel(ctx)

    // Need to run Session in a goroutine since there's no way to set a
    // timeout for an individual Recv call in a stream.
    go func() {
        stream, err = client.Session(sessionCtx, &api.SessionRequest{
            Description: description,
        })
        if err != nil {
            errChan <- err
            return
        }

        msg, err = stream.Recv()
        errChan <- err
    }()

    select {
    case err := <-errChan:
        if err != nil {
            return err
        }
    case <-time.After(dispatcherRPCTimeout):
        cancelSession()
        return errors.New("session initiation timed out")
    }

    s.sessionID = msg.SessionID
    s.session = stream

    return s.handleSessionMessage(ctx, msg)
}

(1)

    client := api.NewDispatcherClient(s.agent.config.Conn)

    description, err := s.agent.config.Executor.Describe(ctx)
    if err != nil {
        log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
            Errorf("node description unavailable")
        return err
    }
    // Override hostname
    if s.agent.config.Hostname != "" {
        description.Hostname = s.agent.config.Hostname
    }

而关于api.NewDispatcherClient()函数和其所返回的类型定义如下:

    type dispatcherClient struct {
        cc *grpc.ClientConn
    }

    func NewDispatcherClient(cc *grpc.ClientConn) DispatcherClient {
        return &dispatcherClient{cc}
    }

s.agent.config.Conn就是之前在Node.runAgent()函数中通过下列代码得到的和manager直接的GRPC连接:

conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))

s.agent.config.Executor.Describe()返回对当前node的描述(类型是:*api.NodeDescription)。
(2)

    errChan := make(chan error, 1)
    var (
        msg*api.SessionMessage
        stream api.Dispatcher_SessionClient
    )
    // Note: we don't defer cancellation of this context, because the
    // streaming RPC is used after this function returned. We only cancel
    // it in the timeout case to make sure the goroutine completes.
    sessionCtx, cancelSession := context.WithCancel(ctx)

    // Need to run Session in a goroutine since there's no way to set a
    // timeout for an individual Recv call in a stream.
    go func() {
        stream, err = client.Session(sessionCtx, &api.SessionRequest{
            Description: description,
        })
        if err != nil {
            errChan <- err
            return
        }

        msg, err = stream.Recv()
        errChan <- err
    }()

dispatcherClient.Session()代码如下:

func (c *dispatcherClient) Session(ctx context.Context, in *SessionRequest, opts ...grpc.CallOption) (Dispatcher_SessionClient, error) {
    stream, err := grpc.NewClientStream(ctx, &_Dispatcher_serviceDesc.Streams[0], c.cc, "/docker.swarmkit.v1.Dispatcher/Session", opts...)
    if err != nil {
        return nil, err
    }
    x := &dispatcherSessionClient{stream}
    if err := x.ClientStream.SendMsg(in); err != nil {
        return nil, err
    }
    if err := x.ClientStream.CloseSend(); err != nil {
        return nil, err
    }
    return x, nil
}

返回的是一个符合Dispatcher_SessionClient interface类型的变量:

type Dispatcher_SessionClient interface {
    Recv() (*SessionMessage, error)
    grpc.ClientStream
}

grpc.NewClientStream()函数返回的是grpc.ClientStream interface,而dispatcherSessionClient定义如下:

type dispatcherSessionClient struct {
    grpc.ClientStream
}  

为了满足Dispatcher_SessionClient interface定义,dispatcherSessionClient结构体还实现了Recv方法:

func (x *dispatcherSessionClient) Recv() (*SessionMessage, error) {
    m := new(SessionMessage)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

x.ClientStream.SendMsg()发送的是SessionRequest,而它仅包含一个NodeDescription

// SessionRequest starts a session.
type SessionRequest struct {
    Description *NodeDescription `protobuf:"bytes,1,opt,name=description" json:"description,omitempty"`
}

x.ClientStream.CloseSend()表示所有的发送操作已完成。
接下来收到manager的消息后,把err发到errChan

msg, err = stream.Recv()
errChan <- err

(3)

    select {
    case err := <-errChan:
        if err != nil {
            return err
        }
    case <-time.After(dispatcherRPCTimeout):
        cancelSession()
        return errors.New("session initiation timed out")
    }

    s.sessionID = msg.SessionID
    s.session = stream

    return s.handleSessionMessage(ctx, msg)

一开始goroutine阻塞在select,一旦收到正确的响应,就会完成session的初始化。然后继续等待manager分配任务。

一旦session.start()成功,就会启动另外3goroutine

go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)

session.heartbeat()会创建一个新的dispatcherClient变量,然后在1秒钟后发送api.HeartbeatRequest请求,manager会返回api.HeartbeatResponse,告诉agent每隔多长时间发送heartbeat,目前默认时间是5秒。

session.watch()也会新创建一个dispatcherTasksClient变量,然后发送api.TasksRequest请求,通知manager自己已经ready。接下来就阻塞在Recv()函数,等待manager发送task请求。

session.listen()复用session.session变量,阻塞在Recv()函数,等待manager发送SessionMessage,然后处理。