rabbitmq简单使用示例

2020-04-18  本文已影响0人  一个小废材

我们使用springboot 构建示例代码。创建一个控制器接收用户请求,向rabbitmq压入数据。程序启动的时候拉起一个消费线程,消费数据。
例子使用基本的rabbitmq函数。

rabbitmq基础操作类
./java/com/amqp/demo/util/rabbitmq.java
package com.amqp.demo.util;

import com.amqp.demo.service.queueCost;
import com.rabbitmq.client.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@Component
public class rabbitmq {
    @Value("${amqp.host}")
     String host;
    @Value("${amqp.port}")
     int port;
    @Value("${amqp.user}")
     String user;
    @Value("${amqp.passwd}")
     String passwd;
    @Value("${amqp.vhost}")
    private String vhost;

    private Connection connection = null;
    private Channel amqpChanenl = null;

    private void initConnect() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        //设置参数
        factory.setHost(host);
        factory.setPort(port);
        factory.setUsername(user);
        factory.setPassword(passwd);
        factory.setVirtualHost(vhost);

        //创建连接,tcp连接可以在channel之间复用,所以不需要每次都创建
        if (null == connection) {
            connection = factory.newConnection();
        }
        //创建Channel
        amqpChanenl = connection.createChannel();
    }


    public boolean pushInfo(String msgInfo, String exchangeName, String routingKey) throws IOException, TimeoutException {
        initConnect();
        byte[] messageodyByte = msgInfo.getBytes();
        //消息压入
        amqpChanenl.basicPublish(exchangeName, routingKey, null, messageodyByte);
        //关闭channel
        amqpChanenl.close();
        return true;
    }

    /**
     * 弹出消息
     * @param autoAck
     * @param queueName
     * @param queue
     * @return
     * @throws IOException
     * @throws TimeoutException
     */
    public String popInfo(boolean autoAck, String queueName, queueCost queue) throws IOException, TimeoutException {
        //初始化连接和channel
        initConnect();
        //想queueCost 注入channel依赖
        queue.setChannel(amqpChanenl);
        try {
            //订阅消息
            amqpChanenl.basicConsume(queueName, autoAck, "myConsumerTag", queue);
        } catch(IOException error) {
            return "";
        }
        return "";
    }
}
处理队列消息
./java/com/amqp/demo/service/queueCost.java
package com.amqp.demo.controller;

import com.alibaba.fastjson.JSONObject;
import com.amqp.demo.util.rabbitmq;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;

import javax.servlet.http.HttpServletRequest;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.TimeoutException;

@RestController
public class HelloController {
    @Autowired
    private rabbitmq amqp;

    @RequestMapping("/hello")
    public String hello() {
        return "this is first";
    }

    @PostMapping(value = "/push", produces = "application/json;charset=UTF-8")
    public String mapClass(HttpServletRequest req) throws JSONException, IOException, TimeoutException {
        BufferedReader br = new BufferedReader(new InputStreamReader(req.getInputStream()));
        StringBuffer sb=new StringBuffer();
        String s=null;
        while((s=br.readLine())!=null){
            sb.append(s);
        }
        System.out.println(sb);
        JSONObject param = JSON.parseObject(String.valueOf(sb));
        String name = (String) param.get("name");

        amqp.pushInfo(name, "test-exchange", "sibowen");
        return name;
    }
}
生产消息的接口
./java/com/amqp/demo/controller/HelloController.java
  package com.amqp.demo.controller;

import com.alibaba.fastjson.JSONObject;
import com.amqp.demo.util.rabbitmq;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;

import javax.servlet.http.HttpServletRequest;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.TimeoutException;

@RestController
public class HelloController {
    @Autowired
    private rabbitmq amqp;

    @RequestMapping("/hello")
    public String hello() {
        return "this is first";
    }

    @PostMapping(value = "/push", produces = "application/json;charset=UTF-8")
    public String mapClass(HttpServletRequest req) throws JSONException, IOException, TimeoutException {
        BufferedReader br = new BufferedReader(new InputStreamReader(req.getInputStream()));
        StringBuffer sb=new StringBuffer();
        String s=null;
        while((s=br.readLine())!=null){
            sb.append(s);
        }
        System.out.println(sb);
        JSONObject param = JSON.parseObject(String.valueOf(sb));
        String name = (String) param.get("name");

        amqp.pushInfo(name, "test-exchange", "sibowen");
        return name;
    }
}
拉起消费线程的任务
./java/com/amqp/demo/command/queueCommand.java
package com.amqp.demo.command;

import com.amqp.demo.service.queueCost;
import com.amqp.demo.util.rabbitmq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

@Component
public class queueCommand  {
    @Autowired
    private rabbitmq amqp;

    @PostConstruct
    public void cost() throws IOException, TimeoutException {
        System.out.println("start listen");
        //调用util,消费消息
        amqp.popInfo(false, "queue_name", new queueCost());
        System.out.println("end ");
    }
}

上一篇下一篇

猜你喜欢

热点阅读