hadoop实战-8.stormProject Idea、mav
2019-04-01 本文已影响0人
笨鸡
1.创建maven项目
添加storm依赖 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ctgu.ct</groupId>
<artifactId>storm_t</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
2.创建Spout,Bolt
RandomWordSpout.java
package StormProject;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
public class RandomWordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
@Override
public void nextTuple() {
Random random = new Random();
int index = random.nextInt(words.length);
String goodName = words[index];
collector.emit(new Values(goodName));
Utils.sleep(500);
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("orignname"));
}
}
UpperBolt.java
package StormProject;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class UpperBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String goodName = tuple.getString(0);
String goodName_upper = goodName.toUpperCase();
basicOutputCollector.emit(new Values(goodName_upper));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("uppername"));
}
}
SuffixBolt
package StormProject;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
public class SuffixBolt extends BaseBasicBolt {
FileWriter fileWriter = null;
@Override
public void prepare(Map stormConf, TopologyContext context) {
try{
fileWriter = new FileWriter("/root/storm_dir/"+ UUID.randomUUID());
}catch(IOException e){
throw new RuntimeException(e);
}
}
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String upper_name = tuple.getString(0);
String suffix_name = upper_name + " IT IS OK";
try{
fileWriter.write(suffix_name);
fileWriter.write("\n");
fileWriter.flush();
}catch(IOException e){
throw new RuntimeException(e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
TopoMain.java
package StormProject;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
public class TopoMain {
public static void main(String[] args) throws Exception{
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("randomspout", new RandomWordSpout(), 4);
builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
StormTopology topology = builder.createTopology();
Config conf = new Config();
conf.setNumWorkers(4);
conf.setDebug(true);
conf.setNumAckers(0);
StormSubmitter.submitTopology("demotopo", conf, topology);
}
}
3.打包上传并运行
storm_jar.png
浏览器
storm_run.png
tail -f filename
storm_result.png
storm_jar.png
storm_run.png
storm_result.png