storm批处理事物原理

2017-08-27  本文已影响0人  shuaidong

title: storm批处理事物原理
date: 2017-08-20
categoties:


storm批处理事物原理

对于容错机制,storm通过一个系统级别的组件acker,结合xor校验机制判断一个tuple是否发送成功,进而spout可以重发该tuple,保证一个tuple在出错的情况下至少被重发一次。
但是在需要精确统计tuple的数量如销售额场景时,希望每个tuple被且仅被处理一次,storm 0.7.0引入了transactional topology,它保证每个tuple被且仅被处理一次,这样我们就可以实现一种非常准确,且高度容错方式来实现计数类应用。逐个处理单个tuple,增加很多开销,如写库,输出结果频率过高。
事物处理单个tuple效率比较低,因此storm中引入了batch处理。事物可确保该批次要么全部处理成功,如果有处理失败的则全部不计,storm会对失败的批次重新发送,且确保每个batch有且仅被处理一次。

事物机制原理:

对于只处理一次的需要,从原理上来讲,需要在发送tuple的时候带上事物id:txid,在需要事物处理的时候,根据该txid是否以前已经处理成功来决定是否进行处理,当然需要把txid和处理结果一起做保存。并且需要保障顺序性,在当前请求txid提交之前,所有比自己低txid请求都已经提交。

在事物batch处理中,一批tuple赋予一个txid,为了提高batch之间处理的并行度,storm采用pipeline(管道)处理模型,这样多个事物可以并行执行,但是commit的是按严格顺序的。

storm事物处理中,把一个batch的计算分成两个阶段processing和commit阶段:

processing阶段:多个batch可以并行计算;
commiting阶段:batch之间强制按照顺序进行提交。

事物topo:

processing阶段:多个batch可以并行计算,比如说bolt2是普通的batchbolt(实现了IBatchBolt),那么多个batch在bolt2的task之间可以并行执行。

commiting阶段:batch之间强制按照顺序进行提交,比如bolt3实现IBatchBolt并且标记需要事物处理(实现了ICommitter接口,或者通过TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology里面),那么storm认为可以提交batch的时候调用finishbatch,在finishBatch做txid的比较以及状态保存工作。

使用transactional topologles的时候,storm会为你做下面的事情:

  1. 管理状态:storm把所有实现transactional topologies所必须的状态保存在zookeeper里面,包括当前transaction id以及每个batch的一些元数据;

  2. 协调事务:storm帮你管理所有事情,如帮你决定在任何一个时间点是该processing还是该committing。

  3. 错误检测:storm利用acking框架来高效地检测什么时候一个batch被成功处理了,被成功提交了,或者失败了。Storm然后会相应地replay对应的batch。你不需要自己手动做任何acking或者anchoring(emit的时候发生的动作)。

  4. 内置的批处理api:storm在普通bolt之上包装了一层API来提供对tuple的批处理支持。storm管理所有的协调工作,包括决定什么时候一个bolt接收到一个特定transaction的所有tuple,storm同时也会自动清理每个transaction所产生的中间数据。

事物性的spout需要实现ITransactionalSpout,这个接口包含两个内部类接口类Coordinator和Emmiter。在topology运行的时候,事务性的spout内部包含一个子topology。
这里面有两种类型的tuple,一种是事务性的tuple,一种是batch中的tuple;
coordinator开启一个事务准备发射一个batch时候,进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到“batch emit”流
emitter以all grouping(广播)的方式订阅coordinator的“batch emit”流,负责为每个batch实际发射tuple,发送的tuple都必须以transactionAttempt作为第一个field,storm根据这个field来判断tuple属于哪一个batch。
coordinator只有一个,emmitter根据并行度可以有多个实例</br>
<font color="#4590a3" size = "3px">transactionAttempt包含两个值:一个transaction id,一个attempt id。
transaction id的作用就是我们上面说的对每个batch中的tuple是唯一的,而不管这个batch replay多少次都是一样的。
attemp id是对于每个batch唯一的一个id,但对于同一个batch,它replay之后的attempt id和replay之前的就不一样了。
我们可以把attempt id理解成replay-times,storm利用这个id来区别一个batch发送的tuple的不同版本。
metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放到zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。
</font>

事务bolt:

BaseTransactionalBolt:

处理batch在一起的tuples,对于每一个tuple调用execute方法,而整个batch处理(processing)完成的时候调用finishBatch方法。如果BatchBolt被标记成committer,则只能在commit阶段调用finishBatch方法。一个batch的commit阶段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。那么如何直到batch的processing完成了,也就是bolt是否接受处理了batch里面所有的tuple:在bolt内部,有一个CoordinatedBolt的模型。

CoordinateBolt:

每个CoordinateBolt记录两个值:有哪些task给我发送了tuple(根据topology的grouping信息);我给哪些task发送信息(同样根据grouping信息)。

等所有的tuple都发送完成之后,coordinatorBolt通过另外一个特殊的stream以emitDirect的方式告诉所有它发送过tuple的task,它发送了多少tuple给这个task。下游task会将这个数字和自己已经接收到的tuple数量做对比,如果相等,则说明处理完了所有的tuple。

下游CoordinateBolt会重复上面的步骤,通知其下游。

![](/Users/shuaidong/Documents/Yu Writer Libraries/Default/storm/pic/coordinateBolt.png)


案例

事务一般只适合做汇总型的统计

定义元数据

import java.io.Serializable;

public class MyMdata implements Serializable {

    //必须实现序列化接口
    private static final long serialVersionUID = -2894782092366251691L;
    private long beginPoint;//事务开始位置
    private long  num;//bitch的tuple个数

    public static long getSerialVersionUID() {
        return serialVersionUID;
    }

    public long getBeginPoint() {
        return beginPoint;
    }

    public void setBeginPoint(long beginPoint) {
        this.beginPoint = beginPoint;
    }

    public long getNum() {
        return num;
    }

    public void setNum(long num) {
        this.num = num;
    }
}

定义事务spout

import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.transactional.ITransactionalSpout;
import org.apache.storm.tuple.Fields;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

public class MyTxSpout implements ITransactionalSpout<MyMdata> {



    Map<Long,String> dbMap = new HashMap<>();
    public MyTxSpout() {
        Random _rand = new Random();
        String[] hosts={"www.haoyidao.com"};
        String[] session_id =  { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };

        String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53",
                "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };

        StringBuffer stringBuffer = new StringBuffer();
        for (long i=0;i<100;i++){
            dbMap.put(i,hosts[0] + "\t" + session_id[_rand.nextInt(5)]+"\t"+ _rand.nextInt(8));

        }
    }

    //getCoordinator方法,告诉Storm用来协调生成批次的类
    @Override
    public Coordinator<MyMdata> getCoordinator(Map map, TopologyContext topologyContext) {
        return new MyCoordinator();
    }

    //getEmitter,负责读取批次并把它们分发到拓扑中的数据流组
    @Override
    public Emitter<MyMdata> getEmitter(Map map, TopologyContext topologyContext) {
        return new MyEmitter(dbMap);
    }

    //定义类型
  //  最后,就像之前做过的,需要声明要分发的域。
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("tx","log"));

    }

    //获得配置文件
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

定义协调者告诉storm如何协调生成批次,即实现Coordinator,代码如下

import org.apache.storm.transactional.ITransactionalSpout;
import org.apache.storm.utils.Utils;

import java.math.BigInteger;

public class MyCoordinator implements ITransactionalSpout.Coordinator<MyMdata> {

    private static int BATCH_NUM = 10;
    //bigInteger为事务txid   myMdata上一个元数据
    @Override
    public MyMdata initializeTransaction(BigInteger bigInteger, MyMdata myMdata) {
        long beginPoint;
        if (myMdata==null){
            beginPoint = 0;
        }else {
            beginPoint = myMdata.getBeginPoint()+myMdata.getNum();
        }
        MyMdata myMdata1 = new MyMdata();
        myMdata1.setBeginPoint(beginPoint);
        myMdata1.setNum(BATCH_NUM);
        System.out.println("启动一个事务:"+myMdata1.toString());


        return myMdata1;
    }

    @Override
    public boolean isReady() {
        //没启动之后让其等待2秒在执行下一个事务
       // Utils.sleep(2000);
        System.out.println("myready start 执行。。。");
        return true;
    }

    @Override
    public void close() {

    }
}

实现实际执行批次的类emitter,将事务中元数据提交到topology中

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.transactional.ITransactionalSpout;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Values;

import java.math.BigInteger;
import java.util.Map;

public class MyEmitter implements ITransactionalSpout.Emitter<MyMdata> {

    //TransactionAttempt  事务的标示,同一个批次被strom重发了,txid是相同的,但是TransactionAttempt是不同的

    private Map<Long,String> dbMap;

    public MyEmitter(Map map) {
        this.dbMap =map;
    }

    @Override
    public void emitBatch(TransactionAttempt transactionAttempt, MyMdata myMdata, BatchOutputCollector batchOutputCollector) {

        long beginPoint = myMdata.getBeginPoint();
        long num = myMdata.getNum();
        for (long i=beginPoint;i<beginPoint+num;i++){

            if (dbMap.get(i)==null){
               continue;
            }

            batchOutputCollector.emit(new Values(transactionAttempt,dbMap.get(i)));
        }
    }

    @Override
    public void cleanupBefore(BigInteger bigInteger) {

    }

    @Override
    public void close() {

    }

}

定义blot,实现对每一个批次的处理、统计

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Iterator;
import java.util.Map;

public class MytransactionBlot extends BaseTransactionalBolt{

    private int count = 0;

    BatchOutputCollector batchOutputCollector;
    TransactionAttempt transactionAttempt;

    //初始化
    @Override
    public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {


        this.batchOutputCollector = batchOutputCollector;
        this.transactionAttempt = transactionAttempt;
    }

    //从emit接到每一行处理,同一个批次处理完成后交给finishBatch
    @Override
    public void execute(Tuple tuple) {
        TransactionAttempt tx = (TransactionAttempt) tuple.getValue(0);
        System.out.println("MytransactionBlot TransactionAttempt id:"+tx.getAttemptId()+"  txid:"+tx.getTransactionId().toString());

        String log = String.valueOf(tuple.getValue(1));
        Iterator iterator = tuple.getFields().iterator();
        while (iterator.hasNext()){
            System.out.println("filds字段值:"+iterator.next());
        }
        if (log!=null&&log.length()>0){
            count++;
        }

    }

    //同一个批次处理完成后做一个处理
    @Override
    public void finishBatch() {

      //  System.out.println("prepare method getAttemptId"+transactionAttempt.getAttemptId()+"txid:"+transactionAttempt.getTransactionId().toString());

        batchOutputCollector.emit(new Values(transactionAttempt,count));

}

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        outputFieldsDeclarer.declare(new Fields("tx","count"));
    }
}


定义一个统一统计的类commit,对上一级多线程进行统计,在构建topo时这个bolt必须为单线程

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.ICommitter;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Tuple;

import java.io.Serializable;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;


public class MyCommitter extends BaseTransactionalBolt implements ICommitter,Serializable {

    private static final long serialVersionUID = 1136043849412072523L;

    public static Map<String,DbValue> dbValueMap = new HashMap<>();

    public static final String GLOBAL_KEY = "GLOBAL_KEY";

    int sum = 0;
    TransactionAttempt transactionAttempt;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {

        this.transactionAttempt = transactionAttempt;
        System.out.println("MyTransactionBolt prepare getAttemptId:"+transactionAttempt.getAttemptId()+"getTransactionId:"+transactionAttempt.getTransactionId());


    }

    //execute会从emit中得到每一行进行处理,同一个批次处理完成交给finishBatch
    @Override
    public void execute(Tuple tuple) {
        TransactionAttempt tx = (TransactionAttempt) tuple.getValue(0);
        String log = tuple.getString(1);
        System.out.println("execut bolt TransactionAttempt id:"+tx.getAttemptId()+"TransactionId:"+tx.getTransactionId());

        if (log!=null&&log.length()>0){
            sum++;
        }
        // sum += tuple.getInteger(1);



    }

    @Override
    public void finishBatch() {
        DbValue value = dbValueMap.get(GLOBAL_KEY);
        if (value == null|| !value.txid.equals(transactionAttempt.getTransactionId())){
            //更新数据库
            DbValue newValue = new DbValue();
            newValue.txid = this.transactionAttempt.getTransactionId();
            if (value ==null){
                newValue.count = sum;
            }else {
                newValue.count = value.count+sum;
            }
            dbValueMap.put(GLOBAL_KEY,newValue);
        }else {

        }

        System.out.println("total===============:"+dbValueMap.get(GLOBAL_KEY).count);

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    public static class DbValue{
        BigInteger txid;
        int count = 0;

    }
}

定义main方法,进行提交到storm中

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.transactional.TransactionalTopologyBuilder;

import java.util.HashMap;
import java.util.Map;

public class MyTopo {
    public static void main(String[] args) {
        TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId","spoutId",new MyTxSpout(),1);
        builder.setBolt("bolt1",new MytransactionBlot(),3).shuffleGrouping("spoutId");
        builder.setBolt("committer",new MyCommitter(),1).shuffleGrouping("bolt1");


        Map conf = new HashMap();
        conf.put(Config.TOPOLOGY_WORKERS,4);

       // System.setProperty("storm.jar","/Users/shuaidong/Downloads/StromLearning/transaction/target/Demo03.jar");

        if (args!=null&&args.length>0){
            try {
                StormSubmitter.submitTopology(args[0],conf,builder.buildTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            } catch (AuthorizationException e) {
                e.printStackTrace();
            }
        }else {
            try {
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("myTxToPo",conf,builder.buildTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

上一篇下一篇

猜你喜欢

热点阅读