配置项

使用通道服务功能消费数据时,您可以根据需要配置客户端tunnel client、数据消费worker以及日志。本文介绍通道服务的配置项。

tunnel client

初始化tunnel client时,您可以通过NewTunnelClientWithConfig接口自定义客户端配置,如果未指定config进行接口初始化或者confignil,则默认配置会使用DefaultTunnelConfig。

DefaultTunnelConfig的配置信息如下:

var DefaultTunnelConfig = &TunnelConfig{
      //最大指数退避重试时间。
      MaxRetryElapsedTime: 75 * time.Second,
      //HTTP请求超时时间。
      RequestTimeout:      60 * time.Second,
      //http.DefaultTransport。
      Transport:           http.DefaultTransport,
}

数据消费worker

TunnelWorkerConfig中包含了数据消费worker需要的配置,其中ProcessorFactory为必填项,其余参数如果不填写将使用默认值,通常使用默认值即可。

TunnelWorkerConfig的配置信息如下:

type TunnelWorkerConfig struct {
   //worker同Tunnel服务的心跳超时时间,通常使用默认值即可。
   HeartbeatTimeout  time.Duration
   //worker发送心跳的频率,通常使用默认值即可。
   HeartbeatInterval time.Duration
   //tunnel下消费连接建立接口,通常使用默认值即可。
   ChannelDialer     ChannelDialer

   //消费连接上具体处理器产生接口,通常使用callback函数初始化SimpleProcessFactory即可。
   ProcessorFactory ChannelProcessorFactory

   //zap日志配置,默认值为DefaultLogConfig。
   LogConfig      *zap.Config
   //zap日志轮转配置,默认值为DefaultSyncer。
   LogWriteSyncer zapcore.WriteSyncer
}

其中ProcessorFactory为用户注册消费callback函数以及其他信息的接口,建议使用SDK中自带SimpleProcessorFactory实现。

SimpleProcessorFactory的配置信息如下:

type SimpleProcessFactory struct {
   //用户自定义信息,会传递到ProcessFunc和ShutdownFunc中的ChannelContext参数中。
   CustomValue interface{}

   //Worker记录checkpoint的间隔,CpInterval<=0时会使用DefaultCheckpointInterval。
   CpInterval time.Duration

   //worker数据处理的同步调用callback,ProcessFunc返回error时worker会用本批数据退避重试ProcessFunc。
   ProcessFunc  func(channelCtx *ChannelContext, records []*Record) error
   //worker退出时的同步调用callback。
   ShutdownFunc func(channelCtx *ChannelContext)

   //日志配置,Logger为nil时会使用DefaultLogConfig初始化logger。
   Logger *zap.Logger
}

日志

默认日志配置和日志轮转配置示例如下:

默认日志配置

//DefaultLogConfig是TunnelWorkerConfig和SimpleProcessFactory使用的默认日志配置。
var DefaultLogConfig = zap.Config{
   Level:       zap.NewAtomicLevelAt(zap.InfoLevel),
   Development: false,
   Sampling: &zap.SamplingConfig{
      Initial:    100,
      Thereafter: 100,
   },
   Encoding: "json",
   EncoderConfig: zapcore.EncoderConfig{
      TimeKey:        "ts",
      LevelKey:       "level",
      NameKey:        "logger",
      CallerKey:      "caller",
      MessageKey:     "msg",
      StacktraceKey:  "stacktrace",
      LineEnding:     zapcore.DefaultLineEnding,
      EncodeLevel:    zapcore.LowercaseLevelEncoder,
      EncodeTime:     zapcore.ISO8601TimeEncoder,
      EncodeDuration: zapcore.SecondsDurationEncoder,
      EncodeCaller:   zapcore.ShortCallerEncoder,
   },
}

日志轮转配置

//DefaultSyncer是TunnelWorkerConfig和SimpleProcessFactory使用的默认日志轮转配置
var DefaultSyncer = zapcore.AddSync(&lumberjack.Logger{
   //日志文件路径。
   Filename:   "tunnelClient.log",
   //最大日志文件大小。
   MaxSize:    512, //MB
   //压缩轮转的日志文件数。
   MaxBackups: 5,
   //轮转日志文件保留的最大天数。
   MaxAge:     30, //days
   //是否压缩轮转日志文件。
   Compress:   true,
})