什么是“lambda”

下列文字摘自What is This “Lambda” You Speak Of?

lambda means “anonymous function,” and calculus means “a formal system”

Therefore, the term lambda calculus refers to “a formal way to think about functions.”

That same Wikipedia link states this:

“Lambda calculus provides a theoretical framework for describing functions and their evaluation. Although it is a mathematical abstraction rather than a programming language, it forms the basis of almost all functional programming languages today.”

When I first started learning about functional programming, I found these terms to be a little intimidating, but as with most FP terms, they’re just uncommon words for talking about “functions and their evaluation.”

什么是“functional programming”

下列文字摘自What is “Functional Programming”?

In this lesson, I defined functional programming like this:

Functional programming is a way of writing software applications using only pure functions and immutable values.

To support that, I also defined pure function like this:

The output of a pure function depends only on (a) its input parameters and (b) its internal algorithm.
A pure function has no side effects, meaning that it does not read anything from the outside world or write anything to the outside world.
As a result of those first two statements, if a pure function is called with an input parameter x an infinite number of times, it will always return the same result y.

I noted that higher-order functions (HOFs) are a terrific FP language feature, and also stated that recursion is a by-product of the definition of FP.

I also briefly discussed some of the benefits of immutable values (and FP in general):

The best FP code is like algebra
Pure functions and immutable values are easier to reason about
Without much support (yet), I stated that immutable values make parallel/concurrent programming easier

 

Swarmkit笔记(16)——运行“make setup”命令

在运行“make”或“make xxx”命令之前,要先运行“make setup”命令。否则,“make”会失败:

# make
......
 lint
/bin/sh: 1: golint: not found
......

make generate”也会失败:

# make generate
 bin/protoc-gen-gogoswarm
 generate
mockgen -package exec -destination controller_test.mock.go -source controller.go Controller StatusReporter
agent/exec/controller_test.go:17: running "mockgen": exec: "mockgen": executable file not found in $PATH
protoc -I.:../protobuf:../vendor:../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mtimestamp/timestamp.proto=github.com/docker/swarmkit/api/timestamp,Mduration/duration.proto=github.com/docker/swarmkit/api/duration,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mplugin/plugin.proto=github.com/docker/swarmkit/protobuf/plugin:. types.proto specs.proto objects.proto control.proto dispatcher.proto ca.proto snapshot.proto raft.proto health.proto resource.proto
protoc -I.:../../vendor:../../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api/duration,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. duration.proto
protoc -I.:../../vendor:../../vendor/github.com/gogo/protobuf --gogoswarm_out=plugins=grpc+deepcopy+raftproxy+authenticatedwrapper,import_path=github.com/docker/swarmkit/api/timestamp,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. timestamp.proto
protoc -I.:/usr/local --gogoswarm_out=import_path=github.com/docker/swarmkit/protobuf/plugin,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor:. plugin.proto
protoc -I.:../../../../vendor --gogoswarm_out=plugins=deepcopy,import_path=github.com/docker/swarmkit/protobuf/plugin/deepcopy/test:. deepcopy.proto
protoc -I.:../../../../vendor --gogoswarm_out=plugins=grpc+raftproxy,import_path=github.com/docker/swarmkit/protobuf/plugin/raftproxy/test:. service.proto
Makefile:43: recipe for target 'generate' failed
make: *** [generate] Error 1

参考Why does “make generate” fail?

Go语言中的nil

本文是Francesc CampoyGoConf上做的Understanding Nil演讲的笔记。

(1)nil没有type

(2)在Go语言中,未显示初始化的变量拥有其类型的zero value。共有6种类型变量的zero valuenilpointerslicemapchannelfunctioninterface。具体含义如下:

类型 nil值含义
pointer 指向nothing
slice slice变量中的3个成员值:buf为nil(没有backing array),len和cap都是0
map,channel,function 一个nil pointer,指向nothing
interface interface包含”type, value”,一个nil interface必须二者都为nil:”nil, nil”

对于interface来说,正因为需要<type, value>这个元组中两个值都为nilinterface值才是nil。所以需要注意下面两点:

a)Do not declare concrete error vars:

func do() error {
    var err *doError // nil of type *doError
    return err // error (*doError, nil)
}
func main() {
    err := do() // error (*doError, nil)
    fmt.Println(err == nil) // false
}

b)Do not return concrete error types:

func do() *doError {
    return nil // nil of type *doError
}
func main() {
    err := do() // nil of type *doError
    fmt.Println(err == nil) // true
}

func do() *doError {
    return nil // nil of type *doError
}
func wrapDo() error { // error (*doError, nil)
    return do() // nil of type *doError
}
func main() {
    err := wrapDo() // error (*doError, nil)
    fmt.Println(err == nil) // false
}

(3)nil一些有用的使用场景:

类型 nil值使用场景
pointer methods can be called on nil receivers
slice perfectly valid zero values
map perfect as read-only values(往nil map添加成员会导致panic)
channel essential for some concurrency patterns
function needed for completeness
interface the most used signal in Go (err != nil)

 

Swarmkit笔记(15)——cluster node存储相关的代码

raft.Node有一个memoryStore成员(定义在manager/state/raft/raft.go):

// Node represents the Raft Node useful
// configuration.
type Node struct {
    ......
    memoryStore         *store.MemoryStore
    ......
}

它非常重要,因为cluster中用来响应swarmctl命令的manager leader中的store成员其实就是指向managerNode结构体中的memoryStore

// Server is the Cluster API gRPC server.
type Server struct {
    store  *store.MemoryStore
    raft   *raft.Node
    rootCA *ca.RootCA
}

store.MemoryStore定义在manager/state/store/memory.go

// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
    // updateLock must be held during an update transaction.
    updateLock sync.Mutex

    memDB *memdb.MemDB
    queue *watch.Queue

    proposer state.Proposer
}

其中实际用来存储的memory database部分使用的是go-memdb项目。初始化store.MemoryStore使用NewMemoryStore()函数:

// NewMemoryStore returns an in-memory store. The argument is an optional
// Proposer which will be used to propagate changes to other members in a
// cluster.
func NewMemoryStore(proposer state.Proposer) *MemoryStore {
    memDB, err := memdb.NewMemDB(schema)
    if err != nil {
        // This shouldn't fail
        panic(err)
    }

    return &MemoryStore{
        memDB:    memDB,
        queue:    watch.NewQueue(0),
        proposer: proposer,
    }
}

其中schema是一个*memdb.DBSchema类型的变量:

schema        = &memdb.DBSchema{
        Tables: map[string]*memdb.TableSchema{},
    }

schema添加成员使用的是register函数(定义在manager/state/store/memory.go):

func register(os ObjectStoreConfig) {
    objectStorers = append(objectStorers, os)
    schema.Tables[os.Name] = os.Table
}

register函数在store package里各个文件(分别是cluster.gonetworks.gonodes.goservices.gotasks.go,正好对应swarmctl5个子命令。)的init()函数中使用,用来注册如何处理相应的object

ObjectStoreConfig定义在manager/state/store/object.go

// ObjectStoreConfig provides the necessary methods to store a particular object
// type inside MemoryStore.
type ObjectStoreConfig struct {
    Name             string
    Table            *memdb.TableSchema
    Save             func(ReadTx, *api.StoreSnapshot) error
    Restore          func(Tx, *api.StoreSnapshot) error
    ApplyStoreAction func(Tx, *api.StoreAction) error
    NewStoreAction   func(state.Event) (api.StoreAction, error)
}

它定义了如何存储一个object

services.go为例:

const tableService = "service"

func init() {
    register(ObjectStoreConfig{
        Name: tableService,
        Table: &memdb.TableSchema{
            Name: tableService,
            Indexes: map[string]*memdb.IndexSchema{
                indexID: {
                    Name:    indexID,
                    Unique:  true,
                    Indexer: serviceIndexerByID{},
                },
                indexName: {
                    Name:    indexName,
                    Unique:  true,
                    Indexer: serviceIndexerByName{},
                },
            },
        },
        Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error {
            var err error
            snapshot.Services, err = FindServices(tx, All)
            return err
        },
        Restore: func(tx Tx, snapshot *api.StoreSnapshot) error {
            services, err := FindServices(tx, All)
            if err != nil {
                return err
            }
            for _, s := range services {
                if err := DeleteService(tx, s.ID); err != nil {
                    return err
                }
            }
            for _, s := range snapshot.Services {
                if err := CreateService(tx, s); err != nil {
                    return err
                }
            }
            return nil
        },
        ApplyStoreAction: func(tx Tx, sa *api.StoreAction) error {
            switch v := sa.Target.(type) {
            case *api.StoreAction_Service:
                obj := v.Service
                switch sa.Action {
                case api.StoreActionKindCreate:
                    return CreateService(tx, obj)
                case api.StoreActionKindUpdate:
                    return UpdateService(tx, obj)
                case api.StoreActionKindRemove:
                    return DeleteService(tx, obj.ID)
                }
            }
            return errUnknownStoreAction
        },
        NewStoreAction: func(c state.Event) (api.StoreAction, error) {
            var sa api.StoreAction
            switch v := c.(type) {
            case state.EventCreateService:
                sa.Action = api.StoreActionKindCreate
                sa.Target = &api.StoreAction_Service{
                    Service: v.Service,
                }
            case state.EventUpdateService:
                sa.Action = api.StoreActionKindUpdate
                sa.Target = &api.StoreAction_Service{
                    Service: v.Service,
                }
            case state.EventDeleteService:
                sa.Action = api.StoreActionKindRemove
                sa.Target = &api.StoreAction_Service{
                    Service: v.Service,
                }
            default:
                return api.StoreAction{}, errUnknownStoreAction
            }
            return sa, nil
        },
    })
}

NewStoreAction是创建针对service这张tableapi.StoreAction;而ApplyStoreAction则是根据具体情况,使用相应的actioncreateupdate还是delete,等等);Save是从数据库读取所有的service并保存到一个snapshot中;Restore则是用snapshot中的值更新数据库中相应的service

再看一下manager leader用来创建service的函数(manager\controlapi\service.go):

// CreateService creates and return a Service based on the provided ServiceSpec.
// - Returns `InvalidArgument` if the ServiceSpec is malformed.
// - Returns `Unimplemented` if the ServiceSpec references unimplemented features.
// - Returns `AlreadyExists` if the ServiceID conflicts.
// - Returns an error if the creation fails.
func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRequest) (*api.CreateServiceResponse, error) {
    ......
    err := s.store.Update(func(tx store.Tx) error {
        return store.CreateService(tx, service)
    })
    if err != nil {
        return nil, err
    }
    ......
}

s.store.Update()函数是核心部分(manager/state/store/memory.go):

// Update executes a read/write transaction.
func (s *MemoryStore) Update(cb func(Tx) error) error {
    return s.update(s.proposer, cb)
} 

再看一下MemoryStore.update()函数(manager/state/store/memory.go):

func (s *MemoryStore) update(proposer state.Proposer, cb func(Tx) error) error {
    s.updateLock.Lock()
    memDBTx := s.memDB.Txn(true)

    var curVersion *api.Version

    if proposer != nil {
        curVersion = proposer.GetVersion()
    }

    var tx tx
    tx.init(memDBTx, curVersion)

    err := cb(&tx)

    if err == nil {
        if proposer == nil {
            memDBTx.Commit()
        } else {
            var sa []*api.StoreAction
            sa, err = tx.changelistStoreActions()

            if err == nil {
                if sa != nil {
                    err = proposer.ProposeValue(context.Background(), sa, func() {
                        memDBTx.Commit()
                    })
                } else {
                    memDBTx.Commit()
                }
            }
        }
    }

    if err == nil {
        for _, c := range tx.changelist {
            s.queue.Publish(c)
        }
        if len(tx.changelist) != 0 {
            s.queue.Publish(state.EventCommit{})
        }
    } else {
        memDBTx.Abort()
    }
    s.updateLock.Unlock()
    return err

}

分析一下上面这个函数:
(1)

memDBTx := s.memDB.Txn(true)

这是go-memdb的用法,true表明创建一个write transaction

(2)

if proposer != nil {
    curVersion = proposer.GetVersion()
}

proposermanagerraft.Node成员,其功能是用来通知cluster中其它follower manager所发生的变化:

// ProposeValue calls Propose on the raft and waits
// on the commit log action before returning a result
func (n *Node) ProposeValue(ctx context.Context, storeAction []*api.StoreAction, cb func()) error {
    _, err := n.processInternalRaftRequest(ctx, &api.InternalRaftRequest{Action: storeAction}, cb)
    if err != nil {
        return err
    }
    return nil
}

// GetVersion returns the sequence information for the current raft round.
func (n *Node) GetVersion() *api.Version {
    n.stopMu.RLock()
    defer n.stopMu.RUnlock()

    if !n.IsMember() {
        return nil
    }

    status := n.Node.Status()
    return &api.Version{Index: status.Commit}
}

(3)

var tx tx
tx.init(memDBTx, curVersion)

err := cb(&tx)

其中tx定义如下:

// Tx is a read/write transaction. Note that transaction does not imply
// any internal batching. The purpose of this transaction is to give the
// user a guarantee that its changes won't be visible to other transactions
// until the transaction is over.
type Tx interface {
    ReadTx
    create(table string, o Object) error
    update(table string, o Object) error
    delete(table, id string) error
}

type tx struct {
    readTx
    curVersion *api.Version
    changelist []state.Event
}

tx用来实现read/write transaction

tx.init()就是一个“一对一”的赋值:

func (tx *tx) init(memDBTx *memdb.Txn, curVersion *api.Version) {
    tx.memDBTx = memDBTx
    tx.curVersion = curVersion
    tx.changelist = nil
}

cb就是:

func(tx store.Tx) error {
        return store.CreateService(tx, service)
}

store.CreateService()函数:

// CreateService adds a new service to the store.
// Returns ErrExist if the ID is already taken.
func CreateService(tx Tx, s *api.Service) error {
    // Ensure the name is not already in use.
    if tx.lookup(tableService, indexName, strings.ToLower(s.Spec.Annotations.Name)) != nil {
        return ErrNameConflict
    }

    return tx.create(tableService, serviceEntry{s})
}

以上代码确定service name没有重复后,再创建service

// create adds a new object to the store.
// Returns ErrExist if the ID is already taken.
func (tx *tx) create(table string, o Object) error {
    if tx.lookup(table, indexID, o.ID()) != nil {
        return ErrExist
    }

    copy := o.Copy()
    meta := copy.Meta()
    if err := touchMeta(&meta, tx.curVersion); err != nil {
        return err
    }
    copy.SetMeta(meta)

    err := tx.memDBTx.Insert(table, copy)
    if err == nil {
        tx.changelist = append(tx.changelist, copy.EventCreate())
        o.SetMeta(meta)
    }
    return err
}

上面这个函数会创建一个Object副本(也就是serviceEntry结构体)存放到数据库里,并把一个state.EventCreateService加到tx.changelist中。

其实这些有callbak作为参数的函数,真正用来做事就是callback,函数的其它部分仅仅是提供了一些common的功能。比如:获得transaction和commit。

(4)

    if err == nil {
        if proposer == nil {
            memDBTx.Commit()
        } else {
            var sa []*api.StoreAction
            sa, err = tx.changelistStoreActions()

            if err == nil {
                if sa != nil {
                    err = proposer.ProposeValue(context.Background(), sa, func() {
                        memDBTx.Commit()
                    })
                } else {
                    memDBTx.Commit()
                }
            }
        }
    }

把数据commit到数据库。

(5)

    if err == nil {
        for _, c := range tx.changelist {
            s.queue.Publish(c)
        }
        if len(tx.changelist) != 0 {
            s.queue.Publish(state.EventCommit{})
        }
    } else {
        memDBTx.Abort()
    }

s.queue.Publish()函数把创建service这个消息通知到其它的goroutine(例如m.globalOrchestrator.Run()),这些goroutine会做具体的创建service操作。

此外,MemoryStore还提供了View函数,用来完成read transaction

// ReadTx is a read transaction. Note that transaction does not imply
// any internal batching. It only means that the transaction presents a
// consistent view of the data that cannot be affected by other
// transactions.
type ReadTx interface {
    lookup(table, index, id string) Object
    get(table, id string) Object
    find(table string, by By, checkType func(By) error, appendResult func(Object)) error
}

type readTx struct {
    memDBTx *memdb.Txn
}

// View executes a read transaction.
func (s *MemoryStore) View(cb func(ReadTx)) {
    memDBTx := s.memDB.Txn(false)

    readTx := readTx{
        memDBTx: memDBTx,
    }
    cb(readTx)
    memDBTx.Commit()
}

RESTful Web 服务简介

本文是RESTful Web Services: A Tutorial的笔记:

REST stands for Representational State Transfer, which is an architectural style for networked hypermedia applications, it is primarily used to build Web services that are lightweight, maintainable, and scalable. A service based on REST is called a RESTful service. REST is not dependent on any protocol, but almost every RESTful service uses HTTP as its underlying protocol.

尽管REST本身不依赖任何协议,但是事实上RESTful服务基本都使用HTTP协议。

RESTful服务本身的特性:

Every system uses resources. These resources can be pictures, video files, Web pages, business information, or anything that can be represented in a computer-based system. The purpose of a service is to provide a window to its clients so that they can access these resources. Service architects and developers want this service to be easy to implement, maintainable, extensible, and scalable. A RESTful design promises that and more. In general, RESTful services should have following properties and features, which I’ll describe in detail:

Representations
Messages
URIs
Uniform interface
Stateless
Links between resources
Caching

具体来说:
Representations:使用JSONXML
MessagesURIsUniform interface:使用HTTP协议;
Stateless:很重要,不同的request之间不能有依赖。

Swarmkit笔记(14)——manager切换角色

Manager结构体(定义在manager/manager.go)包含一个*raft.Node成员:

// Manager is the cluster manager for Swarm.
// This is the high-level object holding and initializing all the manager
// subsystems.
type Manager struct {
    ......
    RaftNode   *raft.Node
    ......
}

raft.Node(定义在manager/state/raft/raft.go)则包含一个*events.Broadcaster成员,用来接收改变manager role的消息(变成leader还是follower):

// Node represents the Raft Node useful
// configuration.
type Node struct {
    ......
    leadershipBroadcast *events.Broadcaster
    ......
}   

发送改变当前manager role的代码位于manager/state/raft/raft.go

// Run is the main loop for a Raft node, it goes along the state machine,
// acting on the messages received from other Raft nodes in the cluster.
//
// Before running the main loop, it first starts the raft node based on saved
// cluster state. If no saved state exists, it starts a single-node cluster.
func (n *Node) Run(ctx context.Context) error {
    ......
            // If we cease to be the leader, we must cancel
            // any proposals that are currently waiting for
            // a quorum to acknowledge them. It is still
            // possible for these to become committed, but
            // if that happens we will apply them as any
            // follower would.
            if rd.SoftState != nil {
                if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
                    wasLeader = false
                    n.wait.cancelAll()
                    if atomic.LoadUint32(&n.signalledLeadership) == 1 {
                        atomic.StoreUint32(&n.signalledLeadership, 0)
                        n.leadershipBroadcast.Write(IsFollower)
                    }
                } else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
                    wasLeader = true
                }
            }

            if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
                // If all the entries in the log have become
                // committed, broadcast our leadership status.
                if n.caughtUp() {
                    atomic.StoreUint32(&n.signalledLeadership, 1)
                    n.leadershipBroadcast.Write(IsLeader)
                }
            }
    ......
}

接收消息的代码在Manager.Run()函数(manager/manager.go):

// Run starts all manager sub-systems and the gRPC server at the configured
// address.
// The call never returns unless an error occurs or `Stop()` is called.
func (m *Manager) Run(parent context.Context) error {
    ......
    leadershipCh, cancel := m.RaftNode.SubscribeLeadership()
    defer cancel()

    go m.handleLeadershipEvents(ctx, leadershipCh)

    ......

    go func() {
            err := m.RaftNode.Run(ctx)
            if err != nil {
                log.G(ctx).Error(err)
                m.Stop(ctx)
            }
        }()

    ......
}

Node.SubscribeLeadership()Manager.handleLeadershipEvents()代码比较简单,不再赘述。