程序员

Storm trident之三聚合运算之persistentAg

2017-03-23  本文已影响0人  SamHxm

Storm trident聚合运算之persistentAggregate

persistentAggregate可以看成是对源源不断发送过来数据流做一个总的聚合,每个批次的聚合值只是一个中间状态,通过与trident新提出的state概念结合,实现中间状态的持久化,同时支持事务性。persistentAggregate不能使用Aggregator<T>,只能使用CombinerAggregator<T>或者ReducerAggregator<T>。聚合运算完成后将运算结果emit给后续的Bolt处理

比如:

TridentTopology topology = new TridentTopology();

topology.newStream("filter", new WordSpout()).parallelismHint(1).shuffle()
    .each(new Fields("field1", "field2"), new WordFilter()).parallelismHint(4).shuffle()
    .each(new Fields("field1", "field2"), new WordFunction(), new Fields("field3")).parallelismHint(4).shuffle()
    .persistentAggregate(new MyState.MyFactory(), new Fields("field3"), new Doing(), new Fields("field4")).newValuesStream().shuffle()
    .each(new Fields("field4"), new CountFilter(), new Fields("field5")).parallelismHint(4);

具体介绍一下persistentAggregate的定义。

public TridentState persistentAggregate(StateFactory stateFactory, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
        return persistentAggregate(new StateSpec(stateFactory), inputFields, agg, functionFields);
}

重点看看persistentAggregate方法所需的参数。

StateFactory stateFactory,要求一个实现org.apache.storm.trident.state.StateFactory接口的对象。该接口比较简单,只有一个方法

State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions);

Map conf:启动topology时的配置信息。
IMetricsContext metrics:监控指标统计对象
int partitionIndex:分区下标
int numPartitions:分区数

返回类型要求一个实现org.apache.storm.trident.state.State接口对象。该接口有2个方法

public interface State {
    void beginCommit(Long txid); // can be null for things like partitionPersist occuring off a DRPC stream
    void commit(Long txid);
}

主要用来支持事务性。

persistentAggregate方法的第二个参数很简单,一个org.apache.storm.tuple.Fields对象,用来指定前一步操作emit出的Fields。

persistentAggregate方法的第三个参数用来指定具体执行聚合运算的对象,也就要求实现CombinerAggregator<T>或者ReducerAggregator<T>的对象。

import org.apache.storm.trident.operation.CombinerAggregator;
import org.apache.storm.trident.tuple.TridentTuple;

public class Doing implements CombinerAggregator<Long>{


    @Override
    public Long init(TridentTuple tuple) {
        return 1L;
    }

    @Override
    public Long combine(Long val1, Long val2) {
        return val1 * val2;
    }

    @Override
    public Long zero() {
        return 0L;
    }

}

init:每条tuple调用1次,对tuple做预处理。

combine:每条tuple调用1次,和之前的聚合值(val1)做运算。如果是第一条tuple则和zero返回的值做运算。

zero:当没有数据流时的处理逻辑。

在每个batch结束时将最后运算得到的结果emi出去t。

persistentAggregate方法的第四个参数指定聚合运算完成后,emit给下一个bolt的Fields。

注意:实现State接口的对象还要实现Snapshottable接口。

package org.apache.storm.trident.state.snapshot;

import org.apache.storm.trident.state.ValueUpdater;


// used by Stream#persistentAggregate
public interface Snapshottable<T> extends ReadOnlySnapshottable<T> {
    T update(ValueUpdater updater);
    void set(T o);
}

通过调用update方法的参数ValueUpdater updater,最终会调用CombinerAggregator接口实现对象的combine方法。

import java.util.Map;

import org.apache.storm.shade.com.google.common.collect.Maps;
import org.apache.storm.task.IMetricsContext;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.ValueUpdater;
import org.apache.storm.trident.state.snapshot.Snapshottable;

public class MyState implements State,Snapshottable<Long> {
    
    private Map<String,Long> result = Maps.newConcurrentMap();
    
    @Override
    public void beginCommit(Long txid) {
    }

    @Override
    public void commit(Long txid) {
    }
    
    @Override
    public Long get() {
        return result.get("key");
    }

    @Override
    public Long update(ValueUpdater updater) {
        Long l =  (Long) updater.update(get());
        set(l);
        return l;
    }

    @Override
    public void set(Long o) {
        result.put("key", o);
    }
    
    public static class MyFactory implements StateFactory {

        @Override
        public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
            return new MyState();
        }
    }
}

假如spout发射3个batch,每个batch4条数据,则调用顺序如下:

第一个batch
CombinerAggregator.zero //当前数据流中没有数据,执行zero方法
CombinerAggregator.init //对当前batch中第一条数据执行初始化操作
CombinerAggregator.combine //对当前batch中第一条数据,执行combine方法,完成聚合运算。用之前步骤中zero的结果与前一步init的结果进行计算
CombinerAggregator.init //对当前batch中第二条数据执行初始化操作
CombinerAggregator.combine //对当前batch中第二条数据执行聚合运算。用前一步init的结果与前一次combine的结果进行计算。
CombinerAggregator.init //对当前batch中第三条数据执行初始化操作
CombinerAggregator.combine //对当前batch中第三条数据执行聚合运算。用前一步init的结果与前一次combine的结果进行计算。
CombinerAggregator.init //对当前batch中第四条数据执行初始化操作
CombinerAggregator.combine //对当前batch中第四条数据执行聚合运算。用前一步init的结果与前一次combine的结果进行计算。
CombinerAggregator.zero //当前数据流中没有数据,执行zero方法
CombinerAggregator.combine //用前一步zero方法的结果与前一次combine的结果进行计算。
State.update //回调State接口实现对象,将前一batch聚合结果与当前batch聚合结果进行计算,需要在State接口实现对象的update方法中保存最近全局聚合运算结果。

第二个batch
CombinerAggregator.zero
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.zero
CombinerAggregator.combine
State.update

第三个batch
CombinerAggregator.zero
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.init
CombinerAggregator.combine
CombinerAggregator.zero
CombinerAggregator.combine
State.update

更多的batch或每个batch有更多的数据时,规则相同

上一篇 下一篇

猜你喜欢

热点阅读