IDEA配置本地开发环境连接远程集群访问Hdfs,Spark S
2020-12-12 本文已影响0人
xiaogp
摘要:IDEA
,Spark
,Hive
,Hdfs
IDEA配置访问hdfs
- IDEA本地环境需要配置
pom.xml
依赖配置hadoop-client
-
resources
集群hadoop配置文件hdfs-site.xml
,core-site.xml
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
将hdfs-site.xml
,core-site.xml
放入src/main/resources
目录下
hifs-site.xml配置详解,通过RPC
通信地址进行hdfs文件的读写操作
<configuration>
<!-- hdfs数据块的复制份数,默认3,和三台机器datanode节点对应 -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!-- 是否在HDFS中开启权限检查 -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
<!-- 为namenode集群定义一个services name -->
<property>
<name>dfs.nameservices</name>
<value>ns1</value>
</property>
<!-- 文件按照128m进行分割 -->
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>
<!--services ns1包含那些namenode,为namenode起名 -->
<property>
<name>dfs.ha.namenodes.ns1</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址,nn1所在地址 -->
<property>
<name>dfs.namenode.rpc-address.ns1.nn1</name>
<value>cloudera01:8020</value>
</property>
<!-- nn1的http通信地址,外部访问地址 -->
<property>
<name>dfs.namenode.http-address.ns1.nn1</name>
<value>cloudera01:50070</value>
</property>
<!-- nn2的RPC通信地址,nn2所在地址 -->
<property>
<name>dfs.namenode.rpc-address.ns1.nn2</name>
<value>cloudera02:8020</value>
</property>
<!-- nn2的http通信地址,外部访问地址 -->
<property>
<name>dfs.namenode.http-address.ns1.nn2</name>
<value>cloudera02:50070</value>
</property>
<!--客户端通过代理访问namenode,访问文件系统,HDFS 客户端与Active 节点通信的Java 类,使用其确定Active 节点是否活跃 -->
<property>
<name>dfs.client.failover.proxy.provider.ns1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
</value>
</property>
<!-- 这个是开启自动故障转移,如果你没有自动故障转移,这个可以先不配 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
</configuration>
core-site.xml配置详解
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://ns1</value>
</property>
<!-- zookeeper集群地址 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>cloudera01:2181,cloudera02:2181,cloudera14:2181,cloudera03:2181,cloudera16:2181</value>
</property>
<property>
<name>fs.hdfs.impl.disable.cache</name>
<value>true</value>
</property>
</configuration>
scala脚本访问hdfs工具类,使用Configuration
加载core-site.xml
, hdfs-site.xml
配置文件获得hdfs入口进行读写操作
import java.io._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, _}
import org.apache.hadoop.io.IOUtils
/**
* hdfs文件夹操作类
*/
object HdfsUtils {
def getFS(): FileSystem = {
this.synchronized {
System.setProperty("HADOOP_USER_NAME", "hdfs")
val conf = new Configuration()
conf.addResource(getClass.getResourceAsStream("/core-site.xml"))
conf.addResource(getClass.getResourceAsStream("/hdfs-site.xml"))
conf.set("mapred.remote.os", "Linux")
println(conf)
FileSystem.get(conf)
}
}
/**
* 关闭FileSystem
*
* @param fileSystem
*/
def closeFS(fileSystem: FileSystem) {
this.synchronized {
if (fileSystem != null) {
try {
fileSystem.close()
} catch {
case e: IOException => e.printStackTrace()
}
}
}
}
/**
* ls
*
* @param hdfsFilePath
*/
def listFiles(hdfsFilePath: String): Unit = {
this.synchronized {
val fileSystem = getFS()
try {
val fstats = fileSystem.listStatus(new Path(hdfsFilePath))
for (fstat <- fstats) {
if (fstat.isDirectory()) {
println("directory")
} else {
println("file")
}
println("Permission:" + fstat.getPermission())
println("Owner:" + fstat.getOwner())
println("Group:" + fstat.getGroup())
println("Size:" + fstat.getLen())
println("Replication:" + fstat.getReplication())
println("Block Size:" + fstat.getBlockSize())
println("Name:" + fstat.getPath())
println("#############################")
}
} catch {
case e: IOException => e.printStackTrace()
} finally {
if (fileSystem != null) {
try {
fileSystem.close()
} catch {
case e: IOException => e.printStackTrace()
}
}
}
}
}
def ls(fileSystem: FileSystem, path: String) = {
println("list path:" + path)
val fs = fileSystem.listStatus(new Path(path))
val listPath = FileUtil.stat2Paths(fs)
for (p <- listPath) {
println(p)
}
println("----------------------------------------")
}
/**
* 创建目录
*
* @param hdfsFilePath
*/
def mkdir(hdfsFilePath: String) = {
this.synchronized {
val fileSystem = getFS()
try {
val success = fileSystem.mkdirs(new Path(hdfsFilePath))
if (success) {
println("Create directory or file successfully")
}
} catch {
case e: IllegalArgumentException => e.printStackTrace()
case e: IOException => e.printStackTrace()
} finally {
this.closeFS(fileSystem)
}
}
}
/**
* 删除文件或目录
*
* @param hdfsFilePath
* @param recursive 递归
*/
def rm(hdfsFilePath: String, recursive: Boolean): Unit = {
this.synchronized {
val fileSystem = this.getFS()
try {
val path = new Path(hdfsFilePath)
if (fileSystem.exists(path)) {
val success = fileSystem.delete(path, recursive)
if (success) {
System.out.println("delete successfully")
}
}
} catch {
case e: IllegalArgumentException => e.printStackTrace()
case e: IOException => e.printStackTrace()
} finally {
this.closeFS(fileSystem)
}
}
}
/**
* 上传文件到HDFS
*
* @param localPath
* @param hdfspath
*/
def write(localPath: String, hdfspath: String) {
this.synchronized {
val fileSystem = this.getFS()
var inStream: FileInputStream = null
var outStream: FSDataOutputStream = null
try {
inStream = new FileInputStream(
new File(localPath))
val writePath = new Path(hdfspath)
outStream = fileSystem.create(writePath)
IOUtils.copyBytes(inStream, outStream, 4096, false)
} catch {
case e: IOException => e.printStackTrace()
} finally {
IOUtils.closeStream(inStream)
IOUtils.closeStream(outStream)
this.closeFS(fileSystem)
}
}
}
/**
* 读文本文件并返回行的列表
*
* @param hdfspath
*/
def readAllLines(hdfspath: String): scala.collection.mutable.ListBuffer[String] = {
this.synchronized {
val fileSystem = this.getFS()
var inStreamReader: InputStreamReader = null
var isr: java.io.BufferedReader = null
var allLines: scala.collection.mutable.ListBuffer[String] = scala.collection.mutable.ListBuffer()
try {
val readPath = new Path(hdfspath)
inStreamReader = new InputStreamReader(fileSystem.open(readPath), "UTF-8")
isr = new java.io.BufferedReader(inStreamReader)
var line: String = null
do {
line = isr.readLine()
if (line != null) {
//println(line)
allLines += line;
}
} while (line != null)
} catch {
case e: IOException => {
e.printStackTrace()
}
} finally {
isr.close
inStreamReader.close
this.closeFS(fileSystem)
}
allLines
}
}
/**
* 读文本文件并返回行的列表
*
* @param hdfspath
*/
def readContent(hdfspath: String): String = {
this.synchronized {
val fileSystem = this.getFS()
var buf: Array[Byte] = null
var inputStream: FSDataInputStream = null
try {
val readPath = new Path(hdfspath)
buf = new Array[Byte](fileSystem.getFileStatus(readPath).getLen.toInt)
inputStream = fileSystem.open(readPath)
var toRead: Int = buf.length
var off = 0
while (toRead > 0) {
val ret: Int = inputStream.read(buf, off, toRead)
if (ret < 0) {
throw new IOException("Premature EOF from inputStream")
}
toRead = toRead - ret
off += ret
Thread.sleep(10)
}
new String(buf, "UTF-8")
} catch {
case e: Exception => {
e.printStackTrace()
}
""
} finally {
inputStream.close
this.closeFS(fileSystem)
}
}
}
/**
* // * 上传文件到HDFS
* // *
* // * @param localFilePath
* // * @param hdfsFilePath
* //
*/
def put(localFilePath: String, hdfsFilePath: String) = {
this.synchronized {
val fileSystem = this.getFS()
var fdos: FSDataOutputStream = null
var fis: FileInputStream = null
try {
fdos = fileSystem.create(new Path(hdfsFilePath))
fis = new FileInputStream(new File(localFilePath))
IOUtils.copyBytes(fis, fdos, 1024)
} catch {
case e: IllegalArgumentException => e.printStackTrace()
case e: IOException => e.printStackTrace()
} finally {
IOUtils.closeStream(fdos)
IOUtils.closeStream(fis)
this.closeFS(fileSystem)
}
}
}
/**
* 打印hdfs上的文件内容
*
* @param hdfsFilePath
*/
def cat(hdfsFilePath: String) {
this.synchronized {
val fileSystem = this.getFS()
var inStream: FSDataInputStream = null
try {
val readPath = new Path(hdfsFilePath)
inStream = fileSystem.open(readPath)
IOUtils.copyBytes(inStream, System.out, 4096, false)
} catch {
case e: IOException => e.printStackTrace()
} finally {
IOUtils.closeStream(inStream)
this.closeFS(fileSystem)
}
}
}
/**
* 打印hdfs上的文件内容
*
* @param hdfsFilePath
*/
def exist(hdfsFilePath: String): Boolean = {
this.synchronized {
val fileSystem = this.getFS()
try {
fileSystem.exists(new Path(hdfsFilePath))
} catch {
case e: IOException =>
e.printStackTrace()
false
false
} finally {
// this.closeFS(fileSystem)
}
}
}
/**
* 下载文件到本地
*
* @param localFilePath
* @param hdfsFilePath
*/
def get(localFilePath: String, hdfsFilePath: String) {
this.synchronized {
val fileSystem = this.getFS()
var fsis: FSDataInputStream = null
var fos: FileOutputStream = null
try {
fsis = fileSystem.open(new Path(hdfsFilePath))
fos = new FileOutputStream(new File(localFilePath))
IOUtils.copyBytes(fsis, fos, 1024)
} catch {
case e: IllegalArgumentException => e.printStackTrace()
case e: IOException => e.printStackTrace()
} finally {
IOUtils.closeStream(fsis)
IOUtils.closeStream(fos)
this.closeFS(fileSystem)
}
}
}
}
IDEA配置Spark访问远程集群hive
- IDEA本地环境需要配置
pom.xml
依赖配置spark-core
,spark-sql
,spark-hive
依赖,其中spark依赖的依赖范围使用默认的compile
-
resources
集群hive配置文件hive-site.xml
指定metastore服务
的机器地址和端口号 - 远程集群hive开启metastore服务
hive --service metastore -p 9083 &
引入项目依赖
<!--Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
hive-site.xml配置,其中hive.metastore.uris
指定metastore服务
运行的机器ip和端口,并且需要单独手动启动metastore服务,客户端连接metastore服务,metastore再去连接MySQL数据库来存取hive元数据,元数据包含用Hive创建的database
、table
等的元信息。有了metastore服务,就可以有多个客户端同时连接
,而且这些客户端不需要知道MySQL数据库的用户名和密码,只需要连接metastore 服务即可。
<property>
<name>hive.metastore.uris</name>
<value>thrift://cloudera01:9083</value>
</property>
脚本测试使用Spark SQL连接远程hive,如果没有权限访问hive表修改HADOOP_USER_NAME
,例如为hdfs
import org.apache.spark.sql.SparkSession
object test {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hdfs")
// hive
val spark = SparkSession.builder().appName("test").master("local").enableHiveSupport().getOrCreate()
spark.sql("show databases").show()
val df = spark.sql("select * from test_gp.student_info")
df.show()
}
}