web前端

rabbitmq实例:订单系统

2022-06-16  本文已影响0人  姜治宇

流量削峰

消息队列的一个主要用途是做流量削峰。
比如一台单点服务器,平时的访问量很少,但如果做秒杀活动,流量的突然剧增会让db扛不住压力而崩溃。因此可以维护一个消息队列,出现流量高峰时排队执行即可。

db

数据库用postpresql来玩,在docker安装一下。

docker run --name my-postgis -e POSTGRES_PASSWORD=test_123456 -d postgis/postgis

然后安装一个图形化界面navicat,创建几个表。

//商品表
create table products(id SERIAL,prodname text,isn text,price real,CONSTRAINT products_pkey PRIMARY KEY(id));
//订单表
create table goodsorder(id SERIAL,prodid integer,uid integer,amount integer,CONSTRAINT goodsorder_pkey PRIMARY KEY(id));
//库存表
create table stocks(id SERIAL,prodid integer,amount integer,CONSTRAINT stocks_pkey PRIMARY KEY(id));
//增加测试数据,商品
insert into products(prodname,isn,price) values('高露洁牙膏','123456789',12.50);
insert into products(prodname,isn,price) values('百草味瓜子','876554439',5.00);
//增加测试数据,库存
insert into stocks(prodid,amount) values(1,5);
insert into stocks(prodid,amount) values(2,10);

node

安装pg和amqplib两个插件。

npm install pg -S
 npm install amqplib -S

当用户下订单时,我们用消息队列作缓冲层,只需向mq写入一条数据后即可返回前端,把耗时的db操作排队执行。
代码:

import { Controller, Get, Post, Req, Res } from "@nestjs/common";
import { Pool } from 'pg';
const amqp = require('amqplib/callback_api');
@Controller("mq")
export class MqController {
    pool: Pool;
    channel;
    customer;
    constructor() {
        this.pool = new Pool({
            host: 'localhost',
            port: 5432,
            password: 'test_123456',
            user: 'postgres',
            database: 'shop'
        });
    }

    @Get('/order')
    async publishOrder(@Req() req, @Res() res) { //下订单

        let orderObj = { uid: req.query.uid, prodid: req.query.prodid, amount: req.query.amount };

        if (!this.channel) {
            this.channel = await this.createChannel('amqp://guest:guest@localhost');
            this.channel.assertExchange('shop', 'direct', { durable: false });
            //只有一个消费者
            this.customer = await this.channel.assertQueue('customer');
            this.channel.bindQueue(this.customer.queue, 'shop', 'order');
        }
        //生产者
        this.channel.publish('shop', 'order', Buffer.from(JSON.stringify(orderObj)));
        //通知消费者
        this.consumeOrder();
        //返回前端
        res.send({
            code: 0,
            status: 'success',
            msg: '下单成功'
        });
    }

    async consumeOrder() {
        //消费者,从mq拉取数据
        let orderObj: any = await this.consume();
        console.log('从mq获取的订单信息>>>', orderObj);
        if (orderObj) {
            //查询库存
            let stocksql = `select prodid,amount from stocks where prodid=${orderObj.prodid}`;
            let stockData = await this.pool.query(stocksql);
            console.log('库存数据>>>', stockData.rows[0].amount);
            //如果库存够了
            let stockCount = Number(stockData.rows[0].amount) - Number(orderObj.amount);
            if (stockCount >= 0) {
                let updatestocksql = `update stocks set amount=${stockCount} where prodid=${orderObj.prodid}`;
                let minusRes = await this.pool.query(updatestocksql);
                console.log('减掉库存>>>', minusRes);
                if (minusRes.rowCount) {
                    //生成订单
                    let ordersql = `insert into goodsorder(prodid,uid,amount) values(${orderObj.prodid},${orderObj.uid},${orderObj.amount})`;
                    let result = await this.pool.query(ordersql);
                    console.log('生成订单>>>', result);
                }

            }

        }


    }

    createChannel(url) {
        return new Promise((resolve, reject) => {
            amqp.connect(url, (error, conn) => {

                if (error) {
                    reject(error);
                }

                conn.createChannel((error, channel) => {
                    if (error) {
                        reject(error);
                    }

                    resolve(channel);
                })
            })

        });
    }
    consume() {
        return new Promise((resolve, reject) => {
            this.channel.consume(this.customer.queue, data => {
                console.log('消费订单>>>', data.content.toString());
                let orderObj: any = JSON.parse(data.content.toString());
                resolve(orderObj);

            }, { noAck: true });
        });
    }
}

代码看着有点乱,稍微优化一下。

import { Controller, Get, Post, Req, Res } from "@nestjs/common";
import { Pool } from 'pg';
const amqp = require('amqplib/callback_api');
@Controller("mq")
export class MqController {
    pool: Pool;
    channel;
    customer;
    constructor() {
        //db初始化
        this.pool = new Pool({
            host: 'localhost',
            port: 5432,
            password: 'test_123456',
            user: 'postgres',
            database: 'shop'
        });
    }
    async initMq(){
        if (!this.channel) {
            this.channel = await this.createChannel('amqp://guest:guest@localhost');
            this.channel.assertExchange('shop', 'direct', { durable: false });
            //只有一个消费者
            this.customer = await this.channel.assertQueue('customer');
            this.channel.bindQueue(this.customer.queue, 'shop', 'order');
            this.consumeOrder();
        }
    }
    //发布消息
    publishMessage(orderObj){
        this.channel.publish('shop', 'order', Buffer.from(JSON.stringify(orderObj)));
    }

    @Get('/order')
    async publishOrder(@Req() req, @Res() res) { //下订单

        let orderObj = { uid: req.query.uid, prodid: req.query.prodid, amount: req.query.amount };
        await this.initMq();
        //向mq发布一条消息
        this.publishMessage(orderObj);
        //返回前端
        res.send({
            code: 0,
            status: 'success',
            msg: '下单成功'
        });
    }

    async consumeOrder() { //有关db的操作,没有返回前端信息
        //消费者,从mq拉取数据
        let orderObj: any = await this.consume();
        console.log('从mq获取的订单信息>>>', orderObj);
        if (orderObj) {
            //查询库存
            let stocksql = `select prodid,amount from stocks where prodid=${orderObj.prodid}`;
            let stockData = await this.pool.query(stocksql);
            console.log('库存数据>>>', stockData.rows[0].amount);
            //如果库存够了
            let stockCount = Number(stockData.rows[0].amount) - Number(orderObj.amount);
            if (stockCount >= 0) {
                let updatestocksql = `update stocks set amount=${stockCount} where prodid=${orderObj.prodid}`;
                let minusRes = await this.pool.query(updatestocksql);
                console.log('减掉库存>>>', minusRes);
                if (minusRes.rowCount) {
                    //生成订单
                    let ordersql = `insert into goodsorder(prodid,uid,amount) values(${orderObj.prodid},${orderObj.uid},${orderObj.amount})`;
                    let result = await this.pool.query(ordersql);
                    console.log('生成订单>>>', result);
                }

            }

        }


    }

    createChannel(url) {
        return new Promise((resolve, reject) => {
            amqp.connect(url, (error, conn) => {

                if (error) {
                    reject(error);
                }

                conn.createChannel((error, channel) => {
                    if (error) {
                        reject(error);
                    }

                    resolve(channel);
                })
            })

        });
    }
    consume() {
        return new Promise((resolve, reject) => {
            this.channel.consume(this.customer.queue, data => {
                console.log('消费订单>>>', data.content.toString());
                let orderObj: any = JSON.parse(data.content.toString());
                resolve(orderObj);

            }, { noAck: true });
        });
    }
}
上一篇下一篇

猜你喜欢

热点阅读