raft.Node
有一个memoryStore
成员(定义在manager/state/raft/raft.go
):
// Node represents the Raft Node useful
// configuration.
type Node struct {
......
memoryStore *store.MemoryStore
......
}
它非常重要,因为cluster
中用来响应swarmctl
命令的manager leader
中的store
成员其实就是指向manager
中Node
结构体中的memoryStore
:
// Server is the Cluster API gRPC server.
type Server struct {
store *store.MemoryStore
raft *raft.Node
rootCA *ca.RootCA
}
store.MemoryStore
定义在manager/state/store/memory.go
:
// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
// updateLock must be held during an update transaction.
updateLock sync.Mutex
memDB *memdb.MemDB
queue *watch.Queue
proposer state.Proposer
}
其中实际用来存储的memory database
部分使用的是go-memdb项目。初始化store.MemoryStore
使用NewMemoryStore()
函数:
// NewMemoryStore returns an in-memory store. The argument is an optional
// Proposer which will be used to propagate changes to other members in a
// cluster.
func NewMemoryStore(proposer state.Proposer) *MemoryStore {
memDB, err := memdb.NewMemDB(schema)
if err != nil {
// This shouldn't fail
panic(err)
}
return &MemoryStore{
memDB: memDB,
queue: watch.NewQueue(0),
proposer: proposer,
}
}
其中schema
是一个*memdb.DBSchema
类型的变量:
schema = &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{},
}
往schema
添加成员使用的是register
函数(定义在manager/state/store/memory.go
):
func register(os ObjectStoreConfig) {
objectStorers = append(objectStorers, os)
schema.Tables[os.Name] = os.Table
}
register
函数在store package
里各个文件(分别是cluster.go
,networks.go
,nodes.go
,services.go
和tasks.go
,正好对应swarmctl
的5
个子命令。)的init()
函数中使用,用来注册如何处理相应的object
。
ObjectStoreConfig
定义在manager/state/store/object.go
:
// ObjectStoreConfig provides the necessary methods to store a particular object
// type inside MemoryStore.
type ObjectStoreConfig struct {
Name string
Table *memdb.TableSchema
Save func(ReadTx, *api.StoreSnapshot) error
Restore func(Tx, *api.StoreSnapshot) error
ApplyStoreAction func(Tx, *api.StoreAction) error
NewStoreAction func(state.Event) (api.StoreAction, error)
}
它定义了如何存储一个object
。
以services.go
为例:
const tableService = "service"
func init() {
register(ObjectStoreConfig{
Name: tableService,
Table: &memdb.TableSchema{
Name: tableService,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
Unique: true,
Indexer: serviceIndexerByID{},
},
indexName: {
Name: indexName,
Unique: true,
Indexer: serviceIndexerByName{},
},
},
},
Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error {
var err error
snapshot.Services, err = FindServices(tx, All)
return err
},
Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
services, err := FindServices(tx, All)
if err != nil {
return err
}
for _, s := range services {
if err := DeleteService(tx, s.ID); err != nil {
return err
}
}
for _, s := range snapshot.Services {
if err := CreateService(tx, s); err != nil {
return err
}
}
return nil
},
ApplyStoreAction: func(tx Tx, sa *api.StoreAction) error {
switch v := sa.Target.(type) {
case *api.StoreAction_Service:
obj := v.Service
switch sa.Action {
case api.StoreActionKindCreate:
return CreateService(tx, obj)
case api.StoreActionKindUpdate:
return UpdateService(tx, obj)
case api.StoreActionKindRemove:
return DeleteService(tx, obj.ID)
}
}
return errUnknownStoreAction
},
NewStoreAction: func(c state.Event) (api.StoreAction, error) {
var sa api.StoreAction
switch v := c.(type) {
case state.EventCreateService:
sa.Action = api.StoreActionKindCreate
sa.Target = &api.StoreAction_Service{
Service: v.Service,
}
case state.EventUpdateService:
sa.Action = api.StoreActionKindUpdate
sa.Target = &api.StoreAction_Service{
Service: v.Service,
}
case state.EventDeleteService:
sa.Action = api.StoreActionKindRemove
sa.Target = &api.StoreAction_Service{
Service: v.Service,
}
default:
return api.StoreAction{}, errUnknownStoreAction
}
return sa, nil
},
})
}
NewStoreAction
是创建针对service
这张table
的api.StoreAction
;而ApplyStoreAction
则是根据具体情况,使用相应的action
(create
,update
还是delete
,等等);Save
是从数据库读取所有的service
并保存到一个snapshot
中;Restore
则是用snapshot
中的值更新数据库中相应的service
。
再看一下manager leader
用来创建service
的函数(manager\controlapi\service.go
):
// CreateService creates and return a Service based on the provided ServiceSpec.
// - Returns `InvalidArgument` if the ServiceSpec is malformed.
// - Returns `Unimplemented` if the ServiceSpec references unimplemented features.
// - Returns `AlreadyExists` if the ServiceID conflicts.
// - Returns an error if the creation fails.
func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRequest) (*api.CreateServiceResponse, error) {
......
err := s.store.Update(func(tx store.Tx) error {
return store.CreateService(tx, service)
})
if err != nil {
return nil, err
}
......
}
s.store.Update()
函数是核心部分(manager/state/store/memory.go
):
// Update executes a read/write transaction.
func (s *MemoryStore) Update(cb func(Tx) error) error {
return s.update(s.proposer, cb)
}
再看一下MemoryStore.update()
函数(manager/state/store/memory.go
):
func (s *MemoryStore) update(proposer state.Proposer, cb func(Tx) error) error {
s.updateLock.Lock()
memDBTx := s.memDB.Txn(true)
var curVersion *api.Version
if proposer != nil {
curVersion = proposer.GetVersion()
}
var tx tx
tx.init(memDBTx, curVersion)
err := cb(&tx)
if err == nil {
if proposer == nil {
memDBTx.Commit()
} else {
var sa []*api.StoreAction
sa, err = tx.changelistStoreActions()
if err == nil {
if sa != nil {
err = proposer.ProposeValue(context.Background(), sa, func() {
memDBTx.Commit()
})
} else {
memDBTx.Commit()
}
}
}
}
if err == nil {
for _, c := range tx.changelist {
s.queue.Publish(c)
}
if len(tx.changelist) != 0 {
s.queue.Publish(state.EventCommit{})
}
} else {
memDBTx.Abort()
}
s.updateLock.Unlock()
return err
}
分析一下上面这个函数:
(1)
memDBTx := s.memDB.Txn(true)
这是go-memdb的用法,true
表明创建一个write transaction
。
(2)
if proposer != nil {
curVersion = proposer.GetVersion()
}
proposer
是manager
中raft.Node
成员,其功能是用来通知cluster
中其它follower manager
所发生的变化:
// ProposeValue calls Propose on the raft and waits
// on the commit log action before returning a result
func (n *Node) ProposeValue(ctx context.Context, storeAction []*api.StoreAction, cb func()) error {
_, err := n.processInternalRaftRequest(ctx, &api.InternalRaftRequest{Action: storeAction}, cb)
if err != nil {
return err
}
return nil
}
// GetVersion returns the sequence information for the current raft round.
func (n *Node) GetVersion() *api.Version {
n.stopMu.RLock()
defer n.stopMu.RUnlock()
if !n.IsMember() {
return nil
}
status := n.Node.Status()
return &api.Version{Index: status.Commit}
}
(3)
var tx tx
tx.init(memDBTx, curVersion)
err := cb(&tx)
其中tx
定义如下:
// Tx is a read/write transaction. Note that transaction does not imply
// any internal batching. The purpose of this transaction is to give the
// user a guarantee that its changes won't be visible to other transactions
// until the transaction is over.
type Tx interface {
ReadTx
create(table string, o Object) error
update(table string, o Object) error
delete(table, id string) error
}
type tx struct {
readTx
curVersion *api.Version
changelist []state.Event
}
tx
用来实现read/write transaction
。
tx.init()
就是一个“一对一”的赋值:
func (tx *tx) init(memDBTx *memdb.Txn, curVersion *api.Version) {
tx.memDBTx = memDBTx
tx.curVersion = curVersion
tx.changelist = nil
}
cb
就是:
func(tx store.Tx) error {
return store.CreateService(tx, service)
}
store.CreateService()
函数:
// CreateService adds a new service to the store.
// Returns ErrExist if the ID is already taken.
func CreateService(tx Tx, s *api.Service) error {
// Ensure the name is not already in use.
if tx.lookup(tableService, indexName, strings.ToLower(s.Spec.Annotations.Name)) != nil {
return ErrNameConflict
}
return tx.create(tableService, serviceEntry{s})
}
以上代码确定service name
没有重复后,再创建service
:
// create adds a new object to the store.
// Returns ErrExist if the ID is already taken.
func (tx *tx) create(table string, o Object) error {
if tx.lookup(table, indexID, o.ID()) != nil {
return ErrExist
}
copy := o.Copy()
meta := copy.Meta()
if err := touchMeta(&meta, tx.curVersion); err != nil {
return err
}
copy.SetMeta(meta)
err := tx.memDBTx.Insert(table, copy)
if err == nil {
tx.changelist = append(tx.changelist, copy.EventCreate())
o.SetMeta(meta)
}
return err
}
上面这个函数会创建一个Object
副本(也就是serviceEntry
结构体)存放到数据库里,并把一个state.EventCreateService
加到tx.changelist
中。
其实这些有callbak作为参数的函数,真正用来做事就是callback,函数的其它部分仅仅是提供了一些common的功能。比如:获得transaction和commit。
(4)
if err == nil {
if proposer == nil {
memDBTx.Commit()
} else {
var sa []*api.StoreAction
sa, err = tx.changelistStoreActions()
if err == nil {
if sa != nil {
err = proposer.ProposeValue(context.Background(), sa, func() {
memDBTx.Commit()
})
} else {
memDBTx.Commit()
}
}
}
}
把数据commit
到数据库。
(5)
if err == nil {
for _, c := range tx.changelist {
s.queue.Publish(c)
}
if len(tx.changelist) != 0 {
s.queue.Publish(state.EventCommit{})
}
} else {
memDBTx.Abort()
}
s.queue.Publish()
函数把创建service
这个消息通知到其它的goroutine
(例如m.globalOrchestrator.Run()
),这些goroutine
会做具体的创建service
操作。
此外,MemoryStore
还提供了View
函数,用来完成read transaction
:
// ReadTx is a read transaction. Note that transaction does not imply
// any internal batching. It only means that the transaction presents a
// consistent view of the data that cannot be affected by other
// transactions.
type ReadTx interface {
lookup(table, index, id string) Object
get(table, id string) Object
find(table string, by By, checkType func(By) error, appendResult func(Object)) error
}
type readTx struct {
memDBTx *memdb.Txn
}
// View executes a read transaction.
func (s *MemoryStore) View(cb func(ReadTx)) {
memDBTx := s.memDB.Txn(false)
readTx := readTx{
memDBTx: memDBTx,
}
cb(readTx)
memDBTx.Commit()
}