大数据 -【spark入门】
2019-02-12 本文已影响0人
小哥不才IT
1. 简要说明
基于spark 2.3.1版本学习spark基础知识及整体框架。本文首先以python版为主进行描述,后期会主要针对scala版本进行详细讲解。
2. spark学习环境搭建
- spark安装包下载地址
http://spark.apache.org/downloads.html
https://archive.apache.org/dist/spark/
作者使用的为spark-2.3.1版本为例进行测试与学习。(之所以不选择最新版本,大家都懂的,最新版本不稳定,会有很多坑要踩,索性选择相对稳定的版本)
- 环境设置
1. 如想设置为全局环境变量,则可配置到bashrc_profile中
2. 仅为开发调试,直接进入到下载安装包spark-2.3.1-bin-hadoop2.7/bin下指定相关操作的命令即可。
- 启动spark
启动python版本spark 客户端命令(./pyspark)
Python 2.7.10 (default, Aug 17 2018, 19:45:58)
[GCC 4.2.1 Compatible Apple LLVM 10.0.0 (clang-1000.0.42)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
2019-02-11 17:57:49 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.1
/_/
Using Python version 2.7.10 (default, Aug 17 2018 19:45:58)
SparkSession available as 'spark'.
>>>
到此为止spark学习调试的环境基本搭建完成。
3. 核心概念介绍
-
首先,
每个spark应用都由一个驱动器程序发起集群上的各种并行操作。shell终端本身即为实际的驱动器程序。shell启动时自动创建了一个SparkContext对象,其变量叫sc,所以以下的操作都可以基于sc做操作。 -
其次,驱动器一般管理多个执行器(executor)节点。即在集群模式下执行action操作时,不同的节点会统计不同部分的数据(计算结果)。由于我们在本地模式下执行操作,所以所有的执行任务都会在单节点上运行。
spark集群模式执行过程.png
- 最后,可通过向spark API传递函数,亦可操作相应的集群上。需要对lambda操作熟悉。如:
>>> lines = sc.textFile("README.md")
>>> lines.filter(lambda line: "Python" in line)
PythonRDD[4] at RDD at PythonRDD.scala:49
>>> lines.filter(lambda line: "Python" in line).count()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Library/Python/2.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError
报错原因为:spark默认是从hdfs上都文件的,想要读取本地文件需要增加file://前缀。即:
lambda形式:
lines = sc.textFile("file:///spark-2.3.1-bin-hadoop2.7/README.md")
pyline = lines.filter(lambda line: "Scala" in line)
pyline.count()
函数形式:
def hasScala(line):
return "Scala" in line
pythonLines = lines.filter(hasPython)
- 独立应用之运行方式
Java和Scala中,只需要添加Maven依赖,编辑器会自动下载依赖的包。但 python程序运行需要使用spark自带的spark-submit脚本来运行。(脚本中已经帮我们引入了python程序的spark依赖)
例如:
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
import logging
from pyspark import SparkConf, SparkContext
logging.basicConfig(level=logging.ERROR)
conf = SparkConf().setMaster("local").setAppName("myapp")
sc = SparkContext(conf=conf)
contents = sc.textFile("file://absfilepath")
res = contents.filter(lambda line: "Python" in line)
print "*" * 10,res.count()
sc.stop()
运行方式如:
spark-submit test.py
运行spark-submit时会出现很繁琐不易识别的INFO信息,如何过滤掉INFO信息呢?
注意:将rootCategory等级修改为WARN或者ERROR即可。
方法如下:
修改日志过滤等级:【conf/log4j.properties】
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n