RabbitMQ+Flume+Kafka的使用

2018-07-05  本文已影响0人  PandaEyes聊大数据

背景

如果你的平台使用RabbitMQ,并且短时间不想换Kafka的话,可以考虑使用以下方式去把数据对接到大数据平台,只要对接到kafka,后面用什么技术,由你选择。

RabbitMQ

RabbitMQ的使用这边不多介绍,只要RabbitMQ上有可用的Queue存在就行
或者写一个java的生产者
maven项目配置

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.6</version>
        </dependency>

Java代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQ_Producer {
    private final static String QUEUE_NAME  = "rk_queue_test";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 获取到连接以及mq通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("xxx");
        factory.setPassword("xxx");
        factory.setVirtualHost("xxx");
        factory.setHost("192.168.70.xxx");
        factory.setPort(5672);
        Connection conn = factory.newConnection();
        // 创建一个频道
        Channel channel = conn.createChannel();
        // 指定一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        int prefetchCount = 1;

        //每个消费者发送确认信号之前,消息队列不发送下一个消息过来,一次只处理一个消息
        //限制发给同一个消费者不得超过1条消息
        channel.basicQos(prefetchCount);

        // 发送的消息
        for (int i = 0; i < 50; i++) {
            String message = "." + i;
            // 往队列中发出一条消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
            Thread.sleep(i * 10);
        }
        // 关闭频道和连接
        channel.close();
        conn.close();
    }
}

Flume

Flume自带是没有对接RabbitMQ的,你需要自行写一个对接的代码,当然已经有大神早就写好了,我们拿来用就可以了。rabbitmq-flume-plugin点击下载代码,打包成jar,复制到Flume的lib目录下。然后编写到conf 目录下编写 rabbit-flume-kafka.properties 配置文件。

vim conf/rabbit-flume-kafka.properties

rabbit-flume-kafka.properties的内容,如下:

a1.channels = ch-1
a1.sources = src-1
a1.channels.ch-1.type=memory

a1.sources.src-1.channels = ch-1
a1.sources.src-1.type = com.aweber.flume.source.rabbitmq.RabbitMQSource
a1.sources.src-1.host = 192.168.70.xx #RabbitMQ的IP
a1.sources.src-1.port = 5672
a1.sources.src-1.virtual-host = vh
a1.sources.src-1.username = xxxx  #RabbitMQ的用户
a1.sources.src-1.password = xxxxxx   #RabbitMQ的密码
a1.sources.src-1.queue = rk_queue_test
a1.sources.src-1.prefetchCount = 10

a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = ch-1
a1.sinks.k1.topic=rfk_out
a1.sinks.k1.brokerList=192.168.70.xxx:6667,192.168.70.xxx:6667,192.168.70.xxx:6667
a1.sinks.k1.requiredAcks=1
a1.sinks.k1.batchSize=20

flume运行命令

/usr/hdp/2.6.3.0-235/flume/bin/flume-ng agent --conf conf --conf-file conf/rabbit-flume-kafka.properties --name a1 -Dflume.root.logger=INFO,console

Kafka

这边运行一个Kafka的消费者,消费一下数据就可以了

/usr/hdp/2.6.3.0-235/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.70.xxx:6667,192.168.70.xxx:6667,192.168.70.xxx:6667 --topic rfk_out --from-beginning

如果消费到数据说明搭建成功,这边验证信息就不写了,如果在使用的时候有什么问题请在评论上提出。

上一篇下一篇

猜你喜欢

热点阅读