分布式流式计算-jstorm部署

2019-01-10  本文已影响0人  史圣杰

jstorm是阿里巴巴使用java语言重写的storm,可以用来做流式计算,我们使用jstorm从kafka中读取数据,然后汇总单词数量

1.wordcount编写

jstorm的官网较为详细的介绍了jstorm的各个组件,与Kafka集成时,只需要替换spout即可。

处理流程

在编写代码时,我们首先要引入jstorm的jar包和storm的kafka包

<dependency>
      <groupId>com.alibaba.jstorm</groupId>
      <artifactId>jstorm-core</artifactId>
      <version>${jstorm.version}</version>

      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-nop</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-jdk14</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>jcl-over-slf4j</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
      </exclusions>
    </dependency>


    <dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-kafka</artifactId>
      <version>0.9.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.9.0.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
        </exclusion>
        <exclusion>
          <groupId>log4j</groupId>
          <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

1.1 Spout

Kafka的Spout需要指定Kafka的zookeeper地址和broker路径,topic的名称,将这些信息放在SpoutConfig中,然后创建KafkaSpout即可。

private static KafkaSpout kafkaSpout(){
        String brokerZkStr = "106.12.196.74:2181";
        String brokerZkPath = "/brokers";
        String topic = "jianshu-topic-new";
        String zkRoot = "";
        //id 可以随意命名
        String id = "jstormspout";

        ZkHosts zk = new ZkHosts(brokerZkStr,brokerZkPath);
        SpoutConfig spoutConf = new SpoutConfig(zk, topic, zkRoot, id);

        List<String> zkServices = new ArrayList<String>();

        for(String str : zk.brokerZkStr.split(",")){
            zkServices.add(str.split(":")[0]);
        }
        spoutConf.zkServers = zkServices;
        spoutConf.zkPort = 2181;
        spoutConf.forceFromStart = true;
        spoutConf.socketTimeoutMs = 60 * 1000;
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        return new KafkaSpout(spoutConf);
    }

1.2 拆分Bolt

拆分的逻辑比较简单,根据空格拆分即可,然后分别发送每个word

public class SplitBolt extends BaseRichBolt {
    OutputCollector outputCollector;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0); // getStringByField("sentence");
        String rs[] = sentence.split("\\s+");
        for (String sen : rs){
            outputCollector.emit(new Values(sen));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

1.3 统计Bolt

使用一个map统计每个单词出现的次数

public class TotalCount extends BaseRichBolt {
    OutputCollector outputCollector;
    Map<String,Integer> map ;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
        this.map = new ConcurrentHashMap<String, Integer>() {
        };
    }

    @Override
    public void execute(Tuple tuple) {
        String  word = tuple.getStringByField("word");
        Object obj = map.get(word);
        int size = 0;
        if(obj != null){
            size = Integer.valueOf(obj.toString());
        }
        map.put(word , size+1);
        System.out.println(String.format("[%s] is %d",word,size+1));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("count"));
    }
}

1.4 构建拓扑

// spout
SpoutDeclarer spout = builder.setSpout(KafkaTopology.SEQUENCE_SPOUT_NAME,
             kafkaSpout(), spoutParal);
// bolt,使用shuffleGrouping
builder.setBolt(SPLIT_BOLT_NAME,new SplitBolt()).shuffleGrouping(SEQUENCE_SPOUT_NAME);
// bolt,使用fieldsGrouping,相同的word会被发送到同一个bolt
BoltDeclarer totalBolt = builder.setBolt(KafkaTopology.TOTAL_BOLT_NAME, new TotalCount(),
                boltParal).fieldsGrouping(KafkaTopology.SPLIT_BOLT_NAME , new Fields("word"));
上一篇 下一篇

猜你喜欢

热点阅读