Swarmkit笔记(6)——Agent运行

Node.runAgent()函数实现如下:

func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportAuthenticator, ready chan<- struct{}) error {
    var manager api.Peer
    select {
    case <-ctx.Done():
    case manager = <-n.remotes.WaitSelect(ctx):
    }
    if ctx.Err() != nil {
        return ctx.Err()
    }
    picker := picker.NewPicker(n.remotes, manager.Addr)
    conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
    if err != nil {
        return err
    }

    agent, err := New(&Config{
        Hostname:         n.config.Hostname,
        Managers:         n.remotes,
        Executor:         n.config.Executor,
        DB:               db,
        Conn:             conn,
        Picker:           picker,
        NotifyRoleChange: n.roleChangeReq,
    })
    if err != nil {
        return err
    }
    if err := agent.Start(ctx); err != nil {
        return err
    }

    n.Lock()
    n.agent = agent
    n.Unlock()

    defer func() {
        n.Lock()
        n.agent = nil
        n.Unlock()
    }()

    go func() {
        <-agent.Ready()
        close(ready)
    }()

    // todo: manually call stop on context cancellation?

    return agent.Err(context.Background())
}

上面函数解释如下:

(1)case manager = <-n.remotes.WaitSelect(ctx):首先获得manager
(2)接下来调用grpc.Dial()去连接这个manager

    picker := picker.NewPicker(n.remotes, manager.Addr)
    conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
    if err != nil {
        return err
    }

(3)生成并运行一个Agent

    agent, err := New(&Config{
        Hostname:         n.config.Hostname,
        Managers:         n.remotes,
        Executor:         n.config.Executor,
        DB:               db,
        Conn:             conn,
        Picker:           picker,
        NotifyRoleChange: n.roleChangeReq,
    })
    if err != nil {
        return err
    }
    if err := agent.Start(ctx); err != nil {
        return err
    }

关于Agent结构体定义:

// Agent implements the primary node functionality for a member of a swarm
// cluster. The primary functionality is to run and report on the status of
// tasks assigned to the node.
type Agent struct {
    config *Config

    // The latest node object state from manager
    // for this node known to the agent.
    node *api.Node

    keys []*api.EncryptionKey

    sessionq chan sessionOperation
    worker   Worker

    started chan struct{}
    ready   chan struct{}
    stopped chan struct{} // requests shutdown
    closed  chan struct{} // only closed in run
    err     error         // read only after closed is closed
}

其中Config结构体定义:

// Config provides values for an Agent.
type Config struct {
    // Hostname the name of host for agent instance.
    Hostname string

    // Managers provides the manager backend used by the agent. It will be
    // updated with managers weights as observed by the agent.
    Managers picker.Remotes

    // Conn specifies the client connection Agent will use.
    Conn *grpc.ClientConn

    // Picker is the picker used by Conn.
    // TODO(aaronl): This is only part of the config to allow resetting the
    // GRPC connection. This should be refactored to address the coupling
    // between Conn and Picker.
    Picker *picker.Picker

    // Executor specifies the executor to use for the agent.
    Executor exec.Executor

    // DB used for task storage. Must be open for the lifetime of the agent.
    DB *bolt.DB

    // NotifyRoleChange channel receives new roles from session messages.
    NotifyRoleChange chan<- api.NodeRole
}

注释都很清楚,不必赘述。

Agent.Start()会调到Agent.Run(),实现如下:

func (a *Agent) run(ctx context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    defer close(a.closed) // full shutdown.

    ctx = log.WithLogger(ctx, log.G(ctx).WithField("module", "agent"))

    log.G(ctx).Debugf("(*Agent).run")
    defer log.G(ctx).Debugf("(*Agent).run exited")

    var (
        backoff    time.Duration
        session    = newSession(ctx, a, backoff) // start the initial session
        registered = session.registered
        ready      = a.ready // first session ready
        sessionq   chan sessionOperation
    )

    if err := a.worker.Init(ctx); err != nil {
        log.G(ctx).WithError(err).Error("worker initialization failed")
        a.err = err
        return // fatal?
    }

    // setup a reliable reporter to call back to us.
    reporter := newStatusReporter(ctx, a)
    defer reporter.Close()

    a.worker.Listen(ctx, reporter)

    for {
        select {
        case operation := <-sessionq:
            operation.response <- operation.fn(session)
        case msg := <-session.tasks:
            if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
                log.G(ctx).WithError(err).Error("task assignment failed")
            }
        case msg := <-session.messages:
            if err := a.handleSessionMessage(ctx, msg); err != nil {
                log.G(ctx).WithError(err).Error("session message handler failed")
            }
        case <-registered:
            log.G(ctx).Debugln("agent: registered")
            if ready != nil {
                close(ready)
            }
            ready = nil
            registered = nil // we only care about this once per session
            backoff = 0      // reset backoff
            sessionq = a.sessionq
        case err := <-session.errs:
            // TODO(stevvooe): This may actually block if a session is closed
            // but no error was sent. Session.close must only be called here
            // for this to work.
            if err != nil {
                log.G(ctx).WithError(err).Error("agent: session failed")
                backoff = initialSessionFailureBackoff + 2*backoff
                if backoff > maxSessionFailureBackoff {
                    backoff = maxSessionFailureBackoff
                }
            }

            if err := session.close(); err != nil {
                log.G(ctx).WithError(err).Error("agent: closing session failed")
            }
            sessionq = nil
            // if we're here before <-registered, do nothing for that event
            registered = nil

            // Bounce the connection.
            if a.config.Picker != nil {
                a.config.Picker.Reset()
            }
        case <-session.closed:
            log.G(ctx).Debugf("agent: rebuild session")

            // select a session registration delay from backoff range.
            delay := time.Duration(rand.Int63n(int64(backoff)))
            session = newSession(ctx, a, delay)
            registered = session.registered
            sessionq = a.sessionq
        case <-a.stopped:
            // TODO(stevvooe): Wait on shutdown and cleanup. May need to pump
            // this loop a few times.
            return
        case <-ctx.Done():
            if a.err == nil {
                a.err = ctx.Err()
            }

            return
        }
    }
}

其中重要的是session这个概念,通过“session = newSession(ctx, a, backoff)”这行代码将sessionAgent关联起来。

发表评论

邮箱地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.