Go语言中的条件变量

Go语言sync package提供了条件变量(condition variable)类型:

type Cond struct {
        // L is held while observing or changing the condition
        L Locker
        // contains filtered or unexported fields
}
type Cond
    func NewCond(l Locker) *Cond
    func (c *Cond) Broadcast()
    func (c *Cond) Signal()
    func (c *Cond) Wait()
type Locker

type Locker interface {
        Lock()
        Unlock()
}
A Locker represents an object that can be locked and unlocked.

NewCond()函数输入参数是一个Locker接口类型,即实现了锁功能的变量。Broadcast()函数通知所有等待在condition variablegoroutine,而Signal()函数只会通知其中的一个goroutineWait()会让goroutine阻塞在condition variable,等待条件成立。通常的做法是:

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

进入Wait()函数会解锁,离开Wait()函数会重新加锁。由于在“解锁->收到通知->重新加锁”这段逻辑中间有可能另一个同样wait()goroutine抢先一步改变了条件,导致当前goroutine的条件不再成立,所以这块要使用循环检测。参考下例:

package main

import (
    "sync"
    "time"
    "fmt"
)

func main() {
    c := sync.NewCond(&sync.Mutex{})
    var num int

    for i := 1; i <= 2; i++ {
        go func(id int) {
            fmt.Println("Enter Thread ID:", id)
            c.L.Lock()
            for num != 1 {
                fmt.Println("Enter loop: Thread ID:", id)
                c.Wait()
                fmt.Println("Exit loop: Thread ID:", id)
            }
            num++
            c.L.Unlock()
            fmt.Println("Exit Thread ID:", id)
        }(i)
    }

    time.Sleep(time.Second)
    fmt.Println("Sleep 1 second")

    num++
    c.Broadcast()
    time.Sleep(time.Second)
    fmt.Println("Program exit")
}

一次执行结果如下:

Enter Thread ID: 2
Enter loop: Thread ID: 2
Enter Thread ID: 1
Enter loop: Thread ID: 1
Sleep 1 second
Exit loop: Thread ID: 2
Exit Thread ID: 2
Exit loop: Thread ID: 1
Enter loop: Thread ID: 1
Program exit

从上面例子可以看出由于goroutine 2改变了条件,导致goroutine 1重新进入循环,即多个goroutine阻塞在一个condition variable上存在着竞争的关系。

参考资料:
Package sync
Condition Variables

Swarmkit笔记(8)——agent.session

Agentmanager之间的通信是通过session进行的,下面是agent.session结构体定义:

// session encapsulates one round of registration with the manager. session
// starts the registration and heartbeat control cycle. Any failure will result
// in a complete shutdown of the session and it must be reestablished.
//
// All communication with the master is done through session.  Changes that
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
    agent     *Agent
    sessionID string
    session   api.Dispatcher_SessionClient
    errs      chan error
    messages  chan *api.SessionMessage
    tasks     chan *api.TasksMessage

    registered chan struct{} // closed registration
    closed     chan struct{}
}

(1)registered channel是用来通知agent已经向manager注册成功了:

func (s *session) run(ctx context.Context, delay time.Duration) {
    time.Sleep(delay) // delay before registering.

    if err := s.start(ctx); err != nil {
        select {
        case s.errs <- err:
        case <-s.closed:
        case <-ctx.Done():
        }
        return
    }

    ctx = log.WithLogger(ctx, log.G(ctx).WithField("session.id", s.sessionID))

    go runctx(ctx, s.closed, s.errs, s.heartbeat)
    go runctx(ctx, s.closed, s.errs, s.watch)
    go runctx(ctx, s.closed, s.errs, s.listen)

    close(s.registered)
}

session.run函数中,如果session.start()运行没有问题,则会在最后close registered这个channel。而在Agent.Run()中:

func (a *Agent) run(ctx context.Context) {
        .....
        session    = newSession(ctx, a, backoff) // start the initial session
        registered = session.registered
for {
        select {
            ......
        case <-registered:
            log.G(ctx).Debugln("agent: registered")
            if ready != nil {
                close(ready)
            }
            ready = nil
            registered = nil // we only care about this once per session
            backoff = 0      // reset backoff
            sessionq = a.sessionq
            ......
    }
}

一旦registeredclose<-registered这个case则会马上被执行。

(2)当session运行出现错误时,会把error发到errs channel。在Agent.Run()中:

case err := <-session.errs:
        // TODO(stevvooe): This may actually block if a session is closed
        // but no error was sent. Session.close must only be called here
        // for this to work.
        if err != nil {
            log.G(ctx).WithError(err).Error("agent: session failed")
            backoff = initialSessionFailureBackoff + 2*backoff
            if backoff > maxSessionFailureBackoff {
                backoff = maxSessionFailureBackoff
            }
        }

        if err := session.close(); err != nil {
            log.G(ctx).WithError(err).Error("agent: closing session failed")
        }
        sessionq = nil
        // if we're here before <-registered, do nothing for that event
        registered = nil

        // Bounce the connection.
        if a.config.Picker != nil {
            a.config.Picker.Reset()
        }

收到error后,会关闭这个session并做一些扫尾工作。

(3)messages channel用来接收manager发送给agent的消息,并转给Agent.run()函数进行处理:

case msg := <-session.messages:
        if err := a.handleSessionMessage(ctx, msg); err != nil {
            log.G(ctx).WithError(err).Error("session message handler failed")
        }

(4)tasks channel用来接收manager发送给agent的需要在这个node上运行的task信息,同样需要转给Agent.run()函数进行处理:

case msg := <-session.tasks:
        if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
            log.G(ctx).WithError(err).Error("task assignment failed")
        }

(5)closed channelsession.close()函数中被关闭。也就是在case err := <-session.errs:这个分支中才会执行。一旦closed channel被关闭后,会重新建立连接:

case <-session.closed:
        log.G(ctx).Debugf("agent: rebuild session")

        // select a session registration delay from backoff range.
        delay := time.Duration(rand.Int63n(int64(backoff)))
        session = newSession(ctx, a, delay)
        registered = session.registered
        sessionq = a.sessionq  

再看一下session.start()这个函数:

// start begins the session and returns the first SessionMessage.
func (s *session) start(ctx context.Context) error {
    log.G(ctx).Debugf("(*session).start")

    client := api.NewDispatcherClient(s.agent.config.Conn)

    description, err := s.agent.config.Executor.Describe(ctx)
    if err != nil {
        log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
            Errorf("node description unavailable")
        return err
    }
    // Override hostname
    if s.agent.config.Hostname != "" {
        description.Hostname = s.agent.config.Hostname
    }

    errChan := make(chan error, 1)
    var (
        msg*api.SessionMessage
        stream api.Dispatcher_SessionClient
    )
    // Note: we don't defer cancellation of this context, because the
    // streaming RPC is used after this function returned. We only cancel
    // it in the timeout case to make sure the goroutine completes.
    sessionCtx, cancelSession := context.WithCancel(ctx)

    // Need to run Session in a goroutine since there's no way to set a
    // timeout for an individual Recv call in a stream.
    go func() {
        stream, err = client.Session(sessionCtx, &api.SessionRequest{
            Description: description,
        })
        if err != nil {
            errChan <- err
            return
        }

        msg, err = stream.Recv()
        errChan <- err
    }()

    select {
    case err := <-errChan:
        if err != nil {
            return err
        }
    case <-time.After(dispatcherRPCTimeout):
        cancelSession()
        return errors.New("session initiation timed out")
    }

    s.sessionID = msg.SessionID
    s.session = stream

    return s.handleSessionMessage(ctx, msg)
}

(1)

    client := api.NewDispatcherClient(s.agent.config.Conn)

    description, err := s.agent.config.Executor.Describe(ctx)
    if err != nil {
        log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
            Errorf("node description unavailable")
        return err
    }
    // Override hostname
    if s.agent.config.Hostname != "" {
        description.Hostname = s.agent.config.Hostname
    }

而关于api.NewDispatcherClient()函数和其所返回的类型定义如下:

    type dispatcherClient struct {
        cc *grpc.ClientConn
    }

    func NewDispatcherClient(cc *grpc.ClientConn) DispatcherClient {
        return &dispatcherClient{cc}
    }

s.agent.config.Conn就是之前在Node.runAgent()函数中通过下列代码得到的和manager直接的GRPC连接:

conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))

s.agent.config.Executor.Describe()返回对当前node的描述(类型是:*api.NodeDescription)。
(2)

    errChan := make(chan error, 1)
    var (
        msg*api.SessionMessage
        stream api.Dispatcher_SessionClient
    )
    // Note: we don't defer cancellation of this context, because the
    // streaming RPC is used after this function returned. We only cancel
    // it in the timeout case to make sure the goroutine completes.
    sessionCtx, cancelSession := context.WithCancel(ctx)

    // Need to run Session in a goroutine since there's no way to set a
    // timeout for an individual Recv call in a stream.
    go func() {
        stream, err = client.Session(sessionCtx, &api.SessionRequest{
            Description: description,
        })
        if err != nil {
            errChan <- err
            return
        }

        msg, err = stream.Recv()
        errChan <- err
    }()

dispatcherClient.Session()代码如下:

func (c *dispatcherClient) Session(ctx context.Context, in *SessionRequest, opts ...grpc.CallOption) (Dispatcher_SessionClient, error) {
    stream, err := grpc.NewClientStream(ctx, &_Dispatcher_serviceDesc.Streams[0], c.cc, "/docker.swarmkit.v1.Dispatcher/Session", opts...)
    if err != nil {
        return nil, err
    }
    x := &dispatcherSessionClient{stream}
    if err := x.ClientStream.SendMsg(in); err != nil {
        return nil, err
    }
    if err := x.ClientStream.CloseSend(); err != nil {
        return nil, err
    }
    return x, nil
}

返回的是一个符合Dispatcher_SessionClient interface类型的变量:

type Dispatcher_SessionClient interface {
    Recv() (*SessionMessage, error)
    grpc.ClientStream
}

grpc.NewClientStream()函数返回的是grpc.ClientStream interface,而dispatcherSessionClient定义如下:

type dispatcherSessionClient struct {
    grpc.ClientStream
}  

为了满足Dispatcher_SessionClient interface定义,dispatcherSessionClient结构体还实现了Recv方法:

func (x *dispatcherSessionClient) Recv() (*SessionMessage, error) {
    m := new(SessionMessage)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

x.ClientStream.SendMsg()发送的是SessionRequest,而它仅包含一个NodeDescription

// SessionRequest starts a session.
type SessionRequest struct {
    Description *NodeDescription `protobuf:"bytes,1,opt,name=description" json:"description,omitempty"`
}

x.ClientStream.CloseSend()表示所有的发送操作已完成。
接下来收到manager的消息后,把err发到errChan

msg, err = stream.Recv()
errChan <- err

(3)

    select {
    case err := <-errChan:
        if err != nil {
            return err
        }
    case <-time.After(dispatcherRPCTimeout):
        cancelSession()
        return errors.New("session initiation timed out")
    }

    s.sessionID = msg.SessionID
    s.session = stream

    return s.handleSessionMessage(ctx, msg)

一开始goroutine阻塞在select,一旦收到正确的响应,就会完成session的初始化。然后继续等待manager分配任务。

一旦session.start()成功,就会启动另外3goroutine

go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)

session.heartbeat()会创建一个新的dispatcherClient变量,然后在1秒钟后发送api.HeartbeatRequest请求,manager会返回api.HeartbeatResponse,告诉agent每隔多长时间发送heartbeat,目前默认时间是5秒。

session.watch()也会新创建一个dispatcherTasksClient变量,然后发送api.TasksRequest请求,通知manager自己已经ready。接下来就阻塞在Recv()函数,等待manager发送task请求。

session.listen()复用session.session变量,阻塞在Recv()函数,等待manager发送SessionMessage,然后处理。

Swarmkit笔记(7)——exec.Executor interface

exec.Executor interface定义(位于agent/exec/executor.go):

// Executor provides controllers for tasks.
type Executor interface {
    // Describe returns the underlying node description.
    Describe(ctx context.Context) (*api.NodeDescription, error)

    // Configure uses the node object state to propagate node
    // state to the underlying executor.
    Configure(ctx context.Context, node *api.Node) error

    // Controller provides a controller for the given task.
    Controller(t *api.Task) (Controller, error)

    // SetNetworkBootstrapKeys passes the symmetric keys from the
    // manager to the executor.
    SetNetworkBootstrapKeys([]*api.EncryptionKey) error
}

container Package实现了executor结构体(位于agent/exec/container/executor.go

import engineapi "github.com/docker/engine-api/client"
type executor struct {
    client engineapi.APIClient
}

里面只有一个成员:一个Docker APIClientexecutor结构体实际只实现了下面两个方法:

// Describe returns the underlying node description from the docker client.
func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
    info, err := e.client.Info(ctx)
    if err != nil {
        return nil, err
    }

    plugins := map[api.PluginDescription]struct{}{}
    addPlugins := func(typ string, names []string) {
        for _, name := range names {
            plugins[api.PluginDescription{
                Type: typ,
                Name: name,
            }] = struct{}{}
        }
    }

    addPlugins("Volume", info.Plugins.Volume)
    // Add builtin driver "overlay" (the only builtin multi-host driver) to
    // the plugin list by default.
    addPlugins("Network", append([]string{"overlay"}, info.Plugins.Network...))
    addPlugins("Authorization", info.Plugins.Authorization)

    pluginFields := make([]api.PluginDescription, 0, len(plugins))
    for k := range plugins {
        pluginFields = append(pluginFields, k)
    }
    sort.Sort(sortedPlugins(pluginFields))

    // parse []string labels into a map[string]string
    labels := map[string]string{}
    for _, l := range info.Labels {
        stringSlice := strings.SplitN(l, "=", 2)
        // this will take the last value in the list for a given key
        // ideally, one shouldn't assign multiple values to the same key
        if len(stringSlice) > 1 {
            labels[stringSlice[0]] = stringSlice[1]
        }
    }

    description := &api.NodeDescription{
        Hostname: info.Name,
        Platform: &api.Platform{
            Architecture: info.Architecture,
            OS:           info.OSType,
        },
        Engine: &api.EngineDescription{
            EngineVersion: info.ServerVersion,
            Labels:        labels,
            Plugins:       pluginFields,
        },
        Resources: &api.Resources{
            NanoCPUs:    int64(info.NCPU) * 1e9,
            MemoryBytes: info.MemTotal,
        },
    }

    return description, nil
}

// Controller returns a docker container controller.
func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
    ctlr, err := newController(e.client, t)
    if err != nil {
        return nil, err
    }

    return ctlr, nil
}

Describe()方法返回当前Docker engine的资源配置信息,而Controller()则返回一个container.controller结构体。

其中关于client的初始化位于:

        client, err := engineapi.NewClient(engineAddr, "", nil, nil)
        if err != nil {
            return err
        }

        executor := container.NewExecutor(client)

如果没有对engineAddr做特殊设置,就会使用其默认值:unix:///var/run/docker.sockclient值默认如下:

(dlv) p client
*github.com/docker/swarmkit/vendor/github.com/docker/engine-api/client.Client {
        proto: "unix",
        addr: "/var/run/docker.sock",
        basePath: "",
        transport: (unreadable interface type "*transport.apiTransport" not found for 0xc8202ecbd8: no type entry found, use 'types' for a list of valid types),
        version: "",
        customHTTPHeaders: map[string]string [],}

Swarmkit笔记(6)——Agent运行

Node.runAgent()函数实现如下:

func (n *Node) runAgent(ctx context.Context, db *bolt.DB, creds credentials.TransportAuthenticator, ready chan<- struct{}) error {
    var manager api.Peer
    select {
    case <-ctx.Done():
    case manager = <-n.remotes.WaitSelect(ctx):
    }
    if ctx.Err() != nil {
        return ctx.Err()
    }
    picker := picker.NewPicker(n.remotes, manager.Addr)
    conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
    if err != nil {
        return err
    }

    agent, err := New(&Config{
        Hostname:         n.config.Hostname,
        Managers:         n.remotes,
        Executor:         n.config.Executor,
        DB:               db,
        Conn:             conn,
        Picker:           picker,
        NotifyRoleChange: n.roleChangeReq,
    })
    if err != nil {
        return err
    }
    if err := agent.Start(ctx); err != nil {
        return err
    }

    n.Lock()
    n.agent = agent
    n.Unlock()

    defer func() {
        n.Lock()
        n.agent = nil
        n.Unlock()
    }()

    go func() {
        <-agent.Ready()
        close(ready)
    }()

    // todo: manually call stop on context cancellation?

    return agent.Err(context.Background())
}

上面函数解释如下:

(1)case manager = <-n.remotes.WaitSelect(ctx):首先获得manager
(2)接下来调用grpc.Dial()去连接这个manager

    picker := picker.NewPicker(n.remotes, manager.Addr)
    conn, err := grpc.Dial(manager.Addr,
        grpc.WithPicker(picker),
        grpc.WithTransportCredentials(creds),
        grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
    if err != nil {
        return err
    }

(3)生成并运行一个Agent

    agent, err := New(&Config{
        Hostname:         n.config.Hostname,
        Managers:         n.remotes,
        Executor:         n.config.Executor,
        DB:               db,
        Conn:             conn,
        Picker:           picker,
        NotifyRoleChange: n.roleChangeReq,
    })
    if err != nil {
        return err
    }
    if err := agent.Start(ctx); err != nil {
        return err
    }

关于Agent结构体定义:

// Agent implements the primary node functionality for a member of a swarm
// cluster. The primary functionality is to run and report on the status of
// tasks assigned to the node.
type Agent struct {
    config *Config

    // The latest node object state from manager
    // for this node known to the agent.
    node *api.Node

    keys []*api.EncryptionKey

    sessionq chan sessionOperation
    worker   Worker

    started chan struct{}
    ready   chan struct{}
    stopped chan struct{} // requests shutdown
    closed  chan struct{} // only closed in run
    err     error         // read only after closed is closed
}

其中Config结构体定义:

// Config provides values for an Agent.
type Config struct {
    // Hostname the name of host for agent instance.
    Hostname string

    // Managers provides the manager backend used by the agent. It will be
    // updated with managers weights as observed by the agent.
    Managers picker.Remotes

    // Conn specifies the client connection Agent will use.
    Conn *grpc.ClientConn

    // Picker is the picker used by Conn.
    // TODO(aaronl): This is only part of the config to allow resetting the
    // GRPC connection. This should be refactored to address the coupling
    // between Conn and Picker.
    Picker *picker.Picker

    // Executor specifies the executor to use for the agent.
    Executor exec.Executor

    // DB used for task storage. Must be open for the lifetime of the agent.
    DB *bolt.DB

    // NotifyRoleChange channel receives new roles from session messages.
    NotifyRoleChange chan<- api.NodeRole
}

注释都很清楚,不必赘述。

Agent.Start()会调到Agent.Run(),实现如下:

func (a *Agent) run(ctx context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    defer close(a.closed) // full shutdown.

    ctx = log.WithLogger(ctx, log.G(ctx).WithField("module", "agent"))

    log.G(ctx).Debugf("(*Agent).run")
    defer log.G(ctx).Debugf("(*Agent).run exited")

    var (
        backoff    time.Duration
        session    = newSession(ctx, a, backoff) // start the initial session
        registered = session.registered
        ready      = a.ready // first session ready
        sessionq   chan sessionOperation
    )

    if err := a.worker.Init(ctx); err != nil {
        log.G(ctx).WithError(err).Error("worker initialization failed")
        a.err = err
        return // fatal?
    }

    // setup a reliable reporter to call back to us.
    reporter := newStatusReporter(ctx, a)
    defer reporter.Close()

    a.worker.Listen(ctx, reporter)

    for {
        select {
        case operation := <-sessionq:
            operation.response <- operation.fn(session)
        case msg := <-session.tasks:
            if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
                log.G(ctx).WithError(err).Error("task assignment failed")
            }
        case msg := <-session.messages:
            if err := a.handleSessionMessage(ctx, msg); err != nil {
                log.G(ctx).WithError(err).Error("session message handler failed")
            }
        case <-registered:
            log.G(ctx).Debugln("agent: registered")
            if ready != nil {
                close(ready)
            }
            ready = nil
            registered = nil // we only care about this once per session
            backoff = 0      // reset backoff
            sessionq = a.sessionq
        case err := <-session.errs:
            // TODO(stevvooe): This may actually block if a session is closed
            // but no error was sent. Session.close must only be called here
            // for this to work.
            if err != nil {
                log.G(ctx).WithError(err).Error("agent: session failed")
                backoff = initialSessionFailureBackoff + 2*backoff
                if backoff > maxSessionFailureBackoff {
                    backoff = maxSessionFailureBackoff
                }
            }

            if err := session.close(); err != nil {
                log.G(ctx).WithError(err).Error("agent: closing session failed")
            }
            sessionq = nil
            // if we're here before <-registered, do nothing for that event
            registered = nil

            // Bounce the connection.
            if a.config.Picker != nil {
                a.config.Picker.Reset()
            }
        case <-session.closed:
            log.G(ctx).Debugf("agent: rebuild session")

            // select a session registration delay from backoff range.
            delay := time.Duration(rand.Int63n(int64(backoff)))
            session = newSession(ctx, a, delay)
            registered = session.registered
            sessionq = a.sessionq
        case <-a.stopped:
            // TODO(stevvooe): Wait on shutdown and cleanup. May need to pump
            // this loop a few times.
            return
        case <-ctx.Done():
            if a.err == nil {
                a.err = ctx.Err()
            }

            return
        }
    }
}

其中重要的是session这个概念,通过“session = newSession(ctx, a, backoff)”这行代码将sessionAgent关联起来。

FreeBSD kernel 笔记(4)——UIO

UIO相关的结构体和函数定义:

 #include <sys/types.h>
 #include <sys/uio.h>

 struct uio {
     struct  iovec *uio_iov;         /* scatter/gather list */
     int     uio_iovcnt;         /* length of scatter/gather list */
     off_t   uio_offset;         /* offset in target object */
     ssize_t uio_resid;          /* remaining bytes to copy */
     enum    uio_seg uio_segflg;     /* address space */
     enum    uio_rw uio_rw;      /* operation */
     struct  thread *uio_td;         /* owner */
 };

 int
 uiomove(void *buf, int howmuch, struct uio *uiop);

 int
 uiomove_nofault(void *buf, int howmuch, struct uio *uiop);

关于uio结构体需要注意的是:如果uio_iovcnt不为1,可以把uio_iov所指向的struct iovec看成一个连接起来的大bufferuio_offset指向这个bufferoffest,而uio_resid表明还有多少字节需要copy。在执行read操作时,uio_offset表明已经填充的buffer大小,而uio_resid表明buffer剩余的空间。可以参考这个程序

uiomoveuiomove_nofault本质上调用的都是uiomove_faultflag函数:

static int
uiomove_faultflag(void *cp, int n, struct uio *uio, int nofault)
{
    struct thread *td;
    struct iovec *iov;
    size_t cnt;
    int error, newflags, save;

    td = curthread;
    error = 0;

    KASSERT(uio->uio_rw == UIO_READ || uio->uio_rw == UIO_WRITE,
    ("uiomove: mode"));
    KASSERT(uio->uio_segflg != UIO_USERSPACE || uio->uio_td == td,
    ("uiomove proc"));
    if (!nofault)
        WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL,
        "Calling uiomove()");

    /* XXX does it make a sense to set TDP_DEADLKTREAT for UIO_SYSSPACE ? */
    newflags = TDP_DEADLKTREAT;
    if (uio->uio_segflg == UIO_USERSPACE && nofault) {
        /*
         * Fail if a non-spurious page fault occurs.
         */
        newflags |= TDP_NOFAULTING | TDP_RESETSPUR;
    }
    save = curthread_pflags_set(newflags);

    while (n > 0 && uio->uio_resid) {
        iov = uio->uio_iov;
        cnt = iov->iov_len;
        if (cnt == 0) {
            uio->uio_iov++;
            uio->uio_iovcnt--;
            continue;
        }
        if (cnt > n)
            cnt = n;

        switch (uio->uio_segflg) {

        case UIO_USERSPACE:
            maybe_yield();
            if (uio->uio_rw == UIO_READ)
                error = copyout(cp, iov->iov_base, cnt);
            else
                error = copyin(iov->iov_base, cp, cnt);
            if (error)
                goto out;
            break;

        case UIO_SYSSPACE:
            if (uio->uio_rw == UIO_READ)
                bcopy(cp, iov->iov_base, cnt);
            else
                bcopy(iov->iov_base, cp, cnt);
            break;
        case UIO_NOCOPY:
            break;
        }
        iov->iov_base = (char *)iov->iov_base + cnt;
        iov->iov_len -= cnt;
        uio->uio_resid -= cnt;
        uio->uio_offset += cnt;
        cp = (char *)cp + cnt;
        n -= cnt;
    }
out:
    curthread_pflags_restore(save);
    return (error);
}

可以看到这个函数会对传入的uio结构体的内容进行修改。

关于uiomove_nofault()函数,参考如下定义:

The function uiomovenofault() requires that the buffer and I/O vectors be accessible without incurring a page fault. The source and destination addresses must be physically mapped for read and write access, respec- tively, and neither the source nor destination addresses may be pageable. Thus, the function uiomovenofault() can be called from contexts where acquiring virtual memory system locks or sleeping are prohibited.

参考资料:
UIO

Swarmkit笔记(5)——Node.Run()函数

Swarmd程序的精髓就是Node.Run()函数。刨除前面一大堆CA验证的相关代码,下面是实际执行manageragent的部分。

......
managerReady := make(chan struct{})
agentReady := make(chan struct{})
var managerErr error
var agentErr error
var wg sync.WaitGroup
wg.Add(2)
go func() {
    managerErr = n.runManager(ctx, securityConfig, managerReady) // store err and loop
    wg.Done()
    cancel()
}()
go func() {
    agentErr = n.runAgent(ctx, db, securityConfig.ClientTLSCreds, agentReady)
    wg.Done()
    cancel()
}()
......

如果node的角色是agent,则runManager goroutine就会阻塞在Node.waitRole()这里:

func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
    for {
        n.waitRole(ctx, ca.ManagerRole)
        ......
    }
}

因此只有Node.runAgent()这个goroutine可以顺畅执行。

如果node的角色是manager,则runManagerrunAgent这两个goroutine都会运行,即manager本身也是一个agent

Swarmkit笔记(4)——swarmd命令选项

Swarmd程序支持的命令选项:

func init() {
    mainCmd.Flags().BoolP("version", "v", false, "Display the version and exit")
    mainCmd.Flags().StringP("log-level", "l", "info", "Log level (options \"debug\", \"info\", \"warn\", \"error\", \"fatal\", \"panic\")")
    mainCmd.Flags().StringP("state-dir", "d", "./swarmkitstate", "State directory")
    mainCmd.Flags().StringP("join-token", "", "", "Specifies the secret token required to join the cluster")
    mainCmd.Flags().String("engine-addr", "unix:///var/run/docker.sock", "Address of engine instance of agent.")
    mainCmd.Flags().String("hostname", "", "Override reported agent hostname")
    mainCmd.Flags().String("listen-remote-api", "0.0.0.0:4242", "Listen address for remote API")
    mainCmd.Flags().String("listen-control-api", "./swarmkitstate/swarmd.sock", "Listen socket for control API")
    mainCmd.Flags().String("listen-debug", "", "Bind the Go debug server on the provided address")
    mainCmd.Flags().String("join-addr", "", "Join cluster with a node at this address")
    mainCmd.Flags().Bool("force-new-cluster", false, "Force the creation of a new cluster from data directory")
    mainCmd.Flags().Uint32("heartbeat-tick", 1, "Defines the heartbeat interval (in seconds) for raft member health-check")
    mainCmd.Flags().Uint32("election-tick", 3, "Defines the amount of ticks (in seconds) needed without a Leader to trigger a new election")
    mainCmd.Flags().Var(&externalCAOpt, "external-ca", "Specifications of one or more certificate signing endpoints")
}

(1)versionlog-levelhostname最简单,不必细说。
(2)state-dir目录存储远端manager以及CA认证等相关信息:

# ls -alt
total 24
drwx------ 5 root root 4096 Jul 29 02:40 .
drwx------ 4 root root 4096 Jul 29 02:40 raft
-rw------- 1 root root   63 Jul 29 02:40 state.json
drwxr-xr-x 2 root root 4096 Jul 29 02:40 worker
drwxr-xr-x 2 root root 4096 Jul 29 02:40 certificates
drwxr-xr-x 3 root root 4096 Jul 29 02:40 ..

(3)join-tokennode用来加入某个clustertoken,在第一次认证请求时会被用到。
(4)engine-addr指定实际用来执行executorengine位置,默认是使用本机Docker
(5)listen-remote-api指定监听一个tcp port,用来接收和处理其它node的访问请求。
(6)listen-control-api指定一个Unix socket,用来接收和处理swarmctl程序的访问请求。
(7)listen-debug指定监听一个用来debug程序的端口。
(8)join-addr指定要加入的cluster的一个node地址,通过连接这个node来加入这个cluster
(9)其余force-new-clusterheartbeat-tickelection-tickexternal-ca解释的都很清楚,不必赘述。

Swarmkit笔记(3)——swarmd程序框架

agent.Node结构体有4channel,理解它们的作用就可以理解swarmd程序的框架:

// Node implements the primary node functionality for a member of a swarm
// cluster. Node handles workloads and may also run as a manager.
type Node struct {
    ......
    started              chan struct{}
    stopped              chan struct{}
    ready                chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
    ......
    closed               chan struct{}
    ......
}

swarmd程序的框架(其中executor通过engine-addr得到,代表最终运行task的实体,实际是一个Docker engineapi.APIClient。其它参数都通过命令行直接得到。):

        ......
        // Create a context for our GRPC call
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()

        ......

        n, err := agent.NewNode(&agent.NodeConfig{
            Hostname:         hostname,
            ForceNewCluster:  forceNewCluster,
            ListenControlAPI: unix,
            ListenRemoteAPI:  addr,
            JoinAddr:         managerAddr,
            StateDir:         stateDir,
            JoinToken:        joinToken,
            ExternalCAs:      externalCAOpt.Value(),
            Executor:         executor,
            HeartbeatTick:    hb,
            ElectionTick:     election,
        })
        if err != nil {
            return err
        }

        if err := n.Start(ctx); err != nil {
            return err
        }

        c := make(chan os.Signal, 1)
        signal.Notify(c, os.Interrupt)
        go func() {
            <-c
            n.Stop(ctx)
        }()

        go func() {
            select {
            case <-n.Ready():
            case <-ctx.Done():
            }
            if ctx.Err() == nil {
                logrus.Info("node is ready")
            }
        }()

        return n.Err(ctx)

(1)

if err := n.Start(ctx); err != nil {
    return err
}

看一下Node.Start()函数的实现:

// Start starts a node instance.
func (n *Node) Start(ctx context.Context) error {
    select {
    case <-n.started:
        select {
        case <-n.closed:
            return n.err
        case <-n.stopped:
            return errAgentStopped
        case <-ctx.Done():
            return ctx.Err()
        default:
            return errAgentStarted
        }
    case <-ctx.Done():
        return ctx.Err()
    default:
    }

    close(n.started)
    go n.run(ctx)
    return nil
}

如果执行Node.Start()时没有任何异常发生,就会把Node.started这个channel关掉(close(n.started)),然后启动这个节点初始化过程:go n.run(ctx)

(2)

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
    <-c
    n.Stop(ctx)
}()

这段代码的含义是用户按Ctrl+C可以中断程序。Node.Stop()函数实现如下:

// Stop stops node execution
func (n *Node) Stop(ctx context.Context) error {
    select {
    case <-n.started:
        select {
        case <-n.closed:
            return n.err
        case <-n.stopped:
            select {
            case <-n.closed:
                return n.err
            case <-ctx.Done():
                return ctx.Err()
            }
        case <-ctx.Done():
            return ctx.Err()
        default:
            close(n.stopped)
            // recurse and wait for closure
            return n.Stop(ctx)
        }
    case <-ctx.Done():
        return ctx.Err()
    default:
        return errAgentNotStarted
    }
}

由于此时Node.started这个channel已经被关掉,所以会永远执行select的第一个case分支:case <-n.started。然后会根据当时的情况,再决定执行哪个分支。

(3)

go func() {
    select {
        case <-n.Ready():
        case <-ctx.Done():
    }
    if ctx.Err() == nil {
        logrus.Info("node is ready")
    }
}()

Node.Ready()函数会返回Node.ready这个channel

// Ready returns a channel that is closed after node's initialization has
// completes for the first time.
func (n *Node) Ready() <-chan struct{} {
    return n.ready
}

Node初始化完成后,Node.ready这个channel就会被关掉。因此如果一切顺利的话,就会看到“node is ready”的log

(4)

return n.Err(ctx)

Node.Err()函数的实现:

// Err returns the error that caused the node to shutdown or nil. Err blocks
// until the node has fully shut down.
func (n *Node) Err(ctx context.Context) error {
    select {
    case <-n.closed:
        return n.err
    case <-ctx.Done():
        return ctx.Err()
    }
}

Node.Err()函数阻塞在这里,等待Node关闭。

 

FreeBSD kernel 笔记(3)——设备名字

在使用下列函数

int makedevs(struct makedevargs *args, struct cdev **cdev, const char *fmt, …);

struct cdev * make_dev(struct cdevsw *cdevsw, int unit, uidt uid, gidt gid, int perms, const char *fmt, …);

为设备创建cdev结构体时,fmt用来指定设备的名字:

The name is the expansion of fmt and following arguments as printf(9) would print it. The name determines its path under /dev or other devfs(5) mount point and may contain slash `/’ char- acters to denote subdirectories.

也就是/dev下面节点的名字。

cdevsw结构体中的d_name指定的是driver的名字:

struct cdevsw {
    ......
    const char      *d_name;
    ......
}

一个driver可以用来操作多个设备。

参考资料:
MAKE_DEV

FreeBSD kernel 笔记(2)——“preparing a device”和“preparing a device for I/O”

preparing (or initializing) a device”通常发生在加载设备驱动模块时,举例如下:

static int hello_modevent(module_t mod __unused, int /* modeventtype_t */ event, void *arg __unused)
{
    ......
    switch (event) 
    {
        case MOD_LOAD:
        {
            make_dev_args_init(&args);
            args.mda_devsw = &hello_cdevsw;
            args.mda_uid = UID_ROOT;
            args.mda_gid = GID_WHEEL;
            args.mda_mode = 0600;
            uprintf("Hello is loaded:%d\n", make_dev_s(&args, &hello_dev, "hello"));
            break;
        }
        ......
    }
    return error;
}

preparing a device for I/O”则是发生在open这个设备时,比如cdevsw结构体的d_open函数:

struct cdevsw {
    ......
    d_open_t        *d_open;  
    ......
}