java实现股市行情实时推送
所使用的技术
springcloud,redis,rocketMq,websocket,mysql
架构图
行情推送架构图.png整体流程说明:交易服务交易产生的行情,存放到redis的队列内,然后行情服务会监听这个队列,获取这个队列里面的行情节点,经过数据处理,如,生成k线数据,然后存入redis内,并且使用mq异步持久化行情数据,同时,将行情数据发布给网关,网关订阅到行情数据后,使用websocket实时推送给前端用户。
问题
1.交易产生的行情数据是怎么样的?
这里要说一下,国际行情不是我们去产生的,而是要去对接第三方(比如对接OTC),拿到行情,然后我们再对国际行情进行处理推送展示。而本文的行情是我们自己系统自己交易产生的,所以会有一个交易服务,给用户进行交易,然后产生行情。
产生的行情数据其实很简单,大部分都包含:成交价格,成交量,成交时间这三个字段,因为是自己产生的行情,所以可以把成交的订单号也一并放到队列里,方便查找行情的出处。
2.什么是分时线,什么是K线?
分时线:每分钟的最后一笔成交价的连线叫分时线。分时线即大盘、个股分时走势图中的白色曲线,它反映的是大盘、个股的实时走势。像下图,横坐标的时间,纵坐标是成交价格,那么每一分钟的最后一笔成交价就的连线就是分时线了,分时线是实时变动的,这里就要使用ws推送行情了。那么这里有几个问题,就是怎么获取分时线数据?分时线数据如何存储?你想,要产生和存储分时图的数据,我们是不是得将每一分钟的最后一口价进行存储,并且存储时得注明这个价格是哪一分钟的价格,这个要如何实现?问题会在后面讲到解决办法。
K线图:股市及期货市场中的K线图的画法包含四个数据,即开盘价、最高价、最低价、收盘价,所有的k线都是围绕这四个数据展开,反映大势的状况和价格信息。如果把每日的K线图放在一张纸上,就能得到日K线图,同样也可画出周K线图、月K线图。
从图片上来看,其实不止开盘价、最高价、最低价、收盘价这个数据,还有成交量和成交额。
所以,包括的数据就有:开盘价、最高价、最低价、收盘价,成交量、成交额
比如今天是2020-03-01,那么今天就会产生一个日K数据,数据包含:日期(精确到日,即2020-03-01),开盘价(今天的开始价格)、最高价(今天的最高价格)、最低价(今天的最低价格)、收盘价(今天的最后一口价),成交量(今天的总成交量),成交额(今天的总成交额)同理,如果是周K,则就是以周为单位:日期(这周的结束日期,即这周的最后一天的日期,如今天周日,则这周最后一天就是今天2020-03-01),开盘价(这周的开始价格)、最高价(这周的最高价格)、最低价(这周的最低价格)、收盘价(这周的最后一口价)成交量(这周的总成交量),成交额(这周的总成交额)。
当然,这里的日期怎么显示,要看公司具体需求,比如周K的日期,有些公司是拿这周的开始日期,有些公司是拿这周的结束日期。
3.为什么要将行情保存到redis的队列里,然后行情服务去主动获取队列里的行情?而不直接使用redis的发布订阅,将行情发布给行情服务;或者使用rocketMq,将行情异步发送给行情服务?
可能有人会说,可以直接使用redis的发布订阅来将行情发布给行情服务,所以就没必要将行情存放到redis的一个队列里,然后行情服务再主动去获取行情数据。理论上,这样也是可以达到效果的,但是为什么不这样做?主要就是因为redis的发布订阅,是消息不可靠的,也就是说这个消息很可能丢失,假如行情服务全部挂掉了,但是交易服务还是正常的发布消息,那么,这个时候发布的消息将全部丢失。所以才需要将行情数据先保存到一个队列内,然后等待行情服务主动去获取,这样,即使行情服务挂了,行情数据也不会丢失,等到行情服务正常后,行情服务会主动去获取队列里的行情数据,一个一个的处理。
那么,为什么不直接使用rocketMq将行情发送给行情服务呢?其中最大的原因就是消息顺序消费的问题,行情需要保证一个正确的顺序输出的,比如我这一秒产生的行情,不能够比上一秒产生的行情要晚推送给前端,是需要保证顺序的,而rocketMq是很难保证顺序消费的,所以也不会使用rocketmq来实现这一点。
4.websocket是什么?为什么使用它做行情的实时推送?
可能有人会想,要保证页面实时的更新获取最新价格,可以使用http的长轮询,也就是不断的去轮询请求接口获取相应的数据来保证实时性,但是这样是很耗资源和性能的,因此不采用这种方式,而是使用websocket。
从上图可以看出,使用ajax轮询实现实时推送,需要每次都发送一次http请求,然后等待服务器响应。而使用websocket只需要在第一次的时候发送http请求,客户端与服务器建立连接之后,后面客户端就不再需要每次都发送请求,然后等待访问了,服务端会主动发送数据给客户端。
这就像我要去山的另外一边看海,如果使用ajax轮询这种方式,我每次去山的那边都需要在山上挖一条隧道,然后走过去看海,但是使用websocket方式,则只需要第一次的时候,将隧道挖好,以后想去看海都不用再挖隧道了,直接就可以过去看海。当然,关于websocket的原理,大家可以去搜索上网了解,这里只是简单的说下。
代码实现
最上面的架构图就是实现的一个思路,我们需要将行情基础数据保存到redis的一个队列里,然后行情服务去主动获取队列内的数据,行情服务获取到数据后,就将行情发布给网关,然后由网关使用websocket将数据推送给前端用户,同时,需要将行情数据处理成k线数据,并保存到redis里以及对行情数据进行持久化。
那么我们只要围绕着这个思路,去实现就可以了:
1.将行情保存到redis队列内
2.行情服务获取队列数据
3.行情服务发布行情消息
4.行情服务处理行情数据,生成K线数据,并保存到redis
5.行情服务队行情数据进行持久化
6.网关订阅行情,并使用websocet将行情推送给前端
1.将行情保存到redis队列内(略)
2.行情服务获取队列数据(分布式锁保证顺序),发布行情消息,生成K线数据并保存到redis,行情数据持久化
/**
* 系统初始化类
*
* @author Longer
*
*/
@Component
public class SystemInitBean implements InitializingBean {
static Logger logger = LoggerFactory.getLogger(SystemInitBean.class);
@Autowired
private IKLineService kLineService;
@Autowired
private TradeTimeService tradeTimeService;
@Autowired
private RedisUtil redisUtil;
@Autowired
private QuotePeriodUtil quotePeriodUtil;
@Autowired
private RocketMQUtil rocketMQUtil;
@Value("${rocketmq.quotepersistenece.topics}")
private String persistenceTopics;
@Value("${product.list}")
private String productList;//商品列表
public void afterPropertiesSet() throws Exception {
logger.info("-------------开始启动初始服务--------------");
String[] productArr = productList.split(",");
for (String productTradeNo : productArr) {
new QuoteListListenerThread(kLineService,tradeTimeService,redisUtil,quotePeriodUtil
,rocketMQUtil,persistenceTopics,productTradeNo).start();
}
logger.info("---------启动初始服务完毕-------------------");
}
}
/**
* @Classname QuoteListListenerThread
* @Description 行情队列监听线程
* @Date 2019/12/23 15:43
* @Created by Longer
*/
@Slf4j
public class QuoteListListenerThread extends Thread{
public QuoteListListenerThread(){}
public QuoteListListenerThread(IKLineService kLineService, TradeTimeService tradeTimeService,
RedisUtil redisUtil, QuotePeriodUtil quotePeriodUtil,
RocketMQUtil rocketMQUtil, String persistenceTopics, String productTradeNo){
this.kLineService=kLineService;
this.tradeTimeService=tradeTimeService;
this.redisUtil=redisUtil;
this.quotePeriodUtil=quotePeriodUtil;
this.rocketMQUtil=rocketMQUtil;
this.persistenceTopics=persistenceTopics;
this.productTradeNo=productTradeNo;
}
private IKLineService kLineService;
private TradeTimeService tradeTimeService;
private RedisUtil redisUtil;
private QuotePeriodUtil quotePeriodUtil;
private RocketMQUtil rocketMQUtil;
private String persistenceTopics;
private String productTradeNo;
@Override
public void run() {
while(true){
String lockValue = UUID.randomUUID().toString();
try{
//加锁(分布式锁),不加锁的话,就无法保证顺序消费,因为有多个节点
boolean lock = redisUtil.getLock(MessageFormat.format(RedisConstant.QUOTESERVICE_KLINE_LOCK_KEY,productTradeNo)
, lockValue, 1);
if(lock){
Object realMsg = redisUtil.rightPop(MessageFormat.format(RedisConstant.QUOTESERVICE_QUOTE_LIST,productTradeNo));
if(!StringUtils.isEmpty(realMsg)){
log.info("QuoteListListenerThread监听到队列消息:{}",realMsg);
String s = realMsg.toString();
QuoteDto quoteDto = JSONObject.parseObject(s, QuoteDto.class);
//step1.发布行情消息给网关
//step2.行情数据处理(K线)和存储到redis
//step3.行情数据异步持久化
}
}else{
log.info("QuoteListListenerThread:获取锁失败");
Thread.sleep(1 * 1000);
}
}catch (Exception e){
log.info("QuoteListListenerThread报错:{}",e);
try {
Thread.sleep(1 * 1000);
} catch (InterruptedException e1) {
log.info("QuoteListListenerThread睡眠报错:{}",e1);
}
}finally {
try {
//释放锁
redisUtil.releaseLock(MessageFormat.format(RedisConstant.QUOTESERVICE_KLINE_LOCK_KEY,productTradeNo),lockValue);
}catch (Exception e){
log.info("释放锁报错:{}",e);
}
}
}
}
}
说明:不同的商品会对应不同的行情,所以也会将各自的行情存放到不同的队列。这里区分商品的标识是productTradeNo 字段(商品交易编码),在系统启动的时候,会针对每一个商品,都开启一个线程,进行处理商品的行情。
由于行情服务是多节点,所以这里需要使用分布式锁,来保证每次只有一个节点获取到行情数据,从而保证行情的顺序消费。其实如果要百分百保证顺序消费的话,最好是行情服务只部署一个节点,但是这样就不能保证行情服务的高可用了。
行情服务处理行情数据,生成K线数据,并保存到redis
刚刚在上面提过,分时线就是每一分钟最后一口价的连线,而K线包含的数据有:“开盘价、最高价、最低价、收盘价,成交量、成交额”,那么要获取分时线数据,则只需要获取1分钟k线数据就好了,也就是以每分钟为单位的K线数据。因为1分钟K线的收盘价就是每分钟的最后一口价。
那现在的问题就是怎么样将基本行情数据(成交价格,成交量,成交时间),加工处理成K线数据,并且保存到redis内。
这里就要使用到一个Java类CronSequenceGenerator
二话不说,直接上代码。
/**
* @Classname PeriodType
* @Description TODO
* @Date 2019/5/30 16:57
* @Created by Longer
*/
public enum PeriodType {
/** 分时 */
ONE_MINUTE("1"),
/** 5分钟 */
FIVE_MINUTE("5"),
/** 15分钟 */
FIFTEEN_MINUTE("15"),
/** 30分钟 */
THIRTY_MINUTE("30"),
/** 60分钟 */
SIXTY_MINUTE("60"),
/** 天 */
DAY("day"),
/** 4小时*/
FOUR_HOUR("4"),
/** 周 */
WEEK("week"),
/** 月 */
MONTH("month");
private String index;
// 构造方法
private PeriodType(String index) {
this.index = index;
}
public String getIndex() {
return index;
}
}
/**
* @Classname QuotePeriodUtil
* @Description TODO
* @Date 2019/5/31 14:46
* @Created by Longer
*/
@Component
@Slf4j
public class QuotePeriodUtil {
private static CronSequenceGenerator oneMinuteTrigger = new CronSequenceGenerator("0 0/1 * * * ? ");
private static CronSequenceGenerator fiveMinuteTrigger = new CronSequenceGenerator("0 0/5 * * * ? ");
private static CronSequenceGenerator fifteenMinuteTrigger = new CronSequenceGenerator("0 0/15 * * * ? ");
private static CronSequenceGenerator thirtyMinuteTrigger = new CronSequenceGenerator("0 0/30 * * * ? ");
private static CronSequenceGenerator fourHourTrigger = new CronSequenceGenerator("0 0 0/4 * * ? ");
private static CronSequenceGenerator sixtyMinuteTrigger = new CronSequenceGenerator("0 0/60 * * * ? ");
// 每天上午0:00触发
private static CronSequenceGenerator dayTrigger = new CronSequenceGenerator("0 0 0 * * ?");
// 每个星期一上午0点触发
private static CronSequenceGenerator weekTrigger = new CronSequenceGenerator("0 0 0 ? * MON");
// 表示在每月的1日的上午0点触发
private static CronSequenceGenerator monthTrigger = new CronSequenceGenerator("0 0 0 1 * ?");
/*
@Autowired
private ParameterCache parameterCache;*/
public QuotePeriod getPeriod( PeriodType type, Date dateTime) {
QuotePeriod period = null;
Date nextExecutionTime = null;
switch (type) {
case ONE_MINUTE:
nextExecutionTime = oneMinuteTrigger.next(dateTime);
period = new QuotePeriod(nextExecutionTime.getTime() - 1 * 60 * 1000, nextExecutionTime.getTime(), type);
break;
case FIVE_MINUTE:
nextExecutionTime = fiveMinuteTrigger.next(dateTime);
period = new QuotePeriod(nextExecutionTime.getTime() - 5 * 60 * 1000, nextExecutionTime.getTime(), type);
break;
case FIFTEEN_MINUTE:
nextExecutionTime = fifteenMinuteTrigger.next(dateTime);
period = new QuotePeriod(nextExecutionTime.getTime() - 15 * 60 * 1000, nextExecutionTime.getTime(), type);
break;
case THIRTY_MINUTE:
nextExecutionTime = thirtyMinuteTrigger.next(dateTime);
period = new QuotePeriod(nextExecutionTime.getTime() - 30 * 60 * 1000, nextExecutionTime.getTime(), type);
break;
case SIXTY_MINUTE:
nextExecutionTime = sixtyMinuteTrigger.next(dateTime);
period = new QuotePeriod(nextExecutionTime.getTime() - 60 * 60 * 1000, nextExecutionTime.getTime(), type);
break;
case FOUR_HOUR:
nextExecutionTime = fourHourTrigger.next(dateTime);
period = new QuotePeriod(nextExecutionTime.getTime() - 4 * 60 * 60 * 1000, nextExecutionTime.getTime(), type);
break;
case DAY:
nextExecutionTime = dayTrigger.next(dateTime);
period = new QuotePeriod(nextExecutionTime.getTime() - 24 * 60 * 60 * 1000, nextExecutionTime.getTime(), type);
break;
case WEEK:
nextExecutionTime = weekTrigger.next(dateTime);
period = new QuotePeriod(nextExecutionTime.getTime() - 7 * 24 * 60 * 60 * 1000, nextExecutionTime.getTime(), type);
break;
case MONTH:
nextExecutionTime = monthTrigger.next(dateTime);
// 获取当月1号上午6点
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date(nextExecutionTime.getTime()));
calendar.add(Calendar.MONTH, -1);// 取上个月
calendar.set(Calendar.DAY_OF_MONTH, 1);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
period = new QuotePeriod(calendar.getTimeInMillis(), nextExecutionTime.getTime(), type);
break;
default:
nextExecutionTime = thirtyMinuteTrigger.next(dateTime);
period = new QuotePeriod(nextExecutionTime.getTime() - 30 * 60 * 1000, nextExecutionTime.getTime(), PeriodType.THIRTY_MINUTE);
}
return period;
}
/**
* 获取当前天的周期前的第 n 个天周期的日期时间
* @param type
* @param n
* @return
*/
public QuotePeriod getBeforePeriod( PeriodType type, int n) {
Date nextExecutionTime = dayTrigger.next(new Date());
QuotePeriod period = new QuotePeriod(nextExecutionTime.getTime() - 24 * 60 * 60 * 1000 * n, nextExecutionTime.getTime(), type);
return period;
}
使用上面我写好的类,就可以知道当前时间是属于哪一分钟,或者是属于哪个周,哪个月的。大家可以直接拷贝上面两个类,然后自行进行调试。
public static void main(String[] args) {
QuotePeriodUtil quotePeriodUtil = new QuotePeriodUtil();
QuotePeriod period = quotePeriodUtil.getPeriod(PeriodType.WEEK, new Date());
System.out.println("这周的开始日期"+period.getStartTime()+"这周的结束日期"+period.getEndTime());
}
3.网关订阅行情,并使用websocet将行情推送给前端(略)
因为本文的重点是整个行情推送的设计实录,并且对一些难点进行说明,而ws推送的代码网上是很多的,大家可以自行上网查找,这里就不贴出来了。
总结
java实现股市行情实时推送,最上面的架构图就是实现的一个思路,我们需要将行情基础数据保存到redis的一个队列里,然后行情服务去主动获取队列内的数据,行情服务获取到数据后,就将行情发布给网关,然后由网关使用websocket将数据推送给前端用户,同时,需要将行情数据处理成k线数据,并保存到redis里以及对行情数据进行持久化。
那么我们只要围绕着这个思路,去实现就可以了:
1.将行情保存到redis队列内
2.行情服务获取队列数据
3.行情服务发布行情消息
4.行情服务处理行情数据,生成K线数据,并保存到redis
5.数据持久化