Docker Swarm代码分析笔记(7)——创建Docker API server

Docker Swarm manage函数的最后部分是创建Docker API server部分:

server := api.NewServer(hosts, tlsConfig)
if c.Bool("replication") {
    ......
    setupReplication(c, cl, server, discovery, addr, leaderTTL, tlsConfig)
} else {
    server.SetHandler(api.NewPrimary(cl, tlsConfig, &statusHandler{cl, nil, nil}, c.GlobalBool("debug"), c.Bool("cors")))
    cluster.NewWatchdog(cl)
}

log.Fatal(server.ListenAndServe())

Server结构体定义在api/server.go

type Server struct {
    hosts      []string
    tlsConfig  *tls.Config
    dispatcher *dispatcher
}

它的核心方法是ListenAndServe(),即为每个host起一个goroutine监听并处理Docker client的连接请求。

func (s *Server) ListenAndServe() error {
    chErrors := make(chan error, len(s.hosts))

    for _, host := range s.hosts {
        protoAddrParts := strings.SplitN(host, "://", 2)
        if len(protoAddrParts) == 1 {
            protoAddrParts = append([]string{"tcp"}, protoAddrParts...)
        }

        go func() {
            log.WithFields(log.Fields{"proto": protoAddrParts[0], "addr": protoAddrParts[1]}).Info("Listening for HTTP")

            var (
                l      net.Listener
                err    error
                server = &http.Server{
                    Addr:    protoAddrParts[1],
                    Handler: s.dispatcher,
                }
            )

            switch protoAddrParts[0] {
            case "unix":
                l, err = newUnixListener(protoAddrParts[1], s.tlsConfig)
            case "tcp":
                l, err = newListener("tcp", protoAddrParts[1], s.tlsConfig)
            default:
                err = fmt.Errorf("unsupported protocol: %q", protoAddrParts[0])
            }
            if err != nil {
                chErrors <- err
            } else {
                chErrors <- server.Serve(l)
            }

        }()
    }

    for i := 0; i < len(s.hosts); i++ {
        err := <-chErrors
        if err != nil {
            return err
        }
    }
    return nil
}

 

Docker Swarm代码分析笔记(6)——swarm driver的NewCluster函数

Swarm driverNewCluster函数的核心功能如下(cluster/swarm/cluster.go):

discoveryCh, errCh := cluster.discovery.Watch(nil)
go cluster.monitorDiscovery(discoveryCh, errCh)
go cluster.monitorPendingEngines()

cluster.discovery.Watch的定义如下:

Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error)

返回两个channel:第一个channel类型是Entriestype Entries []*Entry),用来传输cluster所包含的主机信息,第二个channel用来通知是否有错误发生。

Cluster.monitorDiscovery功能是整理Cluster.engines

select {
    case entries := <-ch:
        added, removed := currentEntries.Diff(entries)
        currentEntries = entries

        // Remove engines first. `addEngine` will refuse to add an engine
        // if there's already an engine with the same ID.  If an engine
        // changes address, we have to first remove it then add it back.
        for _, entry := range removed {
            c.removeEngine(entry.String())
        }

        for _, entry := range added {
            c.addEngine(entry.String())
        }
    case err := <-errCh:
        log.Errorf("Discovery error: %v", err)
    }

Cluster.monitorPendingEngines则是验证处于pending状态的Cluster.engines现在是否能够连接上:

for _, e := range pEngines {
        if e.TimeToValidate() {
            go c.validatePendingEngine(e)
        }
    }

 

Docker Swarm代码分析笔记(5)——cluster/cluster.go

cluster\cluster.go文件定义了Cluster interface

// Cluster is exported
type Cluster interface {
    // Create a container
    CreateContainer(config *ContainerConfig, name string, authConfig *types.AuthConfig) (*Container, error)

    // Remove a container
    RemoveContainer(container *Container, force, volumes bool) error

    // Return all images
    Images() Images
    ......
}

目前实现了mesoscluster/mesos/cluster.go,目前仍然处于试验阶段)和swarmcluster/swarm/cluster.go)两种driver。如果你想实现自己的driver,就要实现上面Cluster interface的所有函数。

 

Docker Swarm代码分析笔记(4)——swarm join

Docker Swarmjoin命令的定义:

    {
        Name:      "join",
        ShortName: "j",
        Usage:     "Join a docker cluster",
        Flags:     []cli.Flag{flJoinAdvertise, flHeartBeat, flTTL, flJoinRandomDelay, flDiscoveryOpt},
        Action:    join,
    },

flHeartBeat的默认值是60s,而flTTL默认值是180s:

flHeartBeat = cli.StringFlag{
    Name:  "heartbeat",
    Value: "60s",
    Usage: "period between each heartbeat",
}
flTTL = cli.StringFlag{
    Name:  "ttl",
    Value: "180s",
    Usage: "set the expiration of an ephemeral node",
}

join函数的核心代码:

......
for {
    log.WithFields(log.Fields{"addr": addr, "discovery": dflag}).Infof("Registering on the discovery service every %s...", hb)
    if err := d.Register(addr); err != nil {
        log.Error(err)
    }
    time.Sleep(hb)
}
......

token.Register函数实现:

func (s *Discovery) Register(addr string) error {
    buf := strings.NewReader(addr)

    resp, err := http.Post(fmt.Sprintf("%s/%s/%s?ttl=%d", s.url,
        "clusters", s.token, uint64(s.ttl.Seconds())), "application/json", buf)

    if err != nil {
        return err
    }

    resp.Body.Close()
    return nil
}

join命令其实就是每隔heartbeat时间(例如,60s),向https://discovery.hub.docker.com/v1/clusters/token/ttl=180ttl取默认值),注册一下当前Docker的地址(IP:PORT)。

 

Docker Swarm代码分析笔记(3)——swarm create

Docker Swarmcreate命令代码很简单:

func create(c *cli.Context) {
    if len(c.Args()) != 0 {
        log.Fatalf("the `create` command takes no arguments. See '%s create --help'.", c.App.Name)
    }
    discovery := &token.Discovery{}
    discovery.Initialize("", 0, 0, nil)
    token, err := discovery.CreateCluster()
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(token)
}

token.CreateCluster()函数的实现:

// CreateCluster returns a unique cluster token
func (s *Discovery) CreateCluster() (string, error) {
    resp, err := http.Post(fmt.Sprintf("%s/%s", s.url, "clusters"), "", nil)
    if err != nil {
        return "", err
    }

    defer resp.Body.Close()
    token, err := ioutil.ReadAll(resp.Body)
    return string(token), err
}

其实就是向https://discovery.hub.docker.com/v1/clusters发送一个https post请求,然后得到唯一的一个token,利用这个token来创建和管理cluster

 

Docker Swarm代码分析笔记(2)——discovery

Dockerdiscovery package提供了实现新的backend定义的interfacediscovey.go):

package discovery

import (
    "errors"
    "time"
)

var (
    // ErrNotSupported is returned when a discovery service is not supported.
    ErrNotSupported = errors.New("discovery service not supported")

    // ErrNotImplemented is returned when discovery feature is not implemented
    // by discovery backend.
    ErrNotImplemented = errors.New("not implemented in this discovery service")
)

// Watcher provides watching over a cluster for nodes joining and leaving.
type Watcher interface {
    // Watch the discovery for entry changes.
    // Returns a channel that will receive changes or an error.
    // Providing a non-nil stopCh can be used to stop watching.
    Watch(stopCh <-chan struct{}) (<-chan Entries, <-chan error)
}

// Backend is implemented by discovery backends which manage cluster entries.
type Backend interface {
    // Watcher must be provided by every backend.
    Watcher

    // Initialize the discovery with URIs, a heartbeat, a ttl and optional settings.
    Initialize(string, time.Duration, time.Duration, map[string]string) error

    // Register to the discovery.
    Register(string) error
}

Watcher interface用于node加入或移除出cluster时的回调函数。Initialize用于backend的初始化,而Register用于将node加入cluster

 

Docker Swarm代码分析笔记(1)——main.go

main.go代码如下:

package main

import (
    _ "github.com/docker/docker/pkg/discovery/file"
    _ "github.com/docker/docker/pkg/discovery/kv"
    _ "github.com/docker/docker/pkg/discovery/nodes"
    _ "github.com/docker/swarm/discovery/token"

    "github.com/docker/swarm/cli"
)

func main() {
    cli.Run()
}

以下4行代码保证相应packageinit函数可以执行:

_ "github.com/docker/docker/pkg/discovery/file"
_ "github.com/docker/docker/pkg/discovery/kv"
_ "github.com/docker/docker/pkg/discovery/nodes"
_ "github.com/docker/swarm/discovery/token"

只有token这个packageswarm新实现的,其余filekvnodes都是docker现有的。

cli package利用github.com/codegangsta/cli这个project构建CLI程序。而cli.Run()函数则会执行swarm具体的commandcreatemanage等等)。

Linux kernel 笔记 (62)——list_head

双向链表是Linux kernel中常用的数据结构,定义如下:

struct list_head {
    struct list_head *next, *prev;
};

#define LIST_HEAD_INIT(name) { &(name), &(name) }

#define LIST_HEAD(name) \
    struct list_head name = LIST_HEAD_INIT(name)

static inline void INIT_LIST_HEAD(struct list_head *list)
{
    list->next = list;
    list->prev = list;
}
...

下图选自plka

Capture

从上图可以看出,定义链表需要一个头结点,通过头结点继而可以完成插入,删除元素等操作。来看一个例子(list.c):

struct list_head {
        struct list_head *next, *prev;
};

#define LIST_HEAD_INIT(name) { &(name), &(name) }

#define LIST_HEAD(name) \
        struct list_head name = LIST_HEAD_INIT(name)


int main(void) {
        LIST_HEAD(dev_list);
        return 0;
}

检查gcc预处理的输出:

# gcc -E -P list.c
struct list_head {
 struct list_head *next, *prev;
};
int main(void) {
 struct list_head dev_list = { &(dev_list), &(dev_list) };
 return 0;
}

可以看到,头结点dev_listprevnext都指向了自己。下面代码达到同样的效果:

struct list_head {
    struct list_head *next, *prev;
};

static inline void INIT_LIST_HEAD(struct list_head *list)
{
    list->next = list;
    list->prev = list;
}

int main(void) {
    struct list_head dev_list;
    INIT_LIST_HEAD(&dev_list);
    return 0;
}

 

Delve代码分析笔记(5)——debug命令

dlv debug命令会编译并且调试一个package,其代码如下:

func debugCmd(cmd *cobra.Command, args []string) {
    status := func() int {
        var pkg string
        dlvArgs, targetArgs := splitArgs(cmd, args)

        if len(dlvArgs) > 0 {
            pkg = args[0]
        }
        err := gobuild(debugname, pkg)
        if err != nil {
            fmt.Fprintf(os.Stderr, "%v\n", err)
            return 1
        }
        fp, err := filepath.Abs("./" + debugname)
        if err != nil {
            fmt.Fprintf(os.Stderr, "%v\n", err)
            return 1
        }
        defer os.Remove(fp)

        processArgs := append([]string{"./" + debugname}, targetArgs...)
        return execute(0, processArgs, conf)
    }()
    os.Exit(status)
}

其中gobuild函数实现如下:

func gobuild(debugname, pkg string) error {
    args := []string{"-gcflags", "-N -l", "-o", debugname}
    if BuildFlags != "" {
        args = append(args, BuildFlags)
    }
    args = append(args, pkg)
    return gocommand("build", args...)
}

dlv debug命令其实就是在当前目录下临时编译生成一个没有代码优化的可执行文件,文件名是debug。接下来就是调用execute函数对这个debug文件进行调试。dlv程序退出后,会删除这个文件:defer os.Remove(fp)

 

Delve代码分析笔记(4)——构建command tree

Delve使用cobra来构造command tree。先看root command,也就是dlv

func New() *cobra.Command {
    ......
    // Main dlv root command.
    RootCommand = &cobra.Command{
        Use:   "dlv",
        Short: "Delve is a debugger for the Go programming language.",
        Long:  dlvCommandLongDesc,
    }

    RootCommand.PersistentFlags().StringVarP(&Addr, "listen", "l", "localhost:0", "Debugging server listen address.")
    RootCommand.PersistentFlags().BoolVarP(&Log, "log", "", false, "Enable debugging server logging.")
    RootCommand.PersistentFlags().BoolVarP(&Headless, "headless", "", false, "Run debug server only, in headless mode.")
    RootCommand.PersistentFlags().BoolVarP(&AcceptMulti, "accept-multiclient", "", false, "Allows a headless server to accept multiple client connections. Note that the server API is not reentrant and clients will have to coordinate")
    RootCommand.PersistentFlags().IntVar(&ApiVersion, "api-version", 1, "Selects API version when headless")
    RootCommand.PersistentFlags().StringVar(&InitFile, "init", "", "Init file, executed by the terminal client.")
    RootCommand.PersistentFlags().StringVar(&BuildFlags, "build-flags", buildFlagsDefault, "Build flags, to be passed to the compiler.")
    ......
}

因为dlv command没有实现run函数,所以单独运行dlv命令只会打印cobra帮忙生成的默认输出:

# dlv
Delve is a source level debugger for Go programs.

......

Usage:
  dlv [command]

Available Commands:
  version     Prints version.
  ......
Flags:
      --accept-multiclient[=false]: Allows a headless server to accept multiple client connections. Note that the server API is not reentrant and clients will have to coordinate
    ......

依次为Long descriptionUsageAvailable Commands等等。

再以trace subcommand为例,看如何把subcommand加到root command里:

......
// 'trace' subcommand.
traceCommand := &cobra.Command{
    Use:   "trace [package] regexp",
    Short: "Compile and begin tracing program.",
    Long:  "Trace program execution. Will set a tracepoint on every function matching the provided regular expression and output information when tracepoint is hit.",
    Run:   traceCmd,
}
traceCommand.Flags().IntVarP(&traceAttachPid, "pid", "p", 0, "Pid to attach to.")
traceCommand.Flags().IntVarP(&traceStackDepth, "stack", "s", 0, "Show stack trace with given depth.")
RootCommand.AddCommand(traceCommand)
......

Cobra提供两种flags
a)Persistent Flags:对当前命令及其子命令都有效;
b)Local Flags:只对当前命令有效。