Storm集群安装&WordCount示例&项目打包发布
2020-03-23 本文已影响0人
TZX_0710
一、集群安装
链接:https://pan.baidu.com/s/1SUfYk0xd9pzdEkFApIj7Ng
提取码:gp9y
#下载安装包,之后进行解压
tar -zxvf apache-storm-2.1.0.tar.gz
#配置环境变量
export STROM_HOME=/usr/local/apache-storm-2.1.0
export PATH=${PATH}:${STROM_HOME}/bin
#环境变量生效
source /etc/profile
#修改${STORM_HOME}/conf/storm.yaml
#zookeeper集群的主机列表
storm.zookeeper.servers:
- "node01"
- "node02"
- "node03"
#Nimbus的节点列表
nimbus.seeds: ["node01", "node02"]
#Nimbus和Supervisor需要本地磁盘上来存储少量状态(如jar包,配置文件等)
storm.local.dir: "/home/storm"
#workers进程的端口,每个worker进程会使用一个端口来接收信息
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
#安装包分发
scp -r apache-storm-2.1.0/ root@node02:/usr/local/
scp -r apache-storm-2.1.0/ root@node03:/usr/local/
#配置环境分发
scp -r /etc/profile root@node02:/etc/profile
scp -r /etc/profile root@node03:/etc/profile
二、集群启动
#node01
nohup storm nimbus >nimbus.log 2>&1 &
nohup storm ui >ui.log 2>&1 &
#node02启动
nohup storm nimbus >nimbus.log 2>&1 &
nohup storm supervisor >supervisor.log 2>1& &
#启动node03
nohup storm supervisor >supervisor.log 2>&1 &
#在所有节点启动logviewer 为了方便查看日志
nohup bin/storm logviewer >/dev/null 2>&1 &
node01
node02
Strom客户端测试用例编写
本地测试
导入pom文件
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>{version}</version>
</dependency>
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.*;
public class StormWordCount extends BaseRichSpout {
private List<String>list=Arrays.asList( "Spark","Hadoop","Hbase","Storm","Filnk","Hive" );
private SpoutOutputCollector spoutOutputCollector;
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector=spoutOutputCollector;
}
public void nextTuple() {
//模拟产生数据
String lineData = productData();
spoutOutputCollector.emit( new Values( lineData ) );
Utils.sleep(1000);
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare( new Fields( "line" ) );
}
//模拟数据
private String productData(){
Collections.shuffle(list);
Random random=new Random( );
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join( list.toArray(),"\t",0,endIndex );
}
}
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(new Values(word));
}
}
}
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class CountBolt extends BaseRichBolt {
private Map<String,Integer> counts=new HashMap( );
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
// 输出
System.out.print("当前实时统计结果:");
counts.forEach((key, value) -> System.out.print(key + ":" + value + "; "));
System.out.println();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
public class LocalWordCountApp {
public static void main(String[] args) {
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout( "StormWordCount",new StormWordCount());
//将StormWordCount发送到SplitBolt
builder.setBolt( "SplitBolt",new SplitBolt() ).shuffleGrouping( "StormWordCount" );
//将SplitBolt发送到CountBolt
builder.setBolt( "CountBolt",new CountBolt() ).shuffleGrouping( "SplitBolt" );
//创建本地集群测试 本地不需要安装storm,直接运行该Main方法即可
// //运行流程StormWordCount先运行 产生测试数据 发送到SplitBot
//SplitBot对数据进行分割 处理
//SplitBot对数据处理完成之后发送到CountBolt进行数据统计
LocalCluster cluster=new LocalCluster( );
cluster.submitTopology( "LocalWordCountApp",
new Config(),
builder.createTopology() );
}
}
LocalCluster测试结果
1.打包上传
//修改LocalWordCountApp的LocalCluster的提交方式
package example;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
public class ClusterWordCountApp {
public static void main(String[] args) {
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout( "example.StormWordCount",new StormWordCount());
//将StormWordCount发送到SplitBolt
builder.setBolt( "example.SplitBolt",new SplitBolt() ).shuffleGrouping( "example.StormWordCount" );
//将SplitBolt发送到CountBolt
builder.setBolt( "example.CountBolt",new CountBolt() ).shuffleGrouping( "example.SplitBolt" );
//创建本地集群测试 本地不需要安装storm,直接运行该Main方法即可
//LocalCluster cluster=new LocalCluster( );
//运行流程StormWordCount先运行 产生测试数据 发送到SplitBot
//SplitBot对数据进行分割 处理
//SplitBot对数据处理完成之后发送到CountBolt进行数据统计
// cluster.submitTopology( "main.ClusterWordCountApp",
// new Config(),
// builder.createTopology() );
//提交到服务器集群运行
try {
StormSubmitter.submitTopology("ClusterWordCountApp", new Config(), builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
}
}
打包成jar 上传到服务器
#提交到集群
storm jar /usr/local/storm-1.0-SNAPSHOT.jar example.ClusterWordCountApp wordCount
#
#查看所有的Topology
storm list
#停止topology --格式 storm kill TogologyName -w[wait 3 time-secs]
storm kill LocalWordCountApp -w 3
成功提交到集群
storm list查看所有的Topology
Storm UI
Topology详情