Engine
结构体有一个eventsMonitor
成员:
type Engine struct {
......
eventsMonitor *EventsMonitor
}
EventsMonitor
结构体定义如下:
//EventsMonitor monitors events
type EventsMonitor struct {
stopChan chan struct{}
cli client.APIClient
handler func(msg events.Message) error
}
stopChan
用来通知停止接受消息;cli
是底层连接的client
,而handler
则是收到event
的处理函数。
Engine.ConnectWithClient
方法会给eventsMonitor
成员赋值:
// ConnectWithClient is exported
func (e *Engine) ConnectWithClient(client dockerclient.Client, apiClient engineapi.APIClient) error {
e.client = client
e.apiClient = apiClient
e.eventsMonitor = NewEventsMonitor(e.apiClient, e.handler)
// Fetch the engine labels.
if err := e.updateSpecs(); err != nil {
return err
}
e.StartMonitorEvents()
// Force a state update before returning.
if err := e.RefreshContainers(true); err != nil {
return err
}
if err := e.RefreshImages(); err != nil {
return err
}
// Do not check error as older daemon does't support this call.
e.RefreshVolumes()
e.RefreshNetworks()
e.emitEvent("engine_connect")
return nil
}
其中Engine.StartMonitorEvents
代码如下:
// StartMonitorEvents monitors events from the engine
func (e *Engine) StartMonitorEvents() {
log.WithFields(log.Fields{"name": e.Name, "id": e.ID}).Debug("Start monitoring events")
ec := make(chan error)
e.eventsMonitor.Start(ec)
go func() {
if err := <-ec; err != nil {
if !strings.Contains(err.Error(), "EOF") {
// failing node reconnect should use back-off strategy
<-e.refreshDelayer.Wait(e.getFailureCount())
}
e.StartMonitorEvents()
}
close(ec)
}()
}
Engine.StartMonitorEvents
就是如果从ec channel
收取消息,如果是错误,就不断地循环启动Engine.StartMonitorEvents
。
EventsMonitor.Start
函数代码如下:
// Start starts the EventsMonitor
func (em *EventsMonitor) Start(ec chan error) {
em.stopChan = make(chan struct{})
responseBody, err := em.cli.Events(context.Background(), types.EventsOptions{})
if err != nil {
ec <- err
return
}
resultChan := make(chan decodingResult)
go func() {
dec := json.NewDecoder(responseBody)
for {
var result decodingResult
result.err = dec.Decode(&result.msg)
resultChan <- result
if result.err == io.EOF {
break
}
}
close(resultChan)
}()
go func() {
defer responseBody.Close()
for {
select {
case <-em.stopChan:
ec <- nil
return
case result := <-resultChan:
if result.err != nil {
ec <- result.err
return
}
if err := em.handler(result.msg); err != nil {
ec <- err
return
}
}
}
}()
}
代码逻辑实际就是发出“HTTP GET /events
”请求,然后等待Docker Engine
的响应。因为这个HTTP
请求很可能会阻塞在这里,因此随后的HTTP
消息交互就会重新建立一个HTTP
连接。原理在这里:
type Response struct {
......
// Body represents the response body.
//
// The http Client and Transport guarantee that Body is always
// non-nil, even on responses without a body or responses with
// a zero-length body. It is the caller's responsibility to
// close Body. The default HTTP client's Transport does not
// attempt to reuse HTTP/1.0 or HTTP/1.1 TCP connections
// ("keep-alive") unless the Body is read to completion and is
// closed.
//
// The Body is automatically dechunked if the server replied
// with a "chunked" Transfer-Encoding.
Body io.ReadCloser
......
}
如果想停止这个EventsMonitor
,可以使用Engine.Stop
方法:
// Stop stops the EventsMonitor
func (em *EventsMonitor) Stop() {
if em.stopChan == nil {
return
}
close(em.stopChan)
}