Docker Swarm代码分析笔记(15)——scheduler

Docker Swarm manage命令的scheduler是通过filterstrategy构建的(cli/manage.go):

sched := scheduler.New(s, fs)

scheduler实际的功能就是选择符合cluster.ContainerConfig要求的nodeDocker Engine)列表:

// SelectNodesForContainer will return a list of nodes where the container can
// be scheduled, sorted by order or preference.
func (s *Scheduler) SelectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig) ([]*node.Node, error) {
    candidates, err := s.selectNodesForContainer(nodes, config, true)

    if err != nil {
        candidates, err = s.selectNodesForContainer(nodes, config, false)
    }
    return candidates, err
}

func (s *Scheduler) selectNodesForContainer(nodes []*node.Node, config *cluster.ContainerConfig, soft bool) ([]*node.Node, error) {
    accepted, err := filter.ApplyFilters(s.filters, config, nodes, soft)
    if err != nil {
        return nil, err
    }

    if len(accepted) == 0 {
        return nil, errNoNodeAvailable
    }

    return s.strategy.RankAndSort(config, accepted)
}

 

Docker Swarm代码分析笔记(14)——strategy

Docker Swarm manage命令的flStrategy选项用来指定scheduler调度的strategy的过滤项,其变量定义如下(cli/flags.go):

flStrategy = cli.StringFlag{
    Name:  "strategy",
    Usage: "placement strategy to use [" + strings.Join(strategy.List(), ", ") + "]",
    Value: strategy.List()[0],
}

strategy的默认值是SpreadPlacementStrategy

strategy在代码中的实际定义是PlacementStrategy,一个interface

// PlacementStrategy is the interface for a container placement strategy.
type PlacementStrategy interface {
    // Name of the strategy
    Name() string
    // Initialize performs any initial configuration required by the strategy and returns
    // an error if one is encountered.
    // If no initial configuration is needed, this may be a no-op and return a nil error.
    Initialize() error
    // RankAndSort applies the strategy to a list of nodes and ranks them based
    // on the best fit given the container configuration.  It returns a sorted
    // list of nodes (based on their ranks) or an error if there is no
    // available node on which to schedule the container.
    RankAndSort(config *cluster.ContainerConfig, nodes []*node.Node) ([]*node.Node, error)
}

其中RankAndSort返回一个符合条件node(也就是Docker Engine)列表,列表中元素按匹配度排序。

Docker Swarm代码分析笔记(13)——filter

Docker Swarm manage命令的filter选项用来指定scheduler选择Docker Engine的过滤项,其变量定义如下(cli/flags.go):

// hack for go vet
flFilterValue = cli.StringSlice(filter.List())
// DefaultFilterNumber is exported
DefaultFilterNumber = len(flFilterValue)

flFilter = cli.StringSliceFlag{
    Name:  "filter, f",
    Usage: "filter to use [" + strings.Join(filter.List(), ", ") + "]",
    Value: &flFilterValue,
}

获得filter值的代码如下(cli/manage.go):

// see https://github.com/codegangsta/cli/issues/160
names := c.StringSlice("filter")
if c.IsSet("filter") || c.IsSet("f") {
    names = names[DefaultFilterNumber:]
}
fs, err := filter.New(names)
if err != nil {
    log.Fatal(err)
}

默认情况下,获得所有的filterscheduler/filter/filter.go):

func init() {
    filters = []Filter{
        &HealthFilter{},
        &PortFilter{},
        &SlotsFilter{},
        &DependencyFilter{},
        &AffinityFilter{},
        &ConstraintFilter{},
    }
}

Filter是一个interfacescheduler/filter/filter.go):

// Filter is exported
type Filter interface {
    Name() string

    // Return a subset of nodes that were accepted by the filtering policy.
    Filter(*cluster.ContainerConfig, []*node.Node, bool) ([]*node.Node, error)

    // Return a list of constraints/filters provided
    GetFilters(*cluster.ContainerConfig) ([]string, error)
}

其中Filter方法用来过滤符合条件的Docker Engine,而GetFilters则返回一个描述过滤条件的字符串。

scheduler/filter/filter.go中,ApplyFilters用来过滤Docker EnginelistAllFilters返回所有的过滤条件:

// ApplyFilters applies a set of filters in batch.
func ApplyFilters(filters []Filter, config *cluster.ContainerConfig, nodes []*node.Node, soft bool) ([]*node.Node, error) {
    var (
        err        error
        candidates = nodes
    )

    for _, filter := range filters {
        candidates, err = filter.Filter(config, candidates, soft)
        if err != nil {
            // special case for when no healthy nodes are found
            if filter.Name() == "health" {
                return nil, err
            }
            return nil, fmt.Errorf("Unable to find a node that satisfies the following conditions %s", listAllFilters(filters, config, filter.Name()))
        }
    }
    return candidates, nil
}

// listAllFilters creates a string containing all applied filters
func listAllFilters(filters []Filter, config *cluster.ContainerConfig, lastFilter string) string {
    allFilters := ""
    for _, filter := range filters {
        list, err := filter.GetFilters(config)
        if err == nil && len(list) > 0 {
            allFilters = fmt.Sprintf("%s\n%v", allFilters, list)
        }
        if filter.Name() == lastFilter {
            return allFilters
        }
    }
    return allFilters
}