scala spark

2018-12-26  本文已影响0人  叫兽吃橙子

一 程序配置

1.给程序传入参数,运行的jar包和传参放在最后

spark-submit --master xxx demo.jar "arg1" "arg2"

2.引用 sql 函数包

import org.apache.spark.sql.functions._

3.启动 sparkshell 窗口,名字为 zy ,指定 test 队列

spark2-shell --name "zy test" --master yarn --queue test

4.杀死正在运行的程序

yarn application -kill application_1544177774251_30331

二、dataframe 语法

1.类似 sql nvl 语法

withColumn("newid",when($"old_device_number".isNotNull,$"old_device_number").otherwise($"android_all"))

2.类似 sql split 语法

Val df2=Rdd_04.withColumn("android_all",split($"device_id_raw","\\|")(1))

3.类似 sql group 语法

val pv=df1.groupBy().count().withColumnRenamed("count","pv").withColumn("key1",lit("1"))

val df2 = df1.groupBy("device_number","visit").sum("duration").withColumnRenamed("sum(duration)", "duration")

val mdf31 = mdf3.groupBy("create_time").agg(countDistinct('order_id),countDistinct('user_id),sum('realpay)).withColumnRenamed("create_time","pay_time")

4.窗口函数

val byKey = Window.orderBy('duration)
val df3 = df2.withColumn("percent_rank", percent_rank over byKey)

5.传递参数过滤/不传递参数,where和filter过滤

#多条件过滤
item_view.filter($"iid" === 1619094 && $"uid" === 2338528).orderBy('iid, 'uid, 'time) show
item_view.filter('iid === 1622749 && 'uid === 1658732).orderBy('iid, 'uid, 'time) show

val ind:Int=2;
df.filter($"num"===ind)

val str = s"a"
df.filter($"id"equalTo(str))

val df41 = df4.where(" pid like '%newcomerActivity%' ")
df41.filter( "platform = 2")

6.列运算,不要忘记 $ 号,否则有可能识别不出来列

val df22 = df21.withColumn("pct",$"sum(realpay)" / $"count(DISTINCT user_id)")

7.关联操作,列名不同或者相同

val day_order = df31.join(df22,df31("create_time") === df22("pay_time"),"left")
val day_visit = df41.join(df43,Seq("key"),"left").join(df45,Seq("key"),"left")

8.增加一列,选择任意文本/选择传入参数minputTime

 val pv = df1.groupBy().count().withColumnRenamed("count", "pv").withColumn("key1", lit("1"))
 mdf45 = mdf44.groupBy().agg(countDistinct('device_number),count('device_number)).withColumn("key",lit(minputTime))

9.传入每月一号日期/当天日期

val mf_sdf = new SimpleDateFormat("yyyy-MM-01")
val cal = Calendar.getInstance()
var minputTime = mf_sdf.format(cal.getTime)

val sdf = new SimpleDateFormat("yyyy-MM-dd")
var inputTime = sdf.format(cal.getTime)

10.打印DataFrame的Schema信息

personDF.printSchema

11.DataFrame使用map的示例
下面的代码是前面的基本信息都跑了之后可以运行

val d_dev_sql = s""" select uid,device_number from dw.dwd_device_app_info_di where ds='$inputTime' and uid is not null and uid <>"" and uid <>0 group by uid,device_number """
val d_dev = spark.sql(d_dev_sql)
d_dev.map(f => "uid:" + f(0)).show
d_dev.map(f => "uid:" + f.getAs[String]("uid")).show

12.case when

#CASE WHEN的实现1
df = df.withColumn('mod_val_test1',F.when(df['rand'] <= 0.35,1).when(df['rand'] <= 0.7, 2).otherwise(3))

添加一条实际执行过的代码
    val accu_purchase2 = accu_purchase1.withColumn("is_pay",when(accu_purchase1("total_pay_cnt") === 0,0).otherwise(1))

14.增加排序的序号,转rdd排序加序号然后再转回来

直接把排好序需要加序号的 dataframe 赋值给这个是dataframe,而后可以直接排序
   // 在原Schema信息的基础上添加一列 “id”信息
    val schema: StructType = dataframe.schema.add(StructField("id", LongType))

    // DataFrame转RDD 然后调用 zipWithIndex
    val dfRDD: RDD[(Row, Long)] = dataframe.rdd.zipWithIndex()

    val rowRDD: RDD[Row] = dfRDD.map(tp => Row.merge(tp._1, Row(tp._2)))

    // 将添加了索引的RDD 转化为DataFrame
    val df2 = spark.createDataFrame(rowRDD, schema)

    df2.show()

15 参数传递以及四舍五入

    val max_id = df2.count()
    val df3 = df2.withColumn("rank",round(($"id" / max_id),0))

16 修改数据类型

val order_item2 = order_item.withColumn("realpay", $"realpay".cast(DataTypes.LongType))

SparkSql数据类型
数字类型
ByteType:代表一个字节的整数。范围是-128到127
ShortType:代表两个字节的整数。范围是-32768到32767
IntegerType:代表4个字节的整数。范围是-2147483648到2147483647
LongType:代表8个字节的整数。范围是-9223372036854775808到9223372036854775807
FloatType:代表4字节的单精度浮点数
DoubleType:代表8字节的双精度浮点数
DecimalType:代表任意精度的10进制数据。通过内部的java.math.BigDecimal支持。BigDecimal由一个任意精度的整型非标度值和一个32位整数组成
StringType:代表一个字符串值
BinaryType:代表一个byte序列值
BooleanType:代表boolean值
Datetime类型
TimestampType:代表包含字段年,月,日,时,分,秒的值
DateType:代表包含字段年,月,日的值
原文链接:https://blog.csdn.net/Android_xue/article/details/100975387

17 转为key-value 形式,value 使用json字符串

import org.apache.spark.sql.functions.to_json
val df = spark.sql("""select order_item_id,(named_struct('item_id',item_id,'title',title)) c1
               from dw.dwd_order_item_dd where ds='2019-11-24' limit 10""")
val df2 = df.withColumn("c2", to_json($"c1"))
df2.cache()
df2.show(truncate=false)

18 设置引用时间的问题

package cn.idongjia.statistics.auciton
import cn.idongjia.statistics.util.{Common, Config,DBUtil}
import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}




val sdf = new SimpleDateFormat("yyyy-MM-dd")

val s_sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

val mill_seconds_of_day: Long = 3600 * 24 * 1000

val m_sdf = new SimpleDateFormat("yyyy-MM")


var cal = Calendar.getInstance()
var inputTime = sdf.format(cal.getTime)
var endTime = sdf.format(cal.getTime)


var cal = Calendar.getInstance()
var inputTime = sdf.format(cal.getTime)
#重点在下面这一句 cal.setTime(sdf.parse(inputTime)) ,不写这句,会把时间设置成当前时间的时间戳
cal.setTime(sdf.parse(inputTime))
val from_time = cal.getTimeInMillis

19 排序

item_view.orderBy('uid,'iid,'type desc) show

阅读文档

单词翻译

Interfaces: 接口
Optimizations: 优化
interact: 相互作用
Benefits: 实惠
Manipulated: 任人摆布
manipulation: 操纵
Dynamic: 动态
i.e.:即
conceptually: 概念地
under the hood:底层
builtin: 执行内建的函数
domain: 领域
referred as: 称为
in contrast to:
come with: 伴随
Serialization: 序列化
Verbose: 冗长
Inferring: 推理
implicitly: 暗含的
implicit: 隐式
subsequent: 随后的
explicitly: 明确地
Encoders: 编码器
retrieves multiple columns at once:同时检索多个列
specifying: 说明
parsed: 被解析的
predefined: 预定义的
identical: 同一的
deploying:部署

股票

写一些股票每日复盘的东西。

总体

中钢天源

0518

上一篇下一篇

猜你喜欢

热点阅读