大数据准实时流JStorm

2023-08-01  本文已影响0人  后来丶_a24d

目录


概述

  1. 实时数据处理:JStorm可以实时处理和分析大规模的数据流,用于实时数据分析、实时数据仪表盘、实时监控等场景
  2. 实时风控:JStorm能够实时分析大量的数据,可以用于实时风控系统,例如实时检测异常行为、实时预测欺诈等
  3. 在线推荐系统:JStorm可以实时计算用户行为和兴趣,用于在线推荐系统,例如实时推荐商品、实时推荐内容等
  4. 实时监控和报警:JStorm可以实时收集和分析系统指标、日志等数据,用于实时监控和实时报警
  5. 实时大数据分析:JStorm可以快速处理大规模的数据集,用于实时数据分析和机器学习

架构

业务

Topology拓扑整体.png

总体

work相关

work相关.png

通信

storm.messaging.transport = com.alibaba.jstorm.message.netty.NettyContext

调度不同拓扑的调度器用的默认

topology.scheduler.strategy = backtype.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy

实践

数据流分流和合并

分流
SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
                new SequenceSpout(), spoutParal);

builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
                        SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
                
builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
                        .shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
# 在topology提交时:
builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping(
                        SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
                
builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
                SequenceTopologyDef.SPLIT_BOLT_NAME,  // --- 发送方名字
                SequenceTopologyDef.TRADE_STREAM_ID); // --- 接收发送方该stream 的tuple
        
builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
                .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字
                        SequenceTopologyDef.CUSTOMER_STREAM_ID);      // --- 接收发送方该stream 的tuple

# 在发送消息时
public void execute(Tuple tuple, BasicOutputCollector collector) {
     tpsCounter.count();
     
     Long tupleId = tuple.getLong(0);
     Object obj = tuple.getValue(1);
     
     if (obj instanceof TradeCustomer) {
     
         TradeCustomer tradeCustomer = (TradeCustomer)obj;
         
         Pair trade = tradeCustomer.getTrade();
         Pair customer = tradeCustomer.getCustomer();
            
            collector.emit(SequenceTopologyDef.TRADE_STREAM_ID, 
                    new Values(tupleId, trade));
            
            collector.emit(SequenceTopologyDef.CUSTOMER_STREAM_ID, 
                    new Values(tupleId, customer));
     } else if (obj != null) {
         LOG.info("Unknow type " + obj.getClass().getName());
     } else {
         LOG.info("Nullpointer " );
     }
  
 }
# 定义输出流格式
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declareStream(SequenceTopologyDef.TRADE_STREAM_ID, new Fields("ID", "TRADE"));
  declarer.declareStream(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER"));
 }
# 接受消息时,需要判断数据流
 if (input.getSourceStreamId().equals(SequenceTopologyDef.TRADE_STREAM_ID) ) {
    customer = pair;
    customerTuple = input;
    
    tradeTuple = tradeMap.get(tupleId);
    if (tradeTuple == null) {
        customerMap.put(tupleId, input);
        return;
    }
    
    trade = (Pair) tradeTuple.getValue(1);
    
}
数据流合并
# 生成topology时
 builder.setBolt(SequenceTopologyDef.MERGE_BOLT_NAME, new MergeRecord(), 1)
                        .shuffleGrouping(SequenceTopologyDef.TRADE_BOLT_NAME)
                        .shuffleGrouping(SequenceTopologyDef.CUSTOMER_BOLT_NAME);
# 接收方是,区分一下来源component即可识别出数据的来源
if (input.getSourceComponent().equals(SequenceTopologyDef.CUSTOMER_BOLT_NAME) ) {
    customer = pair;
    customerTuple = input;
    
    tradeTuple = tradeMap.get(tupleId);
    if (tradeTuple == null) {
        customerMap.put(tupleId, input);
        return;
    }
    
    trade = (Pair) tradeTuple.getValue(1);
    
} else if (input.getSourceComponent().equals(SequenceTopologyDef.TRADE_BOLT_NAME)) {
    trade = pair;
    tradeTuple = input;
    
    customerTuple = customerMap.get(tupleId);
    if (customerTuple == null) {
        tradeMap.put(tupleId, input);
        return;
    }
    
    customer = (Pair) customerTuple.getValue(1);
} 

事务

例子
说明
  1. 一次性从Kafka 中取出一批数据,然后一条一条将数据发送出去,当所有数据均被正确处理后, 触发一个commit 流,这个commit流是严格排序,通常在commit流中进行flush动作或刷数据库动作,如果commit流最后返回也成功,spout 就更新Meta或kafka的偏移量,否则,任何一个环节出错,都不会更新偏移量,也就最终重复消费这批数据。
  2. 其实,相当于把一个batch当做一个原子tuple来处理,只是中间计算的过程,可以并发
  1. 提供一个strong order,也就是,如果一个tuple没有被完整的处理完,就不会处理下一个tuple,说简单一些,就是,采用同步方式。并对每一个tuple赋予一个transaction ID,这个transaction ID是递增属性(强顺序性),如果每个bolt在处理tuple时,记录了上次的tupleID,这样即使在failure replay时能保证处理仅且处理一次
  2. 如果一次处理一个tuple,性能不够好,可以考虑,一次处理一批(batch tuples) 这个时候,一个batch为一个transaction处理单元,当一个batch处理完毕,才能处理下一个batch,每个batch赋予一个transaction ID
  3. 如果在计算任务中,并不是所有步骤需要强顺序性,因此将一个计算任务拆分为2个阶段:
    3.1 processing 阶段:这个阶段可以并发
    3.2 commit阶段:这个阶段必须强顺序性,因此,一个时刻,只要一个batch在被处理 任何一个阶段发生错误,都会完整重发batch

Trident

例子
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),
               new Values("the man went to the store and bought some candy"),
               new Values("four score and seven years ago"),
               new Values("how many apples can you eat"),
spout.setCycle(true);
TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

Ack 机制

  1. spout发送的每一条消息,在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理
  2. 在规定的时间内,没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败,
  3. 或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作
topology.max.spout.pending
使用ACK
ACK例子
public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        // 创建一个TopologyBuilder
        TopologyBuilder builder = new TopologyBuilder();

        // 设置Spout
        builder.setSpout("spout", new WordSpout(), 2);

        // 设置Bolt
        builder.setBolt("split", new WordSplitBolt(), 2)
                .shuffleGrouping("spout");

        builder.setBolt("count", new WordCountBolt(), 2)
                .fieldsGrouping("split", new Fields("word"));

        // 创建一个本地集群
        LocalCluster cluster = new LocalCluster();

        // 提交Topology到本地集群运行
        Config config = new Config();
        config.setDebug(true);
        // 设置reliable mode
        config.setNumAckers(1);
       // config.setMaxSpoutPending(10000);

        cluster.submitTopology("word-count", config, builder.createTopology());
    }
}

public class WordSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        // 随机暂停1秒-5秒
        Utils.sleep((int) (Math.random() * 4000 + 1000));
        // 发送Tuple到Bolt进行处理
        String msgId = UUID.randomUUID().toString();
        System.out.println("msgId: " + msgId);
        collector.emit(new Values("hello world" + UUID.randomUUID()), msgId);
    }
    @Override
    public void ack(Object msgId) {
        // 成功处理消息,不需要进行重发
        System.out.println("success: " + msgId);
    }

    @Override
    public void fail(Object msgId) {
        // 处理失败的消息,重新发送, 这边可以自己维护一个内存数据,当失败时重发
        String messageId = (String) msgId;
        System.out.println("fail msg " + msgId);
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

public class WordSplitBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        String[] words = word.split(" ");

        // 发送每个单词到WordCountBolt进行计数, collector.emit(tuple, new Values(w))这样使用才能在下一个bolt使用时处理
        for (String w : words) {
            collector.emit(tuple, new Values(w));
        }
        System.out.println("sout res ----------------:" + word + "msgId: " + tuple.getMessageId().toString());
        // 发送ack消息确认Tuple已经被成功处理
        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

public class WordCountBolt extends BaseRichBolt {
    private OutputCollector collector;
    private final Map<String, Integer> counts = new HashMap<>();

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");

        // 计算单词的频率
        if (counts.containsKey(word)) {
            counts.put(word, counts.get(word) + 1);
        } else {
            counts.put(word, 1);
        }
        System.out.println("sout res ++++++++++++++:" + word + ": " + counts.get(word));
        // 发送ack消息确认Tuple已经被成功处理, 这里模拟发送失败
        collector.fail(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // do nothing
    }

    @Override
    public void cleanup() {

    }
}

JStorm 任务的动态伸缩

- Task维度: Spout, Bolt,Acker并发数的动态调整
- Worker维度: Worker数的动态调整
命令行方式
USAGE: jstorm rebalance [-r] TopologyName [DelayTime] [NewConfig]
e.g. jstorm rebalance SequenceTest conf.yaml

参数说明:
-r: 对所有task做重新调度
TopologyName: Topology任务的名字
DelayTime: 开始执行动态调整的延迟时间
NewConfig: Spout, Bolt, Acker, Worker的新配置(当前仅支持yaml格式的配置文件)

topology.workers: 4
topology.acker.executors: 4

topology.spout.parallelism:
  SequenceSpout : 2
topology.bolt.parallelism:
  Total : 8

API接口
backtype.storm.command.rebalance:
- public static void submitRebalance(String topologyName, RebalanceOptions options)
- public static void submitRebalance(String topologyName, RebalanceOptions options, Map conf)
    Note: conf里带上zk信息,可以用来做对一个集群的远程提交
应用场景

Jstorm 支持动态更新配置文件

public interface IDynamicComponent extends Serializable {
public void update(Map conf); //conf 配置文件
}
才能实现配置文件的动态更新,至于配置文件怎么去更新相应component的配置,是由用户事先实现好的接口update()决定的。当用户提交动态更新配置文件的命令后,该函数会被回调。更新配置文件是以component为级别的,每个component都有自己的update,如果哪个component不需要实现配置文件动态更新,那它就无需继续该接口

命令行方式
USAGE: jstorm  update_topology  TopologyName  -conf configPath
e.g. jstorm  update_topology  SequenceTest –conf  conf.yaml
参数说明:
TopologyName: Topology任务的名字
configPath: 配置文件名称(暂时只支持yaml格式)

topology.max.spout.pending: 10000
send.sleep.second: 100
kryo.enable: false
fall.back.on.java.serialization: true

API接口
backtype.storm.command.update_topology:
-     private static void updateTopology(String topologyName, String pathJar,
         String pathConf)
    Note: 参数pathJar是用设置来实现jar的更新,没有的话可以设置为null

开发经验总结

  1. spout 按单task每秒500的QPS计算并发,全内存操作的task,按单task 每秒2000个QPS计算并发
  2. 有向外部输出结果的task,按外部系统承受能力进行计算并发
  1. 拉取的频率不要太小,低于100ms时,容易造成MetaQ/Kafka 空转次数偏多
  2. 一次获取数据Block大小推荐是2M或1M,太大内存GC压力比较大,太小效率比较低
  1. 异步模式, 性能更好, 但容易造成spout 出现fail, 适合在无acker模式下,storm.messaging.netty.sync.mode: false
  2. 同步模式, 底层是接收端收一条消息,才能发送一条消息, 适合在有acker模式下,storm.messaging.netty.sync.mode: true

运维经验总结

nohup jstorm supervisor >/dev/null 2>&1 &
ln -s jstorm-0.9.4 jstorm-current

限流控制

限流和解除限流方式
使用方式
## 反压总开关
topology.backpressure.enable: true
## 高水位 -- 当队列使用量超过这个值时,认为阻塞
topology.backpressure.water.mark.high: 0.8
## 低水位 -- 当队列使用量低于这个量时, 认为可以解除阻塞
topology.backpressure.water.mark.low: 0.05
## 阻塞比例 -- 当阻塞task数/这个component并发 的比例高于这值时,触发反压
topology.backpressure.coordinator.trigger.ratio: 0.1

## 反压采样周期, 单位ms
topology.backpressure.check.interval: 1000
## 采样次数和采样比例, 即在连续4次采样中, 超过(不包含)(4 *0.75)次阻塞才能认为真正阻塞, 超过(不包含)(4 * 0.75)次解除阻塞才能认为是真正解除阻塞
topology.backpressure.trigger.sample.rate: 0.75
topology.backpressure.trigger.sample.number: 4

Grouping 方式

fieldsGrouping
globalGrouping
shuffleGrouping
localOrShuffleGrouping
localFirstGrouping
noneGrouping
allGrouping
directGrouping
customGrouping

IBasicBolt vs IRichBolt

在使用IRichBolt是,如果你想可靠的处理元组,你应该显式地调用

emit(String streamId, Tuple anchor, List<Object> tuple)

性能优化

选型
增加并发
让task分配更均匀
使用MetaQ或Kafka时

调度定制化接口(0.9.5 及高版本)

设置每个worker的默认内存大小
       ConfigExtension.setMemSizePerWorker(Map conf, long memSize)

设置每个worker的cgroup,cpu权重
       ConfigExtension.setCpuSlotNumPerWorker(Map conf, int slotNum)

设置是否使用旧的分配方式
       ConfigExtension.setUseOldAssignment(Map conf, boolean useOld)

设置强制某个component的task 运行在不同的节点上
       ConfigExtension.setTaskOnDifferentNode(Map componentConf, boolean isIsolate)

注意,这个配置componentConf是component的配置, 需要执行addConfigurations 加入到spout或bolt的configuration当中

自定义worker分配
      WorkerAssignment worker = new WorkerAssignment();
      worker.addComponent(String compenentName, Integer num);//在这个worker上增加一个task
      worker.setHostName(String hostName);//强制这个worker在某台机器上
      worker.setJvm(String jvm);//设置这个worker的jvm参数
      worker.setMem(long mem); //设置这个worker的内存大小
      worker.setCpu(int slotNum); //设置cpu的权重大小
      ConfigExtension.setUserDefineAssignment(Map conf, List<WorkerAssignment> userDefines)

注:每一个worker的参数并不需要被全部设置,worker属性在合法的前提下即使只设置了部分参数也仍会生效

强制topology运行在一些supervisor上

在实际应用中, 常常一些机器部署了本地服务(比如本地DB), 为了提高性能, 让这个topology的所有task强制运行在这些机器上

conf.put(Config.ISOLATION_SCHEDULER_MACHINES, List<String> isolationHosts);

conf 是topology的configuration

调度细则
默认调度算法

jstorm状态管理

线上配置

storm.messaging.transport: com.alibaba.jstorm.message.netty.NettyContext
storm.messaging.netty.authentication: false
topology.backpressure.water.mark.high: 0.8

DRPC和Netty


实际使用一些抽象的工具类

滑动窗口

public class WindowContainer {

    private static final Map<String, String> markMap = Maps.newConcurrentMap();

    public static <T> TimeWindow<T> getTimeWindow(Map map, String key, boolean isTime, int interval, TimeUnit unit) {
        TimeWindow<T> window;
        if (map.containsKey(key)) {
            window = (TimeWindow<T>) map.get(key);
        } else {
            window = makeWindow(key, isTime, interval, unit);
            map.put(key, window);
        }
        return window;
    }

    public static <T> TimeWindow<T> getResilienceTimeWindow(Map map, String key, boolean isTime, int interval, TimeUnit unit) {
        TimeWindow<T> window;
        if (map.containsKey(key)) {
            window = (TimeWindow<T>) map.get(key);
            String preKey = markMap.get(key);
            // 窗口变化,重新平衡
            if (!markKey(unit, interval).equals(preKey)) {
                window.rebalance(unit, interval);
            }
        } else {
            window = makeWindow(key, isTime, interval, unit);
            map.put(key, window);
            markMap.put(key, markKey(unit, interval));
        }
        return window;
    }

    private static <T> TimeWindow<T> makeWindow(String key, boolean isTime, int interval, TimeUnit unit) {
        return isTime ? new TimeWindow<T>(key, interval, unit) : new TimeWindow<T>(key, interval);
    }

    private static String markKey(TimeUnit unit, int interval) {
        return unit == null ? String.valueOf(interval) : unit + "-" + interval;
    }

    private WindowContainer() {
        throw new UnsupportedOperationException();
    }

    public static Map<String, String> getMarkMap() {
        return markMap;
    }

    public static void main(String[] args) {
        Map<String, Object> map = new HashMap<>();
        String key = "123";
        TimeWindow<String> timeWindow = WindowContainer.getTimeWindow(map, key, true, 1000, TimeUnit.SECONDS);
        int time = (int) (new DateTime().minusSeconds(1000).getMillis() / 1000);
        timeWindow.addCount(time + 5, "test");
        List<String> res = timeWindow.getDatas(2, time + 10, true);
        System.out.println("Sfdsdff");
    }
}

public class TimeWindow<T> {

    private final String key;
    private int capacity;
    private int duration;
    private boolean hasTime;
    private boolean hasCapacity;
    private final Queue<Point<T>> queue = new PriorityBlockingQueue<>();
    private volatile int earliestTime = Integer.MAX_VALUE;
    private String component;

    public TimeWindow(String key, int capacity) {
        this.key = key;
        this.capacity = capacity;
        this.hasCapacity = true;
    }

    public TimeWindow(String key, int duration, TimeUnit unit) {
        this.key = key;
        this.duration = (int) unit.toSeconds(duration);
        this.hasTime = true;
    }

    public TimeWindow(String key, int capacity, int duration, TimeUnit unit) {
        this.key = key;
        this.duration = (int) unit.toSeconds(duration);
        this.capacity = capacity;
        this.hasCapacity = true;
        this.hasTime = true;
    }

    /**
     * 添加元素
     * 过期数据丢弃:
     * 1. 数据驱动:若队列满且数据时间过期直接丢弃不入队。
     * 2. 时间驱动:若数据时间过期直接丢弃不入队。
     * 出队过期元素:
     * 1. 数据驱动:队列满尝试出队时间小于当前数据的数据
     * 2. 时间驱动:每次添加都尝试出队过期数据(时间窗口外的数据)
     *
     */
    public void addCount(int time, T value) {

        // 按队列大小过滤
        if (hasCapacity) {
            // 队列满 且新进来的数据时间小于最小时间则丢弃
            if (queue.size() >= capacity && expired(time)) {
                return;
            }

            while (true) {
                // 队列满出队小于当前时间数据
                if (queue.size() < capacity || !pollPeekIfExpired(time)) {
                    break;
                }
            }
        }

        // 按时间维度过滤
        if (hasTime) {
            // 直接丢弃过期数据
            int expiredTime = (int) (new DateTime().minusSeconds(duration).getMillis() / 1000);
            if (expired(time)) {
                return;
            }

            while (true) {
                // 队列满出队不在窗口中的数据
                if (queue.isEmpty() || !pollPeekIfExpired(expiredTime)) {
                    break;
                }
            }
        }

        queue.offer(new Point<>(time, value));
        if (time < earliestTime) {
            earliestTime = time;
        }
    }

    /**
     * 数据时间是否过期
     * 1. 数据驱动: 查看时间是否比最早的时间还小
     * 2. 时间驱动: 查看时间是否早于窗口
     */
    public boolean expired(int time) {
        int expired = hasCapacity ? earliestTime : (int) (new DateTime().minusSeconds(duration).getMillis() / 1000);
        return expired > time;
    }

    public List<Point<T>> all() {
        List<Point<T>> points = Lists.newLinkedList();
        for (Point<T> aQueue : queue) {
            points.add(aQueue);
        }
        Collections.sort(points);
        return points;
    }

    public Point<T> head() {
        return queue.peek();
    }

    public int size() {
        return queue.size();
    }

    public int count(int from, int to, boolean edge) {
        return find(from, to, edge).size();
    }


    public List<T> getDatas(int from, int to, boolean edge) {
        List<Point<T>> points = find(from, to, edge);
        List<T> results = new ArrayList<>(points.size());
        for (Point<T> point : points) {
            results.add(point.getValue());
        }
        return results;
    }

    public List<T> getDatas(int from, int to, boolean leftEdge, boolean rightEdge) {
        List<Point<T>> points = find(from, to, true);
        List<T> results = Lists.newLinkedList();
        for (Point<T> point : points) {
            results.add(point.getValue());
        }

        if (CollectionUtils.isNotEmpty(points)) {
            if (!leftEdge) {
                while (!results.isEmpty() && points.get(0).getTime() == from) {
                    results.remove(0);
                    points.remove(0);
                }
            }

            if (!rightEdge) {
                while (!results.isEmpty() && points.get(points.size() - 1).getTime() == to) {
                    results.remove(points.size() - 1);
                    points.remove(points.size() - 1);
                }
            }
        }
        return results;
    }

    public List<T> getDatas() {
        List<T> results = Lists.newArrayList();
        for (Point<T> p : all()) {
            results.add(p.getValue());
        }
        return results;
    }

    /**
     * 淘汰过期的队列头结点
     * CAS 操作防止高并发问题
     */
    public synchronized boolean pollPeekIfExpired(int expiredTime) {
        if (queue.isEmpty()) {
            return false;
        }

        Point<T> point = queue.peek();
        if (point == null) {
            return false;
        }

        if (point.getTime() <= expiredTime) {
            queue.poll();
            return true;
        }

        return false;
    }


    public List<Point<T>> find(int from, int to, boolean edge) {
        List<Point<T>> points = all();
        boolean leftFlag = false;
        boolean rightFlag = false;
        int left = 0;
        int right = points.size() - 1;

        while (left <= right) {
            if (!leftFlag) {
                int time = points.get(left).getTime();
                if ((time > from) || (time == from && edge)) {
                    leftFlag = true;
                } else {
                    left++;
                }
            }

            if (!rightFlag) {
                int time = points.get(right).getTime();
                if ((time < to) || (time == to && edge)) {
                    rightFlag = true;
                } else {
                    right--;
                }
            }

            if (leftFlag && rightFlag) {
                return points.subList(left, right + 1);
            }
        }
        return Lists.newArrayList();
    }

    /**
     * 动态修改窗口大小,无需重新平衡数据,当有新的数据进来时会自动做平衡
     */
    public void rebalance(TimeUnit unit, int interval) {
        if (unit == null) {
            this.capacity = interval;
        } else {
            this.duration = (int) unit.toSeconds(interval);
        }
    }

    public static class Point<T> implements Comparable<Point> {
        private final int time;
        private final T value;

        public Point(int time, T value) {
            this.time = time;
            this.value = value;
        }

        public int getTime() {
            return time;
        }

        public T getValue() {
            return value;
        }

        @Override
        public int compareTo(Point o) {
            return Integer.compare(this.time, o.getTime());
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            Point<?> point = (Point<?>) o;

            return getTime() == point.getTime();
        }

        @Override
        public int hashCode() {
            return getTime();
        }

        @Override
        public String toString() {
            return "Point{" +
                    "time=" + time +
                    ", value=" + value +
                    '}';
        }
    }

    public void setComponent(String component) {
        this.component = component;
    }

    @Override
    public String toString() {
        StringBuilder msg = new StringBuilder("[");
        for (Point<T> p : queue) {
            msg.append(p.toString()).append(",");
        }
        return msg.append("]").toString();
    }
}

public class TimestampData {
    private final TreeMap<Long, Object> data;

    public TimestampData() {
        data = new TreeMap<>();
    }

    public void put(long timestamp, Object value) {
        data.put(timestamp, value);
        // TODO: 2023/7/12 put时判断key是否过期
    }

    public Object getRange(long fromTimestamp, long toTimestamp) {
        return data.subMap(fromTimestamp, true, toTimestamp, true);
    }

    public void removeOldKey(long to) {
        Map<Long, Object> expiredData = data.headMap(to);
        List<Long> res = new ArrayList<>(expiredData.keySet());
        for (Long key : res) {
            data.remove(key);
        }
    }

    public static void main(String[] args) {
        TimestampData timestampData = new TimestampData();
        timestampData.put(222222, "Value 2");
        timestampData.put(333333, "Value 3");
        timestampData.put(111111, "Value 1");
        timestampData.put(444444, "Value 4");
        timestampData.put(555555, "Value 5");

        Object rangeData = timestampData.getRange(222221, 444445);
        // 取数据时可以把 treemap的value给出去
        System.out.println(rangeData);

        timestampData.removeOldKey(333334);
        Object rangeData1 = timestampData.getRange(11, 11111111111L);
        System.out.println(rangeData1);
        // TODO: 2023/7/12 并发情况下都要枷锁,也会有可能是不同线程处理,同一个work时
        // 然后加聚合key, key不存在创造TimestampData, 存在则获取TimestampData,然后对TimestampData进行操作,put时判断key是否过期
        // 一个是时间过期,一个是数据量过期
        Map<String, TimestampData> res = new ConcurrentHashMap<>();
    }
}

实例

select A, count(1) as cnt
from (select A, B, count(1) as cnt
      from Table
      where 五分钟内数据
      group by A, B) as C
where 三分钟内数据
group by A;

参考文章

上一篇 下一篇

猜你喜欢

热点阅读