报错集锦大数据学习

Spark SQL:function.array的数据类型问题记

2022-01-27  本文已影响0人  xiaogp

摘要:Spark SQL

问题复现

需要对Spark SQL的DataFrame的一列做groupBy聚合其他所有特征,处理方式是将其他所有特征使用function.array配合function.collect_list聚合为数组,代码如下

    val joinData = data.join(announCountData, Seq("ent_name"), "left_outer").groupBy($"ent_name")
      .agg(collect_list(array("publish_date", "target_amt", "case_position", "case_reason", "plaintiff")).as("items"))
      .map {
        row => {
          // 初始化
          var involvedCount: Int = 0
          var involvedCountLast360: Int = 0
          var involvedDefendantCount: Int = 0 
          var involvedDefendantCountLast360: Int = 0
          var lzjfCount: Int = 0 
          var lzjfCountLast360: Int = 0
          var msqqCount: Int = 0 
          var msqqCountLast360: Int = 0
          // 解析
          val entName: String = row.getAs[String]("ent_name")
          val items: Seq[Seq[String]] = row.getAs[Seq[Seq[String]]]("items")
          for (item: Seq[String] <- items) {
            if (item.head != null) {
              val createCaseDate: String = item.head.split(" ")(0)
              val casePosition: String = item(1)
              val caseReason: String = item(2)
              val plaintiff: String = item(3)
              // TODO 业务统计逻辑
              }
            }
             // 统计结果输出
          (entName, involvedCount, involvedCountInc, involvedDefendantCount, involvedDefendantCountInc, lzjfCount, lzjfCountInc,
            msqqCount, msqqCountInc)
        }
      }.toDF("ent_name", "involved_count", "involved_count_inc", "involved_defendant_count", "involved_defendant_count_inc",
      "lzjf_count", "lzjf_count_inc", "msqq_count", "msqq_count_inc")

执行会报错

org.apache.spark.sql.AnalysisException: ... 
due to data type mismatch: input to function array should all be the same type, 
but it's [timestamp, double, string, string, string];;

报错说的很清楚,array函数内的列数据类型不一致,看下原始数据的数据类型

scala> announAmountData.printSchema
 |-- ent_name: string (nullable = true)
 |-- publish_date: timestamp (nullable = true)
 |-- target_amt: double (nullable = true)
 |-- case_position: string (nullable = true)
 |-- case_reason: string (nullable = true)
 |-- plaintiff: string (nullable = true)

里面包含string,double,timestamp三种类型,因此报错可以理解了,但是我发现这个问题的出现不是绝对的,因为类似这样的代码写了好多回,也有各种类型的数据类型,没有出过错(我怀疑array会自定将非string列改为string),理论上应该一起报错才对,下面开始测试一下


function.array测试

下面分别测试一下string,double,timestamp在使用array的各种场景下哪些会报错类型不一致

scala> val a = Seq(("a", "2021-01-1", 3.3, "1"),("b", "2022-01-01", 4.4, "2")).toDF("a", "b", "c", "d")
a: org.apache.spark.sql.DataFrame = [a: string, b: string ... 2 more fields]

scala> val b = a.select($"a", $"b".cast("timestamp"), $"c", $"d")
b: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 2 more fields]

scala> b.printSchema
root
 |-- a: string (nullable = true)
 |-- b: timestamp (nullable = true)
 |-- c: double (nullable = false)
 |-- d: string (nullable = true)
(1)array(timestamp, string)和array(string, timestamp)
scala> b.withColumn("e", array("b", "d"))
res51: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 2 more fields]

scala> b.withColumn("e", array("d", "b"))
res52: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 2 more fields]
scala> b.withColumn("e", array("d", "b")).show(false)
+---+-------------------+---+---+------------------------+
|a  |b                  |c  |d  |e                       |
+---+-------------------+---+---+------------------------+
|a  |2021-01-01 00:00:00|3.3|1  |[1, 2021-01-01 00:00:00]|
|b  |2022-01-01 00:00:00|4.4|2  |[2, 2022-01-01 00:00:00]|
+---+-------------------+---+---+------------------------+
scala> b.withColumn("e", array("d", "b")).printSchema
root
 |-- a: string (nullable = true)
 |-- b: timestamp (nullable = true)
 |-- c: double (nullable = false)
 |-- d: string (nullable = true)
 |-- e: array (nullable = false)
 |    |-- element: string (containsNull = true)

这两个都是可以的,可见不需要类型一致,最终的array里面都是string,Spark SQL会自动将所有非string列转化为string

(2)array(double, string)和array(string, double)
scala> b.withColumn("e", array("c", "d"))
res58: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]

scala> b.withColumn("e", array("d", "c"))
res59: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]

这种也是可以的不报错,double和string可以组合array

(3)array(double, timestamp)和array(timestamp, double)
scala> b.withColumn("e", array("c", "b"))
org.apache.spark.sql.AnalysisException: cannot resolve 'array(`c`, `b`)' due to data type mismatch: input to function array should all be the same type, but it's [double, timestamp];;
'Project [a#271, b#279, c#273, d#274, array(c#273, b#279) AS e#478]
+- Project [a#271, cast(b#272 as timestamp) AS b#279, c#273, d#274]
   +- Project [_1#266 AS a#271, _2#267 AS b#272, _3#268 AS c#273, _4#269 AS d#274]
      +- LocalRelation [_1#266, _2#267, _3#268, _4#269]

直接报错,不论是array(timestamp, double)还是array(double, timestamp)都直接报错类型不一致,初步结论是array里面没有string列,因为只要将其中任一一列转化为string就可以执行

scala> b.withColumn("e", array($"c".cast("string"), $"b"))
res77: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]

scala> b.withColumn("e", array($"c", $"b".cast("string")))
res78: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]
(4)timestamp,double,string组合出现的情况

这种也是有时报错有时不报错,看顺序,直接Kian测试结果

组合 报错
array(double,timestamp,string) ×
array(timestamp,double,string) ×
array(string,timestamp,double)
array(string,double,timestamp)
array(double,string,timestamp)
array(timestamp,string,double)

初步猜测array的书写顺序需要满足在所有的非string类前面,一定要有至少一个string列


解决方案

解决方案就是手动将所有的非string列先转化为string即可,也就不需要关注书写顺序的问题,改写成如下代码即可

    val joinData = data.join(announAmountData, Seq("ent_name"), "left_outer").groupBy($"ent_name")
      .agg(collect_list(array($"publish_date".cast("string"), $"target_amt".cast("string"), $"case_position", $"case_reason", $"plaintiff")).as("items"))
      .map {
        row => {

          ()
        }
      }.toDF()

上一篇下一篇

猜你喜欢

热点阅读