记录一次canal发送大请求至kafka失败
事件原因
之前我们用的canal1.1.3版本后升级至最新版,但是上线后发现发送数据报错
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.2020-02-20 22:28:50.292 [pool-5-thread-2] ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept. at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:150) ~[canal.server-1.1.5-SNAPSHOT.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:183) [canal.server-1.1.5-SNAPSHOT.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:23) [canal.server-1.1.5-SNAPSHOT.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:225) [canal.server-1.1.5-SNAPSHOT.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_212] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_212] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29) ~[kafka-clients-1.1.1.jar:na] at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:148) ~[canal.server-1.1.5-SNAPSHOT.jar:na] ... 6 common frames omittedCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
大概意思就是发送数据过大超过了kafka的默认的最大消息体大小(默认为1m)
原因是因为我们的老版本改了下代码去掉了部分数据,所以消息体小
{"data":[{"orderType":"0","payTime":"2010-10-27 11:02:28","isAdvance":"0","couponId":"0","payType":"1","id":"117","orderNo":"20101027110228","tradeNo":"120359403459","isHasGif":"0","orderFromUrl":"highso.org.cn","promotionMoney":"0.0","checkReason":"小伙子动起来啊","autoActivate":"0","goodsType":"0","bankId":"wechat","couponMoney":"0.0","status":"4","cancelType":"2","freight":"0.0","remark":"系统自动取消","easyOrderNo":"1231231231","customerId":"12340243","operationId":"100000","isSplitOrder":"0","cancelReason":"7天未付款","onlinePayType":"32","operationName":"超级管理员2","updateTime":"2020-02-24 14:00:48","pageNum":"0","netPayMoney":"360.0","createTime":"2010-10-27 11:02:28","cancelTime":"2019-07-13 01:30:09","buyAgreement":"0","originalMoney":"360.0"}],"pkNames":["id"],"old":[{"operationName":"超级管理员1","updateTime":"2020-02-24 13:58:20"}],"type":"UPDATE","es":1582524048000,"sql":"UPDATE `crm`.`order` SET `operationName` = '超级管理员2' WHERE `id` = 117","database":"crm","id":2,"isDdl":false,"table":"order","ts":1582524048728}
新的如下
{"data":[{"orderType":"0","payTime":"2010-10-27 11:02:28","isAdvance":"0","couponId":"0","payType":"1","id":"117","orderNo":"20101027110228","tradeNo":"120359403459","isHasGif":"0","orderFromUrl":"highso.org.cn","promotionMoney":"0.0","checkReason":"小伙子动起来啊","autoActivate":"0","goodsType":"0","bankId":"wechat","couponMoney":"0.0","status":"4","cancelType":"2","freight":"0.0","remark":"系统自动取消","easyOrderNo":"1231231231","customerId":"12340243","operationId":"100000","isSplitOrder":"0","cancelReason":"7天未付款","onlinePayType":"32","operationName":"超级管理员1","updateTime":"2020-02-24 13:58:20","pageNum":"0","netPayMoney":"360.0","createTime":"2010-10-27 11:02:28","cancelTime":"2019-07-13 01:30:09","buyAgreement":"0","originalMoney":"360.0"}],"pkNames":["id"],"old":[{"operationName":"超级管理员2","updateTime":"2020-02-24 13:57:04"}],"type":"UPDATE","es":1582523900000,"sql":"","database":"crm","sqlType":{"orderType":4,"payTime":93,"isAdvance":4,"couponId":-5,"instalmentNum":3,"webAgent":12,"payType":4,"id":-5,"cancelRemark":12,"orderNo":12,"tradeNo":12,"isHasGif":4,"childPlace":4,"orderFromUrl":12,"promotionMoney":3,"checkReason":2005,"autoActivate":4,"sendTime":93,"goodsType":5,"bankId":12,"checkTime":93,"couponMoney":3,"orderFrom":4,"serviceOperate":4,"status":4,"cancelType":4,"freight":3,"remark":2005,"easyOrderNo":12,"ipAddr":12,"childOrderPayNum":12,"couponType":4,"customerId":-5,"operationId":-5,"isSplitOrder":4,"cancelReason":2005,"deadline":4,"recoveryTime":93,"refundReason":12,"buyAgreementTime":93,"onlinePayType":4,"operationName":12,"updateTime":93,"pageNum":12,"createBy":-5,"netPayMoney":3,"toPayTime":93,"createTime":93,"cancelTime":93,"activationCode":12,"webFrom":12,"buyAgreement":4,"serviceMoney":3,"originalMoney":3},"mysqlType":{"orderType":"int(1)","payTime":"datetime","isAdvance":"int(10) unsigned","couponId":"bigint(20)","instalmentNum":"decimal(12,2)","webAgent":"varchar(64)","payType":"int(1)","id":"bigint(20)","cancelRemark":"varchar(500)","orderNo":"varchar(50)","tradeNo":"varchar(50)","isHasGif":"int(1)","childPlace":"int(2)","orderFromUrl":"varchar(500)","promotionMoney":"decimal(11,2)","checkReason":"text","autoActivate":"int(11)","sendTime":"datetime","goodsType":"smallint(6)","bankId":"varchar(20)","checkTime":"datetime","couponMoney":"decimal(11,2)","orderFrom":"int(1)","serviceOperate":"int(1)","status":"int(1)","cancelType":"int(1)","freight":"decimal(11,2)","remark":"text","easyOrderNo":"varchar(250)","ipAddr":"varchar(32)","childOrderPayNum":"varchar(5)","couponType":"int(1)","customerId":"bigint(20)","operationId":"bigint(20)","isSplitOrder":"int(1)","cancelReason":"text","deadline":"int(11)","recoveryTime":"datetime","refundReason":"varchar(200)","buyAgreementTime":"datetime","onlinePayType":"int(2)","operationName":"varchar(50)","updateTime":"timestamp","pageNum":"varchar(20)","createBy":"bigint(20)","netPayMoney":"decimal(11,2)","toPayTime":"datetime","createTime":"datetime","cancelTime":"datetime","activationCode":"varchar(100)","webFrom":"varchar(255)","buyAgreement":"int(2)","serviceMoney":"decimal(11,2)","originalMoney":"decimal(11,2)"},"id":2,"isDdl":false,"table":"order","ts":1582523900096}
可以看出大了很多,所以当刷数据的时候消息体过大就会报错
解决办法
第一步修改canal服务端mq的配置
canal.mq.maxRequestSize = 1048576 (默认为1m,根据需求修改)
第二部修改队列的最大消息体大小
/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic testTopic --config max.message.bytes=5242880