FreeBSD kernel 笔记(6)——设备通信和控制

FreeBSD系统上,设备通信和控制主要通过sysctlioctl接口:

Generally, sysctls are employed to adjust parameters, and ioctls are used for everything else—that’s why ioctls are the catchall of I/O operations.

ioctl比较简单,不在这里赘述。

要在kernel模块中增加对sysctl的支持,首先要调用sysctl_ctx_init初始化一个sysctl_ctx_list结构体(使用完,通过sysctl_ctx_free来进行释放);然后使用SYSCTL_ADD_*系列函数加入系统支持的参数。需要注意的是,SYSCTL_ADD_*系列函数的第二个参数用来指定新加入参数属于哪个parent node,可以使用下面两个macro来指定其位置:SYSCTL_STATIC_CHILDRENSYSCTL_CHILDREN(如果SYSCTL_STATIC_CHILDREN没有参数,则会新增加一个系统的top-level category)。

另外,SYSCTL_ADD_PROC会增加一个处理函数。其参数是SYSCTL_HANDLER_ARGS

#define SYSCTL_HANDLER_ARGS struct sysctl_oid *oidp, void *arg1,    \
    intptr_t arg2, struct sysctl_req *req

arg1指向sysctl命令需要处理的数据,arg2指向数据的长度。

参考资料:
FreeBSD Device Drivers

Freecolor工具简介

FreeBSD没有free命令,但是提供了一个freecolor命令,用来查看还有多少可用的内存。Freecolor官网在这里,其代码逻辑比较简单。如果系统提供/proc/meminfo,就直接从这个虚拟文件系统获取信息,否则就透过libstatgrab这个第三方库。因为FreeBSD默认不会挂载/proc文件系统,所以会使用libstatgrab

Libstatgrab获得FreeBSD系统内存使用状况的代码:

#elif defined(FREEBSD) || defined(DFBSD)
    /*returns pages*/
    size = sizeof(total_count);
    if (sysctlbyname("vm.stats.vm.v_page_count", &total_count, &size, NULL, 0) < 0) {
        RETURN_WITH_SET_ERROR_WITH_ERRNO("mem", SG_ERROR_SYSCTLBYNAME, "vm.stats.vm.v_page_count");
    }

    /*returns pages*/
    size = sizeof(free_count);
    if (sysctlbyname("vm.stats.vm.v_free_count", &free_count, &size, NULL, 0) < 0) {
        RETURN_WITH_SET_ERROR_WITH_ERRNO("mem", SG_ERROR_SYSCTLBYNAME, "vm.stats.vm.v_free_count");
    }

    size = sizeof(inactive_count);
    if (sysctlbyname("vm.stats.vm.v_inactive_count", &inactive_count , &size, NULL, 0) < 0) {
        RETURN_WITH_SET_ERROR_WITH_ERRNO("mem", SG_ERROR_SYSCTLBYNAME, "vm.stats.vm.v_inactive_count");
    }

    size = sizeof(cache_count);
    if (sysctlbyname("vm.stats.vm.v_cache_count", &cache_count, &size, NULL, 0) < 0) {
        RETURN_WITH_SET_ERROR_WITH_ERRNO("mem", SG_ERROR_SYSCTLBYNAME, "vm.stats.vm.v_cache_count");
    }

    /* Of couse nothing is ever that simple :) And I have inactive pages to
     * deal with too. So I'm going to add them to free memory :)
     */
    mem_stats_buf->cache = (size_t)cache_count;
    mem_stats_buf->cache *= (size_t)sys_page_size;
    mem_stats_buf->total = (size_t)total_count;
    mem_stats_buf->total *= (size_t)sys_page_size;
    mem_stats_buf->free = (size_t)free_count + inactive_count + cache_count;
    mem_stats_buf->free *= (size_t)sys_page_size;
    mem_stats_buf->used = mem_stats_buf->total - mem_stats_buf->free;
#elif defined(WIN32)

可以看到,所有free_countinactive_countcache_count都算作free,即可用的内存。

而获得swap使用率则通过kvm接口:

#elif defined(ALLBSD)
    /* XXX probably not mt-safe! */
    kvmd = kvm_openfiles(NULL, NULL, NULL, O_RDONLY, NULL);
    if(kvmd == NULL) {
        RETURN_WITH_SET_ERROR("swap", SG_ERROR_KVM_OPENFILES, NULL);
    }

    if ((kvm_getswapinfo(kvmd, &swapinfo, 1,0)) == -1) {
        kvm_close( kvmd );
        RETURN_WITH_SET_ERROR("swap", SG_ERROR_KVM_GETSWAPINFO, NULL);
    }

    swap_stats_buf->total = (long long)swapinfo.ksw_total;
    swap_stats_buf->used = (long long)swapinfo.ksw_used;
    kvm_close( kvmd );

    swap_stats_buf->total *= sys_page_size;
    swap_stats_buf->used *= sys_page_size;
    swap_stats_buf->free = swap_stats_buf->total - swap_stats_buf->used;
#elif defined(WIN32)

docker笔记(15)——docker swarm功能代码分析(2)

Docker daemon初始化响应docker client swarm相关命令的处理函数位于api/server/router/swarm/cluster.go

// buildRouter is a router to talk with the build controller
type swarmRouter struct {
    backend Backend
    routes  []router.Route
}

// NewRouter initializes a new build router
func NewRouter(b Backend) router.Router {
    r := &swarmRouter{
        backend: b,
    }
    r.initRoutes()
    return r
}

// Routes returns the available routers to the swarm controller
func (sr *swarmRouter) Routes() []router.Route {
    return sr.routes
}

func (sr *swarmRouter) initRoutes() {
    sr.routes = []router.Route{
        router.NewPostRoute("/swarm/init", sr.initCluster),
        router.NewPostRoute("/swarm/join", sr.joinCluster),
        router.NewPostRoute("/swarm/leave", sr.leaveCluster),
        router.NewGetRoute("/swarm", sr.inspectCluster),
        router.NewPostRoute("/swarm/update", sr.updateCluster),
        router.NewGetRoute("/services", sr.getServices),
        router.NewGetRoute("/services/{id:.*}", sr.getService),
        router.NewPostRoute("/services/create", sr.createService),
        router.NewPostRoute("/services/{id:.*}/update", sr.updateService),
        router.NewDeleteRoute("/services/{id:.*}", sr.removeService),
        router.NewGetRoute("/nodes", sr.getNodes),
        router.NewGetRoute("/nodes/{id:.*}", sr.getNode),
        router.NewDeleteRoute("/nodes/{id:.*}", sr.removeNode),
        router.NewPostRoute("/nodes/{id:.*}/update", sr.updateNode),
        router.NewGetRoute("/tasks", sr.getTasks),
        router.NewGetRoute("/tasks/{id:.*}", sr.getTask),
    }
}

以处理“/swarm/init”请求为例,实际的处理函数位于daemon/cluster/cluster.go

// Init initializes new cluster from user provided request.
func (c *Cluster) Init(req types.InitRequest) (string, error) {
    c.Lock()
    if node := c.node; node != nil {
        if !req.ForceNewCluster {
            c.Unlock()
            return "", ErrSwarmExists
        }
        if err := c.stopNode(); err != nil {
            c.Unlock()
            return "", err
        }
    }

    if err := validateAndSanitizeInitRequest(&req); err != nil {
        c.Unlock()
        return "", err
    }

    listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
    if err != nil {
        c.Unlock()
        return "", err
    }

    advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
    if err != nil {
        c.Unlock()
        return "", err
    }

    localAddr := listenHost

    // If the advertise address is not one of the system's
    // addresses, we also require a listen address.
    listenAddrIP := net.ParseIP(listenHost)
    if listenAddrIP != nil && listenAddrIP.IsUnspecified() {
        advertiseIP := net.ParseIP(advertiseHost)
        if advertiseIP == nil {
            // not an IP
            c.Unlock()
            return "", errMustSpecifyListenAddr
        }

        systemIPs := listSystemIPs()

        found := false
        for _, systemIP := range systemIPs {
            if systemIP.Equal(advertiseIP) {
                found = true
                break
            }
        }
        if !found {
            c.Unlock()
            return "", errMustSpecifyListenAddr
        }
        localAddr = advertiseIP.String()
    }

    // todo: check current state existing
    n, err := c.startNewNode(req.ForceNewCluster, localAddr, "", net.JoinHostPort(listenHost, listenPort), net.JoinHostPort(advertiseHost, advertisePort), "", "")
    if err != nil {
        c.Unlock()
        return "", err
    }
    c.Unlock()

    select {
    case <-n.Ready():
        if err := initClusterSpec(n, req.Spec); err != nil {
            return "", err
        }
        go c.reconnectOnFailure(n)
        return n.NodeID(), nil
    case <-n.done:
        c.RLock()
        defer c.RUnlock()
        if !req.ForceNewCluster { // if failure on first attempt don't keep state
            if err := c.clearState(); err != nil {
                return "", err
            }
        }
        return "", c.err
    }
}

它的核心是c.startNewNode函数,其主要逻辑如下:

......
n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
    Hostname:           c.config.Name,
    ForceNewCluster:    forceNewCluster,
    ListenControlAPI:   filepath.Join(c.root, controlSocket),
    ListenRemoteAPI:    listenAddr,
    AdvertiseRemoteAPI: advertiseAddr,
    JoinAddr:           joinAddr,
    StateDir:           c.root,
    JoinToken:          joinToken,
    Executor:           container.NewExecutor(c.config.Backend),
    HeartbeatTick:      1,
    ElectionTick:       3,
})
if err != nil {
    return nil, err
}
ctx := context.Background()
if err := n.Start(ctx); err != nil {
    return nil, err
}
......

即调用swarmkitNewNode创建一个node,接下来便start这个节点。这个nodemanager角色。

docker笔记(14)——docker swarm功能代码分析(1)

Docker 1.12集成了docker swarm功能,其client的相关代码位于api/client/swarm文件夹下。以docker swarm init命令的代码(api/client/swarm/init.go)为例:

const (
    generatedSecretEntropyBytes = 16
    generatedSecretBase         = 36
    // floor(log(2^128-1, 36)) + 1
    maxGeneratedSecretLength = 25
)

type initOptions struct {
    swarmOptions
    listenAddr NodeAddrOption
    // Not a NodeAddrOption because it has no default port.
    advertiseAddr   string
    forceNewCluster bool
}

func newInitCommand(dockerCli *client.DockerCli) *cobra.Command {
    opts := initOptions{
        listenAddr: NewListenAddrOption(),
    }

    cmd := &cobra.Command{
        Use:   "init [OPTIONS]",
        Short: "Initialize a swarm",
        Args:  cli.NoArgs,
        RunE: func(cmd *cobra.Command, args []string) error {
            return runInit(dockerCli, cmd.Flags(), opts)
        },
    }

    flags := cmd.Flags()
    flags.Var(&opts.listenAddr, flagListenAddr, "Listen address (format: <ip|interface>[:port])")
    flags.StringVar(&opts.advertiseAddr, flagAdvertiseAddr, "", "Advertised address (format: <ip|interface>[:port])")
    flags.BoolVar(&opts.forceNewCluster, "force-new-cluster", false, "Force create a new cluster from current state.")
    addSwarmFlags(flags, &opts.swarmOptions)
    return cmd
}

func runInit(dockerCli *client.DockerCli, flags *pflag.FlagSet, opts initOptions) error {
    client := dockerCli.Client()
    ctx := context.Background()

    req := swarm.InitRequest{
        ListenAddr:      opts.listenAddr.String(),
        AdvertiseAddr:   opts.advertiseAddr,
        ForceNewCluster: opts.forceNewCluster,
        Spec:            opts.swarmOptions.ToSpec(),
    }

    nodeID, err := client.SwarmInit(ctx, req)
    if err != nil {
        if strings.Contains(err.Error(), "could not choose an IP address to advertise") || strings.Contains(err.Error(), "could not find the system's IP address") {
            return errors.New(err.Error() + " - specify one with --advertise-addr")
        }
        return err
    }

    fmt.Fprintf(dockerCli.Out(), "Swarm initialized: current node (%s) is now a manager.\n\n", nodeID)

    if err := printJoinCommand(ctx, dockerCli, nodeID, true, false); err != nil {
        return err
    }

    fmt.Fprint(dockerCli.Out(), "To add a manager to this swarm, run 'docker swarm join-token manager' and follow the instructions.\n\n")
    return nil
}

其中client.DockerCli代表docker command line client

// DockerCli represents the docker command line client.
// Instances of the client can be returned from NewDockerCli.
type DockerCli struct {
    // initializing closure
    init func() error

    // configFile has the client configuration file
    configFile *configfile.ConfigFile
    // in holds the input stream and closer (io.ReadCloser) for the client.
    in io.ReadCloser
    // out holds the output stream (io.Writer) for the client.
    out io.Writer
    // err holds the error stream (io.Writer) for the client.
    err io.Writer
    // keyFile holds the key file as a string.
    keyFile string
    // inFd holds the file descriptor of the client's STDIN (if valid).
    inFd uintptr
    // outFd holds file descriptor of the client's STDOUT (if valid).
    outFd uintptr
    // isTerminalIn indicates whether the client's STDIN is a TTY
    isTerminalIn bool
    // isTerminalOut indicates whether the client's STDOUT is a TTY
    isTerminalOut bool
    // client is the http client that performs all API operations
    client client.APIClient
    // state holds the terminal input state
    inState *term.State
    // outState holds the terminal output state
    outState *term.State
}

其中的client成员便是engine-api/client,所以上述client.SwarmInit的代码位于engine-api/client/swarm_init.go

// SwarmInit initializes the Swarm.
func (cli *Client) SwarmInit(ctx context.Context, req swarm.InitRequest) (string, error) {
    serverResp, err := cli.post(ctx, "/swarm/init", nil, req, nil)
    if err != nil {
        return "", err
    }

    var response string
    err = json.NewDecoder(serverResp.body).Decode(&response)
    ensureReaderClosed(serverResp)
    return response, err
}

docker笔记(13)——nvidia-docker简介

因为GPU属于特定的厂商产品,需要特定的driverDocker本身并不支持GPU。以前如果要在Docker中使用GPU,就需要在container中安装主机上使用GPUdriver,然后把主机上的GPU设备(例如:/dev/nvidia0)映射到container中。所以这样的Docker image并不具备可移植性。

Nvidia-docker项目就是为了解决这个问题,它让Docker image不需要知道底层GPU的相关信息,而是通过启动containermount设备和驱动文件来实现的。

从源码编译安装nvidia-docker(如果需要设置代理,请参考这个issue):

# go get -d github.com/NVIDIA/nvidia-docker
# cd $GOPATH/src/github.com/NVIDIA/nvidia-docker
# make install

其实从nvidia-dockermain函数来看:

func main() {
    args := os.Args[1:]
    defer exit()

    assert(LoadEnvironment())

    command, off, err := docker.ParseArgs(args)
    assert(err)

    if command != "create" && command != "run" {
        if command == "version" {
            fmt.Printf("NVIDIA Docker: %s\n\n", Version)
        }
        assert(docker.Docker(args...))
    }

    opt, i, err := docker.ParseArgs(args[off+1:], command)
    assert(err)
    off += i + 1

    if (command == "create" || command == "run") && opt != "" {
        vols, err := VolumesNeeded(opt)
        assert(err)

        if vols != nil {
            var nargs []string
            var err error

            if Host != nil {
                nargs, err = GenerateRemoteArgs(opt, vols)
            } else {
                assert(nvidia.LoadUVM())
                assert(nvidia.Init())
                nargs, err = GenerateLocalArgs(opt, vols)
                nvidia.Shutdown()
            }
            assert(err)
            args = append(args[:off], append(nargs, args[off:]...)...)
        }
    }

    assert(docker.Docker(args...))
}

除了createrun命令以外,其它的命令还是由本机的docker来处理。

此外,nvidia-docker还提供了使用plug-in模式(参考Internals):

$ curl -s http://localhost:3476/docker/cli --device=/dev/nvidiactl --device=/dev/nvidia-uvm --device=/dev/nvidia3 --device=/dev/nvidia2 --device=/dev/nvidia1 --device=/dev/nvidia0 --volume-driver=nvidia-docker --volume=nvidia_driver_361.48:/usr/local/nvidia:ro
$ docker run -ti --rm `curl -s http://localhost:3476/docker/cli` nvidia/cuda nvidia-smi

这种方式则无需使用nvidia-docker,而可以直接使用docker。然而这种方式不会检查imagenvidia driver是否兼容。

还有一种方式是使用Nvidia提供的用Go实现的package

参考资料:
Why NVIDIA Docker

Swarmkit笔记(11)——manager创建处理swarmctl请求的server

manager.localserver是创建的本地Unix socket,用来等待处理swarmctl发来的命令请求(源码在manager/controlapi目录)。Manager.Run()函数里localserver相关代码如下:

baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())
......

proxyOpts := []grpc.DialOption{
    grpc.WithBackoffMaxDelay(time.Second),
    grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}

cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs

......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)

......
api.RegisterControlServer(m.localserver, localProxyControlAPI)

(1)首先看一下controlapi.Servercontrolapi.NewServer()的定义:

// Server is the Cluster API gRPC server.
type Server struct {
    store  *store.MemoryStore
    raft   *raft.Node
    rootCA *ca.RootCA
}

// NewServer creates a Cluster API server.
func NewServer(store *store.MemoryStore, raft *raft.Node, rootCA *ca.RootCA) *Server {
    return &Server{
        store:  store,
        raft:   raft,
        rootCA: rootCA,
    }
}

controlapi.NewServer()函数就是用来创建一个响应swarmctl程序发出的control命令请求的server

其中store.MemoryStore是一个很重要的结构体:

// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
    // updateLock must be held during an update transaction.
    updateLock sync.Mutex

    memDB *memdb.MemDB
    queue *watch.Queue

    proposer state.Proposer
}

watch.Queue定义如下:

// Queue is the structure used to publish events and watch for them.
type Queue struct {
    broadcast *events.Broadcaster
}
......
// Watch returns a channel which will receive all items published to the
// queue from this point, until cancel is called.
func (q *Queue) Watch() (eventq chan events.Event, cancel func()) {
    return q.CallbackWatch(nil)
}
......
// Publish adds an item to the queue.
func (q *Queue) Publish(item events.Event) {
    q.broadcast.Write(item)
}

简单地讲,就是当Server.store发生变化时,把数据更新到memDB的同时,也要发送消息到queue里,这样manager监听在相应channelgoroutine就可以收到并处理请求。

下面代码就是把当前cluster的信息填充到新创建的controlapi.Server变量里:

baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())

(2)

proxyOpts := []grpc.DialOption{
    grpc.WithBackoffMaxDelay(time.Second),
    grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}

cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs

......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)

上述代码创建出一个raftProxyControlServer类型的变量:

type raftProxyControlServer struct {
    local        ControlServer
    connSelector *raftpicker.ConnSelector
    cluster      raftpicker.RaftCluster
    ctxMods      []func(context.Context) (context.Context, error)
}

localProxyControlAPI含义是如果收到swarmctl请求的managerleaderswarmctlmanager当然位于同一台机器上),则会处理请求,否则就转发给这个clusterleader

(3)

api.RegisterControlServer(m.localserver, localProxyControlAPI)

上述代码则是把localserver所对应的Unix socketraftProxyControlServer关联起来。

Swarmkit笔记(10)——manager处理session请求

Manager处理session请求是通过_Dispatcher_Session_Handler这个函数(./api/dispatcher.pb.go):

func _Dispatcher_Session_Handler(srv interface{}, stream grpc.ServerStream) error {
    m := new(SessionRequest)
    if err := stream.RecvMsg(m); err != nil {
        return err
    }
    return srv.(DispatcherServer).Session(m, &dispatcherSessionServer{stream})
}

实际函数调用栈如下:

0  0x0000000000b65cbf in github.com/docker/swarmkit/manager/dispatcher.(*Dispatcher).Session
   at /go/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go:768
1  0x0000000000782aa5 in github.com/docker/swarmkit/api.(*authenticatedWrapperDispatcherServer).Session
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:207
2  0x000000000078e505 in github.com/docker/swarmkit/api.(*raftProxyDispatcherServer).Session
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:1121
3  0x0000000000789c2a in github.com/docker/swarmkit/api._Dispatcher_Session_Handler
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:667
4  0x0000000000909646 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).processStreamingRPC
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:602
5  0x000000000090b002 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).handleStream
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:686
6  0x000000000090fcbe in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).serveStreams.func1.1
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:348
7  0x0000000000462bf0 in runtime.goexit
   at /usr/local/go/src/runtime/asm_amd64.s:1998

Dispatcher.Session()函数代码如下:

// Session is a stream which controls agent connection.
// Each message contains list of backup Managers with weights. Also there is
// a special boolean field Disconnect which if true indicates that node should
// reconnect to another Manager immediately.
func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
    ctx := stream.Context()
    nodeInfo, err := ca.RemoteNode(ctx)
    if err != nil {
        return err
    }
    nodeID := nodeInfo.NodeID

    if err := d.isRunningLocked(); err != nil {
        return err
    }

    // register the node.
    sessionID, err := d.register(stream.Context(), nodeID, r.Description)
    if err != nil {
        return err
    }

    fields := logrus.Fields{
        "node.id":      nodeID,
        "node.session": sessionID,
        "method":       "(*Dispatcher).Session",
    }
    if nodeInfo.ForwardedBy != nil {
        fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
    }
    log := log.G(ctx).WithFields(fields)

    var nodeObj *api.Node
    nodeUpdates, cancel, err := store.ViewAndWatch(d.store, func(readTx store.ReadTx) error {
        nodeObj = store.GetNode(readTx, nodeID)
        return nil
    }, state.EventUpdateNode{Node: &api.Node{ID: nodeID},
        Checks: []state.NodeCheckFunc{state.NodeCheckID}},
    )
    if cancel != nil {
        defer cancel()
    }

    if err != nil {
        log.WithError(err).Error("ViewAndWatch Node failed")
    }

    if _, err = d.nodes.GetWithSession(nodeID, sessionID); err != nil {
        return err
    }

    if err := stream.Send(&api.SessionMessage{
        SessionID:            sessionID,
        Node:                 nodeObj,
        Managers:             d.getManagers(),
        NetworkBootstrapKeys: d.networkBootstrapKeys,
    }); err != nil {
        return err
    }

    managerUpdates, mgrCancel := d.mgrQueue.Watch()
    defer mgrCancel()
    keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch()
    defer keyMgrCancel()

    // disconnectNode is a helper forcibly shutdown connection
    disconnectNode := func() error {
        // force disconnect by shutting down the stream.
        transportStream, ok := transport.StreamFromContext(stream.Context())
        if ok {
            // if we have the transport stream, we can signal a disconnect
            // in the client.
            if err := transportStream.ServerTransport().Close(); err != nil {
                log.WithError(err).Error("session end")
            }
        }

        nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"}
        if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
            log.WithError(err).Error("failed to remove node")
        }
        // still return an abort if the transport closure was ineffective.
        return grpc.Errorf(codes.Aborted, "node must disconnect")
    }

    for {
        // After each message send, we need to check the nodes sessionID hasn't
        // changed. If it has, we will the stream and make the node
        // re-register.
        node, err := d.nodes.GetWithSession(nodeID, sessionID)
        if err != nil {
            return err
        }

        var mgrs []*api.WeightedPeer

        var disconnect bool

        select {
        case ev := <-managerUpdates:
            mgrs = ev.([]*api.WeightedPeer)
        case ev := <-nodeUpdates:
            nodeObj = ev.(state.EventUpdateNode).Node
        case <-stream.Context().Done():
            return stream.Context().Err()
        case <-node.Disconnect:
            disconnect = true
        case <-d.ctx.Done():
            disconnect = true
        case <-keyMgrUpdates:
        }
        if mgrs == nil {
            mgrs = d.getManagers()
        }

        if err := stream.Send(&api.SessionMessage{
            SessionID:            sessionID,
            Node:                 nodeObj,
            Managers:             mgrs,
            NetworkBootstrapKeys: d.networkBootstrapKeys,
        }); err != nil {
            return err
        }
        if disconnect {
            return disconnectNode()
        }
    }
}

这个stream是处理agent连接的。前半部分是把连接的agent记录下来;后半部分是如果cluster信息发送变化,比如managerleader发生变化,需要通知agent重新连接。disconnectNode()函数则是需要同agent node断开连接时的处理:包括断开连接,agent node信息删除,等等。

FreeBSD kernel 笔记(5)——分配内存

FreeBSD kernel编程分配内存可以参考这两篇文档:MALLOC(9)CONTIGMALLOC(9)。需要注意以下几点:

(1)在中断上下文中使用malloc系列分配内存函数时,要使用M_NOWAIT标记;

(2)contigmalloc有一个boundary参数:

If the given value “boundary” is non-zero, then the set of physical pages cannot cross any physical address boundary that is a multiple of that value.

举个例子,如果boundary设置为1M,则实际分配的物理内存页面可以位于0~1M1M~2M,而不能位于1.9M~2.1M

C语言中的XOR运算符

Stackoverflow上一个回答很好地解释了XOR运算符的作用:

If you know how XOR works, and you know that ^ is XOR in C, then this should be pretty simple. You should know that XOR will flip bits where 1 is set, bits 2 and 5 of 0b00100100 are set, therefore it will flip those bits.

From an “during the test” standpoint, let’s say you need to prove this to yourself, you really don’t need to know the initial value of star to answer the question, If you know how ^ works then just throw anything in there:

 00100100
^10101010  (star's made up value)
---------
 10001110  (star's new value)

 bit position: | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0  
   |---|---|---|---|---|---|---|---
 star's new v: | 1 | 0 | 0 | 0 | 1 | 1 | 1 | 0
   |---|---|---|---|---|---|---|---
 star's old v: | 1 | 0 | 1 | 0 | 1 | 0 | 1 | 0

总结一下,就是第二个操作数用来控制第一个操作数:第二个操作数中bit1会导致第一个操作数中相应的bit发生反转,而bit0则会让第一个操作数中相应的bit不变。

Swarmkit笔记(9)——manager

Node.runManager()函数会启动一个manager

func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
    for {
        n.waitRole(ctx, ca.ManagerRole)
        if ctx.Err() != nil {
            return ctx.Err()
        }
        remoteAddr, _ := n.remotes.Select(n.nodeID)
        m, err := manager.New(&manager.Config{
            ForceNewCluster: n.config.ForceNewCluster,
            ProtoAddr: map[string]string{
                "tcp":  n.config.ListenRemoteAPI,
                "unix": n.config.ListenControlAPI,
            },
            AdvertiseAddr:  n.config.AdvertiseRemoteAPI,
            SecurityConfig: securityConfig,
            ExternalCAs:    n.config.ExternalCAs,
            JoinRaft:       remoteAddr.Addr,
            StateDir:       n.config.StateDir,
            HeartbeatTick:  n.config.HeartbeatTick,
            ElectionTick:   n.config.ElectionTick,
        })
        if err != nil {
            return err
        }
        done := make(chan struct{})
        go func() {
            m.Run(context.Background()) // todo: store error
            close(done)
        }()

        n.Lock()
        n.manager = m
        n.Unlock()

        connCtx, connCancel := context.WithCancel(ctx)
        go n.initManagerConnection(connCtx, ready)

        // this happens only on initial start
        if ready != nil {
            go func(ready chan struct{}) {
                select {
                case <-ready:
                    n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
                case <-connCtx.Done():
                }
            }(ready)
            ready = nil
        }

        n.waitRole(ctx, ca.AgentRole)

        n.Lock()
        n.manager = nil
        n.Unlock()

        select {
        case <-done:
        case <-ctx.Done():
            err = ctx.Err()
            m.Stop(context.Background())
            <-done
        }
        connCancel()

        if err != nil {
            return err
        }
    }
}

(1)

        n.waitRole(ctx, ca.ManagerRole)
        if ctx.Err() != nil {
            return ctx.Err()
        }

首先runManager()函数会阻塞在waitRole()函数。一旦获得manager角色,就会往下执行。

(2)

        remoteAddr, _ := n.remotes.Select(n.nodeID)
        m, err := manager.New(&manager.Config{
            ForceNewCluster: n.config.ForceNewCluster,
            ProtoAddr: map[string]string{
                "tcp":  n.config.ListenRemoteAPI,
                "unix": n.config.ListenControlAPI,
            },
            AdvertiseAddr:  n.config.AdvertiseRemoteAPI,
            SecurityConfig: securityConfig,
            ExternalCAs:    n.config.ExternalCAs,
            JoinRaft:       remoteAddr.Addr,
            StateDir:       n.config.StateDir,
            HeartbeatTick:  n.config.HeartbeatTick,
            ElectionTick:   n.config.ElectionTick,
        })
        if err != nil {
            return err
        }
        done := make(chan struct{})
        go func() {
            m.Run(context.Background()) // todo: store error
            close(done)
        }()

        n.Lock()
        n.manager = m
        n.Unlock()

a)remoteAddr, _ := n.remotes.Select(n.nodeID)作用是从当前clustermanager中(当然需要排除掉当前node)选出一个leader,赋给remoteAddr。如果当前nodecluster中的第一个manager,则remoteAddr就是一个“空的”值:{NodeID: "", Addr: ""}
b)在使用manager.New()函数创建manager时,要注意n.config.AdvertiseRemoteAPI是一直为""的。 manager.New()最后会返回一个Manager结构体:

func New(config *Config) (*Manager, error) {
    ......
    m := &Manager{
        config:      config,
        listeners:   listeners,
        caserver:    ca.NewServer(RaftNode.MemoryStore(), config.SecurityConfig),
        Dispatcher:  dispatcher.New(RaftNode, dispatcherConfig),
        server:      grpc.NewServer(opts...),
        localserver: grpc.NewServer(opts...),
        RaftNode:    RaftNode,
        started:     make(chan struct{}),
        stopped:     make(chan struct{}),
    }

    return m, nil
}

其中的listeners包含监听listen-remote-api(tcp)listen-control-api(unix)的两个socket

c)m.Run()是实际运行manager的函数,连作者自己都觉得复杂(“This function is *way* too complex.”)。可以把这个函数逻辑分成下面几块:
i)如果当前manager被选为leader,就做一大堆初始化的动作,包括为schedulerallocator等分配资源,启动goroutine等等;如果不是leader,就做一大堆收尾工作,停掉goroutine,释放资源。
ii)接下来对manager.localservermanager.server做一大堆设置,主要是authenticationproxy的方面;然后二者分别监听manager.listeners中的UnixTCP socket,处理相应的数据。

(3)

        connCtx, connCancel := context.WithCancel(ctx)
        go n.initManagerConnection(connCtx, ready)

其中Node.initManagerConnection()实现如下:

func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
    opts := []grpc.DialOption{}
    insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
    opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
    // Using listen address instead of advertised address because this is a
    // local connection.
    addr := n.config.ListenControlAPI
    opts = append(opts, grpc.WithDialer(
        func(addr string, timeout time.Duration) (net.Conn, error) {
            return net.DialTimeout("unix", addr, timeout)
        }))
    conn, err := grpc.Dial(addr, opts...)
    if err != nil {
        return err
    }
    state := grpc.Idle
    for {
        s, err := conn.WaitForStateChange(ctx, state)
        if err != nil {
            n.setControlSocket(nil)
            return err
        }
        if s == grpc.Ready {
            n.setControlSocket(conn)
            if ready != nil {
                close(ready)
                ready = nil
            }
        } else if state == grpc.Shutdown {
            n.setControlSocket(nil)
        }
        state = s
    }
}

功能就是建立一个同本地listen-control-api(unix) socket的一个连接,用来监控node的状态。

(4)把当前node也加入remotes的监控列表中:

    // this happens only on initial start
        if ready != nil {
            go func(ready chan struct{}) {
                select {
                case <-ready:
                    n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
                case <-connCtx.Done():
                }
            }(ready)
            ready = nil
        }

(5)阻塞在下列代码,等待角色变化:

n.waitRole(ctx, ca.AgentRole)