storm Trident编程的分组策略

2017-04-26  本文已影响38人  先生_吕

Trident编程中的=数据分组策略演示

代码

public class StrategyTopology {
    
    public static class WriteFunction extends BaseFunction {
        private static final Log log = LogFactory.getLog(WriteBolt.class);
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            // 获取上一个组件所声明的Filed
            String text = tuple.getStringByField("sub");
            //打印结果
            System.out.println(text);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    public static StormTopology buildTopology(){
        TridentTopology topology = new TridentTopology();
        //设定数据源
        @SuppressWarnings("unchecked")
        FixedBatchSpout spout = new FixedBatchSpout(
                new Fields("sub"), //声明输出的域字段为“sub”
                4,                  //设置逼出来大小为4
                //设置数据源内容
                new Values("java"),
                new Values("python"),
                new Values("php"),
                new Values("c++"),
                new Values("ruby")
                );
        //指定是否循环
        spout.setCycle(true);
        //指定输入源spout
        Stream inputStream = topology.newStream("spout", spout);
        /**
         * 要实现sqout - bolt的模式 在trident里使用each来完成
         * each方法参数:
         *      1,输入源参数名称
         *      2,需要流转执行的function对象(就是bolt):new WriteFunction(),此function要求自己编写类
         *      3,指定function对象里的输出参数名称,没有则不在继续流向
         * */
        inputStream.
        //随机分组:shuffle
        shuffle().
        //分区分组:partitionBy
        //partitionBy(new Fields("sub")).
        //全局分组:global
        //global().
        //广播分组:broadcast
        //broadcast().
        each(new Fields("sub"), new WriteFunction(),new Fields()).parallelismHint(4);//parallelismHint设置并行度
        return topology.build();
    }

    
    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setNumWorkers(2);
        conf.setMaxSpoutPending(20);
        if(args.length == 0){
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("trident-filter", conf, buildTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }else{
            StormSubmitter.submitTopology(args[0], conf, buildTopology());
        }
    }
}

上一篇下一篇

猜你喜欢

热点阅读