程序员大数据

Storm集群使用DRPC功能Version1.0.1

2019-01-18  本文已影响36人  木木与呆呆

在Storm集群上开启DRPC功能,
基于Storm的1.0.1版本,
并且执行简单的例子测试。

1.DRPC概念

DRPC就是分布式远程过程调用。
Storm里面引入DRPC主要是利用storm的实时计算能力,
来并行化CPU intensive的计算。
DRPC的Storm topology以函数的参数流作为输入,
而把这些函数调用的返回值作为topology的输出流。
DRPC其实不能算是Storm本身的一个特性,
它是通过组合Storm的原语spout,bolt,topology而成的一种模式(pattern)。

2.DRPC工作机制

Distributed RPC是由一个DPRC Server协调的,
Storm自带了一个称作LinearDRPCTopologyBuilder的topology builder,
它把实现DRPC的几乎所有步骤都自动化了。

DRPC服务器协调机制:

  1. 接收一个RPC请求;
  2. 发送请求到Storm topology;
  3. 从Storm topology接收结果;
  4. 把结果发回给等待的客户端。

从客户端的角度来看一个DRPC调用,
跟一个普通的RPC调用没有任何区别。
下面是客户端代码展示了如何调用RPC的
exclaimation方法,方法的参数是hello:

DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("exclaimation", "hello");

DRPC的工作流大致是这样的:


客户端给DRPC服务器发送要执行的方法的名字,
以及这个方法的参数。
实现了这个函数的topology使用DRPCSpout从DRPC服务器接收函数调用流。
每个函数调用被DRPC服务器标记了一个唯一的id。
这个topology然后计算结果,
在topology的最后一个叫做ReturnResults的bolt会连接到DRPC服务器,
并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。
DRPC服务器用那个唯一id来跟等待的客户端匹配上,
唤醒这个客户端并且把结果发送给它。

3.配置DPRC Server

修改storm.yaml文件,增加drpc的配置:

drpc.servers:  
    - "zdh-237"
drpc.childopts: "-Xmx1024m"

其他参数drpc.port, drpc.http.port等使用默认值即可,
参考默认值如下:

drpc.port:
 3772
drpc.worker.threads:
 64
drpc.max_buffer_size:
 1048576
drpc.queue.size:
 128
drpc.invocations.port:
 3773
drpc.invocations.threads:
 64
drpc.request.timeout.secs:
 600
drpc.childopts:
"-Xmx768m"
drpc.http.port:
 3774
drpc.https.port:
 -1
drpc.https.keystore.password:
""
drpc.https.keystore.type:
"JKS"
drpc.http.creds.plugin:
 backtype.storm.security.auth.DefaultHttpCredentialsPlugin
drpc.authorizer.acl.filename:
"drpc-auth-acl.yaml"
drpc.authorizer.acl.strict:
false

4.启动DPRC Server

使用如下命令启动DRPC进程:
storm drpc > drpc.log 2>&1 &

5.使用本地集群测试

由于命令无入参即没有topo名字,
无对外端口无法提供客户端调用,
在BasicDRPCTopology中启动后调用本地集群,
仅作为测试场景使用。

进入Storm目录,提交处理drpc的topo:

cd /home/stormna/apache-storm-1.0.1/examples/storm-starter/
storm jar storm-starter-topologies-1.0.1.jar org.apache.storm.starter.BasicDRPCTopology

在输出的日志中可以看到如下结果,
结果是单词后面被添加了感叹号!

8043 [Thread-26-bolt2-executor[6 6]] INFO  o.a.s.l.ThriftAccessLogger - Request ID: 3 access from:  principal:  operation: result
Result for "hello": hello!
8054 [Thread-26-bolt2-executor[6 6]] INFO  o.a.s.l.ThriftAccessLogger - Request ID: 3 access from:  principal:  operation: result
Result for "goodbye": goodbye!

6.使用真实集群测试

基于真实集群的DRPC可以提供给外部客户端调用,
先提交处理drpc的topo,指定topo名称为exclamationDrpc:

cd /home/stormna/apache-storm-1.0.1/examples/storm-starter
storm jar storm-starter-topologies-1.0.1.jar org.apache.storm.starter.BasicDRPCTopology exclamationDrpc

7.客户端调用

在BasicDRPCTopology中提供的drpc方法名为exclamation,
效果返回结果是在单词后面被添加的感叹号。
使用下面写客户端代码进行调用测试。

7.1.适配storm-core-0.9.6.jar的客户端代码

注意exclamation名称不要拼错,
否则客户端会一直没有响应:

public class DRPCClientTest096 {

    public static void main(String[] args) throws Exception {
        String drpcHost = "10.43.159.237";
        int drpcPort = 3772;
        
        DRPCClient client = new DRPCClient(drpcHost, drpcPort);
        String input="hello1";
        String result = client.execute("exclamation",input );
        System.out.println("input:"+input+", result:"+result);
    }
}

执行DRPCClientTest096类中的main方法,
调用drpc的exclamation函数,传入参数hello1:
控制台输出结果:

input:hello1, result:hello1!

7.2.适配storm-core-1.0.1.jar的客户端代码

注意调用需要配置Storm参数,
和上面的有点区别的。

public class DRPCClientTest101 {
    public static void main(String[] args) throws Exception {
        String drpcHost = "10.43.159.237";
        int drpcPort = 3772;

        Properties pro = new Properties();
        // InputStream inStream = new FileInputStream("stormclient.conf");
        // 读取storm-core-1.0.1.jar里面 的defaults.yaml配置文件
        InputStream inStream = ClassLoader.getSystemResourceAsStream("defaults.yaml");

        pro.load(inStream);
        inStream.close();
        //由于Properties加载的值带了引号,需要重新设置一下,或者手动去掉前后的引号
        pro.setProperty("storm.thrift.transport", "org.apache.storm.security.auth.SimpleTransportPlugin");

        DRPCClient client = new DRPCClient(pro, drpcHost, drpcPort);
        String input = "hello2";
        String result = client.execute("exclamation", input);
        System.out.println("input:" + input + ", result:" + result);
    }
}

执行DRPCClientTest101类中的main方法,
调用drpc的exclamation函数,传入参数hello2:
控制台输出结果:

input:hello2, result:hello2!

8.参考文章

StormDRPC 概念以及简单例子测试
storm DRPC问题
Storm高级原语(二) — DRPC
storm自带例子详解 (二)——BasicDRPCTopology

上一篇 下一篇

猜你喜欢

热点阅读