数据仓库

销售数据可视化报表

2020-09-19  本文已影响0人  SongSir1

一、项目介绍:

Adventure Works Cycles是Adventure Works样本数据库所虚构的一家大型跨国制造公司,该公司生产和销售自行车到全球各个市场。

二、项目目的:

三、项目过程

1.MySQL数据源观察

了解到数据库主要有3张表,分别如下:
ods_sales_orders:订单明细表

ods_customers:用户信息表

dim_date_df:日期维度表

2.数据指标构建

构建销量、销售额、达标率、环比、同比等指标,分别从当日、昨日、当月、当季、当年日期维度进行分析;


3.数据表构建

(1)dw_order_info:销售详情表

def read_date(adventure_conn_read):
    try:
        # 订单明细表
        sql_1 = "SELECT * from ods_sales_orders"
        ods_sales_orders = pd.read_sql(sql_1,adventure_conn_read)
        # 客户信息表
        sql_2 = "SELECT customer_key,gender,chinese_territory,chinese_province,chinese_city from ods_customer"
        ods_customer = pd.read_sql(sql_2,adventure_conn_read).drop_duplicates("customer_key")
        # 日期维度表
        sql_3 = "SELECT create_date, is_today, is_yesterday, is_21_day, is_current_month, is_current_quarter, is_current_year, is_last_year from dim_date_df"
        dim_date_df = pd.read_sql(sql_3,adventure_conn_read)
        return ods_sales_orders, ods_customer, dim_date_df
    except Exception as e:
        logger.info("read_date报错信息:{}".format(e))

# 合并订单明细表和客户信息表
def merge1(ods_sales_orders, ods_customer):
    try:
        df1 = pd.merge(ods_sales_orders, ods_customer, on = "customer_key")
        return df1
    except Exception as e:
        logger.info("merge1报错信息:{}".format(e))

# 合并df1和日期维度表
def merge2(df1, dim_date_df):
    try:
        df1["create_date"] = pd.to_datetime(df1["create_date"])
        dim_date_df["create_date"] = pd.to_datetime(dim_date_df["create_date"])

        dw_orders_info = pd.merge(df1, dim_date_df, on = "create_date")
        dw_orders_info["create_date"] = dw_orders_info["create_date"].apply(lambda x:x.strftime("%Y-%m-%d"))
        return dw_orders_info
    except Exception as e:
        logger.info("merge2报错信息:{}".format(e))

# 保存至sql
def save_tosql(dw_orders_info,adventure_conn_tosql):
    try:
        dw_orders_info.to_sql("dw_orders_info_{}".format(my_name),con =adventure_conn_tosql, if_exists = 'replace', index=False)
    except Exception as e:
        logger.info("save_tosql报错信息{}".format(e))

(2)dw_order_by_day:每日聚合表

def sum_amount_order(adventure_conn_read):
    """sum_amount_order:销量订单聚合表
        具体:读取ods_sales_orders(订单明细表),根据create_date聚合,求总销售额、客户数量、客单价"""
    try:
        sum_amount_order = pd.read_sql_query("select * from ods_sales_orders",
                                             con=adventure_conn_read)       # 读取ods_sales_orders表数据
        # 聚合
        sum_amount_order = sum_amount_order.groupby("create_date").agg({'customer_key':'count','unit_price':'sum'}).reset_index()
        # 计算客单价
        sum_amount_order['amount_div_order'] = sum_amount_order['unit_price']/sum_amount_order['customer_key']
        # 修改列名
        sum_amount_order.rename(columns = {'customer_key':'order_counts',
                                           "unit_price":"sum_amount"},inplace = True)
        return sum_amount_order
    # 如果错误,则填写错误日志
    except Exception as e:
        logger.info("sum_amount_order异常,报错信息:{}".format(e))

# 生成目标销售额和销量
def add_order_goal(sum_amount_order):
    """add_order_goal:生成目标金额及目标销量
    具体:利用空列表及循环生成对应随机值,与销量订单聚合表合并形成sum_amount_order_goal(销量订单聚合目标表)"""
    try:
        # 定义两个列表,一个为每日销售目标、一个为每日销量目标
        sum_amount_goal_list = []
        # 创建一个日期列表
        create_date_list = list(sum_amount_order['create_date'])        # 获取sum_amount_order中的create_date
        # 根据日期遍历每日销售额、销量目标
        for i in create_date_list:
            a = random.uniform(0.85,1.1)
            amount_goal = list(sum_amount_order[sum_amount_order['create_date']==i]['sum_amount'] * a)[0]
            sum_amount_goal_list.append(amount_goal)
        # 组合每日销量表和每日目标
        sum_amount_order_goal = pd.concat([sum_amount_order,pd.DataFrame({"sum_amount_goal":sum_amount_goal_list})],axis = 1)
        sum_amount_order_goal["compliance rate"] = sum_amount_order_goal["sum_amount"] / sum_amount_order_goal["sum_amount_goal"]
        return sum_amount_order_goal

    except Exception as e:
        logger.info("add_order_goal异常,报错信息:{}".format(e))

# 读取日期维度表
def date_data(adventure_conn_read):
    """数据库读取dim_date_df日期维度表"""
    try:
        date_sql = "SELECT create_date,is_today,is_yesterday,is_21_day,is_current_month,is_current_quarter,is_current_year,is_last_year FROM dim_date_df"
        date_info = pd.read_sql_query(date_sql, con=adventure_conn_read)
        return date_info

    except Exception as e:
        logger.info("date_data异常,报错信息:{}".format(e))

# 融合销量订单聚合目标表和日期维度表
def merge_data(sum_amount_order_goal, date_info):
    """参数解释:sum_amount_order_goal销量订单聚合目标表,
                date_info日期维度表(来自date_data函数)
        输出:amount_order_by_day销量订单聚合目标及日期维度表
    """
    try:
        # 将create_date设置为标准日期格式
        date_info['create_date'] = pd.to_datetime(date_info['create_date'],format = "%Y-%m-%d")
        sum_amount_order_goal['create_date'] = pd.to_datetime(sum_amount_order_goal['create_date'],format = "%Y-%m-%d")
        # 融合两张表
        amount_order_by_day = pd.merge(sum_amount_order_goal,date_info,on = "create_date",how = "inner")
        amount_order_by_day["create_date"] = amount_order_by_day["create_date"].apply(lambda x:x.strftime("%Y-%m-%d"))
        return amount_order_by_day
    except Exception as e:
        logger.info("merge_data异常,报错信息:{}".format(e))

# 储存数据表
def save_to_mysql(amount_order_by_day, adventure_conn_tosql):
    """将amount_order_by_day数据追加到数据库dw_order_by_day(每日环比表)当中"""
    try:
        # 每日销量、销售额环比情况
        amount_order_by_day['order_diff'] = amount_order_by_day['order_counts'].pct_change().fillna(0)
        amount_order_by_day['amount_diff']= amount_order_by_day['sum_amount'].pct_change().fillna(0)

        # 追加至数据库
        amount_order_by_day.to_sql('dw_order_by_day_{}'.format(my_name),con = adventure_conn_tosql,
                                   if_exists = 'replace',index = False)
    except Exception as e:
        logger.info("save_to_mysql异常,报错信息:{}".format(e))

(3)dw_amount_diff:数据同比表

def diff(stage, indictor):
    try:

        '''stage:日期维度的判断,如:is_today 内有[0,1]
            indictor:需取值字段,如:sum_amount(总金额),sum_order(总订单量)
            输出:当前时间维度下总和,去年同期总和'''
        # 当前stage维度,indictor之和
        current_stage_indictor = dw_order_by_day[dw_order_by_day[stage]==1][indictor].sum()
        # 前一年日期维度列表
        before_stage_list = list(dw_order_by_day[dw_order_by_day[stage]==1]['create_date'] - datetime.timedelta(366))
        # 前一年stage维度,indictor之和
        before_stage_indictor = dw_order_by_day[dw_order_by_day['create_date'].isin(before_stage_list)][indictor].sum()

        return current_stage_indictor, before_stage_indictor
    except Exception as e:
        logger.info("diff异常,报错信息:{}".format(e))


def delete_table():
    try:
        pd.read_sql_query("Truncate table dw_amount_diff_{}".format(my_name), con=adventure_conn_tosql)

    except:  # 因为删除插入更新操作没有返回值,程序会抛出ResourceClosedError,并终止程序。使用try捕捉此异常。
        print('继续')
        logger.info("继续运行")


if __name__ == "__main__":
    '''目的:生成dw_amount_diff当日维度表(按当天/昨天/当月/当季/当年的同比)'''
    """各阶段的金额"""
    today_amount, before_year_today_amount = diff('is_today', 'sum_amount')
    yesterday_amount, before_year_yesterday_amount = diff('is_yesterday', 'sum_amount')
    month_amount, before_year_month_amount = diff('is_current_month', 'sum_amount')
    quarter_amount, before_year_quarter_amount = diff('is_current_quarter', 'sum_amount')
    year_amount, before_year_year_amount = diff('is_current_year', 'sum_amount')

    """各阶段的订单数"""
    today_order, before_year_today_order = diff('is_today', 'order_counts')
    yesterday_order, before_year_yesterday_order = diff('is_yesterday', 'order_counts')
    month_order, before_year_month_order = diff('is_current_month', 'order_counts')
    quarter_order, before_year_quarter_order = diff('is_current_quarter', 'order_counts')
    year_order, before_year_year_order = diff('is_current_year', 'order_counts')

    # 生成当天/昨天/本月/本季度/今年日期维度的同比指标    (今日数据/去年今日数据 - 1)
    try:

        amount_dic = {'today_diff': [today_amount / before_year_today_amount - 1,
                                     today_order / before_year_today_order - 1,
                                     (today_amount / today_order) / (before_year_today_amount /
                                                                     before_year_today_order) - 1],
                      'yesterday_diff': [yesterday_amount / before_year_yesterday_amount - 1,
                                         yesterday_order / before_year_yesterday_order - 1,
                                         (yesterday_amount / yesterday_order) / (before_year_yesterday_amount /
                                                                                 before_year_yesterday_order) - 1],
                      'month_diff': [month_amount / before_year_month_amount - 1,
                                     month_amount / before_year_month_amount - 1,
                                     (month_amount / month_order) / (before_year_month_amount /
                                                                     before_year_month_order) - 1],
                      'quarter_diff': [quarter_amount / before_year_quarter_amount - 1,
                                       quarter_amount / before_year_quarter_amount - 1,
                                       (quarter_amount / quarter_order) / (before_year_quarter_amount /
                                                                           before_year_quarter_order) - 1],
                      'year_diff': [year_amount / before_year_year_amount - 1,
                                    year_amount / before_year_year_amount - 1,
                                    (year_amount / year_order) / (before_year_year_amount /
                                                                  before_year_year_order) - 1],
                      'flag': ['amount', 'order', 'avg']}  # 做符号简称,横向提取数据方便;销售额、销量、客单价

        amount_diff = pd.DataFrame(amount_dic)
        amount_diff["create_date"] = today_date.strftime("%Y-%m-%d")

4. linux服务器定时部署

(1)创建定时任务脚本

    os.system("/home/anaconda3/bin/python3 /home/frog005/adventure_yxs/dw_amount_diff.py >> /home/frog005/adventure/yxs_logs/dw_amount_diff_schedule.log 2>&1 &")

schedule定时任务代码,分别将三张表的自动更新时间设置为6:20,7:00,7:10

schedule.every().day.at('06:20').do(job0)
schedule.every().day.at('07:00').do(job1)
schedule.every().day.at('07:10').do(job2)

(2)linux服务器部署脚本

nohup python schedule_job_yxs.py > schedule_test.log 2>&1 &

四、连接PowerBI部署展示

完成前面的步骤后,将powerbi与数据库连接,即可将每日数据进行展示。

在PowerBI中,一共有三页,分别为销售数据(可指定维度),销售趋势、总体销售情况
(1)销售数据页面

(2)销售趋势页面

(3)总体销售情况页面
从省份、区域角度可以查看总体销售情况以及客单价情况

可视化地址:点击

上一篇下一篇

猜你喜欢

热点阅读