Swarmkit笔记(15)——cluster node存储相关的代码

raft.Node有一个memoryStore成员(定义在manager/state/raft/raft.go):

// Node represents the Raft Node useful
// configuration.
type Node struct {
    ......
    memoryStore         *store.MemoryStore
    ......
}

它非常重要,因为cluster中用来响应swarmctl命令的manager leader中的store成员其实就是指向managerNode结构体中的memoryStore

// Server is the Cluster API gRPC server.
type Server struct {
    store  *store.MemoryStore
    raft   *raft.Node
    rootCA *ca.RootCA
}

store.MemoryStore定义在manager/state/store/memory.go

// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
    // updateLock must be held during an update transaction.
    updateLock sync.Mutex

    memDB *memdb.MemDB
    queue *watch.Queue

    proposer state.Proposer
}

其中实际用来存储的memory database部分使用的是go-memdb项目。初始化store.MemoryStore使用NewMemoryStore()函数:

// NewMemoryStore returns an in-memory store. The argument is an optional
// Proposer which will be used to propagate changes to other members in a
// cluster.
func NewMemoryStore(proposer state.Proposer) *MemoryStore {
    memDB, err := memdb.NewMemDB(schema)
    if err != nil {
        // This shouldn't fail
        panic(err)
    }

    return &MemoryStore{
        memDB:    memDB,
        queue:    watch.NewQueue(0),
        proposer: proposer,
    }
}

其中schema是一个*memdb.DBSchema类型的变量:

schema        = &memdb.DBSchema{
        Tables: map[string]*memdb.TableSchema{},
    }

schema添加成员使用的是register函数(定义在manager/state/store/memory.go):

func register(os ObjectStoreConfig) {
    objectStorers = append(objectStorers, os)
    schema.Tables[os.Name] = os.Table
}

register函数在store package里各个文件(分别是cluster.gonetworks.gonodes.goservices.gotasks.go,正好对应swarmctl5个子命令。)的init()函数中使用,用来注册如何处理相应的object

ObjectStoreConfig定义在manager/state/store/object.go

// ObjectStoreConfig provides the necessary methods to store a particular object
// type inside MemoryStore.
type ObjectStoreConfig struct {
    Name             string
    Table            *memdb.TableSchema
    Save             func(ReadTx, *api.StoreSnapshot) error
    Restore          func(Tx, *api.StoreSnapshot) error
    ApplyStoreAction func(Tx, *api.StoreAction) error
    NewStoreAction   func(state.Event) (api.StoreAction, error)
}

它定义了如何存储一个object

services.go为例:

const tableService = "service"

func init() {
    register(ObjectStoreConfig{
        Name: tableService,
        Table: &memdb.TableSchema{
            Name: tableService,
            Indexes: map[string]*memdb.IndexSchema{
                indexID: {
                    Name:    indexID,
                    Unique:  true,
                    Indexer: serviceIndexerByID{},
                },
                indexName: {
                    Name:    indexName,
                    Unique:  true,
                    Indexer: serviceIndexerByName{},
                },
            },
        },
        Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error {
            var err error
            snapshot.Services, err = FindServices(tx, All)
            return err
        },
        Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
            services, err := FindServices(tx, All)
            if err != nil {
                return err
            }
            for _, s := range services {
                if err := DeleteService(tx, s.ID); err != nil {
                    return err
                }
            }
            for _, s := range snapshot.Services {
                if err := CreateService(tx, s); err != nil {
                    return err
                }
            }
            return nil
        },
        ApplyStoreAction: func(tx Tx, sa *api.StoreAction) error {
            switch v := sa.Target.(type) {
            case *api.StoreAction_Service:
                obj := v.Service
                switch sa.Action {
                case api.StoreActionKindCreate:
                    return CreateService(tx, obj)
                case api.StoreActionKindUpdate:
                    return UpdateService(tx, obj)
                case api.StoreActionKindRemove:
                    return DeleteService(tx, obj.ID)
                }
            }
            return errUnknownStoreAction
        },
        NewStoreAction: func(c state.Event) (api.StoreAction, error) {
            var sa api.StoreAction
            switch v := c.(type) {
            case state.EventCreateService:
                sa.Action = api.StoreActionKindCreate
                sa.Target = &api.StoreAction_Service{
                    Service: v.Service,
                }
            case state.EventUpdateService:
                sa.Action = api.StoreActionKindUpdate
                sa.Target = &api.StoreAction_Service{
                    Service: v.Service,
                }
            case state.EventDeleteService:
                sa.Action = api.StoreActionKindRemove
                sa.Target = &api.StoreAction_Service{
                    Service: v.Service,
                }
            default:
                return api.StoreAction{}, errUnknownStoreAction
            }
            return sa, nil
        },
    })
}

NewStoreAction是创建针对service这张tableapi.StoreAction;而ApplyStoreAction则是根据具体情况,使用相应的actioncreateupdate还是delete,等等);Save是从数据库读取所有的service并保存到一个snapshot中;Restore则是用snapshot中的值更新数据库中相应的service

再看一下manager leader用来创建service的函数(manager\controlapi\service.go):

// CreateService creates and return a Service based on the provided ServiceSpec.
// - Returns `InvalidArgument` if the ServiceSpec is malformed.
// - Returns `Unimplemented` if the ServiceSpec references unimplemented features.
// - Returns `AlreadyExists` if the ServiceID conflicts.
// - Returns an error if the creation fails.
func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRequest) (*api.CreateServiceResponse, error) {
    ......
    err := s.store.Update(func(tx store.Tx) error {
        return store.CreateService(tx, service)
    })
    if err != nil {
        return nil, err
    }
    ......
}

s.store.Update()函数是核心部分(manager/state/store/memory.go):

// Update executes a read/write transaction.
func (s *MemoryStore) Update(cb func(Tx) error) error {
    return s.update(s.proposer, cb)
} 

再看一下MemoryStore.update()函数(manager/state/store/memory.go):

func (s *MemoryStore) update(proposer state.Proposer, cb func(Tx) error) error {
    s.updateLock.Lock()
    memDBTx := s.memDB.Txn(true)

    var curVersion *api.Version

    if proposer != nil {
        curVersion = proposer.GetVersion()
    }

    var tx tx
    tx.init(memDBTx, curVersion)

    err := cb(&tx)

    if err == nil {
        if proposer == nil {
            memDBTx.Commit()
        } else {
            var sa []*api.StoreAction
            sa, err = tx.changelistStoreActions()

            if err == nil {
                if sa != nil {
                    err = proposer.ProposeValue(context.Background(), sa, func() {
                        memDBTx.Commit()
                    })
                } else {
                    memDBTx.Commit()
                }
            }
        }
    }

    if err == nil {
        for _, c := range tx.changelist {
            s.queue.Publish(c)
        }
        if len(tx.changelist) != 0 {
            s.queue.Publish(state.EventCommit{})
        }
    } else {
        memDBTx.Abort()
    }
    s.updateLock.Unlock()
    return err

}

分析一下上面这个函数:
(1)

memDBTx := s.memDB.Txn(true)

这是go-memdb的用法,true表明创建一个write transaction

(2)

if proposer != nil {
    curVersion = proposer.GetVersion()
}

proposermanagerraft.Node成员,其功能是用来通知cluster中其它follower manager所发生的变化:

// ProposeValue calls Propose on the raft and waits
// on the commit log action before returning a result
func (n *Node) ProposeValue(ctx context.Context, storeAction []*api.StoreAction, cb func()) error {
    _, err := n.processInternalRaftRequest(ctx, &api.InternalRaftRequest{Action: storeAction}, cb)
    if err != nil {
        return err
    }
    return nil
}

// GetVersion returns the sequence information for the current raft round.
func (n *Node) GetVersion() *api.Version {
    n.stopMu.RLock()
    defer n.stopMu.RUnlock()

    if !n.IsMember() {
        return nil
    }

    status := n.Node.Status()
    return &api.Version{Index: status.Commit}
}

(3)

var tx tx
tx.init(memDBTx, curVersion)

err := cb(&tx)

其中tx定义如下:

// Tx is a read/write transaction. Note that transaction does not imply
// any internal batching. The purpose of this transaction is to give the
// user a guarantee that its changes won't be visible to other transactions
// until the transaction is over.
type Tx interface {
    ReadTx
    create(table string, o Object) error
    update(table string, o Object) error
    delete(table, id string) error
}

type tx struct {
    readTx
    curVersion *api.Version
    changelist []state.Event
}

tx用来实现read/write transaction

tx.init()就是一个“一对一”的赋值:

func (tx *tx) init(memDBTx *memdb.Txn, curVersion *api.Version) {
    tx.memDBTx = memDBTx
    tx.curVersion = curVersion
    tx.changelist = nil
}

cb就是:

func(tx store.Tx) error {
        return store.CreateService(tx, service)
}

store.CreateService()函数:

// CreateService adds a new service to the store.
// Returns ErrExist if the ID is already taken.
func CreateService(tx Tx, s *api.Service) error {
    // Ensure the name is not already in use.
    if tx.lookup(tableService, indexName, strings.ToLower(s.Spec.Annotations.Name)) != nil {
        return ErrNameConflict
    }

    return tx.create(tableService, serviceEntry{s})
}

以上代码确定service name没有重复后,再创建service

// create adds a new object to the store.
// Returns ErrExist if the ID is already taken.
func (tx *tx) create(table string, o Object) error {
    if tx.lookup(table, indexID, o.ID()) != nil {
        return ErrExist
    }

    copy := o.Copy()
    meta := copy.Meta()
    if err := touchMeta(&meta, tx.curVersion); err != nil {
        return err
    }
    copy.SetMeta(meta)

    err := tx.memDBTx.Insert(table, copy)
    if err == nil {
        tx.changelist = append(tx.changelist, copy.EventCreate())
        o.SetMeta(meta)
    }
    return err
}

上面这个函数会创建一个Object副本(也就是serviceEntry结构体)存放到数据库里,并把一个state.EventCreateService加到tx.changelist中。

其实这些有callbak作为参数的函数,真正用来做事就是callback,函数的其它部分仅仅是提供了一些common的功能。比如:获得transaction和commit。

(4)

    if err == nil {
        if proposer == nil {
            memDBTx.Commit()
        } else {
            var sa []*api.StoreAction
            sa, err = tx.changelistStoreActions()

            if err == nil {
                if sa != nil {
                    err = proposer.ProposeValue(context.Background(), sa, func() {
                        memDBTx.Commit()
                    })
                } else {
                    memDBTx.Commit()
                }
            }
        }
    }

把数据commit到数据库。

(5)

    if err == nil {
        for _, c := range tx.changelist {
            s.queue.Publish(c)
        }
        if len(tx.changelist) != 0 {
            s.queue.Publish(state.EventCommit{})
        }
    } else {
        memDBTx.Abort()
    }

s.queue.Publish()函数把创建service这个消息通知到其它的goroutine(例如m.globalOrchestrator.Run()),这些goroutine会做具体的创建service操作。

此外,MemoryStore还提供了View函数,用来完成read transaction

// ReadTx is a read transaction. Note that transaction does not imply
// any internal batching. It only means that the transaction presents a
// consistent view of the data that cannot be affected by other
// transactions.
type ReadTx interface {
    lookup(table, index, id string) Object
    get(table, id string) Object
    find(table string, by By, checkType func(By) error, appendResult func(Object)) error
}

type readTx struct {
    memDBTx *memdb.Txn
}

// View executes a read transaction.
func (s *MemoryStore) View(cb func(ReadTx)) {
    memDBTx := s.memDB.Txn(false)

    readTx := readTx{
        memDBTx: memDBTx,
    }
    cb(readTx)
    memDBTx.Commit()
}

发表评论

邮箱地址不会被公开。 必填项已用*标注

This site uses Akismet to reduce spam. Learn how your comment data is processed.