FreeBSD kernel 笔记(7)——cdevsw结构体中定义不支持操作

下面摘自FreeBSD Device Drivers

If a d_foo function is undefined the corresponding operation is unsupported. However, dopen and dclose are unique; when they’re undefined the kernel will automatically define them as follows:
int
nullop(void)
{
return (0);
}
This ensures that every registered character device can be opened and closed.

即在cdevsw结构体中,d_opend_close是永远不为空的。

/*
 * Character device switch table
 */
struct cdevsw {
    int         d_version;
    u_int           d_flags;
    const char      *d_name;
    d_open_t        *d_open;
    d_fdopen_t      *d_fdopen;
    d_close_t       *d_close;
    d_read_t        *d_read;
    d_write_t       *d_write;
    d_ioctl_t       *d_ioctl;
    d_poll_t        *d_poll;
    d_mmap_t        *d_mmap;
    d_strategy_t        *d_strategy;
    dumper_t        *d_dump;
    d_kqfilter_t        *d_kqfilter;
    d_purge_t       *d_purge;
    d_mmap_single_t     *d_mmap_single;

    int32_t         d_spare0[3];
    void            *d_spare1[3];

    /* These fields should not be messed with by drivers */
    LIST_HEAD(, cdev)   d_devs;
    int         d_spare2;
    union {
        struct cdevsw       *gianttrick;
        SLIST_ENTRY(cdevsw) postfree_list;
    } __d_giant;
};

Swarmkit笔记(12)——swarmctl创建service时指定资源限制

swarmctl创建service时可以指定CPUmemory资源限制:

# swarmctl service create --help
Create a service

Usage:
  swarmctl service create [flags]

Flags:
......
  --cpu-limit string            CPU cores limit (e.g. 0.5)
  --cpu-reservation string      number of CPU cores reserved (e.g. 0.5)
......
  --memory-limit string         memory limit (e.g. 512m)
  --memory-reservation string   amount of reserved memory (e.g. 512m)
    ......

*-reservation的作用是为container分配并“占住”相应的资源,所以这些资源对container一定是可用的;*-limit是限制container进程所使用的资源。解析资源的代码位于cmd/swarmctl/service/flagparser/resource.go文件。

参考资料:
Docker service Limits and Reservations

docker笔记(16)——为container指定CPU资源

Docker run命令的--cpuset-cpus选项,指定container运行在特定的CPU core上。举例如下:

# docker run -ti --rm --cpuset-cpus=1,6 redis

另外还有一个--cpu-shares选项,它是一个相对权重(relative weight),其默认值是1024。即如果两个运行的container--cpu-shares值都是1024的话,则占用CPU资源的比例就相等。

FreeBSD kernel 笔记(6)——设备通信和控制

FreeBSD系统上,设备通信和控制主要通过sysctlioctl接口:

Generally, sysctls are employed to adjust parameters, and ioctls are used for everything else—that’s why ioctls are the catchall of I/O operations.

ioctl比较简单,不在这里赘述。

要在kernel模块中增加对sysctl的支持,首先要调用sysctl_ctx_init初始化一个sysctl_ctx_list结构体(使用完,通过sysctl_ctx_free来进行释放);然后使用SYSCTL_ADD_*系列函数加入系统支持的参数。需要注意的是,SYSCTL_ADD_*系列函数的第二个参数用来指定新加入参数属于哪个parent node,可以使用下面两个macro来指定其位置:SYSCTL_STATIC_CHILDRENSYSCTL_CHILDREN(如果SYSCTL_STATIC_CHILDREN没有参数,则会新增加一个系统的top-level category)。

另外,SYSCTL_ADD_PROC会增加一个处理函数。其参数是SYSCTL_HANDLER_ARGS

#define SYSCTL_HANDLER_ARGS struct sysctl_oid *oidp, void *arg1,    \
    intptr_t arg2, struct sysctl_req *req

arg1指向sysctl命令需要处理的数据,arg2指向数据的长度。

参考资料:
FreeBSD Device Drivers

Freecolor工具简介

FreeBSD没有free命令,但是提供了一个freecolor命令,用来查看还有多少可用的内存。Freecolor官网在这里,其代码逻辑比较简单。如果系统提供/proc/meminfo,就直接从这个虚拟文件系统获取信息,否则就透过libstatgrab这个第三方库。因为FreeBSD默认不会挂载/proc文件系统,所以会使用libstatgrab

Libstatgrab获得FreeBSD系统内存使用状况的代码:

#elif defined(FREEBSD) || defined(DFBSD)
    /*returns pages*/
    size = sizeof(total_count);
    if (sysctlbyname("vm.stats.vm.v_page_count", &total_count, &size, NULL, 0) < 0) {
        RETURN_WITH_SET_ERROR_WITH_ERRNO("mem", SG_ERROR_SYSCTLBYNAME, "vm.stats.vm.v_page_count");
    }

    /*returns pages*/
    size = sizeof(free_count);
    if (sysctlbyname("vm.stats.vm.v_free_count", &free_count, &size, NULL, 0) < 0) {
        RETURN_WITH_SET_ERROR_WITH_ERRNO("mem", SG_ERROR_SYSCTLBYNAME, "vm.stats.vm.v_free_count");
    }

    size = sizeof(inactive_count);
    if (sysctlbyname("vm.stats.vm.v_inactive_count", &inactive_count , &size, NULL, 0) < 0) {
        RETURN_WITH_SET_ERROR_WITH_ERRNO("mem", SG_ERROR_SYSCTLBYNAME, "vm.stats.vm.v_inactive_count");
    }

    size = sizeof(cache_count);
    if (sysctlbyname("vm.stats.vm.v_cache_count", &cache_count, &size, NULL, 0) < 0) {
        RETURN_WITH_SET_ERROR_WITH_ERRNO("mem", SG_ERROR_SYSCTLBYNAME, "vm.stats.vm.v_cache_count");
    }

    /* Of couse nothing is ever that simple :) And I have inactive pages to
     * deal with too. So I'm going to add them to free memory :)
     */
    mem_stats_buf->cache = (size_t)cache_count;
    mem_stats_buf->cache *= (size_t)sys_page_size;
    mem_stats_buf->total = (size_t)total_count;
    mem_stats_buf->total *= (size_t)sys_page_size;
    mem_stats_buf->free = (size_t)free_count + inactive_count + cache_count;
    mem_stats_buf->free *= (size_t)sys_page_size;
    mem_stats_buf->used = mem_stats_buf->total - mem_stats_buf->free;
#elif defined(WIN32)

可以看到,所有free_countinactive_countcache_count都算作free,即可用的内存。

而获得swap使用率则通过kvm接口:

#elif defined(ALLBSD)
    /* XXX probably not mt-safe! */
    kvmd = kvm_openfiles(NULL, NULL, NULL, O_RDONLY, NULL);
    if(kvmd == NULL) {
        RETURN_WITH_SET_ERROR("swap", SG_ERROR_KVM_OPENFILES, NULL);
    }

    if ((kvm_getswapinfo(kvmd, &swapinfo, 1,0)) == -1) {
        kvm_close( kvmd );
        RETURN_WITH_SET_ERROR("swap", SG_ERROR_KVM_GETSWAPINFO, NULL);
    }

    swap_stats_buf->total = (long long)swapinfo.ksw_total;
    swap_stats_buf->used = (long long)swapinfo.ksw_used;
    kvm_close( kvmd );

    swap_stats_buf->total *= sys_page_size;
    swap_stats_buf->used *= sys_page_size;
    swap_stats_buf->free = swap_stats_buf->total - swap_stats_buf->used;
#elif defined(WIN32)

docker笔记(15)——docker swarm功能代码分析(2)

Docker daemon初始化响应docker client swarm相关命令的处理函数位于api/server/router/swarm/cluster.go

// buildRouter is a router to talk with the build controller
type swarmRouter struct {
    backend Backend
    routes  []router.Route
}

// NewRouter initializes a new build router
func NewRouter(b Backend) router.Router {
    r := &swarmRouter{
        backend: b,
    }
    r.initRoutes()
    return r
}

// Routes returns the available routers to the swarm controller
func (sr *swarmRouter) Routes() []router.Route {
    return sr.routes
}

func (sr *swarmRouter) initRoutes() {
    sr.routes = []router.Route{
        router.NewPostRoute("/swarm/init", sr.initCluster),
        router.NewPostRoute("/swarm/join", sr.joinCluster),
        router.NewPostRoute("/swarm/leave", sr.leaveCluster),
        router.NewGetRoute("/swarm", sr.inspectCluster),
        router.NewPostRoute("/swarm/update", sr.updateCluster),
        router.NewGetRoute("/services", sr.getServices),
        router.NewGetRoute("/services/{id:.*}", sr.getService),
        router.NewPostRoute("/services/create", sr.createService),
        router.NewPostRoute("/services/{id:.*}/update", sr.updateService),
        router.NewDeleteRoute("/services/{id:.*}", sr.removeService),
        router.NewGetRoute("/nodes", sr.getNodes),
        router.NewGetRoute("/nodes/{id:.*}", sr.getNode),
        router.NewDeleteRoute("/nodes/{id:.*}", sr.removeNode),
        router.NewPostRoute("/nodes/{id:.*}/update", sr.updateNode),
        router.NewGetRoute("/tasks", sr.getTasks),
        router.NewGetRoute("/tasks/{id:.*}", sr.getTask),
    }
}

以处理“/swarm/init”请求为例,实际的处理函数位于daemon/cluster/cluster.go

// Init initializes new cluster from user provided request.
func (c *Cluster) Init(req types.InitRequest) (string, error) {
    c.Lock()
    if node := c.node; node != nil {
        if !req.ForceNewCluster {
            c.Unlock()
            return "", ErrSwarmExists
        }
        if err := c.stopNode(); err != nil {
            c.Unlock()
            return "", err
        }
    }

    if err := validateAndSanitizeInitRequest(&req); err != nil {
        c.Unlock()
        return "", err
    }

    listenHost, listenPort, err := resolveListenAddr(req.ListenAddr)
    if err != nil {
        c.Unlock()
        return "", err
    }

    advertiseHost, advertisePort, err := c.resolveAdvertiseAddr(req.AdvertiseAddr, listenPort)
    if err != nil {
        c.Unlock()
        return "", err
    }

    localAddr := listenHost

    // If the advertise address is not one of the system's
    // addresses, we also require a listen address.
    listenAddrIP := net.ParseIP(listenHost)
    if listenAddrIP != nil && listenAddrIP.IsUnspecified() {
        advertiseIP := net.ParseIP(advertiseHost)
        if advertiseIP == nil {
            // not an IP
            c.Unlock()
            return "", errMustSpecifyListenAddr
        }

        systemIPs := listSystemIPs()

        found := false
        for _, systemIP := range systemIPs {
            if systemIP.Equal(advertiseIP) {
                found = true
                break
            }
        }
        if !found {
            c.Unlock()
            return "", errMustSpecifyListenAddr
        }
        localAddr = advertiseIP.String()
    }

    // todo: check current state existing
    n, err := c.startNewNode(req.ForceNewCluster, localAddr, "", net.JoinHostPort(listenHost, listenPort), net.JoinHostPort(advertiseHost, advertisePort), "", "")
    if err != nil {
        c.Unlock()
        return "", err
    }
    c.Unlock()

    select {
    case <-n.Ready():
        if err := initClusterSpec(n, req.Spec); err != nil {
            return "", err
        }
        go c.reconnectOnFailure(n)
        return n.NodeID(), nil
    case <-n.done:
        c.RLock()
        defer c.RUnlock()
        if !req.ForceNewCluster { // if failure on first attempt don't keep state
            if err := c.clearState(); err != nil {
                return "", err
            }
        }
        return "", c.err
    }
}

它的核心是c.startNewNode函数,其主要逻辑如下:

......
n, err := swarmagent.NewNode(&swarmagent.NodeConfig{
    Hostname:           c.config.Name,
    ForceNewCluster:    forceNewCluster,
    ListenControlAPI:   filepath.Join(c.root, controlSocket),
    ListenRemoteAPI:    listenAddr,
    AdvertiseRemoteAPI: advertiseAddr,
    JoinAddr:           joinAddr,
    StateDir:           c.root,
    JoinToken:          joinToken,
    Executor:           container.NewExecutor(c.config.Backend),
    HeartbeatTick:      1,
    ElectionTick:       3,
})
if err != nil {
    return nil, err
}
ctx := context.Background()
if err := n.Start(ctx); err != nil {
    return nil, err
}
......

即调用swarmkitNewNode创建一个node,接下来便start这个节点。这个nodemanager角色。

docker笔记(14)——docker swarm功能代码分析(1)

Docker 1.12集成了docker swarm功能,其client的相关代码位于api/client/swarm文件夹下。以docker swarm init命令的代码(api/client/swarm/init.go)为例:

const (
    generatedSecretEntropyBytes = 16
    generatedSecretBase         = 36
    // floor(log(2^128-1, 36)) + 1
    maxGeneratedSecretLength = 25
)

type initOptions struct {
    swarmOptions
    listenAddr NodeAddrOption
    // Not a NodeAddrOption because it has no default port.
    advertiseAddr   string
    forceNewCluster bool
}

func newInitCommand(dockerCli *client.DockerCli) *cobra.Command {
    opts := initOptions{
        listenAddr: NewListenAddrOption(),
    }

    cmd := &cobra.Command{
        Use:   "init [OPTIONS]",
        Short: "Initialize a swarm",
        Args:  cli.NoArgs,
        RunE: func(cmd *cobra.Command, args []string) error {
            return runInit(dockerCli, cmd.Flags(), opts)
        },
    }

    flags := cmd.Flags()
    flags.Var(&opts.listenAddr, flagListenAddr, "Listen address (format: <ip|interface>[:port])")
    flags.StringVar(&opts.advertiseAddr, flagAdvertiseAddr, "", "Advertised address (format: <ip|interface>[:port])")
    flags.BoolVar(&opts.forceNewCluster, "force-new-cluster", false, "Force create a new cluster from current state.")
    addSwarmFlags(flags, &opts.swarmOptions)
    return cmd
}

func runInit(dockerCli *client.DockerCli, flags *pflag.FlagSet, opts initOptions) error {
    client := dockerCli.Client()
    ctx := context.Background()

    req := swarm.InitRequest{
        ListenAddr:      opts.listenAddr.String(),
        AdvertiseAddr:   opts.advertiseAddr,
        ForceNewCluster: opts.forceNewCluster,
        Spec:            opts.swarmOptions.ToSpec(),
    }

    nodeID, err := client.SwarmInit(ctx, req)
    if err != nil {
        if strings.Contains(err.Error(), "could not choose an IP address to advertise") || strings.Contains(err.Error(), "could not find the system's IP address") {
            return errors.New(err.Error() + " - specify one with --advertise-addr")
        }
        return err
    }

    fmt.Fprintf(dockerCli.Out(), "Swarm initialized: current node (%s) is now a manager.\n\n", nodeID)

    if err := printJoinCommand(ctx, dockerCli, nodeID, true, false); err != nil {
        return err
    }

    fmt.Fprint(dockerCli.Out(), "To add a manager to this swarm, run 'docker swarm join-token manager' and follow the instructions.\n\n")
    return nil
}

其中client.DockerCli代表docker command line client

// DockerCli represents the docker command line client.
// Instances of the client can be returned from NewDockerCli.
type DockerCli struct {
    // initializing closure
    init func() error

    // configFile has the client configuration file
    configFile *configfile.ConfigFile
    // in holds the input stream and closer (io.ReadCloser) for the client.
    in io.ReadCloser
    // out holds the output stream (io.Writer) for the client.
    out io.Writer
    // err holds the error stream (io.Writer) for the client.
    err io.Writer
    // keyFile holds the key file as a string.
    keyFile string
    // inFd holds the file descriptor of the client's STDIN (if valid).
    inFd uintptr
    // outFd holds file descriptor of the client's STDOUT (if valid).
    outFd uintptr
    // isTerminalIn indicates whether the client's STDIN is a TTY
    isTerminalIn bool
    // isTerminalOut indicates whether the client's STDOUT is a TTY
    isTerminalOut bool
    // client is the http client that performs all API operations
    client client.APIClient
    // state holds the terminal input state
    inState *term.State
    // outState holds the terminal output state
    outState *term.State
}

其中的client成员便是engine-api/client,所以上述client.SwarmInit的代码位于engine-api/client/swarm_init.go

// SwarmInit initializes the Swarm.
func (cli *Client) SwarmInit(ctx context.Context, req swarm.InitRequest) (string, error) {
    serverResp, err := cli.post(ctx, "/swarm/init", nil, req, nil)
    if err != nil {
        return "", err
    }

    var response string
    err = json.NewDecoder(serverResp.body).Decode(&response)
    ensureReaderClosed(serverResp)
    return response, err
}

docker笔记(13)——nvidia-docker简介

因为GPU属于特定的厂商产品,需要特定的driverDocker本身并不支持GPU。以前如果要在Docker中使用GPU,就需要在container中安装主机上使用GPUdriver,然后把主机上的GPU设备(例如:/dev/nvidia0)映射到container中。所以这样的Docker image并不具备可移植性。

Nvidia-docker项目就是为了解决这个问题,它让Docker image不需要知道底层GPU的相关信息,而是通过启动containermount设备和驱动文件来实现的。

从源码编译安装nvidia-docker(如果需要设置代理,请参考这个issue):

# go get -d github.com/NVIDIA/nvidia-docker
# cd $GOPATH/src/github.com/NVIDIA/nvidia-docker
# make install

其实从nvidia-dockermain函数来看:

func main() {
    args := os.Args[1:]
    defer exit()

    assert(LoadEnvironment())

    command, off, err := docker.ParseArgs(args)
    assert(err)

    if command != "create" && command != "run" {
        if command == "version" {
            fmt.Printf("NVIDIA Docker: %s\n\n", Version)
        }
        assert(docker.Docker(args...))
    }

    opt, i, err := docker.ParseArgs(args[off+1:], command)
    assert(err)
    off += i + 1

    if (command == "create" || command == "run") && opt != "" {
        vols, err := VolumesNeeded(opt)
        assert(err)

        if vols != nil {
            var nargs []string
            var err error

            if Host != nil {
                nargs, err = GenerateRemoteArgs(opt, vols)
            } else {
                assert(nvidia.LoadUVM())
                assert(nvidia.Init())
                nargs, err = GenerateLocalArgs(opt, vols)
                nvidia.Shutdown()
            }
            assert(err)
            args = append(args[:off], append(nargs, args[off:]...)...)
        }
    }

    assert(docker.Docker(args...))
}

除了createrun命令以外,其它的命令还是由本机的docker来处理。

此外,nvidia-docker还提供了使用plug-in模式(参考Internals):

$ curl -s http://localhost:3476/docker/cli --device=/dev/nvidiactl --device=/dev/nvidia-uvm --device=/dev/nvidia3 --device=/dev/nvidia2 --device=/dev/nvidia1 --device=/dev/nvidia0 --volume-driver=nvidia-docker --volume=nvidia_driver_361.48:/usr/local/nvidia:ro
$ docker run -ti --rm `curl -s http://localhost:3476/docker/cli` nvidia/cuda nvidia-smi

这种方式则无需使用nvidia-docker,而可以直接使用docker。然而这种方式不会检查imagenvidia driver是否兼容。

还有一种方式是使用Nvidia提供的用Go实现的package

参考资料:
Why NVIDIA Docker

Swarmkit笔记(11)——manager创建处理swarmctl请求的server

manager.localserver是创建的本地Unix socket,用来等待处理swarmctl发来的命令请求(源码在manager/controlapi目录)。Manager.Run()函数里localserver相关代码如下:

baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())
......

proxyOpts := []grpc.DialOption{
    grpc.WithBackoffMaxDelay(time.Second),
    grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}

cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs

......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)

......
api.RegisterControlServer(m.localserver, localProxyControlAPI)

(1)首先看一下controlapi.Servercontrolapi.NewServer()的定义:

// Server is the Cluster API gRPC server.
type Server struct {
    store  *store.MemoryStore
    raft   *raft.Node
    rootCA *ca.RootCA
}

// NewServer creates a Cluster API server.
func NewServer(store *store.MemoryStore, raft *raft.Node, rootCA *ca.RootCA) *Server {
    return &Server{
        store:  store,
        raft:   raft,
        rootCA: rootCA,
    }
}

controlapi.NewServer()函数就是用来创建一个响应swarmctl程序发出的control命令请求的server

其中store.MemoryStore是一个很重要的结构体:

// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
    // updateLock must be held during an update transaction.
    updateLock sync.Mutex

    memDB *memdb.MemDB
    queue *watch.Queue

    proposer state.Proposer
}

watch.Queue定义如下:

// Queue is the structure used to publish events and watch for them.
type Queue struct {
    broadcast *events.Broadcaster
}
......
// Watch returns a channel which will receive all items published to the
// queue from this point, until cancel is called.
func (q *Queue) Watch() (eventq chan events.Event, cancel func()) {
    return q.CallbackWatch(nil)
}
......
// Publish adds an item to the queue.
func (q *Queue) Publish(item events.Event) {
    q.broadcast.Write(item)
}

简单地讲,就是当Server.store发生变化时,把数据更新到memDB的同时,也要发送消息到queue里,这样manager监听在相应channelgoroutine就可以收到并处理请求。

下面代码就是把当前cluster的信息填充到新创建的controlapi.Server变量里:

baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())

(2)

proxyOpts := []grpc.DialOption{
    grpc.WithBackoffMaxDelay(time.Second),
    grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}

cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs

......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)

上述代码创建出一个raftProxyControlServer类型的变量:

type raftProxyControlServer struct {
    local        ControlServer
    connSelector *raftpicker.ConnSelector
    cluster      raftpicker.RaftCluster
    ctxMods      []func(context.Context) (context.Context, error)
}

localProxyControlAPI含义是如果收到swarmctl请求的managerleaderswarmctlmanager当然位于同一台机器上),则会处理请求,否则就转发给这个clusterleader

(3)

api.RegisterControlServer(m.localserver, localProxyControlAPI)

上述代码则是把localserver所对应的Unix socketraftProxyControlServer关联起来。

Swarmkit笔记(10)——manager处理session请求

Manager处理session请求是通过_Dispatcher_Session_Handler这个函数(./api/dispatcher.pb.go):

func _Dispatcher_Session_Handler(srv interface{}, stream grpc.ServerStream) error {
    m := new(SessionRequest)
    if err := stream.RecvMsg(m); err != nil {
        return err
    }
    return srv.(DispatcherServer).Session(m, &dispatcherSessionServer{stream})
}

实际函数调用栈如下:

0  0x0000000000b65cbf in github.com/docker/swarmkit/manager/dispatcher.(*Dispatcher).Session
   at /go/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go:768
1  0x0000000000782aa5 in github.com/docker/swarmkit/api.(*authenticatedWrapperDispatcherServer).Session
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:207
2  0x000000000078e505 in github.com/docker/swarmkit/api.(*raftProxyDispatcherServer).Session
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:1121
3  0x0000000000789c2a in github.com/docker/swarmkit/api._Dispatcher_Session_Handler
   at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:667
4  0x0000000000909646 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).processStreamingRPC
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:602
5  0x000000000090b002 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).handleStream
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:686
6  0x000000000090fcbe in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).serveStreams.func1.1
   at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:348
7  0x0000000000462bf0 in runtime.goexit
   at /usr/local/go/src/runtime/asm_amd64.s:1998

Dispatcher.Session()函数代码如下:

// Session is a stream which controls agent connection.
// Each message contains list of backup Managers with weights. Also there is
// a special boolean field Disconnect which if true indicates that node should
// reconnect to another Manager immediately.
func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
    ctx := stream.Context()
    nodeInfo, err := ca.RemoteNode(ctx)
    if err != nil {
        return err
    }
    nodeID := nodeInfo.NodeID

    if err := d.isRunningLocked(); err != nil {
        return err
    }

    // register the node.
    sessionID, err := d.register(stream.Context(), nodeID, r.Description)
    if err != nil {
        return err
    }

    fields := logrus.Fields{
        "node.id":      nodeID,
        "node.session": sessionID,
        "method":       "(*Dispatcher).Session",
    }
    if nodeInfo.ForwardedBy != nil {
        fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
    }
    log := log.G(ctx).WithFields(fields)

    var nodeObj *api.Node
    nodeUpdates, cancel, err := store.ViewAndWatch(d.store, func(readTx store.ReadTx) error {
        nodeObj = store.GetNode(readTx, nodeID)
        return nil
    }, state.EventUpdateNode{Node: &api.Node{ID: nodeID},
        Checks: []state.NodeCheckFunc{state.NodeCheckID}},
    )
    if cancel != nil {
        defer cancel()
    }

    if err != nil {
        log.WithError(err).Error("ViewAndWatch Node failed")
    }

    if _, err = d.nodes.GetWithSession(nodeID, sessionID); err != nil {
        return err
    }

    if err := stream.Send(&api.SessionMessage{
        SessionID:            sessionID,
        Node:                 nodeObj,
        Managers:             d.getManagers(),
        NetworkBootstrapKeys: d.networkBootstrapKeys,
    }); err != nil {
        return err
    }

    managerUpdates, mgrCancel := d.mgrQueue.Watch()
    defer mgrCancel()
    keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch()
    defer keyMgrCancel()

    // disconnectNode is a helper forcibly shutdown connection
    disconnectNode := func() error {
        // force disconnect by shutting down the stream.
        transportStream, ok := transport.StreamFromContext(stream.Context())
        if ok {
            // if we have the transport stream, we can signal a disconnect
            // in the client.
            if err := transportStream.ServerTransport().Close(); err != nil {
                log.WithError(err).Error("session end")
            }
        }

        nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"}
        if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
            log.WithError(err).Error("failed to remove node")
        }
        // still return an abort if the transport closure was ineffective.
        return grpc.Errorf(codes.Aborted, "node must disconnect")
    }

    for {
        // After each message send, we need to check the nodes sessionID hasn't
        // changed. If it has, we will the stream and make the node
        // re-register.
        node, err := d.nodes.GetWithSession(nodeID, sessionID)
        if err != nil {
            return err
        }

        var mgrs []*api.WeightedPeer

        var disconnect bool

        select {
        case ev := <-managerUpdates:
            mgrs = ev.([]*api.WeightedPeer)
        case ev := <-nodeUpdates:
            nodeObj = ev.(state.EventUpdateNode).Node
        case <-stream.Context().Done():
            return stream.Context().Err()
        case <-node.Disconnect:
            disconnect = true
        case <-d.ctx.Done():
            disconnect = true
        case <-keyMgrUpdates:
        }
        if mgrs == nil {
            mgrs = d.getManagers()
        }

        if err := stream.Send(&api.SessionMessage{
            SessionID:            sessionID,
            Node:                 nodeObj,
            Managers:             mgrs,
            NetworkBootstrapKeys: d.networkBootstrapKeys,
        }); err != nil {
            return err
        }
        if disconnect {
            return disconnectNode()
        }
    }
}

这个stream是处理agent连接的。前半部分是把连接的agent记录下来;后半部分是如果cluster信息发送变化,比如managerleader发生变化,需要通知agent重新连接。disconnectNode()函数则是需要同agent node断开连接时的处理:包括断开连接,agent node信息删除,等等。