RocketMQ 单topic禁写
2019-12-02 本文已影响0人
Cc_7397
topic 配置对应实体
package com.alibaba.rocketmq.common;
import com.alibaba.rocketmq.common.constant.PermName;
public class TopicConfig {
public static int DefaultReadQueueNums = 16;
public static int DefaultWriteQueueNums = 16;
private static final String SEPARATOR = " ";
private String topicName;
private int readQueueNums;
private int writeQueueNums;
/**
* 6:同时支持读写
* 4:禁写
* 2:禁读
*/
private int perm; //读写权限
private TopicFilterType topicFilterType;
private int topicSysFlag;
private boolean order;
}
topic perm 默认为 6 可写可读,禁写就是要把broker中的topic 对应的 TopicConfig perm 设置为4 ,broker会上报到nameserver
api
查询和修改topic配置的rpc请求code为
RequestCode.UPDATE_AND_CREATE_TOPIC = 17 //更新TopicConfig
RequestCode.GET_ALL_TOPIC_CONFIG = 21 //拉取TopicConfig (全量)
调用
仅使用netty客户端即可
public class TopicConfig {
private final static NettyClientConfig clientConfig = new NettyClientConfig();
//netty客户端
private final static NettyRemotingClient remotingClient = new NettyRemotingClient(clientConfig);
public static void main(String[] args) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
remotingClient.start();
String topic = "quickStart"; //topic名称
String brokerAddr = "XX.XX.X.XXX:10910"; //broker地址
TopicConfigSerializeWrapper allTopicConfig = getAllTopicConfig(brokerAddr);
com.alibaba.rocketmq.common.TopicConfig config = allTopicConfig.getTopicConfigTable().get(topic);
config.setPerm(4); //禁写设为4 恢复设为6 禁读设为2
createAndUpdateTopic(brokerAddr, MixAll.DEFAULT_TOPIC, config, clientConfig.getConnectTimeoutMillis());
System.out.println("done");
}
/**
* 更新或创建
* @param addr
* @param defaultTopic
* @param topicConfig
* @param timeoutMillis
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
* @throws MQClientException
*/
public static void createAndUpdateTopic(final String addr, final String defaultTopic, final com.alibaba.rocketmq.common.TopicConfig topicConfig,
final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
requestHeader.setPerm(topicConfig.getPerm());
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
RemotingCommand response = remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
/**
* 拉取
* @param addr
* @return
* @throws RemotingConnectException
* @throws RemotingSendRequestException
* @throws RemotingTimeoutException
* @throws InterruptedException
* @throws MQBrokerException
*/
public static TopicConfigSerializeWrapper getAllTopicConfig(String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, (CommandCustomHeader) null);
RemotingCommand response = remotingClient.invokeSync(addr, request, 3000L);
assert response != null;
switch (response.getCode()) {
case 0:
return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
default:
throw new MQBrokerException(response.getCode(), response.getRemark());
}
}
}
需要用broker的类,TopicConfigSerializeWrapper
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-broker</artifactId>
<version>${rocketmq.version}</version>
</dependency>