python数据分析与数据化运营实战

python读取mysql数据写入ES总结

2020-12-14  本文已影响0人  王小鹏的随笔

准备工作:mysql库的安装。
python中mysql库用的是mysql-connector,安装执行如下命令:

pip install mysql-connector

第一步:连接mysql,读取数据。

通过执行sql语句,读取mysql数据。

# mysql读取数据
import pandas as pd
import datetime
def get_rawdata():
    import mysql.connector
    config = {
        'host' : '172.10.3.165',
        'user' : 'root',
        'password' : '!QAZxcvfr432',
        'port' : 3306,
        'database' : 'passenger_flow',
        'charset' : 'gb2312'
    }
    cnn = mysql.connector.connect(**config) # 建立MySQL连接
    cursor = cnn.cursor() # 获得游标
    sql = "SELECT  cast(TRAN_DATE AS DATE) as tran_date, cast(CONCAT(TRAN_TIME_MIN,'00') AS TIME) as tran_time, cast(PASSENGER_NUM AS DECIMAL (12,2)) as passenger_num,  cast(DEVICE_ID AS CHAR(8)) as device_id " \
      "FROM passenger_flow.pass_flow_info_his" # SQL语句
    raw_data = pd.read_sql(sql,cnn)
    cursor.close() # 关闭游标
    cnn.close() # 关闭连接
    return raw_data 

至此,获得mysql的原始数据raw_data 。接下来对数据进行预处理,按日期进行分组聚合,然后重命名行和列名,得到dataFrame格式的数据。

raw_data = get_rawdata()
# 按日期分组
date_flow = date_flow.rename_axis('date').reset_index(name='counts')

第二步:连接ES。
这步没有太多的可解释的地方,就是配置信息。

# 连接ES
def connect_es(es_ip, es_port):
    from elasticsearch import Elasticsearch
    es_ip = es_ip
    es_port = es_port
    es = Elasticsearch(
                [es_ip]
               #,http_auth=('elastic', 'passwd')
                ,port=es_port
        )
    return es

第三步:ES主键加密。
这步的目的是为了保持主键唯一性,防止重复写入。用的方法是md5加密。

# md5 加密
def md5(string):
    import  hashlib
    # 对要加密的字符串进行指定编码
    string = string.encode(encoding ='UTF-8')
    # md5加密
    return hashlib.md5(string).hexdigest()

第四步:写入ES
至此,一切的准备工作都做好了,数据也有了,主键加密也做了,就开始写入了。

def write_to_es():
    from elasticsearch import helpers
    # 写入es
    actions = []
    for index, row in date_flow.iterrows():
        day = datetime.datetime.strftime(row[0], '%Y-%m-%d')
        action = {
            "_index": 'pass_flow_index',
            "_id": md5(day),
            "_source": {
                "TRAN_DATE": day,
                "DATE_FLOW": row[1]
            }
        }
        actions.append(action)
    helpers.bulk(es, actions)
write_to_es()

用main方法执行以上方法:

if __name__ == "__main__":
    raw_data = get_rawdata()
    date_flow = date_flow.rename_axis('date').reset_index(name='counts')
    es = connect_es('172.10.3.10', 10200)
    write_to_es()

最后查看一下ES写的是否成功,用查询方法

# 查询es
body = {
    "query":{
        "match_all":{}
    }
}
es.search(index="pass_flow_index" ,body=body)

如果返回以下信息,说明ES里成功插入了数据。


image.png

另外,ES删除索引的操作:

# 删除索引
es.indices.delete('pass_flow_index')
上一篇 下一篇

猜你喜欢

热点阅读