Swarmkit笔记(3)——swarmd程序框架

agent.Node结构体有4channel,理解它们的作用就可以理解swarmd程序的框架:

// Node implements the primary node functionality for a member of a swarm
// cluster. Node handles workloads and may also run as a manager.
type Node struct {
    ......
    started              chan struct{}
    stopped              chan struct{}
    ready                chan struct{} // closed when agent has completed registration and manager(if enabled) is ready to receive control requests
    ......
    closed               chan struct{}
    ......
}

swarmd程序的框架(其中executor通过engine-addr得到,代表最终运行task的实体,实际是一个Docker engineapi.APIClient。其它参数都通过命令行直接得到。):

        ......
        // Create a context for our GRPC call
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()

        ......

        n, err := agent.NewNode(&agent.NodeConfig{
            Hostname:         hostname,
            ForceNewCluster:  forceNewCluster,
            ListenControlAPI: unix,
            ListenRemoteAPI:  addr,
            JoinAddr:         managerAddr,
            StateDir:         stateDir,
            JoinToken:        joinToken,
            ExternalCAs:      externalCAOpt.Value(),
            Executor:         executor,
            HeartbeatTick:    hb,
            ElectionTick:     election,
        })
        if err != nil {
            return err
        }

        if err := n.Start(ctx); err != nil {
            return err
        }

        c := make(chan os.Signal, 1)
        signal.Notify(c, os.Interrupt)
        go func() {
            <-c
            n.Stop(ctx)
        }()

        go func() {
            select {
            case <-n.Ready():
            case <-ctx.Done():
            }
            if ctx.Err() == nil {
                logrus.Info("node is ready")
            }
        }()

        return n.Err(ctx)

(1)

if err := n.Start(ctx); err != nil {
    return err
}

看一下Node.Start()函数的实现:

// Start starts a node instance.
func (n *Node) Start(ctx context.Context) error {
    select {
    case <-n.started:
        select {
        case <-n.closed:
            return n.err
        case <-n.stopped:
            return errAgentStopped
        case <-ctx.Done():
            return ctx.Err()
        default:
            return errAgentStarted
        }
    case <-ctx.Done():
        return ctx.Err()
    default:
    }

    close(n.started)
    go n.run(ctx)
    return nil
}

如果执行Node.Start()时没有任何异常发生,就会把Node.started这个channel关掉(close(n.started)),然后启动这个节点初始化过程:go n.run(ctx)

(2)

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
    <-c
    n.Stop(ctx)
}()

这段代码的含义是用户按Ctrl+C可以中断程序。Node.Stop()函数实现如下:

// Stop stops node execution
func (n *Node) Stop(ctx context.Context) error {
    select {
    case <-n.started:
        select {
        case <-n.closed:
            return n.err
        case <-n.stopped:
            select {
            case <-n.closed:
                return n.err
            case <-ctx.Done():
                return ctx.Err()
            }
        case <-ctx.Done():
            return ctx.Err()
        default:
            close(n.stopped)
            // recurse and wait for closure
            return n.Stop(ctx)
        }
    case <-ctx.Done():
        return ctx.Err()
    default:
        return errAgentNotStarted
    }
}

由于此时Node.started这个channel已经被关掉,所以会永远执行select的第一个case分支:case <-n.started。然后会根据当时的情况,再决定执行哪个分支。

(3)

go func() {
    select {
        case <-n.Ready():
        case <-ctx.Done():
    }
    if ctx.Err() == nil {
        logrus.Info("node is ready")
    }
}()

Node.Ready()函数会返回Node.ready这个channel

// Ready returns a channel that is closed after node's initialization has
// completes for the first time.
func (n *Node) Ready() <-chan struct{} {
    return n.ready
}

Node初始化完成后,Node.ready这个channel就会被关掉。因此如果一切顺利的话,就会看到“node is ready”的log

(4)

return n.Err(ctx)

Node.Err()函数的实现:

// Err returns the error that caused the node to shutdown or nil. Err blocks
// until the node has fully shut down.
func (n *Node) Err(ctx context.Context) error {
    select {
    case <-n.closed:
        return n.err
    case <-ctx.Done():
        return ctx.Err()
    }
}

Node.Err()函数阻塞在这里,等待Node关闭。

 

Go语言的context package

Go语言的context package可以把一组用来处理同一请求的函数和goroutine通过context.Context这个类型的变量关联起来,并提供了取消(cancelation)和超时(timeout)机制。个人觉得Sameer Ajmani的这篇文档:Cancelation, Context, and Plumbing写得很清晰,更容易让人理解context package

更新1:下列关于Context的定义选自Go Concurrency Patterns: Context

// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
    // Done returns a channel that is closed when this Context is canceled
    // or times out.
    Done() <-chan struct{}

    // Err indicates why this context was canceled, after the Done channel
    // is closed.
    Err() error

    // Deadline returns the time when this Context will be canceled, if any.
    Deadline() (deadline time.Time, ok bool)

    // Value returns the value associated with key or nil if none.
    Value(key interface{}) interface{}
}

四个函数定义如下:
a)Done函数返回一个只读channel,因此对其操作只有close。而这个channel只在Contextcanceltimeout的情况下才会被close
b)Err则是在Done channelclose后,用来获得close的原因,并返回一个non-nil的值:context canceledcontext deadline exceeded
c)Deadline返回Contextcancel的时间。如果okfalse,则表明没有设置deadline
d)Value返回同key绑定的值,如果没有相应的值,则返回nil

更新2:简单地分析一下源码:

// An emptyCtx is never canceled, has no values, and has no deadline.  It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}

func (*emptyCtx) Done() <-chan struct{} {
    return nil
}

func (*emptyCtx) Err() error {
    return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
    return nil
}

func (e *emptyCtx) String() string {
    switch e {
    case background:
        return "context.Background"
    case todo:
        return "context.TODO"
    }
    return "unknown empty Context"
}

var (
    background = new(emptyCtx)
    todo   = new(emptyCtx)
)

context.Background()context.TODO()所返回的其实都是一个指向emptyCtx类型变量的指针。

再看一下WithCancel()函数实现:

// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    c := newCancelCtx(parent)
    propagateCancel(parent, c)
    return c, func() { c.cancel(true, Canceled) }
}

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) *cancelCtx {
    return &cancelCtx{
        Context: parent,
        done:    make(chan struct{}),
    }
}

cancelCtx定义如下:

// A cancelCtx can be canceled.  When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
    Context

    done chan struct{} // closed by the first cancel call.

    mu       sync.Mutex
    children map[canceler]bool // set to nil by the first cancel call
    err      error             // set to non-nil by the first cancel call
}

每次调用WithCancel函数,新生成的Context会包含“父Context”以及一个新的done channel

propagateCancel()函数实现如下:

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
    if parent.Done() == nil {
        return // parent is never canceled
    }
    if p, ok := parentCancelCtx(parent); ok {
        p.mu.Lock()
        if p.err != nil {
            // parent has already been canceled
            child.cancel(false, p.err)
        } else {
            if p.children == nil {
                p.children = make(map[canceler]bool)
            }
            p.children[child] = true
        }
        p.mu.Unlock()
    } else {
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}

这个函数就是把新生成的Context加到父Contextchildren成员中,这样可以形成“级联”的cancel操作。

其它参考资料:
package context
Concurrent patterns in Golang: Context
Context and Cancellation of goroutines

Go并发编程模型:pipeline和cancellation

本文是Go Concurrency Patterns: Pipelines and cancellation的读书笔记:

(1)Pipeline定义:

What is a pipeline?

There’s no formal definition of a pipeline in Go; it’s just one of many kinds of concurrent programs. Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines

receive values from upstream via inbound channels
perform some function on that data, usually producing new values
send values downstream via outbound channels

Each stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, respectively. The first stage is sometimes called the source or producer; the last stage, the sink or consumer.

Go并发编程中,pipeline由一系列stage组成,而每个stage则由一组执行相同功能的goroutine组成,各个stage之间通过channel进行通信。goroutineinbound channels从上游stage接收消息,经过一番处理后,利用outbound channels向下游stage发送消息。每个stage可以包含若干个inboundoutbound channels,但是第一个stage只能有一个outbound channel,最后一个stage只能有一个inbound channel

(2)一个基本的pipeline例子:

package main

import "fmt"

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

执行结果:

4
9

gen()是第一个stage并产生一个outbound channelsq()是第二个stage,从gen()outbound channel接受输入,并产生一个新的outbound channel,把每个数的平方发送到这个outbound channel中。main()是最后一个stage,接收sq()outbound channel的输入,并把结果打印出来。

(3)fan-outfan-in
Fan-outfan-in的定义:

Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.

A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed. This is called fan-in.

多个函数从同一个channel读取叫fan-out;一个函数从多个channel读取并把处理结果发送到一个channel中,称之为fan-in。举例如下:

package main

import (
    "fmt"
    "sync"
)

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

以下代码称之为fan-out

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)

merge函数的工作即为fan-in

(3)通知上游stage取消发送操作。例子如下:

package main

import (
    "fmt"
    "sync"
)

func gen(done <-chan struct{}, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n:
            case <- done:
                return
            }
        }
    }()
    return out
}

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }

    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

func main() {
    // Set up a done channel that's shared by the whole pipeline,
    // and close that channel when this pipeline exits, as a signal
    // for all the goroutines we started to exit.
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9
}

构建一个done channel,调用close函数关闭done channel,用来通知上游stage停止发送。

(4)Pipeline构建的原则:

Here are the guidelines for pipeline construction:

stages close their outbound channels when all the send operations are done.
stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked.

Pipelines unblock senders either by ensuring there’s enough buffer for all the values that are sent or by explicitly signalling senders when the receiver may abandon the channel.

本文是Go Concurrency Patterns: Pipelines and cancellation的读书笔记:

(1)Pipeline定义:

What is a pipeline?

There’s no formal definition of a pipeline in Go; it’s just one of many kinds of concurrent programs. Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines

receive values from upstream via inbound channels
perform some function on that data, usually producing new values
send values downstream via outbound channels

Each stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, respectively. The first stage is sometimes called the source or producer; the last stage, the sink or consumer.

Go并发编程中,pipeline由一系列stage组成,而每个stage则由一组执行相同功能的goroutine组成,各个stage之间通过channel进行通信。goroutineinbound channels从上游stage接收消息,经过一番处理后,利用outbound channels向下游stage发送消息。每个stage可以包含若干个inboundoutbound channels,但是第一个stage只能有一个outbound channel,最后一个stage只能有一个inbound channel

(2)一个基本的pipeline例子:

package main

import "fmt"

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

执行结果:

4
9

gen()是第一个stage并产生一个outbound channelsq()是第二个stage,从gen()outbound channel接受输入,并产生一个新的outbound channel,把每个数的平方发送到这个outbound channel中。main()是最后一个stage,接收sq()outbound channel的输入,并把结果打印出来。

(3)fan-outfan-in
Fan-outfan-in的定义:

Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.

A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed. This is called fan-in.

多个函数从同一个channel读取叫fan-out;一个函数从多个channel读取并把处理结果发送到一个channel中,称之为fan-in。举例如下:

package main

import (
    "fmt"
    "sync"
)

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

以下代码称之为fan-out

// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)

merge函数的工作即为fan-in

(3)通知上游stage取消发送操作。例子如下:

package main

import (
    "fmt"
    "sync"
)

func gen(done <-chan struct{}, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n:
            case <- done:
                return
            }
        }
    }()
    return out
}

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }

    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

func main() {
    // Set up a done channel that's shared by the whole pipeline,
    // and close that channel when this pipeline exits, as a signal
    // for all the goroutines we started to exit.
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9
}

构建一个done channel,调用close函数关闭done channel,用来通知上游stage停止发送。

(4)Pipeline构建的原则:

Here are the guidelines for pipeline construction:

stages close their outbound channels when all the send operations are done.
stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked.

Pipelines unblock senders either by ensuring there’s enough buffer for all the values that are sent or by explicitly signalling senders when the receiver may abandon the channel.

 

Docker Swarm代码分析笔记(17)——event_monitor.go

Engine结构体有一个eventsMonitor成员:

type Engine struct {
    ......
    eventsMonitor   *EventsMonitor
}

EventsMonitor结构体定义如下:

//EventsMonitor monitors events
type EventsMonitor struct {
    stopChan chan struct{}
    cli      client.APIClient
    handler  func(msg events.Message) error
}

stopChan用来通知停止接受消息;cli是底层连接的client,而handler则是收到event的处理函数。

Engine.ConnectWithClient方法会给eventsMonitor成员赋值:

// ConnectWithClient is exported
func (e *Engine) ConnectWithClient(client dockerclient.Client, apiClient engineapi.APIClient) error {
    e.client = client
    e.apiClient = apiClient
    e.eventsMonitor = NewEventsMonitor(e.apiClient, e.handler)

    // Fetch the engine labels.
    if err := e.updateSpecs(); err != nil {
        return err
    }

    e.StartMonitorEvents()

    // Force a state update before returning.
    if err := e.RefreshContainers(true); err != nil {
        return err
    }

    if err := e.RefreshImages(); err != nil {
        return err
    }

    // Do not check error as older daemon does't support this call.
    e.RefreshVolumes()
    e.RefreshNetworks()

    e.emitEvent("engine_connect")

    return nil
}

其中Engine.StartMonitorEvents代码如下:

// StartMonitorEvents monitors events from the engine
func (e *Engine) StartMonitorEvents() {
    log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Debug("Start monitoring events")
    ec := make(chan error)
    e.eventsMonitor.Start(ec)

    go func() {
        if err := <-ec; err != nil {
            if !strings.Contains(err.Error(), "EOF") {
                // failing node reconnect should use back-off strategy
                <-e.refreshDelayer.Wait(e.getFailureCount())
            }
            e.StartMonitorEvents()
        }
        close(ec)
    }()
}

Engine.StartMonitorEvents就是如果从ec channel收取消息,如果是错误,就不断地循环启动Engine.StartMonitorEvents

EventsMonitor.Start函数代码如下:

// Start starts the EventsMonitor
func (em *EventsMonitor) Start(ec chan error) {
    em.stopChan = make(chan struct{})

    responseBody, err := em.cli.Events(context.Background(), types.EventsOptions{})
    if err != nil {
        ec <- err
        return
    }

    resultChan := make(chan decodingResult)

    go func() {
        dec := json.NewDecoder(responseBody)
        for {
            var result decodingResult
            result.err = dec.Decode(&result.msg)
            resultChan <- result
            if result.err == io.EOF {
                break
            }
        }
        close(resultChan)
    }()

    go func() {
        defer responseBody.Close()
        for {
            select {
            case <-em.stopChan:
                ec <- nil
                return
            case result := <-resultChan:
                if result.err != nil {
                    ec <- result.err
                    return
                }
                if err := em.handler(result.msg); err != nil {
                    ec <- err
                    return
                }
            }
        }
    }()
}

代码逻辑实际就是发出“HTTP GET /events”请求,然后等待Docker Engine的响应。因为这个HTTP请求很可能会阻塞在这里,因此随后的HTTP消息交互就会重新建立一个HTTP连接。原理在这里

type Response struct {
    ......

    // Body represents the response body.
    //
    // The http Client and Transport guarantee that Body is always
    // non-nil, even on responses without a body or responses with
    // a zero-length body. It is the caller's responsibility to
    // close Body. The default HTTP client's Transport does not
    // attempt to reuse HTTP/1.0 or HTTP/1.1 TCP connections
    // ("keep-alive") unless the Body is read to completion and is
    // closed.
    //
    // The Body is automatically dechunked if the server replied
    // with a "chunked" Transfer-Encoding.
    Body io.ReadCloser

    ......
}

如果想停止这个EventsMonitor,可以使用Engine.Stop方法:

// Stop stops the EventsMonitor
func (em *EventsMonitor) Stop() {
    if em.stopChan == nil {
        return
    }
    close(em.stopChan)
}

 

Go语言DefaultClient没有设置请求超时

今天看到这篇文章:Don’t use Go’s default HTTP client。总结起来就是直接使用Go语言的http.Posthttp.Get等方法时,底层连接使用的是DefaultClient。而DefaultClient没有设置请求超时:

// DefaultClient is the default Client and is used by Get, Head, and Post.
var DefaultClient = &Client{}

因此,如果服务器端如果一直无响应的话,就会把当前发出请求的goroutine挂死。因此如果要使用DefaultClient,一定要留心这个陷阱。

Docker Swarm代码分析笔记(16)——Node结构体

Docker Swarmscheduler会选择符合要求的node来创建container

candidates, err := s.selectNodesForContainer(nodes, config, true)

node定义在scheduler/node/node.go

// Node is an abstract type used by the scheduler.
type Node struct {
    ID         string
    IP         string
    Addr       string
    Name       string
    Labels     map[string]string
    Containers cluster.Containers
    Images     []*cluster.Image

    UsedMemory  int64
    UsedCpus    int64
    TotalMemory int64
    TotalCpus   int64

    HealthIndicator int64
}

Cluster.listNodes方法实现如下:

// listNodes returns all validated engines in the cluster, excluding pendingEngines.
func (c *Cluster) listNodes() []*node.Node {
    c.RLock()
    defer c.RUnlock()

    out := make([]*node.Node, 0, len(c.engines))
    for _, e := range c.engines {
        node := node.NewNode(e)
        for _, pc := range c.pendingContainers {
            if pc.Engine.ID == e.ID && node.Container(pc.Config.SwarmID()) == nil {
                node.AddContainer(pc.ToContainer())
            }
        }
        out = append(out, node)
    }

    return out
}

其实就是从Cluster.engines构建node列表(因为Cluster.pendingEngines还处在待定状态)。后续scheduler就会从这个node列表中选择合适的node

Docker Swarm代码分析笔记(15)——scheduler

Docker Swarm manage命令的scheduler是通过filterstrategy构建的(cli/manage.go):

sched := scheduler.New(s, fs)

scheduler实际的功能就是选择符合cluster.ContainerConfig要求的nodeDocker Engine)列表:

// SelectNodesForContainer will return a list of nodes where the container can
// be scheduled, sorted by order or preference.
func (s *Scheduler) SelectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig) ([]*node.Node, error) {
    candidates, err := s.selectNodesForContainer(nodes, config, true)

    if err != nil {
        candidates, err = s.selectNodesForContainer(nodes, config, false)
    }
    return candidates, err
}

func (s *Scheduler) selectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig, soft bool) ([]*node.Node, error) {
    accepted, err := filter.ApplyFilters(s.filters, config, nodes, soft)
    if err != nil {
        return nil, err
    }

    if len(accepted) == 0 {
        return nil, errNoNodeAvailable
    }

    return s.strategy.RankAndSort(config, accepted)
}

 

Docker Swarm代码分析笔记(14)——strategy

Docker Swarm manage命令的flStrategy选项用来指定scheduler调度的strategy的过滤项,其变量定义如下(cli/flags.go):

flStrategy = cli.StringFlag{
    Name:  "strategy",
    Usage: "placement strategy to use [" + strings.Join(strategy.List(), ", ") + "]",
    Value: strategy.List()[0],
}

strategy的默认值是SpreadPlacementStrategy

strategy在代码中的实际定义是PlacementStrategy,一个interface

// PlacementStrategy is the interface for a container placement strategy.
type PlacementStrategy interface {
    // Name of the strategy
    Name() string
    // Initialize performs any initial configuration required by the strategy and returns
    // an error if one is encountered.
    // If no initial configuration is needed, this may be a no-op and return a nil error.
    Initialize() error
    // RankAndSort applies the strategy to a list of nodes and ranks them based
    // on the best fit given the container configuration.  It returns a sorted
    // list of nodes (based on their ranks) or an error if there is no
    // available node on which to schedule the container.
    RankAndSort(config *cluster.ContainerConfig, nodes []*node.Node) ([]*node.Node, error)
}

其中RankAndSort返回一个符合条件node(也就是Docker Engine)列表,列表中元素按匹配度排序。

Docker Swarm代码分析笔记(13)——filter

Docker Swarm manage命令的filter选项用来指定scheduler选择Docker Engine的过滤项,其变量定义如下(cli/flags.go):

// hack for go vet
flFilterValue = cli.StringSlice(filter.List())
// DefaultFilterNumber is exported
DefaultFilterNumber = len(flFilterValue)

flFilter = cli.StringSliceFlag{
    Name:  "filter, f",
    Usage: "filter to use [" + strings.Join(filter.List(), ", ") + "]",
    Value: &flFilterValue,
}

获得filter值的代码如下(cli/manage.go):

// see https://github.com/codegangsta/cli/issues/160
names := c.StringSlice("filter")
if c.IsSet("filter") || c.IsSet("f") {
    names = names[DefaultFilterNumber:]
}
fs, err := filter.New(names)
if err != nil {
    log.Fatal(err)
}

默认情况下,获得所有的filterscheduler/filter/filter.go):

func init() {
    filters = []Filter{
        &HealthFilter{},
        &PortFilter{},
        &SlotsFilter{},
        &DependencyFilter{},
        &AffinityFilter{},
        &ConstraintFilter{},
    }
}

Filter是一个interfacescheduler/filter/filter.go):

// Filter is exported
type Filter interface {
    Name() string

    // Return a subset of nodes that were accepted by the filtering policy.
    Filter(*cluster.ContainerConfig, []*node.Node, bool) ([]*node.Node, error)

    // Return a list of constraints/filters provided
    GetFilters(*cluster.ContainerConfig) ([]string, error)
}

其中Filter方法用来过滤符合条件的Docker Engine,而GetFilters则返回一个描述过滤条件的字符串。

scheduler/filter/filter.go中,ApplyFilters用来过滤Docker EnginelistAllFilters返回所有的过滤条件:

// ApplyFilters applies a set of filters in batch.
func ApplyFilters(filters []Filter, config *cluster.ContainerConfig, nodes []*node.Node, soft bool) ([]*node.Node, error) {
    var (
        err        error
        candidates = nodes
    )

    for _, filter := range filters {
        candidates, err = filter.Filter(config, candidates, soft)
        if err != nil {
            // special case for when no healthy nodes are found
            if filter.Name() == "health" {
                return nil, err
            }
            return nil, fmt.Errorf("Unable to find a node that satisfies the following conditions %s", listAllFilters(filters, config, filter.Name()))
        }
    }
    return candidates, nil
}

// listAllFilters creates a string containing all applied filters
func listAllFilters(filters []Filter, config *cluster.ContainerConfig, lastFilter string) string {
    allFilters := ""
    for _, filter := range filters {
        list, err := filter.GetFilters(config)
        if err == nil && len(list) > 0 {
            allFilters = fmt.Sprintf("%s\n%v", allFilters, list)
        }
        if filter.Name() == lastFilter {
            return allFilters
        }
    }
    return allFilters
}

 

Docker Swarm代码分析笔记(12)——container相关的config

创建和更新container时需要涉及到containerconfigcluster\config.go):

// ContainerConfig is exported
// TODO store affinities and constraints in their own fields
type ContainerConfig struct {
    container.Config
    HostConfig       container.HostConfig
    NetworkingConfig network.NetworkingConfig
}

// OldContainerConfig contains additional fields for backward compatibility
// This should be removed after we stop supporting API versions <= 1.8
type OldContainerConfig struct {
    ContainerConfig
    Memory     int64
    MemorySwap int64
    CPUShares  int64  `json:"CpuShares"`
    CPUSet     string `json:"Cpuset"`
}

其中container这个package定义在/vendor/github.com/docker/engine-api/types/container/config.gocontainer.Config包含不依赖于hostcontainerconfig;依赖于hostconfig定义在container.HostConfig;而container网络相关的config保存在network.NetworkingConfig