storm整合kafka
2018-10-14 本文已影响0人
小猪Harry
maven
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm-kafka-mysql</groupId>
<artifactId>storm-kafka-mysql</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>storm-kafka-mysql</name>
<description />
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>8.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.glassfish.web</groupId>
<artifactId>javax.servlet.jsp.jstl</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.5</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.5</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.31</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<!--<plugin>-->
<!--<artifactId>maven-war-plugin</artifactId>-->
<!--<version>2.2</version>-->
<!--<configuration>-->
<!--<version>3.1</version>-->
<!--<failOnMissingWebXml>false</failOnMissingWebXml>-->
<!--</configuration>-->
<!--</plugin>-->
</plugins>
</build>
</project>
MyKafkaTopology
package com.neusoft;
import java.util.Arrays;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
public class MyKafkaTopology {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
String zks = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
String topic = "orderMq";
// String zkRoot = "/opt/modules/app/zookeeper/zkdata"; // default zookeeper root configuration for storm
String id = "wordtest";
BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, "", id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.forceFromStart = false;
spoutConf.zkServers = Arrays.asList(new String[] {"hadoop01", "hadoop02", "hadoop03"});
spoutConf.zkPort = 2181;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 2); // Kafka我们创建了一个2分区的Topic,这里并行度设置为2
builder.setBolt("print-bolt", new PrintBolt(), 2).shuffleGrouping("kafka-reader");
Config conf = new Config();
String name = MyKafkaTopology.class.getSimpleName();
if (args != null && args.length > 0) {
// Nimbus host name passed from command line
conf.put(Config.NIMBUS_HOST, args[0]);
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
// Thread.sleep(60000);
// cluster.killTopology(name);
// cluster.shutdown();
// StormSubmitter.submitTopology(name, conf, builder.createTopology());
}
}
}
bolt
package com.neusoft;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class PrintBolt extends BaseBasicBolt {
public static final Log log = LogFactory.getLog(PrintBolt.class);
public static final long serialVersionUID = 1L;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
//获取上一个组件所声明的Field
String print = input.getString(0);
log.info("message: " + print);
System.out.println("message is : " + print);
//进行传递给下一个bolt
//collector.emit(new Values(print));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//declarer.declare(new Fields("write"));
}
}