42.Storm drpc及kafka

2020-02-27  本文已影响0人  文茶君

DRPC (Distributed RPC) remote procedure call
分布式远程过程调用

DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。
DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。
(其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)

DRPC设计目的:
为了充分利用Storm的计算能力实现高密度的并行实时计算。
(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)

客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。


图片1.png

定义DRPC拓扑:
方法1:
通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用)
该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现



方法2:
直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑
需要手动设定好开始的DRPCSpout以及结束的ReturnResults


图片3.png
运行模式:
1、本地模式
图片4.png
运行模式:
2、远程模式(集群模式)

修改配置文件conf/storm.yaml
drpc.servers:
- "node1“

启动DRPC Server
bin/storm drpc &

通过StormSubmitter.submitTopology提交拓扑


图片5.png

kafka

kafka是消息队列

一、Kafka简介

Kafka是一个分布式的消息队列系统(Message Queue)。

官网:<u>https://kafka.apache.org/</u>

图片1.png
kafka集群有多个Broker服务器组成,每个类型的消息被定义为topic。
同一topic内部的消息按照一定的key和算法被分区(partition)存储在不同的Broker上。
消息生产者producer和消费者consumer可以在多个Broker上生产/消费topic

概念理解:
Topics and Logs:
Topic即为每条发布到Kafka集群的消息都有一个类别,topic在Kafka中可以由多个消费者订阅、消费。
每个topic包含一个或多个partition(分区),partition数量可以在创建topic时指定,每个分区日志中记录了该分区的数据以及索引信息。如下图:


图片2.png

Kafka只保证一个分区内的消息有序,不能保证一个主题的不同分区之间的消息有序。如果你想要保证所有的消息都绝对有序可以只为一个主题分配一个分区。
分区会给每个消息记录分配一个顺序ID号(偏移量), 能够唯一地标识该分区中的每个记录。Kafka集群保留所有发布的记录,不管这个记录有没有被消费过,Kafka提供相应策略通过配置从而对旧数据处理。


图片3.png
实际上,每个消费者唯一保存的元数据信息就是消费者当前消费日志的位移位置。位移位置是由消费者控制,即、消费者可以通过修改偏移量读取任何位置的数据。

Ø Distribution -- 分布式

Ø Producers -- 生产者

指定topic来发送消息到Kafka Broker

Ø Consumers -- 消费者

根据topic消费相应的消息

二、Kafka集群部署

集群规划

Zookeeper集群共三台服务器,分别为:node06、node07、node08。

Kafka集群共三台服务器,分别为:node06、node07、node08。

1、Zookeeper集群准备

kafka是一个分布式消息队列,需要依赖ZooKeeper,请先安装好zk集群。

Zookeeper集群安装步骤略。

2、安装Kafka

下载压缩包(官网地址:<u>http://kafka.apache.org/downloads.html</u>

解压:

tar zxvf kafka_2.10-0.9.0.1.tgz -C /opt/sxt

mv kafka_2.10-0.9.0.1/ kafka

修改配置文件:config/server.properties


图片4.png
图片5.png

核心配置参数说明:
broker.id: broker集群中唯一标识id,0、1、2、3依次增长(broker即Kafka集群中的一台服务器)
注:
当前Kafka集群共三台节点,分别为:node1、node2、node3。对应的broker.id分别为0、1、2。
zookeeper.connect: zk集群地址列表

将当前node1服务器上的Kafka目录同步到其他node2、node3服务器上:
scp -r /opt/kafka/ node2:/opt
scp -r /opt/kafka/ node3:/opt

修改node2、node3上Kafka配置文件中的broker.id(分别在node2、3服务器上执行以下命令修改broker.id)
sed -i -e 's/broker.id=./broker.id=1/' /opt/kafka/config/server.properties
sed -i -e 's/broker.id=.
/broker.id=2/' /opt/kafka/config/server.properties

3、启动Kafka集群
A、启动Zookeeper集群。
B、启动Kafka集群。
分别在三台服务器上执行以下命令启动:
bin/kafka-server-start.sh config/server.properties

4、测试
创建话题
(kafka-topics.sh --help查看帮助手册)
创建topic:
bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 3 --topic test

(参数说明:
--replication-factor:指定每个分区的复制因子个数,默认1个
--partitions:指定当前创建的kafka分区数量,默认为1个
--topic:指定新建topic的名称)

查看topic列表:
bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --list

查看“test”topic描述:
bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --describe --topic test


图片6.png

创建生产者:
bin/kafka-console-producer.sh --broker-list node06:9092,node07:9092,node08:9092 --topic test

创建消费者:
bin/kafka-console-consumer.sh --zookeeper node06:2181,node07:2181,node08:2181 --from-beginning --topic test

注:
查看帮助手册:
bin/kafka-console-consumer.sh help

三、Flume & Kafka
1、Flume安装
Flume安装流程:
解压jar包
mv conf/flume-env.sh.template flume-env.sh
vi flume-env.sh java环境变量
./bin flume-ng version
/conf/下 创建配置文件fk.conf内容如下:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node06
a1.sources.r1.port = 41414

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = testflume
a1.sinks.k1.brokerList = node06:9092,node07:9092,node08:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、Flume****+****Kafka

启动zk集群

A、启动Kafka集群。

bin/kafka-server-start.sh config/server.properties

B、配置Flume集群,并启动Flume集群。

bin/flume-ng agent -n a1 -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console

3、测试

Ø 分别启动Zookeeper、Kafka、Flume集群。

zkServer.sh start

bin/kafka-server-start.sh config/server.properties

bin/flume-ng agent -n a1 -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console

Ø 创建topic:(不用)

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 1--topic testflume

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 1--topic LogError

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --list

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 1--topic LogError

bin/kafka-console-consumer.sh --zookeeper node06:2181,node07:2181,node08:2181 --from-beginning --topic LogError

Ø 启动消费者:

bin/kafka-console-consumer.sh --zookeeper node06:2181,node07:2181,node08:2181 --from-beginning --topic testflume

启动生产者

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 1 --topic mylog_cmcc

查看topic列表:

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --list

启动消费者

bin/kafka-console-consumer.sh --zookeeper node06:2181,node07:2181,node08:2181 --from-beginning --topic mylog_cmcc

bin/kafka-console-consumer.sh --zookeeper node06:2181,node07:2181,node08:2181 --topic mylog_cmcc

Ø 运行“RpcClientDemo”代码,通过rpc请求发送数据到Flume集群。

Flume中source类型为AVRO类型,此时通过Java发送rpc请求,测试数据是否传入Kafka。

其中,Java发送Rpc请求Flume代码示例如下:

(参考Flume官方文档:<u>http://flume.apache.org/FlumeDeveloperGuide.html</u>

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;

/**
 * Flume官网案例
 * http://flume.apache.org/FlumeDeveloperGuide.html 
 * @author root
 */
public class RpcClientDemo {
    
    public static void main(String[] args) {
        MyRpcClientFacade client = new MyRpcClientFacade();
        // Initialize client with the remote Flume agent's host and port
        client.init("node1", 41414);

        // Send 10 events to the remote Flume agent. That agent should be
        // configured to listen with an AvroSource.
        String sampleData = "Hello Flume!";
        for (int i = 0; i < 10; i++) {
            client.sendDataToFlume(sampleData);
            System.out.println("发送数据:" + sampleData);
        }

        client.cleanUp();
    }
}

class MyRpcClientFacade {
    private RpcClient client;
    private String hostname;
    private int port;

    public void init(String hostname, int port) {
        // Setup the RPC connection
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
        // Use the following method to create a thrift client (instead of the
        // above lin

四、Storm****&****Kafka

官网地址:

<u>http://storm.apache.org/about/integrates.html</u>

五、flume+kafka+spout

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 1 --topic LogError

六。kafka安装实战


传输并解压



修改配置文件



修改zk配置

传输文件到其他节点
5.png

修改broker的id分别为1,2,3


6.png 7.png

开启zk




开启后输入数据。产生生产者


11.png
另一边是消费者,接受数据,可以看到数据是分区有序,因为刚开始,数据一窝蜂进入

之后数据有序,生产者输入,消费者产生相应输出

上一篇下一篇

猜你喜欢

热点阅读