FreeBSD kernel 笔记(5)——分配内存

FreeBSD kernel编程分配内存可以参考这两篇文档:MALLOC(9)CONTIGMALLOC(9)。需要注意以下几点:

(1)在中断上下文中使用malloc系列分配内存函数时,要使用M_NOWAIT标记;

(2)contigmalloc有一个boundary参数:

If the given value “boundary” is non-zero, then the set of physical pages cannot cross any physical address boundary that is a multiple of that value.

举个例子,如果boundary设置为1M,则实际分配的物理内存页面可以位于0~1M1M~2M,而不能位于1.9M~2.1M

C语言中的XOR运算符

Stackoverflow上一个回答很好地解释了XOR运算符的作用:

If you know how XOR works, and you know that ^ is XOR in C, then this should be pretty simple. You should know that XOR will flip bits where 1 is set, bits 2 and 5 of 0b00100100 are set, therefore it will flip those bits.

From an “during the test” standpoint, let’s say you need to prove this to yourself, you really don’t need to know the initial value of star to answer the question, If you know how ^ works then just throw anything in there:

 00100100
^10101010  (star's made up value)
---------
 10001110  (star's new value)

 bit position: | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0  
   |---|---|---|---|---|---|---|---
 star's new v: | 1 | 0 | 0 | 0 | 1 | 1 | 1 | 0
   |---|---|---|---|---|---|---|---
 star's old v: | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0

总结一下,就是第二个操作数用来控制第一个操作数:第二个操作数中bit1会导致第一个操作数中相应的bit发生反转,而bit0则会让第一个操作数中相应的bit不变。

Swarmkit笔记(9)——manager

Node.runManager()函数会启动一个manager

func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
    for {
        n.waitRole(ctx, ca.ManagerRole)
        if ctx.Err() != nil {
            return ctx.Err()
        }
        remoteAddr, _ := n.remotes.Select(n.nodeID)
        m, err := manager.New(&manager.Config{
            ForceNewCluster: n.config.ForceNewCluster,
            ProtoAddr: map[string]string{
                "tcp":  n.config.ListenRemoteAPI,
                "unix": n.config.ListenControlAPI,
            },
            AdvertiseAddr:  n.config.AdvertiseRemoteAPI,
            SecurityConfig: securityConfig,
            ExternalCAs:    n.config.ExternalCAs,
            JoinRaft:       remoteAddr.Addr,
            StateDir:       n.config.StateDir,
            HeartbeatTick:  n.config.HeartbeatTick,
            ElectionTick:   n.config.ElectionTick,
        })
        if err != nil {
            return err
        }
        done := make(chan struct{})
        go func() {
            m.Run(context.Background()) // todo: store error
            close(done)
        }()

        n.Lock()
        n.manager = m
        n.Unlock()

        connCtx, connCancel := context.WithCancel(ctx)
        go n.initManagerConnection(connCtx, ready)

        // this happens only on initial start
        if ready != nil {
            go func(ready chan struct{}) {
                select {
                case <-ready:
                    n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
                case <-connCtx.Done():
                }
            }(ready)
            ready = nil
        }

        n.waitRole(ctx, ca.AgentRole)

        n.Lock()
        n.manager = nil
        n.Unlock()

        select {
        case <-done:
        case <-ctx.Done():
            err = ctx.Err()
            m.Stop(context.Background())
            <-done
        }
        connCancel()

        if err != nil {
            return err
        }
    }
}

(1)

        n.waitRole(ctx, ca.ManagerRole)
        if ctx.Err() != nil {
            return ctx.Err()
        }

首先runManager()函数会阻塞在waitRole()函数。一旦获得manager角色,就会往下执行。

(2)

        remoteAddr, _ := n.remotes.Select(n.nodeID)
        m, err := manager.New(&manager.Config{
            ForceNewCluster: n.config.ForceNewCluster,
            ProtoAddr: map[string]string{
                "tcp":  n.config.ListenRemoteAPI,
                "unix": n.config.ListenControlAPI,
            },
            AdvertiseAddr:  n.config.AdvertiseRemoteAPI,
            SecurityConfig: securityConfig,
            ExternalCAs:    n.config.ExternalCAs,
            JoinRaft:       remoteAddr.Addr,
            StateDir:       n.config.StateDir,
            HeartbeatTick:  n.config.HeartbeatTick,
            ElectionTick:   n.config.ElectionTick,
        })
        if err != nil {
            return err
        }
        done := make(chan struct{})
        go func() {
            m.Run(context.Background()) // todo: store error
            close(done)
        }()

        n.Lock()
        n.manager = m
        n.Unlock()

a)remoteAddr, _ := n.remotes.Select(n.nodeID)作用是从当前clustermanager中(当然需要排除掉当前node)选出一个leader,赋给remoteAddr。如果当前nodecluster中的第一个manager,则remoteAddr就是一个“空的”值:{NodeID: "", Addr: ""}
b)在使用manager.New()函数创建manager时,要注意n.config.AdvertiseRemoteAPI是一直为""的。 manager.New()最后会返回一个Manager结构体:

func New(config *Config) (*Manager, error) {
    ......
    m := &Manager{
        config:      config,
        listeners:   listeners,
        caserver:    ca.NewServer(RaftNode.MemoryStore(), config.SecurityConfig),
        Dispatcher:  dispatcher.New(RaftNode, dispatcherConfig),
        server:      grpc.NewServer(opts...),
        localserver: grpc.NewServer(opts...),
        RaftNode:    RaftNode,
        started:     make(chan struct{}),
        stopped:     make(chan struct{}),
    }

    return m, nil
}

其中的listeners包含监听listen-remote-api(tcp)listen-control-api(unix)的两个socket

c)m.Run()是实际运行manager的函数,连作者自己都觉得复杂(“This function is *way* too complex.”)。可以把这个函数逻辑分成下面几块:
i)如果当前manager被选为leader,就做一大堆初始化的动作,包括为schedulerallocator等分配资源,启动goroutine等等;如果不是leader,就做一大堆收尾工作,停掉goroutine,释放资源。
ii)接下来对manager.localservermanager.server做一大堆设置,主要是authenticationproxy的方面;然后二者分别监听manager.listeners中的UnixTCP socket,处理相应的数据。

(3)

        connCtx, connCancel := context.WithCancel(ctx)
        go n.initManagerConnection(connCtx, ready)

其中Node.initManagerConnection()实现如下:

func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
    opts := []grpc.DialOption{}
    insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
    opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
    // Using listen address instead of advertised address because this is a
    // local connection.
    addr := n.config.ListenControlAPI
    opts = append(opts, grpc.WithDialer(
        func(addr string, timeout time.Duration) (net.Conn, error) {
            return net.DialTimeout("unix", addr, timeout)
        }))
    conn, err := grpc.Dial(addr, opts...)
    if err != nil {
        return err
    }
    state := grpc.Idle
    for {
        s, err := conn.WaitForStateChange(ctx, state)
        if err != nil {
            n.setControlSocket(nil)
            return err
        }
        if s == grpc.Ready {
            n.setControlSocket(conn)
            if ready != nil {
                close(ready)
                ready = nil
            }
        } else if state == grpc.Shutdown {
            n.setControlSocket(nil)
        }
        state = s
    }
}

功能就是建立一个同本地listen-control-api(unix) socket的一个连接,用来监控node的状态。

(4)把当前node也加入remotes的监控列表中:

    // this happens only on initial start
        if ready != nil {
            go func(ready chan struct{}) {
                select {
                case <-ready:
                    n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
                case <-connCtx.Done():
                }
            }(ready)
            ready = nil
        }

(5)阻塞在下列代码,等待角色变化:

n.waitRole(ctx, ca.AgentRole)

 

Go语言中的条件变量

Go语言sync package提供了条件变量(condition variable)类型:

type Cond struct {
        // L is held while observing or changing the condition
        L Locker
        // contains filtered or unexported fields
}
type Cond
    func NewCond(l Locker) *Cond
    func (c *Cond) Broadcast()
    func (c *Cond) Signal()
    func (c *Cond) Wait()
type Locker

type Locker interface {
        Lock()
        Unlock()
}
A Locker represents an object that can be locked and unlocked.

NewCond()函数输入参数是一个Locker接口类型,即实现了锁功能的变量。Broadcast()函数通知所有等待在condition variablegoroutine,而Signal()函数只会通知其中的一个goroutineWait()会让goroutine阻塞在condition variable,等待条件成立。通常的做法是:

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

进入Wait()函数会解锁,离开Wait()函数会重新加锁。由于在“解锁->收到通知->重新加锁”这段逻辑中间有可能另一个同样wait()goroutine抢先一步改变了条件,导致当前goroutine的条件不再成立,所以这块要使用循环检测。参考下例:

package main

import (
    "sync"
    "time"
    "fmt"
)

func main() {
    c := sync.NewCond(&sync.Mutex{})
    var num int

    for i := 1; i <= 2; i++ {
        go func(id int) {
            fmt.Println("Enter Thread ID:", id)
            c.L.Lock()
            for num != 1 {
                fmt.Println("Enter loop: Thread ID:", id)
                c.Wait()
                fmt.Println("Exit loop: Thread ID:", id)
            }
            num++
            c.L.Unlock()
            fmt.Println("Exit Thread ID:", id)
        }(i)
    }

    time.Sleep(time.Second)
    fmt.Println("Sleep 1 second")

    num++
    c.Broadcast()
    time.Sleep(time.Second)
    fmt.Println("Program exit")
}

一次执行结果如下:

Enter Thread ID: 2
Enter loop: Thread ID: 2
Enter Thread ID: 1
Enter loop: Thread ID: 1
Sleep 1 second
Exit loop: Thread ID: 2
Exit Thread ID: 2
Exit loop: Thread ID: 1
Enter loop: Thread ID: 1
Program exit

从上面例子可以看出由于goroutine 2改变了条件,导致goroutine 1重新进入循环,即多个goroutine阻塞在一个condition variable上存在着竞争的关系。

参考资料:
Package sync
Condition Variables

Swarmkit笔记(8)——agent.session

Agentmanager之间的通信是通过session进行的,下面是agent.session结构体定义:

// session encapsulates one round of registration with the manager. session
// starts the registration and heartbeat control cycle. Any failure will result
// in a complete shutdown of the session and it must be reestablished.
//
// All communication with the master is done through session.  Changes that
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
    agent     *Agent
    sessionID string
    session   api.Dispatcher_SessionClient
    errs      chan error
    messages  chan *api.SessionMessage
    tasks     chan *api.TasksMessage

    registered chan struct{} // closed registration
    closed     chan struct{}
}

(1)registered channel是用来通知agent已经向manager注册成功了:

func (s *session) run(ctx context.Context, delay time.Duration) {
    time.Sleep(delay) // delay before registering.

    if err := s.start(ctx); err != nil {
        select {
        case s.errs <- err:
        case <-s.closed:
        case <-ctx.Done():
        }
        return
    }

    ctx = log.WithLogger(ctx, log.G(ctx).WithField("session.id", s.sessionID))

    go runctx(ctx, s.closed, s.errs, s.heartbeat)
    go runctx(ctx, s.closed, s.errs, s.watch)
    go runctx(ctx, s.closed, s.errs, s.listen)

    close(s.registered)
}

session.run函数中,如果session.start()运行没有问题,则会在最后close registered这个channel。而在Agent.Run()中:

func (a *Agent) run(ctx context.Context) {
        .....
        session    = newSession(ctx, a, backoff) // start the initial session
        registered = session.registered
for {
        select {
            ......
        case <-registered:
            log.G(ctx).Debugln("agent: registered")
            if ready != nil {
                close(ready)
            }
            ready = nil
            registered = nil // we only care about this once per session
            backoff = 0      // reset backoff
            sessionq = a.sessionq
            ......
    }
}

一旦registeredclose<-registered这个case则会马上被执行。

(2)当session运行出现错误时,会把error发到errs channel。在Agent.Run()中:

case err := <-session.errs:
        // TODO(stevvooe): This may actually block if a session is closed
        // but no error was sent. Session.close must only be called here
        // for this to work.
        if err != nil {
            log.G(ctx).WithError(err).Error("agent: session failed")
            backoff = initialSessionFailureBackoff + 2*backoff
            if backoff > maxSessionFailureBackoff {
                backoff = maxSessionFailureBackoff
            }
        }

        if err := session.close(); err != nil {
            log.G(ctx).WithError(err).Error("agent: closing session failed")
        }
        sessionq = nil
        // if we're here before <-registered, do nothing for that event
        registered = nil

        // Bounce the connection.
        if a.config.Picker != nil {
            a.config.Picker.Reset()
        }

收到error后,会关闭这个session并做一些扫尾工作。

(3)messages channel用来接收manager发送给agent的消息,并转给Agent.run()函数进行处理:

case msg := <-session.messages:
        if err := a.handleSessionMessage(ctx, msg); err != nil {
            log.G(ctx).WithError(err).Error("session message handler failed")
        }

(4)tasks channel用来接收manager发送给agent的需要在这个node上运行的task信息,同样需要转给Agent.run()函数进行处理:

case msg := <-session.tasks:
        if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
            log.G(ctx).WithError(err).Error("task assignment failed")
        }

(5)closed channelsession.close()函数中被关闭。也就是在case err := <-session.errs:这个分支中才会执行。一旦closed channel被关闭后,会重新建立连接:

case <-session.closed:
        log.G(ctx).Debugf("agent: rebuild session")

        // select a session registration delay from backoff range.
        delay := time.Duration(rand.Int63n(int64(backoff)))
        session = newSession(ctx, a, delay)
        registered = session.registered
        sessionq = a.sessionq  

再看一下session.start()这个函数:

// start begins the session and returns the first SessionMessage.
func (s *session) start(ctx context.Context) error {
    log.G(ctx).Debugf("(*session).start")

    client := api.NewDispatcherClient(s.agent.config.Conn)

    description, err := s.agent.config.Executor.Describe(ctx)
    if err != nil {
        log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
            Errorf("node description unavailable")
        return err
    }
    // Override hostname
    if s.agent.config.Hostname != "" {
        description.Hostname = s.agent.config.Hostname
    }

    errChan := make(chan error, 1)
    var (
        msg*api.SessionMessage
        stream api.Dispatcher_SessionClient
    )
    // Note: we don't defer cancellation of this context, because the
    // streaming RPC is used after this function returned. We only cancel
    // it in the timeout case to make sure the goroutine completes.
    sessionCtx, cancelSession := context.WithCancel(ctx)

    // Need to run Session in a goroutine since there's no way to set a
    // timeout for an individual Recv call in a stream.
    go func() {
        stream, err = client.Session(sessionCtx, &api.SessionRequest{
            Description: description,
        })
        if err != nil {
            errChan <- err
            return
        }

        msg, err = stream.Recv()
        errChan <- err
    }()

    select {
    case err := <-errChan:
        if err != nil {
            return err
        }
    case <-time.After(dispatcherRPCTimeout):
        cancelSession()
        return errors.New("session initiation timed out")
    }

    s.sessionID = msg.SessionID
    s.session = stream

    return s.handleSessionMessage(ctx, msg)
}

(1)

    client := api.NewDispatcherClient(s.agent.config.Conn)

    description, err := s.agent.config.Executor.Describe(ctx)
    if err != nil {
        log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
            Errorf("node description unavailable")
        return err
    }
    // Override hostname
    if s.agent.config.Hostname != "" {
        description.Hostname = s.agent.config.Hostname
    }

而关于api.NewDispatcherClient()函数和其所返回的类型定义如下:

    type dispatcherClient struct {
        cc *grpc.ClientConn
    }

    func NewDispatcherClient(cc *grpc.ClientConn) DispatcherClient {
        return &dispatcherClient{cc}
    }

s.agent.config.Conn就是之前在Node.runAgent()函数中通过下列代码得到的和manager直接的GRPC连接:

conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))

s.agent.config.Executor.Describe()返回对当前node的描述(类型是:*api.NodeDescription)。
(2)

    errChan := make(chan error, 1)
    var (
        msg*api.SessionMessage
        stream api.Dispatcher_SessionClient
    )
    // Note: we don't defer cancellation of this context, because the
    // streaming RPC is used after this function returned. We only cancel
    // it in the timeout case to make sure the goroutine completes.
    sessionCtx, cancelSession := context.WithCancel(ctx)

    // Need to run Session in a goroutine since there's no way to set a
    // timeout for an individual Recv call in a stream.
    go func() {
        stream, err = client.Session(sessionCtx, &api.SessionRequest{
            Description: description,
        })
        if err != nil {
            errChan <- err
            return
        }

        msg, err = stream.Recv()
        errChan <- err
    }()

dispatcherClient.Session()代码如下:

func (c *dispatcherClient) Session(ctx context.Context, in *SessionRequest, opts ...grpc.CallOption) (Dispatcher_SessionClient, error) {
    stream, err := grpc.NewClientStream(ctx, &_Dispatcher_serviceDesc.Streams[0], c.cc, "/docker.swarmkit.v1.Dispatcher/Session", opts...)
    if err != nil {
        return nil, err
    }
    x := &dispatcherSessionClient{stream}
    if err := x.ClientStream.SendMsg(in); err != nil {
        return nil, err
    }
    if err := x.ClientStream.CloseSend(); err != nil {
        return nil, err
    }
    return x, nil
}

返回的是一个符合Dispatcher_SessionClient interface类型的变量:

type Dispatcher_SessionClient interface {
    Recv() (*SessionMessage, error)
    grpc.ClientStream
}

grpc.NewClientStream()函数返回的是grpc.ClientStream interface,而dispatcherSessionClient定义如下:

type dispatcherSessionClient struct {
    grpc.ClientStream
}  

为了满足Dispatcher_SessionClient interface定义,dispatcherSessionClient结构体还实现了Recv方法:

func (x *dispatcherSessionClient) Recv() (*SessionMessage, error) {
    m := new(SessionMessage)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

x.ClientStream.SendMsg()发送的是SessionRequest,而它仅包含一个NodeDescription

// SessionRequest starts a session.
type SessionRequest struct {
    Description *NodeDescription `protobuf:"bytes,1,opt,name=description" json:"description,omitempty"`
}

x.ClientStream.CloseSend()表示所有的发送操作已完成。
接下来收到manager的消息后,把err发到errChan

msg, err = stream.Recv()
errChan <- err

(3)

    select {
    case err := <-errChan:
        if err != nil {
            return err
        }
    case <-time.After(dispatcherRPCTimeout):
        cancelSession()
        return errors.New("session initiation timed out")
    }

    s.sessionID = msg.SessionID
    s.session = stream

    return s.handleSessionMessage(ctx, msg)

一开始goroutine阻塞在select,一旦收到正确的响应,就会完成session的初始化。然后继续等待manager分配任务。

一旦session.start()成功,就会启动另外3goroutine

go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)

session.heartbeat()会创建一个新的dispatcherClient变量,然后在1秒钟后发送api.HeartbeatRequest请求,manager会返回api.HeartbeatResponse,告诉agent每隔多长时间发送heartbeat,目前默认时间是5秒。

session.watch()也会新创建一个dispatcherTasksClient变量,然后发送api.TasksRequest请求,通知manager自己已经ready。接下来就阻塞在Recv()函数,等待manager发送task请求。

session.listen()复用session.session变量,阻塞在Recv()函数,等待manager发送SessionMessage,然后处理。

Swarmkit笔记(7)——exec.Executor interface

exec.Executor interface定义(位于agent/exec/executor.go):

// Executor provides controllers for tasks.
type Executor interface {
    // Describe returns the underlying node description.
    Describe(ctx context.Context) (*api.NodeDescription, error)

    // Configure uses the node object state to propagate node
    // state to the underlying executor.
    Configure(ctx context.Context, node *api.Node) error

    // Controller provides a controller for the given task.
    Controller(t *api.Task) (Controller, error)

    // SetNetworkBootstrapKeys passes the symmetric keys from the
    // manager to the executor.
    SetNetworkBootstrapKeys([]*api.EncryptionKey) error
}

container Package实现了executor结构体(位于agent/exec/container/executor.go

import engineapi "github.com/docker/engine-api/client"
type executor struct {
    client engineapi.APIClient
}

里面只有一个成员:一个Docker APIClientexecutor结构体实际只实现了下面两个方法:

// Describe returns the underlying node description from the docker client.
func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
    info, err := e.client.Info(ctx)
    if err != nil {
        return nil, err
    }

    plugins := map[api.PluginDescription]struct{}{}
    addPlugins := func(typ string, names []string) {
        for _, name := range names {
            plugins[api.PluginDescription{
                Type: typ,
                Name: name,
            }] = struct{}{}
        }
    }

    addPlugins("Volume", info.Plugins.Volume)
    // Add builtin driver "overlay" (the only builtin multi-host driver) to
    // the plugin list by default.
    addPlugins("Network", append([]string{"overlay"}, info.Plugins.Network...))
    addPlugins("Authorization", info.Plugins.Authorization)

    pluginFields := make([]api.PluginDescription, 0, len(plugins))
    for k := range plugins {
        pluginFields = append(pluginFields, k)
    }
    sort.Sort(sortedPlugins(pluginFields))

    // parse []string labels into a map[string]string
    labels := map[string]string{}
    for _, l := range info.Labels {
        stringSlice := strings.SplitN(l, "=", 2)
        // this will take the last value in the list for a given key
        // ideally, one shouldn't assign multiple values to the same key
        if len(stringSlice) > 1 {
            labels[stringSlice[0]] = stringSlice[1]
        }
    }

    description := &api.NodeDescription{
        Hostname: info.Name,
        Platform: &api.Platform{
            Architecture: info.Architecture,
            OS:           info.OSType,
        },
        Engine: &api.EngineDescription{
            EngineVersion: info.ServerVersion,
            Labels:        labels,
            Plugins:       pluginFields,
        },
        Resources: &api.Resources{
            NanoCPUs:    int64(info.NCPU) * 1e9,
            MemoryBytes: info.MemTotal,
        },
    }

    return description, nil
}

// Controller returns a docker container controller.
func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
    ctlr, err := newController(e.client, t)
    if err != nil {
        return nil, err
    }

    return ctlr, nil
}

Describe()方法返回当前Docker engine的资源配置信息,而Controller()则返回一个container.controller结构体。

其中关于client的初始化位于:

        client, err := engineapi.NewClient(engineAddr, "", nil, nil)
        if err != nil {
            return err
        }

        executor := container.NewExecutor(client)

如果没有对engineAddr做特殊设置,就会使用其默认值:unix:///var/run/docker.sockclient值默认如下:

(dlv) p client
*github.com/docker/swarmkit/vendor/github.com/docker/engine-api/client.Client {
        proto: "unix",
        addr: "/var/run/docker.sock",
        basePath: "",
        transport: (unreadable interface type "*transport.apiTransport" not found for 0xc8202ecbd8: no type entry found, use 'types' for a list of valid types),
        version: "",
        customHTTPHeaders: map[string]string [],}

Swarmkit笔记(6)——Agent运行

Node.runAgent()函数实现如下:

func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportAuthenticator, ready chan<- struct{}) error {
    var manager api.Peer
    select {
    case <-ctx.Done():
    case manager = <-n.remotes.WaitSelect(ctx):
    }
    if ctx.Err() != nil {
        return ctx.Err()
    }
    picker := picker.NewPicker(n.remotes, manager.Addr)
    conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
    if err != nil {
        return err
    }

    agent, err := New(&Config{
        Hostname:         n.config.Hostname,
        Managers:         n.remotes,
        Executor:         n.config.Executor,
        DB:               db,
        Conn:             conn,
        Picker:           picker,
        NotifyRoleChange: n.roleChangeReq,
    })
    if err != nil {
        return err
    }
    if err := agent.Start(ctx); err != nil {
        return err
    }

    n.Lock()
    n.agent = agent
    n.Unlock()

    defer func() {
        n.Lock()
        n.agent = nil
        n.Unlock()
    }()

    go func() {
        <-agent.Ready()
        close(ready)
    }()

    // todo: manually call stop on context cancellation?

    return agent.Err(context.Background())
}

上面函数解释如下:

(1)case manager = <-n.remotes.WaitSelect(ctx):首先获得manager
(2)接下来调用grpc.Dial()去连接这个manager

    picker := picker.NewPicker(n.remotes, manager.Addr)
    conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
    if err != nil {
        return err
    }

(3)生成并运行一个Agent

    agent, err := New(&Config{
        Hostname:         n.config.Hostname,
        Managers:         n.remotes,
        Executor:         n.config.Executor,
        DB:               db,
        Conn:             conn,
        Picker:           picker,
        NotifyRoleChange: n.roleChangeReq,
    })
    if err != nil {
        return err
    }
    if err := agent.Start(ctx); err != nil {
        return err
    }

关于Agent结构体定义:

// Agent implements the primary node functionality for a member of a swarm
// cluster. The primary functionality is to run and report on the status of
// tasks assigned to the node.
type Agent struct {
    config *Config

    // The latest node object state from manager
    // for this node known to the agent.
    node *api.Node

    keys []*api.EncryptionKey

    sessionq chan sessionOperation
    worker   Worker

    started chan struct{}
    ready   chan struct{}
    stopped chan struct{} // requests shutdown
    closed  chan struct{} // only closed in run
    err     error         // read only after closed is closed
}

其中Config结构体定义:

// Config provides values for an Agent.
type Config struct {
    // Hostname the name of host for agent instance.
    Hostname string

    // Managers provides the manager backend used by the agent. It will be
    // updated with managers weights as observed by the agent.
    Managers picker.Remotes

    // Conn specifies the client connection Agent will use.
    Conn *grpc.ClientConn

    // Picker is the picker used by Conn.
    // TODO(aaronl): This is only part of the config to allow resetting the
    // GRPC connection. This should be refactored to address the coupling
    // between Conn and Picker.
    Picker *picker.Picker

    // Executor specifies the executor to use for the agent.
    Executor exec.Executor

    // DB used for task storage. Must be open for the lifetime of the agent.
    DB *bolt.DB

    // NotifyRoleChange channel receives new roles from session messages.
    NotifyRoleChange chan<- api.NodeRole
}

注释都很清楚,不必赘述。

Agent.Start()会调到Agent.Run(),实现如下:

func (a *Agent) run(ctx context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    defer close(a.closed) // full shutdown.

    ctx = log.WithLogger(ctx, log.G(ctx).WithField("module", "agent"))

    log.G(ctx).Debugf("(*Agent).run")
    defer log.G(ctx).Debugf("(*Agent).run exited")

    var (
        backoff    time.Duration
        session    = newSession(ctx, a, backoff) // start the initial session
        registered = session.registered
        ready      = a.ready // first session ready
        sessionq   chan sessionOperation
    )

    if err := a.worker.Init(ctx); err != nil {
        log.G(ctx).WithError(err).Error("worker initialization failed")
        a.err = err
        return // fatal?
    }

    // setup a reliable reporter to call back to us.
    reporter := newStatusReporter(ctx, a)
    defer reporter.Close()

    a.worker.Listen(ctx, reporter)

    for {
        select {
        case operation := <-sessionq:
            operation.response <- operation.fn(session)
        case msg := <-session.tasks:
            if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
                log.G(ctx).WithError(err).Error("task assignment failed")
            }
        case msg := <-session.messages:
            if err := a.handleSessionMessage(ctx, msg); err != nil {
                log.G(ctx).WithError(err).Error("session message handler failed")
            }
        case <-registered:
            log.G(ctx).Debugln("agent: registered")
            if ready != nil {
                close(ready)
            }
            ready = nil
            registered = nil // we only care about this once per session
            backoff = 0      // reset backoff
            sessionq = a.sessionq
        case err := <-session.errs:
            // TODO(stevvooe): This may actually block if a session is closed
            // but no error was sent. Session.close must only be called here
            // for this to work.
            if err != nil {
                log.G(ctx).WithError(err).Error("agent: session failed")
                backoff = initialSessionFailureBackoff + 2*backoff
                if backoff > maxSessionFailureBackoff {
                    backoff = maxSessionFailureBackoff
                }
            }

            if err := session.close(); err != nil {
                log.G(ctx).WithError(err).Error("agent: closing session failed")
            }
            sessionq = nil
            // if we're here before <-registered, do nothing for that event
            registered = nil

            // Bounce the connection.
            if a.config.Picker != nil {
                a.config.Picker.Reset()
            }
        case <-session.closed:
            log.G(ctx).Debugf("agent: rebuild session")

            // select a session registration delay from backoff range.
            delay := time.Duration(rand.Int63n(int64(backoff)))
            session = newSession(ctx, a, delay)
            registered = session.registered
            sessionq = a.sessionq
        case <-a.stopped:
            // TODO(stevvooe): Wait on shutdown and cleanup. May need to pump
            // this loop a few times.
            return
        case <-ctx.Done():
            if a.err == nil {
                a.err = ctx.Err()
            }

            return
        }
    }
}

其中重要的是session这个概念,通过“session = newSession(ctx, a, backoff)”这行代码将sessionAgent关联起来。

FreeBSD kernel 笔记(4)——UIO

UIO相关的结构体和函数定义:

 #include <sys/types.h>
 #include <sys/uio.h>

 struct uio {
     struct  iovec *uio_iov;         /* scatter/gather list */
     int     uio_iovcnt;         /* length of scatter/gather list */
     off_t   uio_offset;         /* offset in target object */
     ssize_t uio_resid;          /* remaining bytes to copy */
     enum    uio_seg uio_segflg;     /* address space */
     enum    uio_rw uio_rw;      /* operation */
     struct  thread *uio_td;         /* owner */
 };

 int
 uiomove(void *buf, int howmuch, struct uio *uiop);

 int
 uiomove_nofault(void *buf, int howmuch, struct uio *uiop);

关于uio结构体需要注意的是:如果uio_iovcnt不为1,可以把uio_iov所指向的struct iovec看成一个连接起来的大bufferuio_offset指向这个bufferoffest,而uio_resid表明还有多少字节需要copy。在执行read操作时,uio_offset表明已经填充的buffer大小,而uio_resid表明buffer剩余的空间。可以参考这个程序

uiomoveuiomove_nofault本质上调用的都是uiomove_faultflag函数:

static int
uiomove_faultflag(void *cp, int n, struct uio *uio, int nofault)
{
    struct thread *td;
    struct iovec *iov;
    size_t cnt;
    int error, newflags, save;

    td = curthread;
    error = 0;

    KASSERT(uio->uio_rw == UIO_READ || uio->uio_rw == UIO_WRITE,
    ("uiomove: mode"));
    KASSERT(uio->uio_segflg != UIO_USERSPACE || uio->uio_td == td,
    ("uiomove proc"));
    if (!nofault)
        WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL,
        "Calling uiomove()");

    /* XXX does it make a sense to set TDP_DEADLKTREAT for UIO_SYSSPACE ? */
    newflags = TDP_DEADLKTREAT;
    if (uio->uio_segflg == UIO_USERSPACE && nofault) {
        /*
         * Fail if a non-spurious page fault occurs.
         */
        newflags |= TDP_NOFAULTING | TDP_RESETSPUR;
    }
    save = curthread_pflags_set(newflags);

    while (n > 0 && uio->uio_resid) {
        iov = uio->uio_iov;
        cnt = iov->iov_len;
        if (cnt == 0) {
            uio->uio_iov++;
            uio->uio_iovcnt--;
            continue;
        }
        if (cnt > n)
            cnt = n;

        switch (uio->uio_segflg) {

        case UIO_USERSPACE:
            maybe_yield();
            if (uio->uio_rw == UIO_READ)
                error = copyout(cp, iov->iov_base, cnt);
            else
                error = copyin(iov->iov_base, cp, cnt);
            if (error)
                goto out;
            break;

        case UIO_SYSSPACE:
            if (uio->uio_rw == UIO_READ)
                bcopy(cp, iov->iov_base, cnt);
            else
                bcopy(iov->iov_base, cp, cnt);
            break;
        case UIO_NOCOPY:
            break;
        }
        iov->iov_base = (char *)iov->iov_base + cnt;
        iov->iov_len -= cnt;
        uio->uio_resid -= cnt;
        uio->uio_offset += cnt;
        cp = (char *)cp + cnt;
        n -= cnt;
    }
out:
    curthread_pflags_restore(save);
    return (error);
}

可以看到这个函数会对传入的uio结构体的内容进行修改。

关于uiomove_nofault()函数,参考如下定义:

The function uiomovenofault() requires that the buffer and I/O vectors be accessible without incurring a page fault. The source and destination addresses must be physically mapped for read and write access, respec- tively, and neither the source nor destination addresses may be pageable. Thus, the function uiomovenofault() can be called from contexts where acquiring virtual memory system locks or sleeping are prohibited.

参考资料:
UIO

Swarmkit笔记(5)——Node.Run()函数

Swarmd程序的精髓就是Node.Run()函数。刨除前面一大堆CA验证的相关代码,下面是实际执行manageragent的部分。

......
managerReady := make(chan struct{})
agentReady := make(chan struct{})
var managerErr error
var agentErr error
var wg sync.WaitGroup
wg.Add(2)
go func() {
    managerErr = n.runManager(ctx, securityConfig, managerReady) // store err and loop
    wg.Done()
    cancel()
}()
go func() {
    agentErr = n.runAgent(ctx, db, securityConfig.ClientTLSCreds, agentReady)
    wg.Done()
    cancel()
}()
......

如果node的角色是agent,则runManager goroutine就会阻塞在Node.waitRole()这里:

func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
    for {
        n.waitRole(ctx, ca.ManagerRole)
        ......
    }
}

因此只有Node.runAgent()这个goroutine可以顺畅执行。

如果node的角色是manager,则runManagerrunAgent这两个goroutine都会运行,即manager本身也是一个agent

Swarmkit笔记(4)——swarmd命令选项

Swarmd程序支持的命令选项:

func init() {
    mainCmd.Flags().BoolP("version", "v", false, "Display the version and exit")
    mainCmd.Flags().StringP("log-level", "l", "info", "Log level (options \"debug\", \"info\", \"warn\", \"error\", \"fatal\", \"panic\")")
    mainCmd.Flags().StringP("state-dir", "d", "./swarmkitstate", "State directory")
    mainCmd.Flags().StringP("join-token", "", "", "Specifies the secret token required to join the cluster")
    mainCmd.Flags().String("engine-addr", "unix:///var/run/docker.sock", "Address of engine instance of agent.")
    mainCmd.Flags().String("hostname", "", "Override reported agent hostname")
    mainCmd.Flags().String("listen-remote-api", "0.0.0.0:4242", "Listen address for remote API")
    mainCmd.Flags().String("listen-control-api", "./swarmkitstate/swarmd.sock", "Listen socket for control API")
    mainCmd.Flags().String("listen-debug", "", "Bind the Go debug server on the provided address")
    mainCmd.Flags().String("join-addr", "", "Join cluster with a node at this address")
    mainCmd.Flags().Bool("force-new-cluster", false, "Force the creation of a new cluster from data directory")
    mainCmd.Flags().Uint32("heartbeat-tick", 1, "Defines the heartbeat interval (in seconds) for raft member health-check")
    mainCmd.Flags().Uint32("election-tick", 3, "Defines the amount of ticks (in seconds) needed without a Leader to trigger a new election")
    mainCmd.Flags().Var(&externalCAOpt, "external-ca", "Specifications of one or more certificate signing endpoints")
}

(1)versionlog-levelhostname最简单,不必细说。
(2)state-dir目录存储远端manager以及CA认证等相关信息:

# ls -alt
total 24
drwx------ 5 root root 4096 Jul 29 02:40 .
drwx------ 4 root root 4096 Jul 29 02:40 raft
-rw------- 1 root root   63 Jul 29 02:40 state.json
drwxr-xr-x 2 root root 4096 Jul 29 02:40 worker
drwxr-xr-x 2 root root 4096 Jul 29 02:40 certificates
drwxr-xr-x 3 root root 4096 Jul 29 02:40 ..

(3)join-tokennode用来加入某个clustertoken,在第一次认证请求时会被用到。
(4)engine-addr指定实际用来执行executorengine位置,默认是使用本机Docker
(5)listen-remote-api指定监听一个tcp port,用来接收和处理其它node的访问请求。
(6)listen-control-api指定一个Unix socket,用来接收和处理swarmctl程序的访问请求。
(7)listen-debug指定监听一个用来debug程序的端口。
(8)join-addr指定要加入的cluster的一个node地址,通过连接这个node来加入这个cluster
(9)其余force-new-clusterheartbeat-tickelection-tickexternal-ca解释的都很清楚,不必赘述。