008 Spark Stream 案例1:过滤日志

2019-12-04  本文已影响0人  逸章

一、例1:过滤日志

1. 小例子

1.1 工程结构

image.png

config.sbt和前面的例子不一样了:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.12"
val sparkVersion = "2.4.4"

resolvers ++= Seq(
  "apache-snapshots" at "http://repository.apache.org/snapshots/"
)

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.0-preview"
)

写成下面这样亦可

name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
val sparkVersion = "2.4.4"
//libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "2.4.4" % "provided"
//libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.4.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion,
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVersion
)   

StreamingApps.scala:

package com.packtpub.sfb
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
object StreamingApps{
    def main(args: Array[String])
    {
        // Log level settings
        LogSettings.setLogLevels()
//注意下面没有循环:In the previous code snippet, there is no loop construct telling the application to repeat till the running 
//application is terminated. This is achieved by the Spark Streaming library itself.        
        // Create the Spark Session and the spark context               
        val spark = SparkSession.builder.appName(getClass.getSimpleName).getOrCreate()
        // Get the Spark context from the Spark session for creating the streaming context
        val sc = spark.sparkContext 
        // Create the streaming context,这里的10秒时间是Batch interval
        val ssc = new StreamingContext(sc, Seconds(10))
        // Set the check point directory for saving the data to recover when there is a crash
        ssc.checkpoint("/tmp")
        println("Stream processing logic start")
        // Create a DStream that connects to localhost on port 9999
        // The StorageLevel.MEMORY_AND_DISK_SER indicates that the data will be stored in memory and if it overflows, in disk as well
        //下面一行是每个batch interval(这里是10秒)都创建一个DStream
        val appLogLines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
        //A filter transformation is applied next on the DStream(产生一个新DStream):Count each log message line containing the word ERROR
        val errorLines = appLogLines.filter(line => line.contains("ERROR"))
        //The next line prints the DStream contents to the console. In other words, for every batch interval, 
        //if there are lines containing the word ERROR , that get displayed in the console
        errorLines.print()
        // Count the number of messages by the windows and print them
        //窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍,这两个参数后面再解释
        //对每个滑动窗口里面的数据进行一次count计算:传入一个窗口长度参数,一个窗口移动速率参数,然后返回指定长度窗口中的元素个数
        errorLines.countByWindow(Seconds(30), Seconds(10)).print()
        //但是这个只会执行一次
        println("Stream processing logic end")
        // Start the streaming
        ssc.start()   
        // Wait till the application is terminated            
        ssc.awaitTermination()  
    }
}
object LogSettings{
    /**
    Necessary log4j logging level settings are done
    */
    def setLogLevels() {
        val log4jInitialized =
        Logger.getRootLogger.getAllAppenders.hasMoreElements
        if (!log4jInitialized) {
            // This is to make sure that the console is clean from other INFO messages printed by Spark
            Logger.getRootLogger.setLevel(Level.WARN)
        }
    }
}
滑动窗口的解释: image.png

1.2 编译

image.png

1.3 部署到cluster上

A 为了看到效果,先启动netcat程序(Ubuntu自带)

image.png
B 监控运行中的应用程序
image.png
C 提交任务到Spark上去
在src同级目录增加一个submit.sh文件
#!/bin/bash
#-----------
# submit.sh
#-----------
# IMPORTANT - Assumption is that the $SPARK_HOME and $KAFKA_HOME environment variables are already set in the system that is running the application

# [FILLUP] Which is your Spark master. If monitoring is needed, use the desired Spark master or use local
# When using the local mode. It is important to give more than one cores in square brackets
#SPARK_MASTER=spark://Rajanarayanans-MacBook-Pro.local:7077
#SPARK_MASTER=local[4]
SPARK_MASTER=spark://yay-ThinkPad-T470-W10DG:7077

# [OPTIONAL] Your Scala version
SCALA_VERSION="2.11"

# [OPTIONAL] Name of the application jar file. You should be OK to leave it like that
APP_JAR="simple-project_2.11-1.0.jar"

# [OPTIONAL] Absolute path to the application jar file
PATH_TO_APP_JAR="target/scala-$SCALA_VERSION/$APP_JAR"

# [OPTIONAL] Spark submit command
SPARK_SUBMIT="$SPARK_HOME/bin/spark-submit"

# [OPTIONAL] Pass the application name to run as the parameter to this script
APP_TO_RUN=$1

sbt package

$SPARK_SUBMIT --class $APP_TO_RUN --master $SPARK_MASTER --jars $PATH_TO_APP_JAR $PATH_TO_APP_JAR

然后执行submit:

yay@yay-ThinkPad-T470-W10DG:~/scalaproject/test1$ ./submit.sh com.packtpub.sfb.StreamingApps

D 在nl窗口里面多次批量键入消息
每隔一定时间键入一次:

[Fri Dec 20 01:46:23 2015] [ERROR] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /home/raj/
[Fri Dec 20 01:46:23 2015] [WARN] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /home/raj/
[Fri Dec 20 01:54:34 2015] [ERROR] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /apache/web/test
[Fri Dec 20 01:54:34 2015] [WARN] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /apache/web/test
[Fri Dec 20 02:25:55 2015] [ERROR] [client 1.2.3.4.5.6] Client sent
malformed Host header
[Fri Dec 20 02:25:55 2015] [WARN] [client 1.2.3.4.5.6] Client sent
malformed Host header
[Mon Dec 20 23:02:01 2015] [ERROR] [client 1.2.3.4.5.6] user test:
authentication failure for "/~raj/test": Password Mismatch
[Mon Dec 20 23:02:01 2015] [WARN] [client 1.2.3.4.5.6] user test:
authentication failure for "/~raj/test": Password Mismatch

然后就能在submit的那个窗口里面看到含有error的内容被打印出来:

image.png
D 看监控结果
image.png image.png image.png
上一篇下一篇

猜你喜欢

热点阅读