common模块阅读10: Message

2017-09-27  本文已影响60人  赤子心_d709

说明

这个类就是描述最基本消息的,包含消息体,topic,properties等

类图

继承关系如下


Message继承关系

源码

代码不长,基本属性和一些get,set方法
注意几个点

flag属性:似乎没什么用,只存储后透传

get(set)DelayTimeLevel方法:和 MessageStoreConfig#messageDelayLevel 相关,如1代表1s,2代表5s

get(set)BuyerId方法:基本没用,测试类里面才用

is(set)WaitStoreMsgOK: 表示消息是否在服务器落盘后才返回应答
只有同步刷盘的时候,这个配置才work
参考CommitLog#handleDiskFlush以及handleHA函数的if嵌套条件

get(set)Keys 消息索引用,可以根据key查询消息,和Index部分相关

putUserProperty 和 putProperty的区别: 内部API用的putProperty, 外部尽量用putUserProperty

源码如下

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;//主题
    private int flag;
    private Map<String, String> properties;//属性
    private byte[] body;//内容

    public Message() {
    }

    public Message(String topic, byte[] body) {
        this(topic, "", "", 0, body, true);
    }

    public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
        this.topic = topic;
        this.flag = flag;
        this.body = body;

        if (tags != null && tags.length() > 0)
            this.setTags(tags);

        if (keys != null && keys.length() > 0)
            this.setKeys(keys);

        this.setWaitStoreMsgOK(waitStoreMsgOK);//是否等到日志存储了再返回
    }

    public Message(String topic, String tags, byte[] body) {
        this(topic, tags, "", 0, body, true);
    }

    public Message(String topic, String tags, String keys, byte[] body) {
        this(topic, tags, keys, 0, body, true);
    }

    public void setKeys(String keys) {
        this.putProperty(MessageConst.PROPERTY_KEYS, keys);//KEYS属性
    }

    void putProperty(final String name, final String value) {
        if (null == this.properties) {
            this.properties = new HashMap<String, String>();
        }

        this.properties.put(name, value);
    }

    void clearProperty(final String name) {
        if (null != this.properties) {
            this.properties.remove(name);
        }
    }


    //存放用户定义的prop,不能和预留的prop关键字重复
    public void putUserProperty(final String name, final String value) {
        if (MessageConst.STRING_HASH_SET.contains(name)) {
            throw new RuntimeException(String.format(
                "The Property<%s> is used by system, input another please", name));
        }

        if (value == null || value.trim().isEmpty()
            || name == null || name.trim().isEmpty()) {
            throw new IllegalArgumentException(
                "The name or value of property can not be null or blank string!"
            );
        }

        this.putProperty(name, value);
    }

    public String getUserProperty(final String name) {
        return this.getProperty(name);
    }

    public String getProperty(final String name) {
        if (null == this.properties) {
            this.properties = new HashMap<String, String>();
        }

        return this.properties.get(name);
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getTags() {
        return this.getProperty(MessageConst.PROPERTY_TAGS);
    }

    public void setTags(String tags) {
        this.putProperty(MessageConst.PROPERTY_TAGS, tags);
    }

    public String getKeys() {
        return this.getProperty(MessageConst.PROPERTY_KEYS);
    }

    public void setKeys(Collection<String> keys) {
        StringBuffer sb = new StringBuffer();
        for (String k : keys) {
            sb.append(k);
            sb.append(MessageConst.KEY_SEPARATOR);
        }

        this.setKeys(sb.toString().trim());
    }

    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }

        return 0;
    }

    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }

    public boolean isWaitStoreMsgOK() {
        String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
        if (null == result)
            return true;

        return Boolean.parseBoolean(result);
    }

    public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {
        this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));
    }

    public int getFlag() {
        return flag;
    }

    public void setFlag(int flag) {
        this.flag = flag;
    }

    public byte[] getBody() {
        return body;
    }

    public void setBody(byte[] body) {
        this.body = body;
    }

    public Map<String, String> getProperties() {
        return properties;
    }

    void setProperties(Map<String, String> properties) {
        this.properties = properties;
    }

    /**
     * buyerId这个是在测试类采用的
     */
    public String getBuyerId() {
        return getProperty(MessageConst.PROPERTY_BUYER_ID);
    }

    public void setBuyerId(String buyerId) {
        putProperty(MessageConst.PROPERTY_BUYER_ID, buyerId);
    }

    @Override
    public String toString() {
        return "Message [topic=" + topic + ", flag=" + flag + ", properties=" + properties + ", body="
            + (body != null ? body.length : 0) + "]";
    }
}

refer

http://blog.csdn.net/quhongwei_zhanqiu/article/details/39144223 flag没什么用
https://fdx321.github.io/2017/08/22/%E3%80%90RocketMQ%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0%E3%80%916-%E6%B6%88%E6%81%AF%E5%AD%98%E5%82%A8/ indexFile部分
《开发手册3.2.4》 7.2消息查询

上一篇 下一篇

猜你喜欢

热点阅读