18 spark编程环境搭建
确定版本号
spark编程用的语言为scala
scala的版本和服务器上保持一致:2.11.8
maven的版本为3.3.9
jdk版本1.8以上
安装scala的window版本,设置环境变量
http://www.runoob.com/scala/scala-install.html
idea上安装scala的插件
略,不会百度
idea的配置
idea和maven的配置
jdk的配置
创建spark的工程
1、选择maven创建webapp工程
2、在project structure中的library中选择scala的安装地址
image.png
3、在project structure中的sdk中选择jdk1.8
image.png
工程pom配置
参考官方文档,和https://github.com/apache/spark上的examples下的pom文件配置
<properties>
<hadoop.version>2.5.0</hadoop.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>2.2.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
编写代码运行
/**
* 编写wordCount程序
*/
val spark = SparkSession
.builder
.appName("test")
.master("local")//指定master url
.getOrCreate()//获取sparkSession
val filepath = args(0);//指定文件目录
// val rdd = spark
// .sparkContext//获得sparkContext
// .textFile(filepath);//读物文件目录,返回RDD格式
// val lines = rdd
// .flatMap(x => x.split(" "))//将文件分割成数组
// .map(x => (x,1))//将数组编程map,value为1
// .reduceByKey((a,b)=>(a+b))//根据key相同,value相加,a,b为key相同的value
// .collect().toList//打印
// println(lines)
import spark.implicits._
val dataset = spark
.read
.textFile(filepath)//读物文件
.flatMap(x => x.split(" "))
.map((x) => (x,1))
.groupBy("_1")//安装字段_1分组
.count()//计每组的数
.show();//直接打印
}
运行时出现错误
18/03/03 13:55:47 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363)
at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:78)
at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
at org.apache.hadoop.security.Groups.<init>(Groups.java:77)
at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:257)
at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:234)
at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:749)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:734)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:607)
at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2430)
at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2430)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2430)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:295)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)
at test$.main(test.scala:9)
at test.main(test.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
18/03/03 13:55:49 ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:376)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)
at test$.main(test.scala:9)
at test.main(test.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
18/03/03 13:55:49 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:376)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)
at test$.main(test.scala:9)
at test.main(test.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
因为本地缺少hadoop的环境,运行该段代码需要使用winutils.exe。
到git搜索下载,解压,配置环境变量。即可
上传到服务器运行测试
1、配置环境
把hadoop下的hadoop-env.sh/mared-env.sh/yarn-env.sh的JAVA_HOME路径改为1.8的路径
2、编写stu.txt,作为wordCount文件
3、启动hdfs ,并设置节点1为active,让节点1先启动zkfc,就能保证节点1可以为active
sbin/start-dfs.sh
4、在dfs上创建目录
bin/hdfs dfs -mkdir -p /user/datas
5、上传文件到hdfs上
bin/hdfs dfs -put /opt/datas/stu.txt /user/datas
6、单机版启动spark,运行jar
--master local[2] 表示 本地运行,启动2个线程
指定jar包,以及第一个参数
bin/spark-submit --master local[2] /opt/jars/sparkTest.jar hdfs://bigdata-pro01.kfk.com:8020/user/datas/stu.txt
运行成功后会打印
+----------+-----+
| _1|count|
+----------+-----+
| mysql| 1|
|javascript| 1|
| css| 1|
| sql| 1|
| scala| 3|
| spark| 2|
| hue| 2|
| java| 5|
| hadoop| 2|
| python| 2|
| oracle| 1|
+----------+-----+