两个系统大数据量对接手记

2019-10-02  本文已影响0人  董二弯

在前几天做了一个需求:外围系统下发业务数据到我方系统做业务处理。当时对方负责人说最多每次只有6万数据量,他们分1000条数据一个包传输到我方系统。

实现方式

提供实时的rest 接口

这种方式是写好处理程序,暴露出去,提供给外围系统rest接口。完成开发后交付测试。然后对方不按约定出牌,2000条一个包,一共发了50几个包,最终结果就是把我方系统测试机跑挂了。所以说最重要的是保证自己系统的健壮性,需求往往是变化的。这种实时方式只适合少量的数据,太大的数据会对系统产生影响。于是我修改模式,换为了第二种方式。

消息队列中间存储

整体流程

代码实现

客户为我提供了rocketmq集群的队列,我直接用,省去了我在服务器安装等操作。这里直接上主要代码。

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * rocketmq消息队列生产者
 *
 */
@Component
public class Producer {
   // 定义group
    private static final String GROUP = "";

    private static String namesrvAddr;

    private static DefaultMQProducer producer = new DefaultMQProducer(GROUP);
    private static int initialState = 0;

    private Producer() {

    }

    public static DefaultMQProducer getDefaultMQProducer() {
        if (producer == null) {
            producer = new DefaultMQProducer(GROUP);
        }
        if (initialState == 0) {
            producer.setNamesrvAddr(getNamesrvAddr());
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
                return null;
            }

            initialState = 1;
        }
        return producer;
    }

    public static String getNamesrvAddr() {
        return namesrvAddr;
    }

    @Value("${rocketmq.namesrvAddr}")
    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }
}

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * rocketmq消息队列消费者
 *
 */
@Component
public class Consumer {
   // 要和生产者的group相同
    private static final String GROUP = "";
    private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP);
    private static int initialState = 0;
    private static String namesrvAddr;

    private Consumer() {

    }

    public static DefaultMQPushConsumer getDefaultMQPushConsumer() {
        if (consumer == null) {
            consumer = new DefaultMQPushConsumer(GROUP);
        }

        if (initialState == 0) {
            consumer.setNamesrvAddr(getNamesrvAddr());
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 设置并发数量
            consumer.setConsumeThreadMin(5);
            consumer.setConsumeThreadMax(10);
            initialState = 1;
        }

        return consumer;
    }

    public static String getNamesrvAddr() {
        return namesrvAddr;
    }

    @Value("${rocketmq.namesrvAddr}")
    public void setNamesrvAddr(String namesrvAddr) {
        Consumer.namesrvAddr = namesrvAddr;
    }
}
private void sendMsg(List newList) {
        // 获取消息生产者
        DefaultMQProducer producer = Producer.getDefaultMQProducer();
        try {
            Message msg = new Message(
                    TOPIC,
                    TAG,
                    JSON.toJSONString(newList).getBytes());
            SendResult sendResult = producer.send(msg);
            LOGGER.info("sendResult:{}", sendResult);
        } catch (MQClientException e) {
            LOGGER.info("-----发送失败:" + e.toString());
        } catch (RemotingException e) {
            LOGGER.info("-----发送失败:" + e.toString());
        } catch (MQBrokerException e) {
            LOGGER.info("-----发送失败:" + e.toString());
        } catch (InterruptedException e) {
            LOGGER.info("-----发送失败:" + e.toString());
        }
    }

接收到数据,分500一次,调用封装的发送方法即可存到消息队列。

import com.alibaba.fastjson.JSONArray;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * 消费开启执行
 *
 */
@Component
public class ConsumerInit implements CommandLineRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerInit.class);
   //和生产者保持一致
    private static final String TAG = "";
    //和控制台配置保持一致
    private static final String TOPIC = "";
    @Autowired
    private FixedAssetsService fixedAssetsService;

    @Override
    public void run(String... strings) throws Exception {
        receiveMsg();
    }

    private void receiveMsg() {

        // 获取消息生产者
        DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();

        // 订阅主体
        try {
            consumer.subscribe(TOPIC, TAG);
            //MessageListenerConcurrently 并行消费
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                /**
                 * * 默认msgs里唯独一条消息,能够通过设置consumeMessageBatchMaxSize參数来批量接收消息
                 */
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    MessageExt msg = msgs.get(0);
                    if (msg.getTopic().equals(TOPIC)) {
                        if (msg.getTags() != null && msg.getTags().equals(TAG)) {
                            String message = null;
                            try {
                                message = new String(msg.getBody(),"UTF-8");
                            } catch (UnsupportedEncodingException e) {
                                LOGGER.info("message转换失败");
                            }
                            List<TransferSapResultVO> list = JSONArray.parseArray(message, TransferSapResultVO.class);
                            // 消费消息
                            fixedAssetsService.consumerMqMessage(list);
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            /**
             * Consumer对象在使用之前必须要调用start初始化。初始化一次就可以<br>
             */
            consumer.start();

            LOGGER.info("Consumer Started.");
        } catch (MQClientException e) {
            LOGGER.info("消费消息错误" + e.toString());
        }
    }
}

这种方式在性能,速度等方面都非常良好。但是好景不长,说推过来的10几万数据少了3条,让我检查为什么少。我第一反应是难道是丢包了? 如果是丢包,我500条一个包,只能少500的整数倍啊。然后百度发现rocketmq的安全性是可以保证的,几乎不会出现丢包的情况。然后我问对方技术是不是根本就没有传这三条数据,但对方就是咬死说传了。为了验证我说重新在传输一次,结果和第一次一样,还是丢了这3条。我自己感觉是对方肯定没有传,但这种消息队列的方式的缺点就暴露了出来,缺少监控。于是又切换到第三种方式。

中间表模式

整体流程

这种方式有了监控的功能,如果说少了数据,直接在中间表中查,看是否外围系统推送了数据。

总结

当数据量较少时,可以用第一种实时的方式,比较方便。
数据量大且不需要监控功能用第二种方式较好。
数据量大且需要监控功能用第三种方式。
这次需求自己相当于开发了3次。用了3种模式。 积累了经验。以后在做这种需求时,两个问题, 数据量大不大? 是否需要监控?

上一篇 下一篇

猜你喜欢

热点阅读