golang

golang 创建redis队列/批量处理队列日志插入数据表

2020-07-17  本文已影响0人  顶尖少爷

用户登录后 日志记录先存储到redis列表 。 使用定时任务 将redis中到登录日志批量插入日志表

.env文件

#Redis配置
REDIS_HOST=127.0.0.1
REDIS_PASSWORD=
REDIS_PORT=6379

#用户登录日志队列任务名称
QueueName=userLoginRecord

#定时任务开关
OPEN_TASK=open

redis链接

    //redis配置
    PREDIS = redis.NewClient(&redis.Options{
        Addr:     os.Getenv("REDIS_HOST")+":"+os.Getenv("REDIS_PORT"),
        Password: os.Getenv("REDIS_PASSWORD"), // no password set
        DB:       0,  // use default DB
    })

    _, err = PREDIS.Ping().Result()
    if err!=nil{
        log.PError("connection redis fail "+err.Error())
    }

创建队列任务

/** 
* @Title 插入队列任务
* @Description:  
* @Param:  
* @return:  
* @Author: liwei
* @Date: 2020/7/17 
**/
func AddJobToRedisLPush(queueName string, data map[string]interface{}){
    strJson,_:=json.Marshal(data)
    conf.PREDIS.LPush(queueName, strJson)
}

使用定时任务 main.go

    //设置定时任务
    //插入登录日志
    if os.Getenv("OPEN_TASK")=="open"{
        toolbox.AddTask("user login record ",console.UserLoginRecord)
        toolbox.StartTask()
        defer toolbox.StopTask()
    }

定义定时任务

var UserLoginRecord *toolbox.Task

func init(){
    UserLoginRecord = toolbox.NewTask("user login record","* * * * * *", command.UserLoginRecordFromRedisCommand)
}

批量处理

/**
* @Title UserLoginRecordFromRedisCommand
* @Description:  想要实现批量插入数据 gorm本身是不支持的。 需要我们自己写sql来进行执行
* @Param:
* @return:
* @Author: liwei
* @Date: 2020/7/17
**/
func UserLoginRecordFromRedisCommand() error{

    queueName:=os.Getenv("QueueName")
    var model userLoginRecord.UserLoginRecord
    tableName:= model.TableCheck()
    rLen :=redis.GetRedisLLen(queueName)

    page :=math.Ceil(float64(rLen/200))
    for i:=0; i<=  int(page);i++{
        var buffer bytes.Buffer
        sql:="insert into `"+tableName+"` (`game_id`,`game_name`,`server_id`,`game_player_name`,`server_name`,`user_id`," +
            "`user_account`,`promote_id`,`user_nickname`,`login_time`,`down_time`,`login_ip`,`type`,`sdk_version`,`status`,`lpuid`,`old_promote_id`,`old_promote_account`,`device_code`,`phone_model`,`third_login_type`) values"
        buffer.WriteString(sql)
        execStatus:=false
        //拼接sql
        for j:=0; j<=200;j++{
            row,_ :=redis.GetJobFromRedisLPop(queueName)
            if len(row)==0 {
                continue
            }
            buffer.WriteString(fmt.Sprintf("(%v,'%s',%v,'%s','%s',%v,'%s',%v,'%s',%v,%v,'%s',%v,%v,%v,%v,%v,'%s','%s','%s',%v),",row["GameId"],row["GameName"],row["ServerId"],row["GamePlayerName"],row["ServerName"],row["UserId"],
                row["UserAccount"],row["PromoteId"],row["UserNickname"],row["LoginTime"],row["DownTime"],row["LoginIp"],
                row["Ttype"],row["SdkVersion"],row["Status"],row["Lpuid"],row["OldPromoteId"],row["OldPromoteAccount"],
                row["DeviceCode"],row["PhoneModel"],row["ThirdLoginType"]))

            execStatus=true
        }
        if execStatus{
            runStr :=buffer.String()
            //删除最后一个字符串  或者替换为; 符合sql语言
            runSql := runStr[0 :len(runStr)-1]
            err:=conf.SERVER_DB.Exec(runSql).Error
            if err!=nil{
                log.PError("插入登录日志失败")
            }
        }

    }

    //var model userLoginRecord.UserLoginRecord


    return nil
}

上一篇下一篇

猜你喜欢

热点阅读