008 Spark Stream 案例1:过滤日志
2019-12-04 本文已影响0人
逸章
一、例1:过滤日志
1. 小例子
1.1 工程结构
image.pngconfig.sbt和前面的例子不一样了:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
val sparkVersion = "2.4.4"
resolvers ++= Seq(
"apache-snapshots" at "http://repository.apache.org/snapshots/"
)
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.0-preview"
)
写成下面这样亦可
name := "Simple Project" version := "1.0" scalaVersion := "2.11.12" val sparkVersion = "2.4.4" //libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "2.4.4" % "provided" //libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.4.4" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-sql" % sparkVersion, "org.apache.spark" %% "spark-mllib" % sparkVersion, "org.apache.spark" %% "spark-core" % sparkVersion, "org.apache.spark" %% "spark-streaming" % sparkVersion, "org.apache.spark" %% "spark-streaming-kafka-0-8" % sparkVersion )
StreamingApps.scala:
package com.packtpub.sfb
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
object StreamingApps{
def main(args: Array[String])
{
// Log level settings
LogSettings.setLogLevels()
//注意下面没有循环:In the previous code snippet, there is no loop construct telling the application to repeat till the running
//application is terminated. This is achieved by the Spark Streaming library itself.
// Create the Spark Session and the spark context
val spark = SparkSession.builder.appName(getClass.getSimpleName).getOrCreate()
// Get the Spark context from the Spark session for creating the streaming context
val sc = spark.sparkContext
// Create the streaming context,这里的10秒时间是Batch interval
val ssc = new StreamingContext(sc, Seconds(10))
// Set the check point directory for saving the data to recover when there is a crash
ssc.checkpoint("/tmp")
println("Stream processing logic start")
// Create a DStream that connects to localhost on port 9999
// The StorageLevel.MEMORY_AND_DISK_SER indicates that the data will be stored in memory and if it overflows, in disk as well
//下面一行是每个batch interval(这里是10秒)都创建一个DStream
val appLogLines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
//A filter transformation is applied next on the DStream(产生一个新DStream):Count each log message line containing the word ERROR
val errorLines = appLogLines.filter(line => line.contains("ERROR"))
//The next line prints the DStream contents to the console. In other words, for every batch interval,
//if there are lines containing the word ERROR , that get displayed in the console
errorLines.print()
// Count the number of messages by the windows and print them
//窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍,这两个参数后面再解释
//对每个滑动窗口里面的数据进行一次count计算:传入一个窗口长度参数,一个窗口移动速率参数,然后返回指定长度窗口中的元素个数
errorLines.countByWindow(Seconds(30), Seconds(10)).print()
//但是这个只会执行一次
println("Stream processing logic end")
// Start the streaming
ssc.start()
// Wait till the application is terminated
ssc.awaitTermination()
}
}
object LogSettings{
/**
Necessary log4j logging level settings are done
*/
def setLogLevels() {
val log4jInitialized =
Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// This is to make sure that the console is clean from other INFO messages printed by Spark
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
滑动窗口的解释:
image.png
1.2 编译
image.png1.3 部署到cluster上
A 为了看到效果,先启动netcat程序(Ubuntu自带):
B 监控运行中的应用程序
image.png
C 提交任务到Spark上去
在src同级目录增加一个submit.sh文件
#!/bin/bash
#-----------
# submit.sh
#-----------
# IMPORTANT - Assumption is that the $SPARK_HOME and $KAFKA_HOME environment variables are already set in the system that is running the application
# [FILLUP] Which is your Spark master. If monitoring is needed, use the desired Spark master or use local
# When using the local mode. It is important to give more than one cores in square brackets
#SPARK_MASTER=spark://Rajanarayanans-MacBook-Pro.local:7077
#SPARK_MASTER=local[4]
SPARK_MASTER=spark://yay-ThinkPad-T470-W10DG:7077
# [OPTIONAL] Your Scala version
SCALA_VERSION="2.11"
# [OPTIONAL] Name of the application jar file. You should be OK to leave it like that
APP_JAR="simple-project_2.11-1.0.jar"
# [OPTIONAL] Absolute path to the application jar file
PATH_TO_APP_JAR="target/scala-$SCALA_VERSION/$APP_JAR"
# [OPTIONAL] Spark submit command
SPARK_SUBMIT="$SPARK_HOME/bin/spark-submit"
# [OPTIONAL] Pass the application name to run as the parameter to this script
APP_TO_RUN=$1
sbt package
$SPARK_SUBMIT --class $APP_TO_RUN --master $SPARK_MASTER --jars $PATH_TO_APP_JAR $PATH_TO_APP_JAR
然后执行submit:
yay@yay-ThinkPad-T470-W10DG:~/scalaproject/test1$ ./submit.sh com.packtpub.sfb.StreamingApps
D 在nl窗口里面多次批量键入消息
每隔一定时间键入一次:
[Fri Dec 20 01:46:23 2015] [ERROR] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /home/raj/
[Fri Dec 20 01:46:23 2015] [WARN] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /home/raj/
[Fri Dec 20 01:54:34 2015] [ERROR] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /apache/web/test
[Fri Dec 20 01:54:34 2015] [WARN] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /apache/web/test
[Fri Dec 20 02:25:55 2015] [ERROR] [client 1.2.3.4.5.6] Client sent
malformed Host header
[Fri Dec 20 02:25:55 2015] [WARN] [client 1.2.3.4.5.6] Client sent
malformed Host header
[Mon Dec 20 23:02:01 2015] [ERROR] [client 1.2.3.4.5.6] user test:
authentication failure for "/~raj/test": Password Mismatch
[Mon Dec 20 23:02:01 2015] [WARN] [client 1.2.3.4.5.6] user test:
authentication failure for "/~raj/test": Password Mismatch
然后就能在submit的那个窗口里面看到含有error的内容被打印出来:
image.pngD 看监控结果
image.png image.png image.png