Spark-shell批量命令执行脚本

2019-09-29  本文已影响0人  达微
#!/bin/bash

source /etc/profile

exec $SPARK_HOME/bin/spark-shell --queue tv  --name spark-sql-test --executor-cores 8 --executor-memory 8g   --num-executors 8 --conf spark.cleaner.ttl=240000 <<!EOF
import org.apache.spark.sql.SaveMode
sql("set hive.exec.dynamic.partition=true")
sql("set hive.exec.dynamic.partition.mode=nonstrict")
sql("use hr")
sql("SELECT * FROM t_abc ").rdd.saveAsTextFile("/tmp/out") 
sql("SELECT * FROM t_abc").rdd.map(_.toString).intersection(sc.textFile("/user/hdfs/t2_abc").map(_.toString).distinct).count
!EOF

记一次坑比操作,存redis的中间状态用了kyro虚拟化后,查看不方便

#!/bin/bash

source /etc/profile

exec $SPARK_HOME/bin/spark-shell spark-shell --jars $(echo /opt/hsvehicle/DataMiningAnalysis-1.0-hs/target/jars/*.jar /opt/hsvehicle/DataMiningAnalysis-1.0-hs/target/DataMiningAnalysis-1.0-hs.jar | tr ' ' ',') <<!EOF

import java.util
import java.util.concurrent.ConcurrentHashMap
import com.dfssi.common.json.Jsons
import com.dfssi.dataplatform.analysis.hs.battery.{BatteryAlarmResult, BatteryCheckStatusRecord}
import com.dfssi.dataplatform.analysis.hs.monitor.HSVehicleAlarmFromKafka
import com.dfssi.dataplatform.analysis.spark.SparkKryoSerializerAdapter
import io.netty.buffer.Unpooled
import org.apache.spark.SparkConf
import org.joda.time.DateTime
import redis.clients.jedis.exceptions.JedisConnectionException
import redis.clients.jedis.{Jedis, JedisPoolConfig, JedisSentinelPool, Protocol}
import com.dfssi.dataplatform.analysis.redis._
import sun.misc.IOUtils
import com.dfssi.dataplatform.analysis.redis.SentinelConnectionPool
import com.dfssi.dataplatform.analysis.redis.SentinelConnectionPool._
import scala.collection.JavaConversions._

val redisEndpoint = SentinelRedisEndpoint("192.168.1.13:26379,192.168.1.4:26379", "mymaster", "123456", 2, 2000)
val jedis = redisEndpoint.connect()
val redis = ByteBufferRedis(jedis)

val buffer = redis.get("hs:alarm:JA123456")
val kafka = new HSVehicleAlarmFromKafka()
val adapter = SparkKryoSerializerAdapter(new SparkConf())
val record:kafka.AlarmCheckStatusRecord = adapter.deserialize(buffer)
println(record)
!EOF
上一篇 下一篇

猜你喜欢

热点阅读