Kafka Connect S3常见参数与异常解释
重要的参数
partition.duration.ms
connector会将数据按时间划分文件夹,比如这个参数设为1800000
,意味着每半小时为一个topic新开一个文件夹。
partitioner.class & path.format
默认值partition.class=TimeBasedPartitioner.class & path.format=YYYY/MM/dd/HH:mm:ss
在这个默认值下,文件路径大概会像这样如下字符串,由时间+topic+partition+offset组成:
/topics/{topicName}+3+0000000000.json
timezone
默认使用UTC+0时间,没有特殊需求尽量不要管动它
timestamp.extractor & timestamp.field
如何获取记录的时间的配置,有以下几种配置:
- Record: 默认值,使用Kafka原本的消息时间
- Wallclock:connector处理到这条消息的系统当前时间
- RecordField:使用Record内某个字段作为时间戳,配合timestamp.field使用。不建议此方法,容易因为没有相关字段直接抛出ConnectorException异常
- 其它:自提供一个TimestampExtractor的实现类,需要给出类的全名。
flush.size
& rotate.interval.ms
最多多少条记录提交一次文件,以及最多多长时间提交一次文件,二者满足其一就会创建一个新的s3文件出来。
s3.part.retries
& s3.retry.backoff.ms
写入s3的失败重试次数,以及每次失败后的等待时长。
其它参数
aws.access.key.id
: s3用户名
aws.secret.access.key
:s3密码
store.url
: 存储的s3地址
s3.bucket.name
: 存储的s3 bucket
tasks.max
:最多几个task
format.class
:数据以什么形式写入s3,有JsonFormat \ ByteArrayFormat \ AvroFormat等,可以通过实现io.confluent.connect.storage.format.Format
接口来自定义想要的格式和内容。