大数据,机器学习,人工智能大数据大数据 爬虫Python AI Sql

Spark Streaming实战

2019-05-30  本文已影响3人  董二弯

在之前Spark Streaming&Flume&Kafka打造通用流处理平台中已经一起学习了环境的搭建,接下来在此基础上做Spark Streaming处理日志实战。

模拟日志生成

通过python代码模拟实时生成日志,代码如下
generate_log.py:

#coding=UTF-8
import random
import time

url_paths=[
    "class/112.html",
    "class/128.html",
    "class/145.html",
    "class/130.html",
    "class/146.html",
    "class/131.html",
    "learn/821",
    "course/list"
]

ip_slices=[132,156,124,10,29,167,143,187,30,46,55,63,72,87,98,168]

http_referers=[
    "https://www.baidu.com/s?wd={query}",
    "https://www.sogou.com/web?query={query}",
    "https://cn.bing.com/search?q={query}",
    "https://www.so.com/s?q={query}"
]

search_keyword=[
    "spark sql实战",
    "hadoop 基础",
    "storm实战",
    "spark streaming实战"
]

status_code=["200","404","500"]

def sample_status_code():
    return random.sample(status_code,1)[0]

def sample_referer():
    if random.uniform(0,1)>0.2:
        return "-"
    refer_str=random.sample(http_referers,1)
    query_str=random.sample(search_keyword,1)
    return refer_str[0].format(query=query_str[0])

def sample_url():
    return random.sample(url_paths,1)[0]

def sample_ip():
    slice=random.sample(ip_slices,4)
    return ".".join([str(item) for item in slice])

def generate_log(count=10):
    time_str=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())

    f=open("/root/data/streaming_access.log","w+")

    while count >=1:
        query_log="{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{refer}".format(url=sample_url(),ip=sample_ip(),refer=sample_referer(),status_code=sample_status_code(),local_time=time_str)
        print(query_log)
        f.write(query_log+"\n")
        count=count-1

if __name__ == '__main__':
    # 每一分钟生成一次日志信息
    while True:
        generate_log()
        time.sleep(60)

这段代码的含义是每一分钟随机生成10条(可修改)日志并写入到本地文件中,这里的文件路径是我虚拟机中的路径,可随意变动,要和后面flume source的路径相同。生成的日志格式如下。

IP地址 时间 请求方式、url、引擎版本 状态码 搜索引擎提供商
187.98.156.46 2019-05-29 GET /class/112.html HTTP/1.1 500 https://www.sogou.com/web?query=xx

在服务器上通过以下命令可运行python程序

//前提是虚拟机已经安装了python的环境,要是没有安装可百度自行安装
python generate_log.py

环境测试

使用Spark Streaming&Flume&Kafka打造通用流处理平台中搭建的环境,唯一的区别是改造flume agent,在日志文件中读取生成的日志,在输出到Kafka,Spark Streaming在Kafka中消费并处理日志。

agent 修改

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command= tail -F /root/data/streaming_access.log
a1.sources.r1.shell = /bin/sh -c

# Describe the sink
#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
a1.sinks.k1.brokerList=192.168.30.131:9092
#设置Kafka的Topic
a1.sinks.k1.topic=kafka_spark
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.batchSize = 3
a1.sinks.k1.requiredAcks = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

sources的路径要和日志生成程序文件写入路径一致。

测试

启动Kafka环境、flume agent、日志生成程序、本地Spark Streaming程序。
观察本地客户端日志,输出了产生的日志,说明环境可用。若没有输出,建议先阅读以前的课程。


image.png

日志处理

需求:

统计今天到目前为止从搜索引擎过来的课程的访问量

产品推销是很重要的一环,于是很多公司都会给各种各样的平台流水来为自己的产品做推销。但是怎么判断那个平台的推销效果比较适合自己呢?根据大数据实时处理推销平台过来的数据,根据推销平台和产品ID为key做一个访问量的Top排序。这样可增加效果好的平台的投资,结束和效果不好平台的合作。这样不仅大大增加了推销的效果还节约了成本。在这个例子中搜索引擎类似推销平台,课程类似产品。

处理结果存储选型

处理结果可存储在关系型数据库中,如myql oracle等
也可以存储在NoSql中,如hbase,redis等。
由于需求中有访问量累计的操作
若使用mysql,需要每次存储时,先根据主键把数据查询出来,做了相加之后在做更新,比较麻烦。
使用hbase,其中客户端在保存时有incrementColumnValue这个方法,自动与数据库中的记录相加,所以这里使用hbase。

安装配置 HBase

下载、解压、配置环境变量

conf/hbase-env.sh
修改JAVA_HOME
export HBASE_MANAGES_ZK=false

conf/hbase-site.xml:
<configuration>
    <property>
        <name>hbase.rootdir</name>
         <value>hdfs://192.168.30.130:8092/hbase</value>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>192.168.30.131:2181</value>
    </property> 
</configuration>

conf/regionservers:
localhost

HBase 建表

// 1 启动hbase,在此之前启动zookeeper,hadoop环境
start-hbase.sh
// 2 启动shell
hbase shell
// 3 建表
create 'course_search_clickcount','info'
// 4 查看数据表
list
// 5 查看数据表信息
describe 'course_search_clickcount'
// 6 查看表数据
scan 'course_search_clickcount'

业务开发

消费kafka数据、数据清洗与统计
1)实体类

CourseSearchClickCount.scala:

/**
  * 从搜索引擎过来的课程点击数实体类
  * @param day_search_course
  * @param click_count
  */
case class CourseSearchClickCount(day_search_course: String, click_count: Long)

3)添加依赖

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>${hbase.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>${hbase.version}</version>
</dependency>

<!-- hadoop -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
</dependency>

3)工具类
DateUtils.scala

mport java.util.Date

import org.apache.commons.lang3.time.FastDateFormat

/**
  * 日期时间工具类
  */
object DateUtils {

  val OLD_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  val TARGET_FORMAT = FastDateFormat.getInstance("yyyyMMddHHmmss")

  def getTime(time: String) = {
    OLD_FORMAT.parse(time).getTime
  }

  def parseToMinute(time: String) = {
    TARGET_FORMAT.format(new Date(getTime(time)))
  }

  def main(args: Array[String]): Unit = {
    println(parseToMinute("2018-9-6 13:58:01"))
  }
}

HBaseUtils.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

/**
 * HBase操作工具类,Java工具类建议采用单例模式封装
 */
public class HBaseUtils {

    HBaseAdmin admin = null;
    Configuration configuration = null;

    /**
     * 私有构造方法
     */
    private HBaseUtils() {

        configuration = new Configuration();
        configuration.set("hbase.zookeeper.quorum", "localhost:2181");
        configuration.set("hbase.rootdir", "hdfs://localhost:8020/hbase");

        try {
            admin = new HBaseAdmin(configuration);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static HBaseUtils instance = null;

    public static synchronized HBaseUtils getInstance() {
        if (null == instance) {
            instance = new HBaseUtils();
        }
        return instance;
    }

    /**
     * 根据表名获取到HTable实例
     *
     * @param tableName
     * @return
     */
    public HTable getTable(String tableName) {
        HTable table = null;
        try {
            table = new HTable(configuration, tableName);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return table;
    }

    /**
     * 添加一条记录到表中
     *
     * @param tableName
     * @param rowkey
     * @param cf
     * @param column
     * @param value
     */
    public void put(String tableName, String rowkey, String cf, String column, String value) {
        HTable table = getTable(tableName);

        Put put = new Put(Bytes.toBytes(rowkey));
        put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
        try {
            table.put(put);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

4)数据库操作
CourseSearchClickCountDAO.scala

import com.lihaogn.spark.project.utils.HBaseUtils
import com.lihaogn.sparkProject.domain.{CourseClickCount, CourseSearchClickCount}
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes

import scala.collection.mutable.ListBuffer

/**
  * 数据访问层,从搜索引擎过来的课程点击数
  */
object CourseSearchClickCountDAO {

  val tableName = "course_search_clickcount"
  val cf = "info"
  val qualifer = "click_count"

  /**
    * 保存数据到HBase
    *
    * @param list
    */
  def save(list: ListBuffer[CourseSearchClickCount]): Unit = {

    val table = HBaseUtils.getInstance().getTable(tableName)

    for (ele <- list) {
      table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),
        Bytes.toBytes(cf),
        Bytes.toBytes(qualifer),
        ele.click_count)
    }
  }

  /**
    * 根据rowkey查询值
    *
    * @param day_search_course
    * @return
    */
  def count(day_search_course: String): Long = {
    val table = HBaseUtils.getInstance().getTable(tableName)

    val get = new Get(Bytes.toBytes(day_search_course))
    val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)

    if (value == null) {
      0L
    } else
      Bytes.toLong(value)
  }
}

5)主类

import com.imooc.spark.demain.{ClickLog, CourseSearchClickCount}
import com.imooc.spark.dto.CourseSearchClickCountDAO
import com.imooc.spark.util.DateUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
  * description
  *
  * @author zhiying.dong@hand-china.com 2019/05/24 16:54
  */
object SparkStreamingApp {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("DirectKafka")
      .setMaster("local[2]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val ssc = new StreamingContext(conf, Seconds(2))

    val topicsSet = Array("kafka_spark")
    val kafkaParams = mutable.HashMap[String, String]()
    //必须添加以下参数,否则会报错
    kafkaParams.put("bootstrap.servers", "192.168.30.131:9092")
    kafkaParams.put("group.id", "group1")
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams
      )
    )

    // 步骤一:测试数据接收
    val logs = messages.map(_.value)

    // 步骤二:数据清洗
    val cleanData = logs.map(line => {
      val infos = line.split("\t")
      val url = infos(2).split(" ")(1)
      var courseId = 0
      // 获取课程编号
      if (url.startsWith("/class")) {
        val courseHtml = url.split("/")(2)
        courseId = courseHtml.substring(0,courseHtml.lastIndexOf(".")).toInt
      }
      ClickLog(infos(0), DateUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))
    }).filter(clicklog => clicklog.courseId != 0)

    // 步骤三:统计从搜索引擎过来的从今天开始到现在的课程的访问量
    cleanData.map(x=>{
      val referer=x.referer.replaceAll("//","/")
      val splits=referer.split("/")
      var host=""
      if(splits.length>2) {
        host=splits(1)
      }

      (host,x.courseId,x.time)
    }).filter(_._1!="").map(x=>{
      (x._3.substring(0,8)+"_"+x._1+"_"+x._2,1)
    }).reduceByKey(_+_).foreachRDD(rdd=>{
      rdd.foreachPartition(partitionRecords=>{
        val list =new ListBuffer[CourseSearchClickCount]

        partitionRecords.foreach(pair=>{
          list.append(CourseSearchClickCount(pair._1,pair._2))
        })
        // 写入数据库
        CourseSearchClickCountDAO.save(list)

      })
    })
    // 开始计算
    ssc.start()
    ssc.awaitTermination()
  }
}

6)测试
启动Kafka环境、flume agent、日志生成程序、Hadoop环境、hbase环境、本地Spark Streaming程序。
查看hbase中course_search_clickcount表,出现以下内容说明处理结果成功存储到了hbase


image.png

7)后续操作
通过以上几步已经把日志通过搜索引擎+课程编号为主键,和总点击量存储到了数据库。此时可通过Javaweb结合echars等开源展示框架为企业进行展示,实现可视化效果。

总结

这次实战是学习的慕课网上spark steaming的课程,只是找找Spark Streaming实战的感觉,从搭建环境到日志处理实战,理解了Spark Streaming整套处理的流程。其中很多代码都是照着视频一行一行敲出来的,下来还需要更多的扩展才行。接下来准备学习Scala的知识,在业务逻辑开发这一部分做更多的深入。

上一篇下一篇

猜你喜欢

热点阅读