Golang使用redis阻塞读brpop实现即时响应并发执行

2017-06-20  本文已影响0人  飞天神猫

实现

主要利用redis的brpop阻塞读和Golang的goroutine并发控制以及os/exec执行程序,实现队列有数据就立即执行对应程序并把结果set任务key。

运行参数

设置brpop的超时(-t)和同步调度时返回的结果ttl(-e)
./dispatchdeploy 
Usage:  -h 192.168.6.151 -p 6388 -t 300 -a /path/testfile.pl -e 1800
  -a string
        start appname (default "/path/testfile.pl")
  -e int
        redis expire time sec (default 1800)
  -h string
        redis ip
  -p int
        redis port (default 6379)
  -t int
        redis brpop timeout (default 300)

静态数据

const (
    maxthread        = 2     //最大并发协程数
    queueName        = "qn_kt"  //阻塞读队列
    result_queueName = "rt_kt"  //同步返回结果的key前缀
    token            = "##"    //执行调度参数的指定分隔符
    sync_flag        = "1"   
)

关键代码

//阻塞读,当有数据分割参数,使用channel控制并发协程数,在execCmd的cmd.wait正常后释放channel
for {
        content, _ := redisdb.brpop(queueName, *timeout)
        if content != nil {
            args := strings.Split(string(content[1]), token)
            if len(args) != 4 {
                log.Printf("%v lack of para length %s\n", args, len(args))
            } else {
                //控制并发数
                sync_num <- 1
                go execCmd(*appname, args, redisdb)
                log.Printf("%s %v Go\n", *appname, args)
            }
        } else {
            log.Printf("timeout %d get nil contenet , just go on", *timeout)
        }
    }

测试

lpush三个调度到队列
127.0.0.1:6888> lpush qn_kt "6234##ZYYC0001##20170620140000##0" "5234##ZYYC0001##20170620140000##1" "7234##ZYYC0001##20170620140000##1" 
(integer) 3

//控制并发数为2,立即调度执行了两个perl程序,等到返回结果执行第三个
2017/06/20 16:45:21 Start listen qn_kt
2017/06/20 16:45:25 testfile.pl [6234 ZYYC0001 20170620140000 0] Go
2017/06/20 16:45:25 testfile.pl [5234 ZYYC0001 20170620140000 1] Go
2017/06/20 16:45:30 testfile.pl [6234 ZYYC0001 20170620140000 0] finish
2017/06/20 16:45:30 testfile.pl [7234 ZYYC0001 20170620140000 1] Go
2017/06/20 16:45:30 testfile.pl [5234 ZYYC0001 20170620140000 S] finish
2017/06/20 16:45:35 testfile.pl [7234 ZYYC0001 20170620140000 S] finish
2017/06/20 16:45:51 timeout 20 get nil contenet , just go on
2017/06/20 16:46:12 timeout 20 get nil contenet , just go on

//同步调度任务执行完成后set对应任务号,由接口程序读取,1800秒后redis回收
127.0.0.1:6888> get rt_kt_7234_ZYYC0001
"7234##ZYYC0001##20170620140000##1"
127.0.0.1:6888> ttl rt_kt_7234_ZYYC0001
(integer) 1791
上一篇下一篇

猜你喜欢

热点阅读