BigData大数据大数据学习

IDEA配置本地开发环境连接远程集群访问Hdfs,Spark S

2020-12-12  本文已影响0人  xiaogp

摘要:IDEASparkHiveHdfs

IDEA配置访问hdfs

        <!-- Hadoop  -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

hdfs-site.xmlcore-site.xml放入src/main/resources目录下
hifs-site.xml配置详解,通过RPC通信地址进行hdfs文件的读写操作

<configuration>
        <!-- hdfs数据块的复制份数,默认3,和三台机器datanode节点对应 -->
        <property>
                <name>dfs.replication</name>
                <value>3</value>
        </property>
         <!-- 是否在HDFS中开启权限检查 -->
        <property>
                <name>dfs.permissions.enabled</name>
                <value>false</value>
        </property>
        <!-- 为namenode集群定义一个services name -->
        <property>
                <name>dfs.nameservices</name>
                <value>ns1</value>
        </property>
        <!-- 文件按照128m进行分割 -->
        <property>
                <name>dfs.blocksize</name>
                <value>134217728</value>
        </property>
        <!--services ns1包含那些namenode,为namenode起名  -->
        <property>
                <name>dfs.ha.namenodes.ns1</name>
                <value>nn1,nn2</value>
        </property>
        <!-- nn1的RPC通信地址,nn1所在地址 -->
        <property>
                <name>dfs.namenode.rpc-address.ns1.nn1</name>
                <value>cloudera01:8020</value>
        </property>
        <!-- nn1的http通信地址,外部访问地址 -->
        <property>
                <name>dfs.namenode.http-address.ns1.nn1</name>
                <value>cloudera01:50070</value>
        </property>
        <!-- nn2的RPC通信地址,nn2所在地址 -->
        <property>
                <name>dfs.namenode.rpc-address.ns1.nn2</name>
                <value>cloudera02:8020</value>
        </property>
        <!-- nn2的http通信地址,外部访问地址 -->
        <property>
                <name>dfs.namenode.http-address.ns1.nn2</name>
                <value>cloudera02:50070</value>
        </property>
        <!--客户端通过代理访问namenode,访问文件系统,HDFS 客户端与Active 节点通信的Java 类,使用其确定Active 节点是否活跃 -->
        <property>
                <name>dfs.client.failover.proxy.provider.ns1</name>
                <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
                </value>
        </property>
        <!-- 这个是开启自动故障转移,如果你没有自动故障转移,这个可以先不配 -->
        <property>
                <name>dfs.ha.automatic-failover.enabled</name>
                <value>true</value>
        </property>
</configuration>

core-site.xml配置详解

<configuration>
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://ns1</value>
        </property>
        <!-- zookeeper集群地址 -->
        <property>
                <name>ha.zookeeper.quorum</name>
                <value>cloudera01:2181,cloudera02:2181,cloudera14:2181,cloudera03:2181,cloudera16:2181</value>
        </property>
        <property>
                <name>fs.hdfs.impl.disable.cache</name>
                <value>true</value>
        </property>
</configuration>

scala脚本访问hdfs工具类,使用Configuration加载core-site.xmlhdfs-site.xml配置文件获得hdfs入口进行读写操作

import java.io._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, _}
import org.apache.hadoop.io.IOUtils

/**
 * hdfs文件夹操作类
 */
object HdfsUtils {

  def getFS(): FileSystem = {
    this.synchronized {
      System.setProperty("HADOOP_USER_NAME", "hdfs")
      val conf = new Configuration()
      conf.addResource(getClass.getResourceAsStream("/core-site.xml"))
      conf.addResource(getClass.getResourceAsStream("/hdfs-site.xml"))
      conf.set("mapred.remote.os", "Linux")
      println(conf)
      FileSystem.get(conf)
    }
  }

  /**
   * 关闭FileSystem
   *
   * @param fileSystem
   */
  def closeFS(fileSystem: FileSystem) {
    this.synchronized {
      if (fileSystem != null) {
        try {
          fileSystem.close()
        } catch {
          case e: IOException => e.printStackTrace()
        }
      }
    }
  }

  /**
   * ls
   *
   * @param hdfsFilePath
   */
  def listFiles(hdfsFilePath: String): Unit = {
    this.synchronized {
      val fileSystem = getFS()
      try {
        val fstats = fileSystem.listStatus(new Path(hdfsFilePath))
        for (fstat <- fstats) {
          if (fstat.isDirectory()) {
            println("directory")
          } else {
            println("file")
          }
          println("Permission:" + fstat.getPermission())
          println("Owner:" + fstat.getOwner())
          println("Group:" + fstat.getGroup())
          println("Size:" + fstat.getLen())
          println("Replication:" + fstat.getReplication())
          println("Block Size:" + fstat.getBlockSize())
          println("Name:" + fstat.getPath())
          println("#############################")
        }

      } catch {
        case e: IOException => e.printStackTrace()
      } finally {
        if (fileSystem != null) {
          try {
            fileSystem.close()
          } catch {
            case e: IOException => e.printStackTrace()
          }
        }
      }
    }
  }

  def ls(fileSystem: FileSystem, path: String) = {
    println("list path:" + path)
    val fs = fileSystem.listStatus(new Path(path))
    val listPath = FileUtil.stat2Paths(fs)
    for (p <- listPath) {
      println(p)
    }
    println("----------------------------------------")
  }

  /**
   * 创建目录
   *
   * @param hdfsFilePath
   */
  def mkdir(hdfsFilePath: String) = {
    this.synchronized {
      val fileSystem = getFS()

      try {
        val success = fileSystem.mkdirs(new Path(hdfsFilePath))
        if (success) {
          println("Create directory or file successfully")
        }
      } catch {
        case e: IllegalArgumentException => e.printStackTrace()
        case e: IOException => e.printStackTrace()
      } finally {
        this.closeFS(fileSystem)
      }
    }
  }

  /**
   * 删除文件或目录
   *
   * @param hdfsFilePath
   * @param recursive 递归
   */
  def rm(hdfsFilePath: String, recursive: Boolean): Unit = {
    this.synchronized {
      val fileSystem = this.getFS()
      try {
        val path = new Path(hdfsFilePath)
        if (fileSystem.exists(path)) {
          val success = fileSystem.delete(path, recursive)
          if (success) {
            System.out.println("delete successfully")
          }
        }

      } catch {
        case e: IllegalArgumentException => e.printStackTrace()
        case e: IOException => e.printStackTrace()
      } finally {
        this.closeFS(fileSystem)
      }
    }
  }

  /**
   * 上传文件到HDFS
   *
   * @param localPath
   * @param hdfspath
   */
  def write(localPath: String, hdfspath: String) {
    this.synchronized {
      val fileSystem = this.getFS()
      var inStream: FileInputStream = null
      var outStream: FSDataOutputStream = null
      try {
        inStream = new FileInputStream(
          new File(localPath))
        val writePath = new Path(hdfspath)
        outStream = fileSystem.create(writePath)
        IOUtils.copyBytes(inStream, outStream, 4096, false)
      } catch {
        case e: IOException => e.printStackTrace()
      } finally {
        IOUtils.closeStream(inStream)
        IOUtils.closeStream(outStream)
        this.closeFS(fileSystem)
      }
    }
  }

  /**
   * 读文本文件并返回行的列表
   *
   * @param hdfspath
   */
  def readAllLines(hdfspath: String): scala.collection.mutable.ListBuffer[String] = {
    this.synchronized {
      val fileSystem = this.getFS()
      var inStreamReader: InputStreamReader = null
      var isr: java.io.BufferedReader = null
      var allLines: scala.collection.mutable.ListBuffer[String] = scala.collection.mutable.ListBuffer()
      try {
        val readPath = new Path(hdfspath)
        inStreamReader = new InputStreamReader(fileSystem.open(readPath), "UTF-8")
        isr = new java.io.BufferedReader(inStreamReader)
        var line: String = null
        do {
          line = isr.readLine()
          if (line != null) {
            //println(line)
            allLines += line;
          }
        } while (line != null)
      } catch {
        case e: IOException => {
          e.printStackTrace()
        }
      } finally {
        isr.close
        inStreamReader.close
        this.closeFS(fileSystem)
      }
      allLines
    }
  }


  /**
   * 读文本文件并返回行的列表
   *
   * @param hdfspath
   */
  def readContent(hdfspath: String): String = {
    this.synchronized {
      val fileSystem = this.getFS()
      var buf: Array[Byte] = null
      var inputStream: FSDataInputStream = null
      try {
        val readPath = new Path(hdfspath)
        buf = new Array[Byte](fileSystem.getFileStatus(readPath).getLen.toInt)
        inputStream = fileSystem.open(readPath)
        var toRead: Int = buf.length
        var off = 0
        while (toRead > 0) {
          val ret: Int = inputStream.read(buf, off, toRead)
          if (ret < 0) {
            throw new IOException("Premature EOF from inputStream")
          }
          toRead = toRead - ret
          off += ret
          Thread.sleep(10)
        }
        new String(buf, "UTF-8")
      } catch {
        case e: Exception => {
          e.printStackTrace()
        }
          ""
      } finally {
        inputStream.close
        this.closeFS(fileSystem)
      }
    }
  }

  /**
   * //    * 上传文件到HDFS
   * //    *
   * //    * @param localFilePath
   * //    * @param hdfsFilePath
   * //
   */
  def put(localFilePath: String, hdfsFilePath: String) = {
    this.synchronized {
      val fileSystem = this.getFS()
      var fdos: FSDataOutputStream = null
      var fis: FileInputStream = null
      try {
        fdos = fileSystem.create(new Path(hdfsFilePath))
        fis = new FileInputStream(new File(localFilePath))
        IOUtils.copyBytes(fis, fdos, 1024)
      } catch {
        case e: IllegalArgumentException => e.printStackTrace()
        case e: IOException => e.printStackTrace()
      } finally {
        IOUtils.closeStream(fdos)
        IOUtils.closeStream(fis)
        this.closeFS(fileSystem)
      }
    }
  }

  /**
   * 打印hdfs上的文件内容
   *
   * @param hdfsFilePath
   */
  def cat(hdfsFilePath: String) {
    this.synchronized {
      val fileSystem = this.getFS()
      var inStream: FSDataInputStream = null
      try {
        val readPath = new Path(hdfsFilePath)
        inStream = fileSystem.open(readPath)
        IOUtils.copyBytes(inStream, System.out, 4096, false)
      } catch {
        case e: IOException => e.printStackTrace()
      } finally {
        IOUtils.closeStream(inStream)
        this.closeFS(fileSystem)
      }
    }
  }

  /**
   * 打印hdfs上的文件内容
   *
   * @param hdfsFilePath
   */
  def exist(hdfsFilePath: String): Boolean = {
    this.synchronized {
      val fileSystem = this.getFS()
      try {
        fileSystem.exists(new Path(hdfsFilePath))
      } catch {
        case e: IOException =>
          e.printStackTrace()
          false
          false
      } finally {
        //      this.closeFS(fileSystem)
      }
    }
  }

  /**
   * 下载文件到本地
   *
   * @param localFilePath
   * @param hdfsFilePath
   */
  def get(localFilePath: String, hdfsFilePath: String) {
    this.synchronized {
      val fileSystem = this.getFS()
      var fsis: FSDataInputStream = null
      var fos: FileOutputStream = null
      try {
        fsis = fileSystem.open(new Path(hdfsFilePath))
        fos = new FileOutputStream(new File(localFilePath))
        IOUtils.copyBytes(fsis, fos, 1024)
      } catch {
        case e: IllegalArgumentException => e.printStackTrace()
        case e: IOException => e.printStackTrace()
      } finally {
        IOUtils.closeStream(fsis)
        IOUtils.closeStream(fos)
        this.closeFS(fileSystem)
      }
    }
  }
}

IDEA配置Spark访问远程集群hive

hive --service metastore -p 9083 &

引入项目依赖

        <!--Spark  -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Hadoop  -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- hive -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

hive-site.xml配置,其中hive.metastore.uris指定metastore服务运行的机器ip和端口,并且需要单独手动启动metastore服务,客户端连接metastore服务,metastore再去连接MySQL数据库来存取hive元数据,元数据包含用Hive创建的databasetable等的元信息。有了metastore服务,就可以有多个客户端同时连接,而且这些客户端不需要知道MySQL数据库的用户名和密码,只需要连接metastore 服务即可。

  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://cloudera01:9083</value>
  </property>

脚本测试使用Spark SQL连接远程hive,如果没有权限访问hive表修改HADOOP_USER_NAME,例如为hdfs

import org.apache.spark.sql.SparkSession

object test {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")

    // hive
    val spark = SparkSession.builder().appName("test").master("local").enableHiveSupport().getOrCreate()
    spark.sql("show databases").show()
    val df = spark.sql("select * from test_gp.student_info")
    df.show()
  }
}
上一篇下一篇

猜你喜欢

热点阅读