七叶笔记 » golang编程 » Docker源码分析之容器日志处理与log-driver实现

Docker源码分析之容器日志处理与log-driver实现

概要

本文将从 docker (1.12.6)源码的角度分析docker daemon怎么将容器的日志收集出来并通过配置的log-driver发送出去,并结合示例介绍了好雨云帮中实现的一个zmq-loger。阅读本文,你也可以实现适合自己业务场景的log-driver。

阅读准备

本文适合能够阅读和编写golang代码的同学。
(1) 首先你需要认知以下几个关键词:

  • stdout:
    标准输出,进程写数据的流。
  • stderr:
    错误输出,进程写错误数据的流。
  • 子进程:
    由一个进程(父进程)创建的进程,集成父进程大部分属性,同时可以被父进程守护和管理。
    (2)

    你需要知道关于进程产生日志的形式


    进程产生日志有两类输出方式,一类是写入到文件中。另一类是直接写到stdout或者stderr,例如php的 echo python的 print golang的 fmt.Println("") 等等。
    (3)

    是否知道docker-daemon与运行中container的关系?

    一个container就是一个特殊的进程,它是由docker daemon创建并启动,因此container是docker daemon的子进程。由docker daemon守护和管理。因此container的stdout能够被docker daemon获取到。基于此理论,我们来分析docker daemon相关代码。

    docker-daemon关于日志源码分析

    container实例源码

     # /container/container.go:62
     Type  CommonContainer struct{
     Stream Config  *stream.Config
     ...
    }
    # /container/stream/streams.go:26
    type Config struct {
    sync.WaitGroup
    stdout *broadcaster.Unbuffered
    stderr *broadcaster.Unbuffered
    stdin io.ReadCloser
    stdinPipe io.WriteCloser
    }  

    找到如上所示对应的代码,显示了每一个container实例都有几个属性stdout,stderr,stdin,以及管道stdinPipe。这里说下stdinPipe,当容器使用-i参数启动时标准输入将被运行,daemon将能够使用此管道向容器内写入标准输入。

    我们试想以上图例,如果是你,你怎么实现日志收集转发?

     # /container/container. go  :312  func  (container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Logger, error) {
    c, err := logger.GetLogDriver(cfg.Type) if  err !=   nil   { return   nil , fmt.Errorf( "Failed to get logging factory: %v" , err)
    }
    ctx := logger.Context{
    Config: cfg.Config,
    ContainerID: container.ID,
    ContainerName: container.Name,
    ContainerEntrypoint: container.Path,
    ContainerArgs: container.Args,
    ContainerImageID: container.ImageID.String(),
    ContainerImageName: container.Config.Image,
    ContainerCreated: container.Created,
    ContainerEnv: container.Config.Env,
    ContainerLabels: container.Config.Labels,
    DaemonName:  "docker" ,
    } // Set logging file for "json-logger"  if  cfg.Type == jsonfilelog.Name {
    ctx.LogPath, err = container.GetRootResourcePath(fmt.Sprintf( "%s-json.log" , container.ID)) if  err !=  nil  { return   nil , err
    }
    } return  c(ctx)
    }
    #/container/container. go  :978  func  (container *Container) startLogging() error { if  container.HostConfig.LogConfig.Type ==  "none"  { return   nil   // do not start logging routines }
    l, err := container.StartLogger(container.HostConfig.LogConfig) if  err !=  nil  { return  fmt.Errorf( "Failed to initialize logging driver: %v" , err)
    }
    copier := logger.NewCopier(  map  [ string ]io.Reader{ "stdout" : container.StdoutPipe(),  "stderr" : container.StderrPipe()}, l)
    container.LogCopier = copier
    copier.Run()
    container.LogDriver = l // set LogPath field only for json-file logdriver  if  jl, ok := l.(*jsonfilelog.JSONFileLogger); ok {
    container.LogPath = jl.LogPath()
    } return   nil }  

    第一个方法是为container查找log-driver。首先根据容器配置的log-driver类别调用: logger.GetLogDriver(cfg.Type) 返回一个方法类型:

     /daemon/logger/ factory .go:9
    type Creator func(Context) (Logger, error)  

    实质就是从工厂类注册的logdriver插件去查找,具体源码下文分析。获取到c方法后构建调用参数具体就是容器的一些信息。然后使用调用c方法返回driver。driver是个接口类型,我们看看有哪些方法:

     # /daemon/logger/logger. go  :61  type  Logger  interface  {
    Log(* Message ) error
    Name()  string  Close () error
    }  

    很简单的三个方法,也很容易理解, Log() 发送日志消息到driver, Close() 进行关闭操作(根据不同实现)。
    也就是说我们自己实现一个logdriver,只需要实现如上三个方法,然后注册到logger工厂类中即可。下面我们来看 /daemon/logger/factory.go
    第二个方法就是处理日志了,获取到日志driver,在创建一个 Copier ,顾名思义就是复制日志,分别从stdout 和stderr复制到logger driver。下面看看具体关键实现:

     #/daemon/logger/copir. go  :41  func  (c *Copier) copySrc(name  string , src io.Reader) { defer  c.copyJobs.Done()
    reader := bufio.NewReader(src) for  { select  { case  <-c.closed: return  default :
    line, err := reader.ReadBytes( '\n' )
    line = bytes.TrimSuffix(line, [] byte { '\n' }) // ReadBytes can return full or partial output even when it failed.  // e.g. it can return a full entry and EOF.  if  err ==  nil  ||  len (line) > 0  { if  logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr !=  nil  {
    logrus.Errorf( "Failed to log msg %q for logger %s: %s" , line, c.dst.Name(), logErr)
    }
    } if  err !=  nil  { if  err != io.EOF {
    logrus.Errorf( "Error scanning log stream: %s" , err)
    } return }
    }
    }
    }  

    每读取一行数据,构建一个消息,调用logdriver的log方法发送到driver处理。

    日志driver注册器

    位于 /daemon/logger/factory.go 的源码实现即时日志driver的注册器,其中几个重要的方法(上文已经提到一个):

     # /daemon/logger/factory. go  :21  func  (lf *logdriverFactory) register(name  string , c Creator) error { if  lf.driverRegistered(name) { return  fmt.Errorf( "logger: log driver named '%s' is already registered" , name)
    }
    lf.m. Lock ()
    lf.registry[name] = c
    lf.m.Unlock() return   nil }
    # /daemon/logger/factory. go  :39  func  (lf *logdriverFactory) registerLogOptValidator(name  string , l LogOptValidator) error {
    lf.m.Lock() defer  lf.m.Unlock() if  _, ok := lf.optValidator[name]; ok { return  fmt.Errorf( "logger: log validator named '%s' is already registered" , name)
    }
    lf.optValidator[name] = l return   nil }  

    看起来很简单,就是将一个 Creator 方法类型添加到一个map结构中,将 LogOptValidator 添加到另一个map这里注意加锁的操作。

     #/daemon/logger/factory.go:13
    type LogOptValidator func(cfg map[string]string) error  

    这个主要是验证driver的参数 ,dockerd和docker启动参数中有: --log-opt

    好雨云帮自己实现一个基于zmq的log-driver

    上文已经完整分析了docker daemon管理logdriver和处理日志的整个流程。相信你已经比较明白了。下面我们以zmq-driver为例讲讲我们怎么实现自己的driver。直接接收容器的日志。
    上文我们已经谈了一个log-driver需要实现的几个方法。
    我们可以看看位于 /daemon/logger 目录下的已有的driver的实现,例如 fluentd , awslogs 等。
    下面我们来分析zmq-driver具体的代码:

      //定义一个struct,这里包含一个zmq套接字  type  ZmqLogger  struct  {
    writer *zmq.Socket
    containerId  string tenantId  string serviceId  string felock sync.Mutex
    } //定义init方法调用logger注册器的方法注册当前driver  //和参数验证方法。  func  init() { if  err := logger.RegisterLogDriver(name, New); err !=  nil  {
    logrus.Fatal(err)
    } if  err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err !=  nil  {
    logrus.Fatal(err)
    }
    } //实现一个上文提到的Creator方法注册logdriver.  //这里新建一个zmq套接字构建一个实例  func  New(ctx logger.Context) (logger.Logger, error) {
    zmqaddress := ctx.Config[zmqAddress]
    puber, err := zmq.NewSocket(zmq.PUB) if  err !=  nil  { return   nil , err
    } var  (
    env =  make ( map [ string ] string )
    tenantId  string serviceId  string ) for  _, pair :=  range  ctx.ContainerEnv {
    p := strings.SplitN(pair,  "=" , 2 ) //logrus.Errorf("ContainerEnv pair: %s", pair)  if   len (p) == 2  {
    key := p [0 ]
    value := p [1 ]
    env[key] = value
    }
    }
    tenantId = env[ "TENANT_ID" ]
    serviceId = env[ "SERVICE_ID" ] if  tenantId ==  ""  {
    tenantId =  "default" } if  serviceId ==  ""  {
    serviceId =  "default" }
    puber.Connect(zmqaddress) return  &ZmqLogger{
    writer: puber,
    containerId: ctx.ID(),
    tenantId: tenantId,
    serviceId: serviceId,
    felock: sync.Mutex{},
    },  nil } //实现Log方法,这里使用zmq socket发送日志消息  //这里必须注意,zmq socket是线程不安全的,我们知道  //本方法可能被两个线程(复制stdout和肤质stderr)调用//必须使用锁保证线程安全。否则会发生错误。  func  (s *ZmqLogger) Log(msg *logger.Message) error {
    s.felock.Lock() defer  s.felock.Unlock()
    s.writer.Send(s.tenantId, zmq.SNDMORE)
    s.writer.Send(s.serviceId, zmq.SNDMORE) if  msg.Source ==  "stderr"  {
    s.writer.Send(s.containerId+ ": " + string (msg.Line), zmq.DONTWAIT)
    }  else  {
    s.writer.Send(s.containerId+ ": " + string (msg.Line), zmq.DONTWAIT)
    } return   nil } //实现Close方法,这里用来关闭zmq socket。  //同样注意线程安全,调用此方法的是容器关闭协程。  func  (s *ZmqLogger) Close() error {
    s.felock.Lock() defer  s.felock.Unlock() if  s.writer !=  nil  { return  s.writer.Close()
    } return   nil } func  (s *ZmqLogger) Name()  string  { return  name
    } //验证参数的方法,我们使用参数传入zmq pub的地址。  func  ValidateLogOpt(cfg  map [ string ] string ) error { for  key :=  range  cfg { switch  key { case  zmqAddress: default : return  fmt.Errorf( "unknown log opt '%s' for %s log driver" , key, name)
    }
    } if  cfg[zmqAddress] ==  ""  { return  fmt.Errorf( "must specify a value for log opt '%s'" , zmqAddress)
    } return   nil }  

    总结

    多研究源码可以方便我们理解docker的工作原理。今天我们分析了日志部分。希望读者对这部分功能能够理解得更清晰。

相关文章