spark之旅-6.structured streaming

2022-03-04  本文已影响0人  笨鸡

Structured Streaming

Structured Streaming 代码案例

package com.ctgu.spark.structured_streaming;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;

import java.sql.Timestamp;
import java.util.concurrent.TimeoutException;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.window;


public class Spark_StructuredStreaming_Window {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException {
        Logger.getLogger("org").setLevel(Level.WARN);
        //创建配置文件
        SparkSession spark = SparkSession
                .builder()
                .appName("JavaStructuredNetworkWordCount")
                .master("local[*]")
                .getOrCreate();

        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", "localhost")
                .option("port", 9999)
                .load();

        // Split the lines into words
        Dataset<DataInfo> words = lines.as(Encoders.STRING())
                .filter((FilterFunction<String>) line -> line.split(" ").length == 2)
                .map((MapFunction<String, DataInfo>) line -> {
                    String[] s = line.split(" ");
                    return new DataInfo(s[0], new Timestamp(Long.valueOf(s[1])));
                }, ExpressionEncoder.javaBean(DataInfo.class));

        Dataset<Row> wordCounts = words.withWatermark("timestamp", "10 seconds")
                .groupBy(
                        window(col("timestamp"), "10 seconds", "5 seconds"),
                        col("word"))
                .count();

        // Start running the query that prints the running counts to the console
        StreamingQuery query = wordCounts.writeStream()
                .outputMode("update")
                .format("console")
                .option("checkpointLocation", "./checkpoint_chapter9_14")
//                .trigger(Trigger.Continuous(60 * 1000L))
                .trigger(Trigger.ProcessingTime(0))
                .start();

//        StreamingQuery query = words.writeStream()
//                .outputMode("append")
//                .format("console")
//                .start();

        query.awaitTermination();
    }
}

package com.ctgu.spark.structured_streaming;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;
import java.sql.Timestamp;

@Data
@AllArgsConstructor
public class DataInfo implements Serializable {
    private String word;
    private Timestamp timestamp;
}

Structured Streaming,思维导图

Structured-Streaming:
    起点:
        SparkSession
    Source:
        Socket
        Rate
        File
        Kafka
    Operations :
        基础API与Spark-SQL的Dataset相似
        事件时间: 窗口与水位线
        水位线Join
        去重
    State Store:
        HDFS state store
        RocksDB state store
    Streaming Queries:
        output sink:
            File Sink
            Kafka Sink
            Foreach Sink
            ForeachBatch Sink
            Console Sink
            Memory Sink
        Output Mode:
            Append
            Complete 
            Update 
        Query Type:
            aggregation
            mapGroupsWithState
            flatMapGroupsWithState
            joins
            other
        Trigger:
            Continuous
            ProcessingTime
            Once
        Checkpoint location
        lastProgress
        StreamingQueryListener
上一篇下一篇

猜你喜欢

热点阅读