Amazing Arch分布式&高可用Java技术升华

高并发 IM 系统架构优化实践

2019-01-16  本文已影响72人  java高并发

功能介绍

表格存储新推出的 主键列递增 功能可以有效地处理上述场景的需求。具体做法为在创建表时,声明主键中的某一列为自增列,在写入一行新数据的时候,应用无需为自增列填入真实值,只需填入一个占位符,表格存储系统在接收到这一行数据后会自动为自增列生成一个值,并且保证在相同的分区键范围内,后生成的值比先生成的值大.

主键列自增功能具有以下几个特性:

介绍了表格存储的主键列自增功能后,下面通过具体的场景介绍下如何使用。

场景

我们继续文章开头的例子,通过构建一个IM聊天工具,演示主键列自增功能的作用和使用方法。

功能

我们要做的IM聊天软件需要支持下列功能:

现有架构

第一步,确定消息模型

第二步,确定后台架构

第三步,确定存储系统

存储系统,我们选择了阿里云的 表格存储 ,主要是因为下列原因:

第四步,确定表结构

确定的表格存储的表结构如下:

主键顺序 主键名称 主键值 说明
1 partition_key md5(receive_id)前4位 分区键,保证数据均匀分布
2 receive_id receive_id 接收方的用户ID
3 message_id message_id 消息ID

到此,我们已经设计出了一个完整的聊天系统,虽然这个系统已经可以运行,且能处理大并发,性能也不差,但是还是存在一些挑战。

挑战

针对上述两个问题,问题2可以通过增加机器的方式解决,但是问题1没法通过增加机器解决,增加机器只能缓解问题,却没法彻底解决。那有没有办法可以彻底解决掉上述两个问题?

新架构

上面两个问题的复杂度主要是由于需要消息严格递增引起的,如果使用了表格存储的主键列自增功能,那么上层的应用层就会简单的多。

使用了表格存储主键列自增功能后的新架构如下:

实现

有了上面的架构图后,现在可以开始实现了,这里选用JAVA SDK,目前4.2.0版本已经支持主键列自增功能,4.2.0版本Java SDK文档和下载地址

第一步,建表

按照之前的设计,表结构如下:

主键顺序 主键名称 主键值 说明
1 partition_key hash(receive_id)前4位 分区键,保证数据均匀分布,可以使用md5作为hash函数
2 receive_id receive_id 接收方的用户ID
3 message_id message_id 消息ID

第三列PK是message_id,这一列是主键自增列,建表时指定message_id列的属性为AUTO_INCREMENT,且类型为INTEGER。


private static void createTable(SyncClient client) {
        TableMeta tableMeta = new TableMeta(“message_table”);

        // 第一列为分区建
        tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("partition_key", PrimaryKeyType.STRING));

        // 第二列为接收方ID
        tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("receive_id", PrimaryKeyType.STRING));

        // 第三列为消息ID,自动自增列,类型为INTEGER,属性为PKO_AUTO_INCREMENT
        tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("message_id", PrimaryKeyType.INTEGER, PrimaryKeyOption.AUTO_INCREMENT));

        int timeToLive = -1;  // 永不过期,也可以设置数据有效期,过期了会自动删除
        int maxVersions = 1;  // 只保存一个版本,目前支持多版本

        TableOptions tableOptions = new TableOptions(timeToLive, maxVersions);

        CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);

        client.createTable(request);
    }

通过上述方式就创建了一个第三列PK为自动自增的表。

第二步,写数据

写数据目前支持PutRow和BatchWriteRow两种方式,这两种接口都支持主键列自增功能,写数据时,第三列message_id是主键自增列,这一列不需要填值,只需要填入占位符即可。


    private static void putRow(SyncClient client, String receive_id) {
        // 构造主键
        PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();

        // 第一列的值为 hash(receive_id)前4位
        primaryKeyBuilder.addPrimaryKeyColumn(“partition_key”, PrimaryKeyValue.fromString(hash(receive_id).substring(4)));

        // 第二列的值为接收方ID
        primaryKeyBuilder.addPrimaryKeyColumn(“receive_id”, PrimaryKeyValue.fromString(receive_id));

        // 第三列是消息ID,主键递增列,这个值是TableStore产生的,用户在这里不需要填入真实值,只需要一个占位符:AUTO_INCREMENT 即可。
        primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.AUTO_INCREMENT);
        PrimaryKey primaryKey = primaryKeyBuilder.build();

        RowPutChange rowPutChange = new RowPutChange("message_table", primaryKey);

        // 这里设置返回类型为RT_PK,意思是在返回结果中包含PK列的值。如果不设置ReturnType,默认不返回。
        rowPutChange.setReturnType(ReturnType.RT_PK);

        //加入属性列,消息内容
        rowPutChange.addColumn(new Column("content", ColumnValue.fromString(content)));

        //写数据到TableStore
        PutRowResponse response = client.putRow(new PutRowRequest(rowPutChange));

        // 打印出返回的PK列
        Row returnRow = response.getRow();
        if (returnRow != null) {
            System.out.println("PrimaryKey:" + returnRow.getPrimaryKey().toString());
        }

        // 打印出消耗的CU
        CapacityUnit  cu = response.getConsumedCapacity().getCapacityUnit();
        System.out.println("Read CapacityUnit:" + cu.getReadCapacityUnit());
        System.out.println("Write CapacityUnit:" + cu.getWriteCapacityUnit());
    }

第三步,读数据

读消息的时候,需要通过GetRange接口读取最近的消息,message_id这一列PK的起始位置是上一条消息的message_id+1, 结束位置是INF_MAX,这样每次都可以读出最新的消息,然后发送给客户端


    private static void getRange(SyncClient client, String receive_id, String lastMessageId) {
        RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria(“message_table”);

        // 设置起始主键
        PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();

        // 第一列的值为 hash(receive_id)前4位
        primaryKeyBuilder.addPrimaryKeyColumn(“partition_key”, PrimaryKeyValue.fromString(hash(receive_id).substring(4)));

        // 第二列的值为接收方ID
        primaryKeyBuilder.addPrimaryKeyColumn(“receive_id”, PrimaryKeyValue.fromString(receive_id));

        // 第三列的值为消息ID,起始于上一条消息
        primaryKeyBuilder.addPrimaryKeyColumn(“message_id”, PrimaryKeyValue.fromLong(lastMessageId + 1));
        rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());

        // 设置结束主键
        primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();

        // 第一列的值为 hash(receive_id)前4位
        primaryKeyBuilder.addPrimaryKeyColumn(“partition_key”, PrimaryKeyValue.fromString(hash(receive_id).substring(4)));

        // 第二列的值为接收方ID
        primaryKeyBuilder.addPrimaryKeyColumn(“receive_id”, PrimaryKeyValue.fromString(receive_id));

        // 第三列的值为消息ID
        primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.INF_MAX);
        rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());

        rangeRowQueryCriteria.setMaxVersions(1);

        System.out.println("GetRange的结果为:");
        while (true) {
            GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
            for (Row row : getRangeResponse.getRows()) {
                System.out.println(row);
            }

            // 若nextStartPrimaryKey不为null, 则继续读取.
            if (getRangeResponse.getNextStartPrimaryKey() != null) {
              rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
            } else {
                break;
            }
        }
    }

上面演示了表格存储及其主键列自增功能在聊天系统中的应用,在其他场景中也有很大的价值,期待大家一起去探索。


上一篇下一篇

猜你喜欢

热点阅读