大数据学习

Storm集群安装&WordCount示例&项目打包发布

2020-03-23  本文已影响0人  TZX_0710

一、集群安装

链接:https://pan.baidu.com/s/1SUfYk0xd9pzdEkFApIj7Ng
提取码:gp9y

#下载安装包,之后进行解压
tar -zxvf  apache-storm-2.1.0.tar.gz
#配置环境变量
export STROM_HOME=/usr/local/apache-storm-2.1.0
export PATH=${PATH}:${STROM_HOME}/bin
#环境变量生效
source /etc/profile
#修改${STORM_HOME}/conf/storm.yaml
#zookeeper集群的主机列表
 storm.zookeeper.servers:
     - "node01"
     - "node02"
     - "node03"
#Nimbus的节点列表
 nimbus.seeds: ["node01", "node02"]
#Nimbus和Supervisor需要本地磁盘上来存储少量状态(如jar包,配置文件等)
 storm.local.dir: "/home/storm"
#workers进程的端口,每个worker进程会使用一个端口来接收信息 
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
#安装包分发
 scp -r apache-storm-2.1.0/ root@node02:/usr/local/
 scp -r apache-storm-2.1.0/ root@node03:/usr/local/
#配置环境分发
scp -r /etc/profile root@node02:/etc/profile
scp -r /etc/profile root@node03:/etc/profile

二、集群启动

#node01
nohup storm nimbus >nimbus.log 2>&1 &
nohup storm ui >ui.log 2>&1 &
#node02启动
nohup storm nimbus >nimbus.log 2>&1 &
nohup storm supervisor >supervisor.log 2>1& &
#启动node03
nohup storm supervisor >supervisor.log 2>&1 &
#在所有节点启动logviewer 为了方便查看日志
nohup bin/storm logviewer >/dev/null 2>&1 &
node01
node02

Strom客户端测试用例编写

本地测试

导入pom文件

  <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>{version}</version>
        </dependency>
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.*;

public class StormWordCount extends BaseRichSpout {

    private List<String>list=Arrays.asList( "Spark","Hadoop","Hbase","Storm","Filnk","Hive" );

    private SpoutOutputCollector spoutOutputCollector;

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
         this.spoutOutputCollector=spoutOutputCollector;
    }

    public void nextTuple() {
        //模拟产生数据
        String lineData = productData();
        spoutOutputCollector.emit( new Values( lineData ) );
        Utils.sleep(1000);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare( new Fields( "line" ) );
    }

    //模拟数据
    private String productData(){
        Collections.shuffle(list);
        Random random=new Random(  );
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join( list.toArray(),"\t",0,endIndex );
    }
}

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class SplitBolt extends BaseRichBolt {

    private OutputCollector collector;

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector=collector;

    }

    public void execute(Tuple input) {
        String line = input.getStringByField("line");
        String[] words = line.split("\t");
        for (String word : words) {
            collector.emit(new Values(word));
        }
    }
}

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class CountBolt extends BaseRichBolt {

    private Map<String,Integer> counts=new HashMap(  );

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        // 输出
        System.out.print("当前实时统计结果:");
        counts.forEach((key, value) -> System.out.print(key + ":" + value + "; "));
        System.out.println();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class LocalWordCountApp {
    public static void main(String[] args) {
        TopologyBuilder builder=new TopologyBuilder();
        builder.setSpout( "StormWordCount",new StormWordCount());
        //将StormWordCount发送到SplitBolt
        builder.setBolt( "SplitBolt",new SplitBolt() ).shuffleGrouping( "StormWordCount" );
        //将SplitBolt发送到CountBolt
        builder.setBolt( "CountBolt",new CountBolt() ).shuffleGrouping( "SplitBolt" );
        //创建本地集群测试 本地不需要安装storm,直接运行该Main方法即可
        // //运行流程StormWordCount先运行 产生测试数据 发送到SplitBot
        //SplitBot对数据进行分割 处理
        //SplitBot对数据处理完成之后发送到CountBolt进行数据统计
        LocalCluster cluster=new LocalCluster(  );
        cluster.submitTopology( "LocalWordCountApp",
                new Config(),
                builder.createTopology() );
    }
}
LocalCluster测试结果

1.打包上传

//修改LocalWordCountApp的LocalCluster的提交方式
package example;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;

public class ClusterWordCountApp {
    public static void main(String[] args) {
        TopologyBuilder builder=new TopologyBuilder();
        builder.setSpout( "example.StormWordCount",new StormWordCount());
        //将StormWordCount发送到SplitBolt
        builder.setBolt( "example.SplitBolt",new SplitBolt() ).shuffleGrouping( "example.StormWordCount" );
        //将SplitBolt发送到CountBolt
        builder.setBolt( "example.CountBolt",new CountBolt() ).shuffleGrouping( "example.SplitBolt" );
        //创建本地集群测试 本地不需要安装storm,直接运行该Main方法即可
        //LocalCluster cluster=new LocalCluster(  );
        //运行流程StormWordCount先运行 产生测试数据 发送到SplitBot
        //SplitBot对数据进行分割 处理
        //SplitBot对数据处理完成之后发送到CountBolt进行数据统计
       // cluster.submitTopology( "main.ClusterWordCountApp",
         //       new Config(),
           //     builder.createTopology() );
        //提交到服务器集群运行
        try {
            StormSubmitter.submitTopology("ClusterWordCountApp",  new Config(), builder.createTopology());
        } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
            e.printStackTrace();
        }

    }
}

打包成jar 上传到服务器

#提交到集群
storm jar /usr/local/storm-1.0-SNAPSHOT.jar  example.ClusterWordCountApp wordCount
#
#查看所有的Topology
storm list
#停止topology --格式 storm kill TogologyName -w[wait 3 time-secs]
storm kill LocalWordCountApp -w 3
成功提交到集群
storm list查看所有的Topology
Storm UI
Topology详情
上一篇下一篇

猜你喜欢

热点阅读