Trident join

2018-07-11  本文已影响0人  正居明阳

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.Consumer;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

/**

*/
public class StreamJoinMain {

public static StormTopology buildTopology() {
FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("key", "value1"), 3, new Values("a", "1"),
new Values("b", "2"), new Values("a", "3"), new Values("a", "4"));
spout1.setCycle(true);//Spout是否循环发送

FixedBatchSpout spout2 = new FixedBatchSpout(new Fields("key", "value2"), 3, new Values("a", "1"),
        new Values("b", "2"), new Values("a", "3"), new Values("a", "5"), new Values("a", "6"));
spout2.setCycle(true);//Spout是否循环发送

TridentTopology topology = new TridentTopology();
Stream stream1 = topology.newStream("spout1", spout1).parallelismHint(2);
Stream stream2 = topology.newStream("spout2", spout2).parallelismHint(2);

topology.join(stream1, new Fields("key"), stream2, new Fields("key"), new Fields("key", "value1", "value2"))
        .peek(new Consumer() {
          public void accept(TridentTuple input) {
            System.out.println(input.toString());
            try {
              Thread.sleep(2000);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        });

return topology.build();

}
public static void main(String[] args) {
Config conf = new Config();
conf.setMaxSpoutPending(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounter", conf, buildTopology());
}
}

上一篇 下一篇

猜你喜欢

热点阅读