修改*.proto
文件后,要使用“make generate
”命令来生成*.pb.go
文件(参考:Two questions about processing “*.proto” files。)。
分类:Swarmkit笔记
Swarmkit笔记(16)——运行“make setup”命令
在运行“make
”或“make xxx
”命令之前,要先运行“make setup
”命令。否则,“make
”会失败:
# make
......
lint
/bin/sh: 1: golint: not found
......
“make generate
”也会失败:
# make generate
bin/protoc-gen-gogoswarm
generate
mockgen -package exec -destination controller_test.mock.go -source controller.go Controller StatusReporter
agent/exec/controller_test.go:17: running "mockgen": exec: "mockgen": executable file not found in $PATH
protoc -I.:../protobuf:../vendor:../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mtimestamp/timestamp.proto=github.com/docker/swarmkit/api/timestamp,Mduration/duration.proto=github.com/docker/swarmkit/api/duration,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mplugin/plugin.proto=github.com/docker/swarmkit/protobuf/plugin:. types.proto specs.proto objects.proto control.proto dispatcher.proto ca.proto snapshot.proto raft.proto health.proto resource.proto
protoc -I.:../../vendor:../../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api/duration,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. duration.proto
protoc -I.:../../vendor:../../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api/timestamp,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. timestamp.proto
protoc -I.:/usr/local --gogoswarm_out=import_path=github.com/docker/swarmkit/protobuf/plugin,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor:. plugin.proto
protoc -I.:../../../../vendor --gogoswarm_out=plugins=deepcopy,import_path=github.com/docker/swarmkit/protobuf/plugin/deepcopy/test:. deepcopy.proto
protoc -I.:../../../../vendor --gogoswarm_out=plugins=grpc+raftproxy,import_path=github.com/docker/swarmkit/protobuf/plugin/raftproxy/test:. service.proto
Makefile:43: recipe for target 'generate' failed
make: *** [generate] Error 1
Swarmkit笔记(15)——cluster node存储相关的代码
raft.Node
有一个memoryStore
成员(定义在manager/state/raft/raft.go
):
// Node represents the Raft Node useful
// configuration.
type Node struct {
......
memoryStore *store.MemoryStore
......
}
它非常重要,因为cluster
中用来响应swarmctl
命令的manager leader
中的store
成员其实就是指向manager
中Node
结构体中的memoryStore
:
// Server is the Cluster API gRPC server.
type Server struct {
store *store.MemoryStore
raft *raft.Node
rootCA *ca.RootCA
}
store.MemoryStore
定义在manager/state/store/memory.go
:
// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
// updateLock must be held during an update transaction.
updateLock sync.Mutex
memDB *memdb.MemDB
queue *watch.Queue
proposer state.Proposer
}
其中实际用来存储的memory database
部分使用的是go-memdb项目。初始化store.MemoryStore
使用NewMemoryStore()
函数:
// NewMemoryStore returns an in-memory store. The argument is an optional
// Proposer which will be used to propagate changes to other members in a
// cluster.
func NewMemoryStore(proposer state.Proposer) *MemoryStore {
memDB, err := memdb.NewMemDB(schema)
if err != nil {
// This shouldn't fail
panic(err)
}
return &MemoryStore{
memDB: memDB,
queue: watch.NewQueue(0),
proposer: proposer,
}
}
其中schema
是一个*memdb.DBSchema
类型的变量:
schema = &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{},
}
往schema
添加成员使用的是register
函数(定义在manager/state/store/memory.go
):
func register(os ObjectStoreConfig) {
objectStorers = append(objectStorers, os)
schema.Tables[os.Name] = os.Table
}
register
函数在store package
里各个文件(分别是cluster.go
,networks.go
,nodes.go
,services.go
和tasks.go
,正好对应swarmctl
的5
个子命令。)的init()
函数中使用,用来注册如何处理相应的object
。
ObjectStoreConfig
定义在manager/state/store/object.go
:
// ObjectStoreConfig provides the necessary methods to store a particular object
// type inside MemoryStore.
type ObjectStoreConfig struct {
Name string
Table *memdb.TableSchema
Save func(ReadTx, *api.StoreSnapshot) error
Restore func(Tx, *api.StoreSnapshot) error
ApplyStoreAction func(Tx, *api.StoreAction) error
NewStoreAction func(state.Event) (api.StoreAction, error)
}
它定义了如何存储一个object
。
以services.go
为例:
const tableService = "service"
func init() {
register(ObjectStoreConfig{
Name: tableService,
Table: &memdb.TableSchema{
Name: tableService,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
Unique: true,
Indexer: serviceIndexerByID{},
},
indexName: {
Name: indexName,
Unique: true,
Indexer: serviceIndexerByName{},
},
},
},
Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error {
var err error
snapshot.Services, err = FindServices(tx, All)
return err
},
Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
services, err := FindServices(tx, All)
if err != nil {
return err
}
for _, s := range services {
if err := DeleteService(tx, s.ID); err != nil {
return err
}
}
for _, s := range snapshot.Services {
if err := CreateService(tx, s); err != nil {
return err
}
}
return nil
},
ApplyStoreAction: func(tx Tx, sa *api.StoreAction) error {
switch v := sa.Target.(type) {
case *api.StoreAction_Service:
obj := v.Service
switch sa.Action {
case api.StoreActionKindCreate:
return CreateService(tx, obj)
case api.StoreActionKindUpdate:
return UpdateService(tx, obj)
case api.StoreActionKindRemove:
return DeleteService(tx, obj.ID)
}
}
return errUnknownStoreAction
},
NewStoreAction: func(c state.Event) (api.StoreAction, error) {
var sa api.StoreAction
switch v := c.(type) {
case state.EventCreateService:
sa.Action = api.StoreActionKindCreate
sa.Target = &api.StoreAction_Service{
Service: v.Service,
}
case state.EventUpdateService:
sa.Action = api.StoreActionKindUpdate
sa.Target = &api.StoreAction_Service{
Service: v.Service,
}
case state.EventDeleteService:
sa.Action = api.StoreActionKindRemove
sa.Target = &api.StoreAction_Service{
Service: v.Service,
}
default:
return api.StoreAction{}, errUnknownStoreAction
}
return sa, nil
},
})
}
NewStoreAction
是创建针对service
这张table
的api.StoreAction
;而ApplyStoreAction
则是根据具体情况,使用相应的action
(create
,update
还是delete
,等等);Save
是从数据库读取所有的service
并保存到一个snapshot
中;Restore
则是用snapshot
中的值更新数据库中相应的service
。
再看一下manager leader
用来创建service
的函数(manager\controlapi\service.go
):
// CreateService creates and return a Service based on the provided ServiceSpec.
// - Returns `InvalidArgument` if the ServiceSpec is malformed.
// - Returns `Unimplemented` if the ServiceSpec references unimplemented features.
// - Returns `AlreadyExists` if the ServiceID conflicts.
// - Returns an error if the creation fails.
func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRequest) (*api.CreateServiceResponse, error) {
......
err := s.store.Update(func(tx store.Tx) error {
return store.CreateService(tx, service)
})
if err != nil {
return nil, err
}
......
}
s.store.Update()
函数是核心部分(manager/state/store/memory.go
):
// Update executes a read/write transaction.
func (s *MemoryStore) Update(cb func(Tx) error) error {
return s.update(s.proposer, cb)
}
再看一下MemoryStore.update()
函数(manager/state/store/memory.go
):
func (s *MemoryStore) update(proposer state.Proposer, cb func(Tx) error) error {
s.updateLock.Lock()
memDBTx := s.memDB.Txn(true)
var curVersion *api.Version
if proposer != nil {
curVersion = proposer.GetVersion()
}
var tx tx
tx.init(memDBTx, curVersion)
err := cb(&tx)
if err == nil {
if proposer == nil {
memDBTx.Commit()
} else {
var sa []*api.StoreAction
sa, err = tx.changelistStoreActions()
if err == nil {
if sa != nil {
err = proposer.ProposeValue(context.Background(), sa, func() {
memDBTx.Commit()
})
} else {
memDBTx.Commit()
}
}
}
}
if err == nil {
for _, c := range tx.changelist {
s.queue.Publish(c)
}
if len(tx.changelist) != 0 {
s.queue.Publish(state.EventCommit{})
}
} else {
memDBTx.Abort()
}
s.updateLock.Unlock()
return err
}
分析一下上面这个函数:
(1)
memDBTx := s.memDB.Txn(true)
这是go-memdb的用法,true
表明创建一个write transaction
。
(2)
if proposer != nil {
curVersion = proposer.GetVersion()
}
proposer
是manager
中raft.Node
成员,其功能是用来通知cluster
中其它follower manager
所发生的变化:
// ProposeValue calls Propose on the raft and waits
// on the commit log action before returning a result
func (n *Node) ProposeValue(ctx context.Context, storeAction []*api.StoreAction, cb func()) error {
_, err := n.processInternalRaftRequest(ctx, &api.InternalRaftRequest{Action: storeAction}, cb)
if err != nil {
return err
}
return nil
}
// GetVersion returns the sequence information for the current raft round.
func (n *Node) GetVersion() *api.Version {
n.stopMu.RLock()
defer n.stopMu.RUnlock()
if !n.IsMember() {
return nil
}
status := n.Node.Status()
return &api.Version{Index: status.Commit}
}
(3)
var tx tx
tx.init(memDBTx, curVersion)
err := cb(&tx)
其中tx
定义如下:
// Tx is a read/write transaction. Note that transaction does not imply
// any internal batching. The purpose of this transaction is to give the
// user a guarantee that its changes won't be visible to other transactions
// until the transaction is over.
type Tx interface {
ReadTx
create(table string, o Object) error
update(table string, o Object) error
delete(table, id string) error
}
type tx struct {
readTx
curVersion *api.Version
changelist []state.Event
}
tx
用来实现read/write transaction
。
tx.init()
就是一个“一对一”的赋值:
func (tx *tx) init(memDBTx *memdb.Txn, curVersion *api.Version) {
tx.memDBTx = memDBTx
tx.curVersion = curVersion
tx.changelist = nil
}
cb
就是:
func(tx store.Tx) error {
return store.CreateService(tx, service)
}
store.CreateService()
函数:
// CreateService adds a new service to the store.
// Returns ErrExist if the ID is already taken.
func CreateService(tx Tx, s *api.Service) error {
// Ensure the name is not already in use.
if tx.lookup(tableService, indexName, strings.ToLower(s.Spec.Annotations.Name)) != nil {
return ErrNameConflict
}
return tx.create(tableService, serviceEntry{s})
}
以上代码确定service name
没有重复后,再创建service
:
// create adds a new object to the store.
// Returns ErrExist if the ID is already taken.
func (tx *tx) create(table string, o Object) error {
if tx.lookup(table, indexID, o.ID()) != nil {
return ErrExist
}
copy := o.Copy()
meta := copy.Meta()
if err := touchMeta(&meta, tx.curVersion); err != nil {
return err
}
copy.SetMeta(meta)
err := tx.memDBTx.Insert(table, copy)
if err == nil {
tx.changelist = append(tx.changelist, copy.EventCreate())
o.SetMeta(meta)
}
return err
}
上面这个函数会创建一个Object
副本(也就是serviceEntry
结构体)存放到数据库里,并把一个state.EventCreateService
加到tx.changelist
中。
其实这些有callbak作为参数的函数,真正用来做事就是callback,函数的其它部分仅仅是提供了一些common的功能。比如:获得transaction和commit。
(4)
if err == nil {
if proposer == nil {
memDBTx.Commit()
} else {
var sa []*api.StoreAction
sa, err = tx.changelistStoreActions()
if err == nil {
if sa != nil {
err = proposer.ProposeValue(context.Background(), sa, func() {
memDBTx.Commit()
})
} else {
memDBTx.Commit()
}
}
}
}
把数据commit
到数据库。
(5)
if err == nil {
for _, c := range tx.changelist {
s.queue.Publish(c)
}
if len(tx.changelist) != 0 {
s.queue.Publish(state.EventCommit{})
}
} else {
memDBTx.Abort()
}
s.queue.Publish()
函数把创建service
这个消息通知到其它的goroutine
(例如m.globalOrchestrator.Run()
),这些goroutine
会做具体的创建service
操作。
此外,MemoryStore
还提供了View
函数,用来完成read transaction
:
// ReadTx is a read transaction. Note that transaction does not imply
// any internal batching. It only means that the transaction presents a
// consistent view of the data that cannot be affected by other
// transactions.
type ReadTx interface {
lookup(table, index, id string) Object
get(table, id string) Object
find(table string, by By, checkType func(By) error, appendResult func(Object)) error
}
type readTx struct {
memDBTx *memdb.Txn
}
// View executes a read transaction.
func (s *MemoryStore) View(cb func(ReadTx)) {
memDBTx := s.memDB.Txn(false)
readTx := readTx{
memDBTx: memDBTx,
}
cb(readTx)
memDBTx.Commit()
}
Swarmkit笔记(14)——manager切换角色
Manager
结构体(定义在manager/manager.go
)包含一个*raft.Node
成员:
// Manager is the cluster manager for Swarm.
// This is the high-level object holding and initializing all the manager
// subsystems.
type Manager struct {
......
RaftNode *raft.Node
......
}
而raft.Node
(定义在manager/state/raft/raft.go
)则包含一个*events.Broadcaster
成员,用来接收改变manager role
的消息(变成leader
还是follower
):
// Node represents the Raft Node useful
// configuration.
type Node struct {
......
leadershipBroadcast *events.Broadcaster
......
}
发送改变当前manager role
的代码位于manager/state/raft/raft.go
:
// Run is the main loop for a Raft node, it goes along the state machine,
// acting on the messages received from other Raft nodes in the cluster.
//
// Before running the main loop, it first starts the raft node based on saved
// cluster state. If no saved state exists, it starts a single-node cluster.
func (n *Node) Run(ctx context.Context) error {
......
// If we cease to be the leader, we must cancel
// any proposals that are currently waiting for
// a quorum to acknowledge them. It is still
// possible for these to become committed, but
// if that happens we will apply them as any
// follower would.
if rd.SoftState != nil {
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
n.wait.cancelAll()
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
atomic.StoreUint32(&n.signalledLeadership, 0)
n.leadershipBroadcast.Write(IsFollower)
}
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
wasLeader = true
}
}
if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
// If all the entries in the log have become
// committed, broadcast our leadership status.
if n.caughtUp() {
atomic.StoreUint32(&n.signalledLeadership, 1)
n.leadershipBroadcast.Write(IsLeader)
}
}
......
}
接收消息的代码在Manager.Run()
函数(manager/manager.go
):
// Run starts all manager sub-systems and the gRPC server at the configured
// address.
// The call never returns unless an error occurs or `Stop()` is called.
func (m *Manager) Run(parent context.Context) error {
......
leadershipCh, cancel := m.RaftNode.SubscribeLeadership()
defer cancel()
go m.handleLeadershipEvents(ctx, leadershipCh)
......
go func() {
err := m.RaftNode.Run(ctx)
if err != nil {
log.G(ctx).Error(err)
m.Stop(ctx)
}
}()
......
}
Node.SubscribeLeadership()
和Manager.handleLeadershipEvents()
代码比较简单,不再赘述。
Swarmkit笔记(13)——swarmctl通过controlClient向swarm cluster发命令
swarmctl
实质上是通过controlClient
向swarm cluster
发命令。controlClient
定义在api/control.pb.go
:
// Client API for Control service
type ControlClient interface {
GetNode(ctx context.Context, in *GetNodeRequest, opts ...grpc.CallOption) (*GetNodeResponse, error)
ListNodes(ctx context.Context, in *ListNodesRequest, opts ...grpc.CallOption) (*ListNodesResponse, error)
......
}
type controlClient struct {
cc *grpc.ClientConn
}
func NewControlClient(cc *grpc.ClientConn) ControlClient {
return &controlClient{cc}
}
func (c *controlClient) GetNode(ctx context.Context, in *GetNodeRequest, opts ...grpc.CallOption) (*GetNodeResponse, error) {
out := new(GetNodeResponse)
err := grpc.Invoke(ctx, "/docker.swarmkit.v1.Control/GetNode", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
......
Swarmkit笔记(12)——swarmctl创建service时指定资源限制
swarmctl
创建service
时可以指定CPU
和memory
资源限制:
# swarmctl service create --help
Create a service
Usage:
swarmctl service create [flags]
Flags:
......
--cpu-limit string CPU cores limit (e.g. 0.5)
--cpu-reservation string number of CPU cores reserved (e.g. 0.5)
......
--memory-limit string memory limit (e.g. 512m)
--memory-reservation string amount of reserved memory (e.g. 512m)
......
*-reservation
的作用是为container
分配并“占住”相应的资源,所以这些资源对container
一定是可用的;*-limit
是限制container
进程所使用的资源。解析资源的代码位于cmd/swarmctl/service/flagparser/resource.go
文件。
Swarmkit笔记(11)——manager创建处理swarmctl请求的server
manager.localserver
是创建的本地Unix socket
,用来等待处理swarmctl
发来的命令请求(源码在manager/controlapi
目录)。Manager.Run()
函数里localserver
相关代码如下:
baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())
......
proxyOpts := []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}
cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs
......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)
......
api.RegisterControlServer(m.localserver, localProxyControlAPI)
(1)首先看一下controlapi.Server
和controlapi.NewServer()
的定义:
// Server is the Cluster API gRPC server.
type Server struct {
store *store.MemoryStore
raft *raft.Node
rootCA *ca.RootCA
}
// NewServer creates a Cluster API server.
func NewServer(store *store.MemoryStore, raft *raft.Node, rootCA *ca.RootCA) *Server {
return &Server{
store: store,
raft: raft,
rootCA: rootCA,
}
}
controlapi.NewServer()
函数就是用来创建一个响应swarmctl
程序发出的control
命令请求的server
。
其中store.MemoryStore
是一个很重要的结构体:
// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
// updateLock must be held during an update transaction.
updateLock sync.Mutex
memDB *memdb.MemDB
queue *watch.Queue
proposer state.Proposer
}
而watch.Queue
定义如下:
// Queue is the structure used to publish events and watch for them.
type Queue struct {
broadcast *events.Broadcaster
}
......
// Watch returns a channel which will receive all items published to the
// queue from this point, until cancel is called.
func (q *Queue) Watch() (eventq chan events.Event, cancel func()) {
return q.CallbackWatch(nil)
}
......
// Publish adds an item to the queue.
func (q *Queue) Publish(item events.Event) {
q.broadcast.Write(item)
}
简单地讲,就是当Server.store
发生变化时,把数据更新到memDB
的同时,也要发送消息到queue
里,这样manager
监听在相应channel
的goroutine
就可以收到并处理请求。
下面代码就是把当前cluster
的信息填充到新创建的controlapi.Server
变量里:
baseControlAPI := controlapi.NewServer(m.RaftNode.MemoryStore(), m.RaftNode, m.config.SecurityConfig.RootCA())
(2)
proxyOpts := []grpc.DialOption{
grpc.WithBackoffMaxDelay(time.Second),
grpc.WithTransportCredentials(m.config.SecurityConfig.ClientTLSCreds),
}
cs := raftpicker.NewConnSelector(m.RaftNode, proxyOpts...)
m.connSelector = cs
......
// localProxyControlAPI is a special kind of proxy. It is only wired up
// to receive requests from a trusted local socket, and these requests
// don't use TLS, therefore the requests it handles locally should
// bypass authorization. When it proxies, it sends them as requests from
// this manager rather than forwarded requests (it has no TLS
// information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, cs, m.RaftNode, forwardAsOwnRequest)
上述代码创建出一个raftProxyControlServer
类型的变量:
type raftProxyControlServer struct {
local ControlServer
connSelector *raftpicker.ConnSelector
cluster raftpicker.RaftCluster
ctxMods []func(context.Context) (context.Context, error)
}
localProxyControlAPI
含义是如果收到swarmctl
请求的manager
是leader
(swarmctl
和manager
当然位于同一台机器上),则会处理请求,否则就转发给这个cluster
的leader
。
(3)
api.RegisterControlServer(m.localserver, localProxyControlAPI)
上述代码则是把localserver
所对应的Unix socket
和raftProxyControlServer
关联起来。
Swarmkit笔记(10)——manager处理session请求
Manager
处理session
请求是通过_Dispatcher_Session_Handler
这个函数(./api/dispatcher.pb.go
):
func _Dispatcher_Session_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SessionRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DispatcherServer).Session(m, &dispatcherSessionServer{stream})
}
实际函数调用栈如下:
0 0x0000000000b65cbf in github.com/docker/swarmkit/manager/dispatcher.(*Dispatcher).Session
at /go/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go:768
1 0x0000000000782aa5 in github.com/docker/swarmkit/api.(*authenticatedWrapperDispatcherServer).Session
at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:207
2 0x000000000078e505 in github.com/docker/swarmkit/api.(*raftProxyDispatcherServer).Session
at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:1121
3 0x0000000000789c2a in github.com/docker/swarmkit/api._Dispatcher_Session_Handler
at /go/src/github.com/docker/swarmkit/api/dispatcher.pb.go:667
4 0x0000000000909646 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).processStreamingRPC
at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:602
5 0x000000000090b002 in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).handleStream
at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:686
6 0x000000000090fcbe in github.com/docker/swarmkit/vendor/google.golang.org/grpc.(*Server).serveStreams.func1.1
at /go/src/github.com/docker/swarmkit/vendor/google.golang.org/grpc/server.go:348
7 0x0000000000462bf0 in runtime.goexit
at /usr/local/go/src/runtime/asm_amd64.s:1998
Dispatcher.Session()
函数代码如下:
// Session is a stream which controls agent connection.
// Each message contains list of backup Managers with weights. Also there is
// a special boolean field Disconnect which if true indicates that node should
// reconnect to another Manager immediately.
func (d *Dispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error {
ctx := stream.Context()
nodeInfo, err := ca.RemoteNode(ctx)
if err != nil {
return err
}
nodeID := nodeInfo.NodeID
if err := d.isRunningLocked(); err != nil {
return err
}
// register the node.
sessionID, err := d.register(stream.Context(), nodeID, r.Description)
if err != nil {
return err
}
fields := logrus.Fields{
"node.id": nodeID,
"node.session": sessionID,
"method": "(*Dispatcher).Session",
}
if nodeInfo.ForwardedBy != nil {
fields["forwarder.id"] = nodeInfo.ForwardedBy.NodeID
}
log := log.G(ctx).WithFields(fields)
var nodeObj *api.Node
nodeUpdates, cancel, err := store.ViewAndWatch(d.store, func(readTx store.ReadTx) error {
nodeObj = store.GetNode(readTx, nodeID)
return nil
}, state.EventUpdateNode{Node: &api.Node{ID: nodeID},
Checks: []state.NodeCheckFunc{state.NodeCheckID}},
)
if cancel != nil {
defer cancel()
}
if err != nil {
log.WithError(err).Error("ViewAndWatch Node failed")
}
if _, err = d.nodes.GetWithSession(nodeID, sessionID); err != nil {
return err
}
if err := stream.Send(&api.SessionMessage{
SessionID: sessionID,
Node: nodeObj,
Managers: d.getManagers(),
NetworkBootstrapKeys: d.networkBootstrapKeys,
}); err != nil {
return err
}
managerUpdates, mgrCancel := d.mgrQueue.Watch()
defer mgrCancel()
keyMgrUpdates, keyMgrCancel := d.keyMgrQueue.Watch()
defer keyMgrCancel()
// disconnectNode is a helper forcibly shutdown connection
disconnectNode := func() error {
// force disconnect by shutting down the stream.
transportStream, ok := transport.StreamFromContext(stream.Context())
if ok {
// if we have the transport stream, we can signal a disconnect
// in the client.
if err := transportStream.ServerTransport().Close(); err != nil {
log.WithError(err).Error("session end")
}
}
nodeStatus := api.NodeStatus{State: api.NodeStatus_DISCONNECTED, Message: "node is currently trying to find new manager"}
if err := d.nodeRemove(nodeID, nodeStatus); err != nil {
log.WithError(err).Error("failed to remove node")
}
// still return an abort if the transport closure was ineffective.
return grpc.Errorf(codes.Aborted, "node must disconnect")
}
for {
// After each message send, we need to check the nodes sessionID hasn't
// changed. If it has, we will the stream and make the node
// re-register.
node, err := d.nodes.GetWithSession(nodeID, sessionID)
if err != nil {
return err
}
var mgrs []*api.WeightedPeer
var disconnect bool
select {
case ev := <-managerUpdates:
mgrs = ev.([]*api.WeightedPeer)
case ev := <-nodeUpdates:
nodeObj = ev.(state.EventUpdateNode).Node
case <-stream.Context().Done():
return stream.Context().Err()
case <-node.Disconnect:
disconnect = true
case <-d.ctx.Done():
disconnect = true
case <-keyMgrUpdates:
}
if mgrs == nil {
mgrs = d.getManagers()
}
if err := stream.Send(&api.SessionMessage{
SessionID: sessionID,
Node: nodeObj,
Managers: mgrs,
NetworkBootstrapKeys: d.networkBootstrapKeys,
}); err != nil {
return err
}
if disconnect {
return disconnectNode()
}
}
}
这个stream
是处理agent
连接的。前半部分是把连接的agent
记录下来;后半部分是如果cluster
信息发送变化,比如manager
的leader
发生变化,需要通知agent
重新连接。disconnectNode()
函数则是需要同agent node
断开连接时的处理:包括断开连接,agent node
信息删除,等等。
Swarmkit笔记(9)——manager
Node.runManager()
函数会启动一个manager
:
func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig, ready chan struct{}) error {
for {
n.waitRole(ctx, ca.ManagerRole)
if ctx.Err() != nil {
return ctx.Err()
}
remoteAddr, _ := n.remotes.Select(n.nodeID)
m, err := manager.New(&manager.Config{
ForceNewCluster: n.config.ForceNewCluster,
ProtoAddr: map[string]string{
"tcp": n.config.ListenRemoteAPI,
"unix": n.config.ListenControlAPI,
},
AdvertiseAddr: n.config.AdvertiseRemoteAPI,
SecurityConfig: securityConfig,
ExternalCAs: n.config.ExternalCAs,
JoinRaft: remoteAddr.Addr,
StateDir: n.config.StateDir,
HeartbeatTick: n.config.HeartbeatTick,
ElectionTick: n.config.ElectionTick,
})
if err != nil {
return err
}
done := make(chan struct{})
go func() {
m.Run(context.Background()) // todo: store error
close(done)
}()
n.Lock()
n.manager = m
n.Unlock()
connCtx, connCancel := context.WithCancel(ctx)
go n.initManagerConnection(connCtx, ready)
// this happens only on initial start
if ready != nil {
go func(ready chan struct{}) {
select {
case <-ready:
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
case <-connCtx.Done():
}
}(ready)
ready = nil
}
n.waitRole(ctx, ca.AgentRole)
n.Lock()
n.manager = nil
n.Unlock()
select {
case <-done:
case <-ctx.Done():
err = ctx.Err()
m.Stop(context.Background())
<-done
}
connCancel()
if err != nil {
return err
}
}
}
(1)
n.waitRole(ctx, ca.ManagerRole)
if ctx.Err() != nil {
return ctx.Err()
}
首先runManager()
函数会阻塞在waitRole()
函数。一旦获得manager
角色,就会往下执行。
(2)
remoteAddr, _ := n.remotes.Select(n.nodeID)
m, err := manager.New(&manager.Config{
ForceNewCluster: n.config.ForceNewCluster,
ProtoAddr: map[string]string{
"tcp": n.config.ListenRemoteAPI,
"unix": n.config.ListenControlAPI,
},
AdvertiseAddr: n.config.AdvertiseRemoteAPI,
SecurityConfig: securityConfig,
ExternalCAs: n.config.ExternalCAs,
JoinRaft: remoteAddr.Addr,
StateDir: n.config.StateDir,
HeartbeatTick: n.config.HeartbeatTick,
ElectionTick: n.config.ElectionTick,
})
if err != nil {
return err
}
done := make(chan struct{})
go func() {
m.Run(context.Background()) // todo: store error
close(done)
}()
n.Lock()
n.manager = m
n.Unlock()
a)remoteAddr, _ := n.remotes.Select(n.nodeID)
作用是从当前cluster
的manager
中(当然需要排除掉当前node
)选出一个leader
,赋给remoteAddr
。如果当前node
是cluster
中的第一个manager
,则remoteAddr
就是一个“空的”值:{NodeID: "", Addr: ""}
;
b)在使用manager.New()
函数创建manager
时,要注意n.config.AdvertiseRemoteAPI
是一直为""
的。 manager.New()
最后会返回一个Manager
结构体:
func New(config *Config) (*Manager, error) {
......
m := &Manager{
config: config,
listeners: listeners,
caserver: ca.NewServer(RaftNode.MemoryStore(), config.SecurityConfig),
Dispatcher: dispatcher.New(RaftNode, dispatcherConfig),
server: grpc.NewServer(opts...),
localserver: grpc.NewServer(opts...),
RaftNode: RaftNode,
started: make(chan struct{}),
stopped: make(chan struct{}),
}
return m, nil
}
其中的listeners
包含监听listen-remote-api(tcp)
和listen-control-api(unix)
的两个socket
。
c)m.Run()
是实际运行manager
的函数,连作者自己都觉得复杂(“This function is *way* too complex.
”)。可以把这个函数逻辑分成下面几块:
i)如果当前manager
被选为leader
,就做一大堆初始化的动作,包括为scheduler
,allocator
等分配资源,启动goroutine
等等;如果不是leader
,就做一大堆收尾工作,停掉goroutine
,释放资源。
ii)接下来对manager.localserver
和manager.server
做一大堆设置,主要是authentication
和proxy
的方面;然后二者分别监听manager.listeners
中的Unix
和TCP socket
,处理相应的数据。
(3)
connCtx, connCancel := context.WithCancel(ctx)
go n.initManagerConnection(connCtx, ready)
其中Node.initManagerConnection()
实现如下:
func (n *Node) initManagerConnection(ctx context.Context, ready chan<- struct{}) error {
opts := []grpc.DialOption{}
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
opts = append(opts, grpc.WithTransportCredentials(insecureCreds))
// Using listen address instead of advertised address because this is a
// local connection.
addr := n.config.ListenControlAPI
opts = append(opts, grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return err
}
state := grpc.Idle
for {
s, err := conn.WaitForStateChange(ctx, state)
if err != nil {
n.setControlSocket(nil)
return err
}
if s == grpc.Ready {
n.setControlSocket(conn)
if ready != nil {
close(ready)
ready = nil
}
} else if state == grpc.Shutdown {
n.setControlSocket(nil)
}
state = s
}
}
功能就是建立一个同本地listen-control-api(unix) socket
的一个连接,用来监控node
的状态。
(4)把当前node
也加入remotes
的监控列表中:
// this happens only on initial start
if ready != nil {
go func(ready chan struct{}) {
select {
case <-ready:
n.remotes.Observe(api.Peer{NodeID: n.nodeID, Addr: n.config.ListenRemoteAPI}, picker.DefaultObservationWeight)
case <-connCtx.Done():
}
}(ready)
ready = nil
}
(5)阻塞在下列代码,等待角色变化:
n.waitRole(ctx, ca.AgentRole)
Swarmkit笔记(8)——agent.session
Agent
和manager
之间的通信是通过session
进行的,下面是agent.session
结构体定义:
// session encapsulates one round of registration with the manager. session
// starts the registration and heartbeat control cycle. Any failure will result
// in a complete shutdown of the session and it must be reestablished.
//
// All communication with the master is done through session. Changes that
// flow into the agent, such as task assignment, are called back into the
// agent through errs, messages and tasks.
type session struct {
agent *Agent
sessionID string
session api.Dispatcher_SessionClient
errs chan error
messages chan *api.SessionMessage
tasks chan *api.TasksMessage
registered chan struct{} // closed registration
closed chan struct{}
}
(1)registered channel
是用来通知agent
已经向manager
注册成功了:
func (s *session) run(ctx context.Context, delay time.Duration) {
time.Sleep(delay) // delay before registering.
if err := s.start(ctx); err != nil {
select {
case s.errs <- err:
case <-s.closed:
case <-ctx.Done():
}
return
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("session.id", s.sessionID))
go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)
close(s.registered)
}
session.run
函数中,如果session.start()
运行没有问题,则会在最后close registered
这个channel
。而在Agent.Run()
中:
func (a *Agent) run(ctx context.Context) {
.....
session = newSession(ctx, a, backoff) // start the initial session
registered = session.registered
for {
select {
......
case <-registered:
log.G(ctx).Debugln("agent: registered")
if ready != nil {
close(ready)
}
ready = nil
registered = nil // we only care about this once per session
backoff = 0 // reset backoff
sessionq = a.sessionq
......
}
}
一旦registered
被close
,<-registered
这个case
则会马上被执行。
(2)当session
运行出现错误时,会把error
发到errs channel
。在Agent.Run()
中:
case err := <-session.errs:
// TODO(stevvooe): This may actually block if a session is closed
// but no error was sent. Session.close must only be called here
// for this to work.
if err != nil {
log.G(ctx).WithError(err).Error("agent: session failed")
backoff = initialSessionFailureBackoff + 2*backoff
if backoff > maxSessionFailureBackoff {
backoff = maxSessionFailureBackoff
}
}
if err := session.close(); err != nil {
log.G(ctx).WithError(err).Error("agent: closing session failed")
}
sessionq = nil
// if we're here before <-registered, do nothing for that event
registered = nil
// Bounce the connection.
if a.config.Picker != nil {
a.config.Picker.Reset()
}
收到error
后,会关闭这个session
并做一些扫尾工作。
(3)messages channel
用来接收manager
发送给agent
的消息,并转给Agent.run()
函数进行处理:
case msg := <-session.messages:
if err := a.handleSessionMessage(ctx, msg); err != nil {
log.G(ctx).WithError(err).Error("session message handler failed")
}
(4)tasks channel
用来接收manager
发送给agent
的需要在这个node
上运行的task
信息,同样需要转给Agent.run()
函数进行处理:
case msg := <-session.tasks:
if err := a.worker.Assign(ctx, msg.Tasks); err != nil {
log.G(ctx).WithError(err).Error("task assignment failed")
}
(5)closed channel
在session.close()
函数中被关闭。也就是在case err := <-session.errs:
这个分支中才会执行。一旦closed channel
被关闭后,会重新建立连接:
case <-session.closed:
log.G(ctx).Debugf("agent: rebuild session")
// select a session registration delay from backoff range.
delay := time.Duration(rand.Int63n(int64(backoff)))
session = newSession(ctx, a, delay)
registered = session.registered
sessionq = a.sessionq
再看一下session.start()
这个函数:
// start begins the session and returns the first SessionMessage.
func (s *session) start(ctx context.Context) error {
log.G(ctx).Debugf("(*session).start")
client := api.NewDispatcherClient(s.agent.config.Conn)
description, err := s.agent.config.Executor.Describe(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
Errorf("node description unavailable")
return err
}
// Override hostname
if s.agent.config.Hostname != "" {
description.Hostname = s.agent.config.Hostname
}
errChan := make(chan error, 1)
var (
msg*api.SessionMessage
stream api.Dispatcher_SessionClient
)
// Note: we don't defer cancellation of this context, because the
// streaming RPC is used after this function returned. We only cancel
// it in the timeout case to make sure the goroutine completes.
sessionCtx, cancelSession := context.WithCancel(ctx)
// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
go func() {
stream, err = client.Session(sessionCtx, &api.SessionRequest{
Description: description,
})
if err != nil {
errChan <- err
return
}
msg, err = stream.Recv()
errChan <- err
}()
select {
case err := <-errChan:
if err != nil {
return err
}
case <-time.After(dispatcherRPCTimeout):
cancelSession()
return errors.New("session initiation timed out")
}
s.sessionID = msg.SessionID
s.session = stream
return s.handleSessionMessage(ctx, msg)
}
(1)
client := api.NewDispatcherClient(s.agent.config.Conn)
description, err := s.agent.config.Executor.Describe(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("executor", s.agent.config.Executor).
Errorf("node description unavailable")
return err
}
// Override hostname
if s.agent.config.Hostname != "" {
description.Hostname = s.agent.config.Hostname
}
而关于api.NewDispatcherClient()
函数和其所返回的类型定义如下:
type dispatcherClient struct {
cc *grpc.ClientConn
}
func NewDispatcherClient(cc *grpc.ClientConn) DispatcherClient {
return &dispatcherClient{cc}
}
s.agent.config.Conn
就是之前在Node.runAgent()
函数中通过下列代码得到的和manager
直接的GRPC
连接:
conn, err := grpc.Dial(manager.Addr,
grpc.WithPicker(picker),
grpc.WithTransportCredentials(creds),
grpc.WithBackoffMaxDelay(maxSessionFailureBackoff))
s.agent.config.Executor.Describe()
返回对当前node
的描述(类型是:*api.NodeDescription
)。
(2)
errChan := make(chan error, 1)
var (
msg*api.SessionMessage
stream api.Dispatcher_SessionClient
)
// Note: we don't defer cancellation of this context, because the
// streaming RPC is used after this function returned. We only cancel
// it in the timeout case to make sure the goroutine completes.
sessionCtx, cancelSession := context.WithCancel(ctx)
// Need to run Session in a goroutine since there's no way to set a
// timeout for an individual Recv call in a stream.
go func() {
stream, err = client.Session(sessionCtx, &api.SessionRequest{
Description: description,
})
if err != nil {
errChan <- err
return
}
msg, err = stream.Recv()
errChan <- err
}()
而dispatcherClient.Session()
代码如下:
func (c *dispatcherClient) Session(ctx context.Context, in *SessionRequest, opts ...grpc.CallOption) (Dispatcher_SessionClient, error) {
stream, err := grpc.NewClientStream(ctx, &_Dispatcher_serviceDesc.Streams[0], c.cc, "/docker.swarmkit.v1.Dispatcher/Session", opts...)
if err != nil {
return nil, err
}
x := &dispatcherSessionClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
返回的是一个符合Dispatcher_SessionClient interface
类型的变量:
type Dispatcher_SessionClient interface {
Recv() (*SessionMessage, error)
grpc.ClientStream
}
grpc.NewClientStream()
函数返回的是grpc.ClientStream interface
,而dispatcherSessionClient
定义如下:
type dispatcherSessionClient struct {
grpc.ClientStream
}
为了满足Dispatcher_SessionClient interface
定义,dispatcherSessionClient
结构体还实现了Recv
方法:
func (x *dispatcherSessionClient) Recv() (*SessionMessage, error) {
m := new(SessionMessage)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
x.ClientStream.SendMsg()
发送的是SessionRequest
,而它仅包含一个NodeDescription
:
// SessionRequest starts a session.
type SessionRequest struct {
Description *NodeDescription `protobuf:"bytes,1,opt,name=description" json:"description,omitempty"`
}
x.ClientStream.CloseSend()
表示所有的发送操作已完成。
接下来收到manager
的消息后,把err
发到errChan
:
msg, err = stream.Recv()
errChan <- err
(3)
select {
case err := <-errChan:
if err != nil {
return err
}
case <-time.After(dispatcherRPCTimeout):
cancelSession()
return errors.New("session initiation timed out")
}
s.sessionID = msg.SessionID
s.session = stream
return s.handleSessionMessage(ctx, msg)
一开始goroutine
阻塞在select
,一旦收到正确的响应,就会完成session
的初始化。然后继续等待manager
分配任务。
一旦session.start()
成功,就会启动另外3
个goroutine
:
go runctx(ctx, s.closed, s.errs, s.heartbeat)
go runctx(ctx, s.closed, s.errs, s.watch)
go runctx(ctx, s.closed, s.errs, s.listen)
session.heartbeat()
会创建一个新的dispatcherClient
变量,然后在1
秒钟后发送api.HeartbeatRequest
请求,manager
会返回api.HeartbeatResponse
,告诉agent
每隔多长时间发送heartbeat
,目前默认时间是5
秒。
session.watch()
也会新创建一个dispatcherTasksClient
变量,然后发送api.TasksRequest
请求,通知manager
自己已经ready
。接下来就阻塞在Recv()
函数,等待manager
发送task
请求。
session.listen()
复用session.session
变量,阻塞在Recv()
函数,等待manager
发送SessionMessage
,然后处理。