pyspark基础入门demo

2020-11-25  本文已影响0人  欧呆哈哈哈

0. 前言

1. 基础操作

#python脚本里
spark = SparkSession.builder.appName(job_name).getOrCreate()
- spark-submit 设置运行参数
#spark安装地址
spark_home="xxx/spark-2.3/"
spark_submit=${spark_home}/bin/spark-submit
#要执行的Python脚本
py_file=$1

${spark_submit} \
--master yarn \
--queue xxxxx \
--num-executors 250 \
--executor-cores 4 \  #executor的核数,每个核可运行一个进程,核越多说明可并行程度越高
--executor-memory 16G \ #executor所占内存
--files adapter.py \
--conf spark.sql.catalogImplementation=hive \
--conf spark.dynamicAllocation.enable=false \
--conf spark.yarn.priority=NORMAL \
--conf spark.default.parallelism=1200 \
$py_file 
#平常执行的sql语句
sql_str = "" 
#执行sql语句
spark.sql(sql_str)
- 读取文本
#定义文本的schema 表示文本的结构
midlog_schema = T.StructType([
T.StructField("q_stra", T.StringType(), True),
T.StructField("query", T.StringType(), True),
T.StructField("qfreq", T.StringType(), True),
T.StructField("date", T.StringType(), True),
])
#读取文本为DataFrame对象
midlog_data = spark.read.csv(text_path_str, sep='\001', schema=midlog_schema, header=None, inferSchema=False, mode='FAILFAST')
#udf(函数,返回类型)
disp_info_udf = F.udf(get_disp_info, T.ArrayType(T.StringType(), True))
- 调用udf
# 由于spark内部不支持一次性传入多个参数,使用struct 可以传入多个参数
data.withColumn('disp_info', F.lit(disp_info_udf(F.struct('disp_result', 'event_day'))))
- udf 定义
def get_disp_info(self, disp_args):
  #F.struct在函数中是元组,根据元组方式获取对应的参数
disp_result = disp_args[0]
day = disp_args[1]
disp_info = []
if disp_result is None:
    return disp_info
#处理其他步骤

# pyspark 相关的库
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import SQLContext

reload(sys)
sys.setdefaultencoding('utf-8')
job_name = "%s_monitor_data_%s" % (user_name, day)
#启动spark任务,可在该语句增加spark任务配置(executor memory,executor个数等)
#具体配置参数可查找spark文档
spark = SparkSession.builder.appName(job_name).getOrCreate()
#定义sql语句
#读取文件有两种方式:sql读表;读取文本(见1.2)
sql_str = "select event_day, search_id, " \
"disp_result " \
"from data_table " \
"where event_day = %s " \
"and is_spam != '1' " \
"and page_no = '1' "  % day
#执行sql语句
data = spark.sql(sql_str).cache()
#定义udf
disp_info_udf = F.udf(get_disp_info, T.ArrayType(T.StringType(), True))
#调用udf
data = data.withColumn('disp_info', F.lit(disp_info_udf(F.struct('disp_result', 'event_day'))))
data\
    .groupBy(['event_day'])\
    .agg(
        F.countDistinct('search_id').alias('pv')
    ).coalesce(1)\
    .write.csv("/user/%s/tmp_table/search_pv/%s' % (user_name, day), sep='\t', mode='overwrite')
上一篇下一篇

猜你喜欢

热点阅读