storm

Storm坑集锦

2016-10-26  本文已影响132人  Magia

KafkaSpout

1.poutConfig继承KafkaConfig,可以通过SpoutConfig设置kafkaSpout基本属性
spoutConfig.forceFromStart可以设置不从kafka初始位置消费,以免重复消费数据。
2.Config.TOPOLOGY_MAX_SPOUT_PENDING配置可以动态对kafka消费进行限流。

EsBolt

1)向Es发数据时发生了NullPointerException:
at org.codehaus.jackson.util.TextBuffer.findBuffer(TextBuffer.java:207)
refer

2)用户自定义esIndex

builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("bolt1");
builder.setBolt("testBolt", new EsBolt("{esIndex}/" + "test", conf), 2).shuffleGrouping("bolt2");

在Bolt1中定义esIndex

public class TimeBasedIndexNameBuilder {
    public static String build(String indexPrefix, Date collectTime) {
        return indexPrefix + "_" + new SimpleDateFormat("yyyy-MM-dd").format(collectTime);
    }
}
String esIndex = TimeBasedIndexNameBuilder.build("agentX", new Date());

如上,EsBolt就会传入agentX_2016-10-26/test,在ES服务器上生成index:agentX_2016-10-26,type:test

bolt继承多spout

现有bolt需要接受来自spout1和spout2的数据流,可通过getSourceComponent来判断数据流来自哪个spout,然后做进一步处理。

//spou1
builder.setSpout("spout1", new Spout1(spoutConfig), 2);
//spou2
builder.setSpout("spout2", new Spout2(), 2);
builder.setBolt("bolt1", new Bolt1(), 2).allGrouping("spout1").shuffleGrouping("spout2");

Bolt1部分代码如下:

@Override
public void execute(Tuple input) {
    //判断数据流来自Spout1
    if(input.getSourceComponent().equals("spout1")) {
        ...
    } else {
        ...
    }
}
上一篇 下一篇

猜你喜欢

热点阅读