Canal 使用
前言
使用cannal之前,需要保证mysql支持binlog功能,以及保证canal已经配置好了。
启动canal
- 进入
canal
的安装目录中。
[admin@hadoop102 canal]$ ll
总用量 4
drwxrwxr-x. 2 admin admin 76 7月 31 19:18 bin
drwxrwxr-x. 5 admin admin 93 7月 31 19:49 conf
drwxrwxr-x. 2 admin admin 4096 7月 31 19:18 lib
drwxrwxr-x. 2 admin admin 6 11月 26 2018 logs
- 执行bin目录下的
startup.sh
[admin@hadoop102 canal]$ bin/startup.sh
- 查看启动进程中是否有
CanalLauncher
。
[admin@hadoop102 canal]$ jps
2662 Jps
2586 CanalLauncher
- 若启动时出现如下情况
[admin@hadoop102 canal]$ bin/startup.sh
found canal.pid , Please run stop.sh first ,then startup.sh
表示上次启动之后,异常退出导致canal.pid
依旧存在,需要先执行stop.sh
命令。
[admin@hadoop102 canal]$ bin/startup.sh
hadoop102: stopping canal 2586 ...
参考:https://github.com/alibaba/canal/wiki/AdminGuide
停止canal
- 进入canal的安装目录中。
- 进入
canal
的安装目录中。
[admin@hadoop102 canal]$ ll
总用量 4
drwxrwxr-x. 2 admin admin 76 7月 31 19:18 bin
drwxrwxr-x. 5 admin admin 93 7月 31 19:49 conf
drwxrwxr-x. 2 admin admin 4096 7月 31 19:18 lib
drwxrwxr-x. 2 admin admin 6 11月 26 2018 logs
- 执行bin目录下的
startup.sh
[admin@hadoop102 canal]$ bin/startup.sh
日志查看
cd logs 中,
- canal:
这是有关canal
的日志,若启动canal
失败,可以再次看看日志 - example:
这是有关mysql
实例的日志
需求
监听mysql数据库写操作变化,从canal中获取该数据,并将数据推送到
Kafka
中。
案例:比如使用canal
监听一张用户表,每新增一个用户,canal
便会从user
表中获取该用户信息,这样我们就可以直接获取新增的用户信息。
疑问
为什么需要需要使用canal?直接从mysql中取不信吗?答案:不行,获取数据的目的就是用于计算,假设mysql有一万条用户信息,我需要统计男女人数,并且用户数据还是不停累加的。比如平均以每分钟1000个用户量递增。
方案一:每新增一批,就统计一次(一万条数据时统一一次,一万一千条是再统计一次,以此类推)
方案二:先统计一万条数据中的男女人数,新增一千条时再从一千条中再统计一次,结果和上一个结果相加,以此类推。
显然第二种方案效率更高,canal
的出现便更好的帮我们解决了该问题。
创建一个maven 项目
参考:https://github.com/alibaba/canal/wiki/ClientExample
- 导入依赖
<!-- canal 相关的APi -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<!-- kafka 相关的APi -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
-
编写一个
CanalClient
程序
便于从canal
中获取数据 -
步骤
- 创建一个客服端对象
需要使用到CanalConnector
,它时一个接口,所以需要指定它的子类
CanalConnector子类
-SimpleCanalConnector:单点模式
--ClusterCanalConnector:集群模式,创建客户端,自带故障转移,要求canal server必须是一个HA的,如果当前连接的active的 cnanl server 挂了,自动重新连接。
-CanalMQConnector:用于一些中间件的连接接口
--KafkaCanalConnector:用于将数据写入Kafka中
--RocketMQCanalConnector:用于将数据写入RocketMQ中
如何创建连接?canal
提供一个工具类com.alibaba.otter.canal.client.CanalConnectors
,专门用于创建CanalConnector
客户端
// 创建链接
String hostname="hadoop102"; //主机名 参考 canal.properties中的 canal.ip
int port=11111;// 端口号 canal.properties中的 canal.port
InetSocketAddress socketAddress = new InetSocketAddress(hostname,port);
/**
* SocketAddress address, 连接 canal server的主机名和端口号,参考 canal.properties
* String destination,参考 canal.properties 中的 canal.destinations,
* example :是指instance.properties 的目录名就是 example
* String username, 不需要写
* String password 不需要写
* 官方解释: https://github.com/alibaba/canal/wiki/AdminGuide
* canal.user canal数据端口订阅的ACL配置 (v1.1.4新增) 如果为空,代表不开启 无
* canal.passwd canal数据端口订阅的ACL配置 (v1.1.4新增) 如果为空,代表不开启
*/
CanalConnector connector = CanalConnectors.newSingleConnector(socketAddress, "example", null, null);
- 使用客服端对象连接
canal
,调用connector.connect()
方法即可,没有返回值
// 使用客服端对象连接`canal`
connector.connect();
- 订阅,调用
connector.subscribe("库名.表名");
// 订阅数据表
connector.subscribe("gmallrealtime0323.order_info");
- 拉取数据,调用
connector.get()
Message get(int batchSize):获取数据,自动进行确认,该方法返回的条件:尝试拿batchSize条记录,有多少取多少,不会阻塞等待
Message get(int batchSize, Long timeout, TimeUnit unit):获取数据,自动进行确认,该方法返回的条件:
a. 拿够batchSize条记录或者超过timeout时间
b. 如果timeout=0,则阻塞至拿到batchSize记录才返回
这里使用第一种,拉取一批次数据
// 拉取数据
Message message = connector.get(100);
当前完整代码
public static void main(String[] args) {
// 创建链接
String hostname="hadoop102"; //主机名 参考 canal.properties中的 canal.ip
int port=11111;// 端口号 canal.properties中的 canal.port
InetSocketAddress socketAddress = new InetSocketAddress(hostname,port);
/**
* SocketAddress address, 连接 canal server的主机名和端口号,参考 canal.properties
* String destination,参考 canal.properties 中的 canal.destinations,
* example :是指instance.properties 的目录名就是 example
* String username, 不需要写
* String password 不需要写
* 官方解释: https://github.com/alibaba/canal/wiki/AdminGuide
* canal.user canal数据端口订阅的ACL配置 (v1.1.4新增) 如果为空,代表不开启 无
* canal.passwd canal数据端口订阅的ACL配置 (v1.1.4新增) 如果为空,代表不开启
*/
CanalConnector connector = CanalConnectors.newSingleConnector(socketAddress, "example", null, null);
// 使用客服端对象连接`canal`
connector.connect();
// 订阅数据表
connector.subscribe("gmallrealtime0323.order_info");
// 拉取数据
Message message = connector.get(100);
System.out.println(message);
}
-
运行结果(消息太大,你忍一下),就不贴出来了。
-
接下里看看如何理解
Message
- Message 代表我们拉取的一批数据。
- Message 的一些属性
private long id;
private List<Entry> entries = new ArrayList(); //Entry 代表一条写操作的sql,造成的数据变化
private boolean raw = true;
private List<ByteString> rawEntries = new ArrayList();
-
Entry :记录写操作的sql,造成的数据变化
所谓写操作就是除了select
语句外的sql语句。如(insert、update、delete)。
比如insert
影响了10行数据,update
影响了10行数据,那么Message
会将该两个操作(insert、update)分别包装到Entry
中,也就是说List<Entry>
中会存有两个Entry
。
查看下面的图
Message:上面介绍过,一次canal从日志中抓取的信息,一个message可以包含多个sql执行的结果。
Entry:上面介绍过,对应一个sql命令,一个sql可能会对多行记录造成影响。
Tablename:执行sql影响到的表
EntryType:表示当前sql是什么类型的lsql语句,insert
语句或update
语句等。注意bigin
(开启事务)、commit
(提交事务)也是属于写操作。
StoreValue:若当前EntryType
为ROWDATA
,表示会影响数据的变化,那么会影响那么数据变化呢?它会采用StoreValue
进行记录。
RowChange:StoreValue
是一个序列化对象,不能直接使用,所以我们需要使用RowChange
进行反序列化,RowChange
对象就表示一条sql
引起的数据变化。
EevntType:用于判断当前sql
是什么类型的。
RowDataList:一条sql
可能会引发多行数据变化的,RowDataList
便存储了引发数据变化的多行。
RowData:每个RowData
就代表一行数据。
column:一行数据有多个列。 -
如何知道当前的
EntryType
是什么呢?canal
定义了没有枚举类
com.alibaba.otter.canal.protocol.CanalEntry.EntryType
他就是说明哪些类型,分别代表什么意思
TRANSACTIONBEGIN : 表示 bigin 开启事务
ROWDATA:普通的写操作(insert、update、delete 统称为 ROWDATA,是会引起数据变化的sql)
TRANSACTIONEND:commit 提交事务
HEARTBEAT 心跳连接请求
GTIDLOG(4, 5);
-
CanalEntry.Header 会记录表名信息
-
ByteString storeValue_:记录当前ROWDATA 类型的sql,引起的数据变化。
ByteString 是一个二进制类型,需要转换成RowChange
类型。 -
剩下的概念就在代码中介绍吧,我们接着
Message
往下写。
当然我们从Message
打印中也能看到以上说的信息
// 有数据 id =1,没数据 id=-1
Message[id=1,entries=[header {
version: 1
logfileName: "mysql-bin.000001"
logfileOffset: 219
serverId: 1
serverenCode: "UTF-8"
executeTime: 1627740174000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 68
}
entryType: TRANSACTIONBEGIN // 开启事务
storeValue: " \005"
, header {
version: 1
logfileName: "mysql-bin.000001"
logfileOffset: 352
serverId: 1
serverenCode: "UTF-8"
executeTime: 1627740174000
sourceType: MYSQL
schemaName: "demo"
tableName: "employees" # 表名
eventLength: 95
eventType: UPDATE
props {
key: "rowsCount"
value: "1"
}
}
entryType: ROWDATA // insert、update、delete 语句
storeValue: "\bl\020\002P\000b\235\003\n\032\b\000\020\004\032\002id" // 存储的值,是一个二进制的,太长了,我就清空了‘
, header {
version: 1
logfileName: "mysql-bin.000001"
logfileOffset: 447
serverId: 1
serverenCode: "UTF-8"
executeTime: 1627740174000
sourceType: MYSQL
schemaName: ""
tableName: ""
eventLength: 31
}
完善
整理一下:
- 创建一个maven项目
- 获取
canal server
服务 - 通过``canal server
获取
CanalConnector`对象 - 通过
CanalConnector
成功获取Message
并打印 - 介绍了
Message
的组成架构。
接下来的工作就是从Message
获取我们先要的数据。我们需要一直监控着canal
拉取数据,所以需要不停的请求
// 不停的拉取数据
while (true){
Message message = connector.get(100);
System.out.println(message);
}
若拉取到没有数据时(如下
)。
Message[id=-1,entries=[],raw=false,rawEntries=[]]
Message[id=-1,entries=[],raw=false,rawEntries=[]]
应该暂停一下,等待一会再请求,所以使用Thread.sleep(3000);
休眠3秒钟再继续
// 不停的拉取数据
while (true){
Message message = connector.get(100);
if (message.getId()==-1){
try {
Thread.sleep(3000);
// 终止本次循环,执行下次循环
continue;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(message);
}
接下来:开始解析数据,我们只要ROWDATA
类型的数据,使用 java8 stram
进行过滤。
//解析 只要 ROWDATA 类型的数据
List<CanalEntry.Entry> entryList = message.getEntries()
.stream()
.filter(entry -> entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA))
.collect(Collectors.toList());
接下里我们就应该从ROWDATA
数据类型中获取StoreValue
entryList.stream().forEach(entry -> {
// 获取 storeValue
ByteString storeValue = entry.getStoreValue();
//解析json 并将其转换成 List<JSONObject>
List<JSONObject> list= analysisStoreValue(storeValue);
System.out.println(list);
});
通过StoreValue
将数据转成成json串
。单独封装成了一个方法analysisStoreValue
/**
* 解析 storeValue 封装为 json
* @param storeValue
*/
public static List<JSONObject> analysisStoreValue(ByteString storeValue){
try {
// storeValue 是一个二进制序列化值,所以需要将其反序列化成 RowChange
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
// 这里只需要获取insert sql的语句数据
if(rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){
// 获取表中所有的行
List<CanalEntry.RowData> rows = rowChange.getRowDatasList();
// 通过行获取所有的列
return rows.stream().map(row->{
List<CanalEntry.Column> columns = row.getAfterColumnsList();
// 将每行数据包装成json
JSONObject result=new JSONObject();
columns.forEach(column -> result.put(column.getName(),column.getValue()));
return result;
}).collect(Collectors.toList());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
}
最后输出结果,这就是每个sql
的执行结果(5个insertsql语句)
[{"payment_way":"2","delivery_address":"JYSrezirkbAySRFAOqpA","consignee":"OVnBSI","create_time":"2021-08-01 13:42:20","order_comment":"IPaJfJeuLtNHWObRhhrk","expire_time":"","order_status":"1","out_trade_no":"9397945080","tracking_no":"","total_amount":"498.0","user_id":"1","img_url":"","province_id":"6","consignee_tel":"13792449126","trade_body":"","id":"1","parent_order_id":"","operate_time":""}]
[{"payment_way":"1","delivery_address":"sFXYagEEgSWfKkVXbptY","consignee":"FRuwYB","create_time":"2021-08-01 06:48:37","order_comment":"MRUarJvbSmFsUvwUjnJK","expire_time":"","order_status":"1","out_trade_no":"4961868146","tracking_no":"","total_amount":"438.0","user_id":"2","img_url":"","province_id":"5","consignee_tel":"13728675675","trade_body":"","id":"2","parent_order_id":"","operate_time":""}]
[{"payment_way":"1","delivery_address":"drzwLnhVjjrgHPdXCTNg","consignee":"HahHQg","create_time":"2021-08-01 18:57:58","order_comment":"suWwwQPBnKLAwDCaxjCl","expire_time":"","order_status":"2","out_trade_no":"4720570830","tracking_no":"","total_amount":"251.0","user_id":"1","img_url":"","province_id":"8","consignee_tel":"13164210619","trade_body":"","id":"3","parent_order_id":"","operate_time":"2021-08-01 19:22:34"}]
[{"payment_way":"1","delivery_address":"uLwYDUQvCDjjuwXzFAOt","consignee":"wOFLME","create_time":"2021-08-01 04:06:48","order_comment":"zsrFakbCIJtShgkHJkga","expire_time":"","order_status":"1","out_trade_no":"6377533572","tracking_no":"","total_amount":"578.0","user_id":"1","img_url":"","province_id":"7","consignee_tel":"13657270001","trade_body":"","id":"4","parent_order_id":"","operate_time":""}]
[{"payment_way":"1","delivery_address":"cnhZyvFRqhLhJJqJwflP","consignee":"MtJZdv","create_time":"2021-08-01 20:40:49","order_comment":"txgpgKgzfdbUvrAyUcCD","expire_time":"","order_status":"1","out_trade_no":"2390154344","tracking_no":"","total_amount":"469.0","user_id":"2","img_url":"","province_id":"8","consignee_tel":"13887020238","trade_body":"","id":"5","parent_order_id":"","operate_time":""}]
该数据最终是要推送到Kafka
中,所有,最好的方式就是将json集合合成一个集合中。
// 拆分所有集合
List<JSONObject> result = entryList.stream()
.flatMap(entry -> analysisStoreValue(entry.getStoreValue()).stream())
.filter(e-> e!=null && !e.isEmpty())
.collect(Collectors.toList());
System.out.println(result);
数据结果
[{"payment_way":"1","delivery_address":"PVgWoDbxivHYXQkCgYAI","consignee":"bhFEia","create_time":"2021-08-01 00:19:29","order_comment":"MMygkJALdkNmTIKztsKv","expire_time":"","order_status":"1","out_trade_no":"9606170154","tracking_no":"","total_amount":"576.0","user_id":"1","img_url":"","province_id":"1","consignee_tel":"13567887322","trade_body":"","id":"1","parent_order_id":"","operate_time":""},
{"payment_way":"2","delivery_address":"ZEZjWgPIVCbxkHIsNNzj","consignee":"yqkdMz","create_time":"2021-08-01 13:21:03","order_comment":"yOzghuykDnCbyqfEHvhz","expire_time":"","order_status":"1","out_trade_no":"0773742642","tracking_no":"","total_amount":"170.0","user_id":"2","img_url":"","province_id":"4","consignee_tel":"13265798120","trade_body":"","id":"2","parent_order_id":"","operate_time":""},
{"payment_way":"2","delivery_address":"MJWLPUatUpPfiAcKsKwc","consignee":"QYuemX","create_time":"2021-08-01 19:13:23","order_comment":"VwxWsWEgVkoPhpcsHiUX","expire_time":"","order_status":"1","out_trade_no":"0550616794","tracking_no":"","total_amount":"971.0","user_id":"2","img_url":"","province_id":"1","consignee_tel":"13049382676","trade_body":"","id":"3","parent_order_id":"","operate_time":""},
{"payment_way":"1","delivery_address":"oiWkmEjjtooEdDFuepkf","consignee":"sUwybN","create_time":"2021-08-01 06:41:31","order_comment":"XwqpDVTFvmZlheXBQyWk","expire_time":"","order_status":"2","out_trade_no":"2591907530","tracking_no":"","total_amount":"229.0","user_id":"2","img_url":"","province_id":"1","consignee_tel":"13736224357","trade_body":"","id":"4","parent_order_id":"","operate_time":"2021-08-01 07:18:40"},
{"payment_way":"1","delivery_address":"MzlHFbtPTWdCAWiXrOTe","consignee":"pOaLAx","create_time":"2021-08-01 21:04:55","order_comment":"PLlYhQITtmgTYlmCYmpN","expire_time":"","order_status":"2","out_trade_no":"9277374270","tracking_no":"","total_amount":"542.0","user_id":"2","img_url":"","province_id":"5","consignee_tel":"13603620396","trade_body":"","id":"5","parent_order_id":"","operate_time":"2021-08-01 21:20:07"}]
**当前完整代码 **
public class CanalClient {
public static void main(String[] args) {
// 创建链接
String hostname="hadoop102"; //主机名 参考 canal.properties中的 canal.ip
int port=11111;// 端口号 canal.properties中的 canal.port
InetSocketAddress socketAddress = new InetSocketAddress(hostname,port);
/**
* SocketAddress address, 连接 canal server的主机名和端口号,参考 canal.properties
* String destination,参考 canal.properties 中的 canal.destinations,
* example :是指instance.properties 的目录名就是 example
* String username, 不需要写
* String password 不需要写
* 官方解释: https://github.com/alibaba/canal/wiki/AdminGuide
* canal.user canal数据端口订阅的ACL配置 (v1.1.4新增) 如果为空,代表不开启 无
* canal.passwd canal数据端口订阅的ACL配置 (v1.1.4新增) 如果为空,代表不开启
*/
CanalConnector connector = CanalConnectors.newSingleConnector(socketAddress, "example", null, null);
// 使用客服端对象连接`canal`
connector.connect();
// 订阅数据表
connector.subscribe("gmallrealtime0323.order_info");
// 不停的拉取数据
while (true){
Message message = connector.get(100);
if (message.getId()==-1){
try {
Thread.sleep(3000);
// 终止本次循环,执行下次循环
continue;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//解析 只要 ROWDATA 类型的数据
List<CanalEntry.Entry> entryList = message.getEntries()
.stream()
.filter(entry -> entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA))
.collect(Collectors.toList());
// 拆分所有集合
List<JSONObject> result = entryList.stream()
.flatMap(entry -> analysisStoreValue(entry.getStoreValue()).stream())
.filter(e-> e!=null && !e.isEmpty())
.collect(Collectors.toList());
System.out.println(result);
}
}
/**
* 解析 storeValue 封装为 json
* @param storeValue
*/
public static List<JSONObject> analysisStoreValue(ByteString storeValue){
try {
// storeValue 是一个二进制序列化值,所以需要将其反序列化成 RowChange
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
// 这里只需要获取insert sql的语句数据
if(rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){
// 获取表中所有的行
List<CanalEntry.RowData> rows = rowChange.getRowDatasList();
// 通过行获取所有的列
return rows.stream().map(row->{
List<CanalEntry.Column> columns = row.getAfterColumnsList();
// 将每行数据包装成json
JSONObject result=new JSONObject();
columns.forEach(column -> result.put(column.getName(),column.getValue()));
return result;
}).collect(Collectors.toList());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
List<JSONObject> result = new ArrayList<>();
return result;
}
}
写入Kafka
- 创建topic
bin]$ sh kafka-topics.sh --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092 --create --partitions 3 --replication-factor 2 --topic gmall-order-info
--bootstrap-server
Kafka集群
--create
表名是这是创建 Kafka
--partitions
分区
--replication
-factor 副本
--topic
topic名称
- 创建一个Produce
/**
* Kafka客户端
* @author admin
* @date 2021/8/1
*/
public class KafkaClient {
private static Producer producer;
static {
producer = getProducer();
}
/**
* 创建生产者
* @return
*/
public static Producer getProducer(){
// 配置,具体要配置什么 参考 org.apache.kafka.clients.producer.ProducerConfig
Properties properties=new Properties();
// kafka集群地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
// 字符序列化方式
String stringSerializer="org.apache.kafka.common.serialization.StringSerializer";
// key序列化类型
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,stringSerializer);
// value序列化类型
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,stringSerializer);
// 创建
KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);
//返回
return kafkaProducer;
}
/**
* 生产数据
* @param topic 主题
* @param value 参数值
*/
public static Future sendData( String topic, JSONObject value){
ProducerRecord<String,String> record=new ProducerRecord(topic,value.toJSONString());
return producer.send(record);
}
}
- 将数据写入 fafka
/**
* 将数据写入Kafka
* @param results
*/
public static void sendToKafka(List<JSONObject> results){
results.forEach(json->KafkaClient.sendData(CustomConstant.GMALL_ORDER_INFO,json));
}
- 启动消费者监听
gmall-order-info
主题
[admin@hadoop102 bin]$ sh kafka-console-consumer.sh --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic gmall-order-info
- 向
order_info
表新增数据,并查看消费情况
[admin@hadoop102 bin]$ sh kafka-console-consumer.sh --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic gmall-order-info
{"payment_way":"1","delivery_address":"daQbNIdsFVPkJzEwxWpJ","consignee":"kKXIve","create_time":"2021-08-01 22:53:19","order_comment":"DNgrqBLaTtnhYudhAeXy","expire_time":"","order_status":"1","out_trade_no":"5279649775","tracking_no":"","total_amount":"285.0","user_id":"1","img_url":"","province_id":"4","consignee_tel":"13168096420","trade_body":"","id":"1","parent_order_id":"","operate_time":""}
{"payment_way":"1","delivery_address":"vScIgIXOdWANmWXYZdqu","consignee":"owrvbW","create_time":"2021-08-01 08:13:40","order_comment":"ccdmZnqNVmAOuJYYXPjz","expire_time":"","order_status":"2","out_trade_no":"9194355258","tracking_no":"","total_amount":"122.0","user_id":"2","img_url":"","province_id":"9","consignee_tel":"13505537925","trade_body":"","id":"2","parent_order_id":"","operate_time":"2021-08-01 09:12:56"}
{"payment_way":"2","delivery_address":"KJdxaMHafCsiLervaSmH","consignee":"uMDFpI","create_time":"2021-08-01 06:35:35","order_comment":"yxRPzfbPTcFVKNJiOtAy","expire_time":"","order_status":"1","out_trade_no":"8849195680","tracking_no":"","total_amount":"423.0","user_id":"1","img_url":"","province_id":"9","consignee_tel":"13413107389","trade_body":"","id":"3","parent_order_id":"","operate_time":""}
{"payment_way":"2","delivery_address":"fqliirkavbTrbhFHtXJo","consignee":"KqxstS","create_time":"2021-08-01 01:59:45","order_comment":"zJKESzTXhQJYQirmidSe","expire_time":"","order_status":"2","out_trade_no":"8940133587","tracking_no":"","total_amount":"768.0","user_id":"1","img_url":"","province_id":"2","consignee_tel":"13113356625","trade_body":"","id":"4","parent_order_id":"","operate_time":"2021-08-01 02:43:59"}
{"payment_way":"2","delivery_address":"JsMELTjoUEnBUSJBQxRP","consignee":"wdZOXs","create_time":"2021-08-01 18:13:50","order_comment":"AfcUqXPhqgIVHyvJiOuH","expire_time":"","order_status":"1","out_trade_no":"0155110810","tracking_no":"","total_amount":"464.0","user_id":"2","img_url":"","province_id":"2","consignee_tel":"13982863828","trade_body":"","id":"5","parent_order_id":"","operate_time":""}
最后
关于Canal
的实战项目就完成了,也许看起来比较乱,所以大家还是挑重要点看吧。
补充
// 通过行获取所有的列
return rows.stream().map(row->{
List<CanalEntry.Column> columns = row.getAfterColumnsList();
// 将每行数据包装成json
JSONObject result=new JSONObject();
columns.forEach(column -> result.put(column.getName(),column.getValue()));
return result;
}).collect(Collectors.toList());
除了使用row.getAfterColumnsList();
获取List<CanalEntry.Column>
还可以通过row.getBeforeColumnsList()
方式获取
row.getAfterColumnsList() 与 row.getBeforeColumnsList() 有什么区别呢?
row.getAfterColumnsList():表示用于获取数据执行变化后的结果,案例中提到,当前主要用于获取insert
语句的新增数据(如下
)
if(rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){
所以无法获取新增前的数据结果,也就无法使用row.getBeforeColumnsList()
的方式。
row.getBeforeColumnsList():主要用于update
、delete
语句,比如获取修改前的数据,就可以用row.getBeforeColumnsList()
,获取修改后的数据则使用row.getAfterColumnsList()
,当然delete
只能使用row.getBeforeColumnsList()
。