AWS SQS队列服务实例的创建和Spring Boot、JMS
一、SQS是什么?
SQS 是AWS提供的队列产品,可让您集成和分离分布式软件系统。
二、获取访问密钥ID和密钥访问密钥
要操作Amazon SQS (例如,使用 Java 或通过 AWS Command Line Interface),您需要访问密钥 ID 和安全访问密钥。
注意:访问密钥 ID 和安全访问密钥需特定使用于SQS。请勿与其他 AWS 服务的密钥,如 Amazon EC2 密钥对。
访问密钥包含访问密钥 ID 和安全访问密钥,用于签署对 AWS 发出的代码请求。如果没有访问密钥,您可以使用AWS 管理控制台进行创建。作为最佳实践,请勿在非必要时使用 AWS root账户 访问密钥执行任务。而是为自己创建一个具有访问密钥的新管理员 IAM 用户。
仅当创建访问密钥时,您才能查看或下载访问密钥。以后您无法查看它们。不过,您随时可以创建新的访问密钥。有关更多信息,请参阅 IAM 用户指南 中的访问 IAM 资源所需的权限。
为 IAM 用户创建访问密钥
-
登录 AWS 管理控制台 并通过以下网址打开 IAM 控制台 https://console.amazonaws.cn/iam。
-
在导航窗格中,选择 Users。
-
选择(或者新建用户)要为其创建访问密钥的用户的名子。
然后选择 Security credentials (安全凭证) 选项卡。 -
在 Access keys (访问密钥) 部分,选择 Create access key (创建访问密钥)。
-
要查看新访问密钥对,请选择 Show (显示)。关闭此对话框后,您将无法再次访问该秘密访问密钥。您的凭证类似:
-
访问密钥 ID:AKIAIOSFODNN7EXAMPLE
-
安全访问密钥:wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
-
-
要下载密钥对,请选择下载 .csv 文件。将密钥存储在安全位置。关闭此对话框后,您将无法再次访问该密钥。
为了保护您的 AWS 账户,切勿通过电子邮件发送密钥。请勿对组织外部共享密钥,即使有来自 AWS 或 Amazon.com 的询问。Amazon 的任何人永远都不会要求您提供密钥。
-
下载
.csv
文件之后,选择 Close (关闭)。在创建访问密钥时,默认情况下,密钥对处于激活状态,并且您可以立即使用此密钥对。
三、Amazon SQS 入门
步骤 1. 创建队列
第一个也是最常见的 Amazon SQS 操作正是创建队列。此过程创建和配置FIFO队列。
-
通过以下网址打开 Amazon SQS 控制台:https://console.amazonaws.cn/sqs。
-
选择 Create queue (创建队列)。
-
在 创建队列 页面,确保您设置了正确的区域。
-
的 标准 队列类型已默认选择。选择 **FIFO(先进先出)。
创建队列后,无法更改队列类型。 -
输入 名称 用于您的队列。FIFO队列的名称必须以
.fifo
后缀。 -
要使用所有默认设置创建队列,请滚动到页尾并选择 创建队列. 创建队列后,您可以 编辑 所有队列配置设置(队列类型除外)。
-
(可选)控制台设置队列配置参数的默认值。您可以为以下参数设置新值。有关其他信息,请参阅 配置参数.
-
为 可见性超时,输入持续时间和单位。范围为0秒至12小时。默认值为 30 秒。在 Amazon SQS 中,在收到消息后,消息将立即保留在队列中。为防止其他用户再次处理消息,Amazon SQS 会设置可见性超时。这是 Amazon SQS 阻止其他使用组件接收并处理消息的一段时间。当您收到来自队列的消息并开始处理该消息时,队列的可见性超时可能不够 (例如,您可能需要处理和删除消息)。可通过使用
https://docs.amazonaws.cn/AWSSimpleQueueService/latest/APIReference/API_ChangeMessageVisibility.html
操作指定新的超时值来缩短或延长消息的可见性。
例如,如果队列的默认超时值为 60 秒,在您接收消息后 15 秒时,您发送了ChangeMessageVisibility
调用并将VisibilityTimeout
设为 10 秒,则这 10 秒从您进行ChangeMessageVisibility
调用时开始计时。因此,自您最初更改可见性超时后 10 秒 (共 25 秒) 起,任何更改可见性超时或删除此消息的尝试都可能导致错误。
每个 Amazon SQS 队列都具有可配置的可见性超时。当系统从队列中读取一条消息时,任何其他读取者在指定时间都看不到该消息。只要消息的处理时间短于可见性超时,则每条消息就都会得以处理并被删除。
如果处理消息的组件出现故障或不可用,那么可见性超时结束后,读取消息队列的任何组件就可以再次看到该消息。这可以让多个组件从同一消息队列中读取消息,并且每个组件负责处理不同的消息。 -
为 message保留期限,输入持续时间和单位。范围为1分钟至14天。默认值为4天。
-
为 交付延迟,输入持续时间和单位。范围为0秒至15分钟。默认值为0秒。
-
为 最大消息大小,输入值。范围为1字节至256KB。默认值为256KB。
-
对于标准队列,输入值 接收消息等待时间. 范围为0至20秒。默认值为0秒,用于设置 短轮询. 任何非零值都设置长轮询。
-
对于FIFO队列,选择 基于内容的重删 启用基于内容的重删。默认设置已禁用。
-
-
(可选)定义 访问政策. 访问策略定义了可以访问队列的帐户、用户和角色。访问策略还定义了操作(例如,
SendMessage
,ReceiveMessage
,或DeleteMessage
),用户可以访问。默认策略仅允许队列所有者发送和接收消息。您可以为策略配置基本和高级设置。
-
在基本设置中,您可以配置谁可以向队列发送消息,谁可以从队列接收消息。只读JSON面板显示队列的结果访问策略。
-
在高级设置中,您可以直接修改JSON访问策略。这允许您指定每个负责人(帐户、用户或角色)可以执行的自定义操作集。
-
-
(可选)添加 加密 到队列。
-
(可选)添加 死信队列 接收无法传递的消息。
-
(可选)添加 标签 到队列。
-
完成队列配置后,选择 创建队列。 Amazon SQS 创建队列并显示队列的 详情 页。
步骤 2. 发送和接收消息
点击进入队列名称后,点击右上角的如下按钮可以通过SQS控制台测试发送和接收消息。
发送和接收消息
四、JMS代码方式操作SQS
官方文档也有JMS部分,但我们主要使用Spring Boot所以官方文档不是很对路。
官方文档:
- Amazon SQS Java Messaging Library入门。
- Amazon SQS 消息大小限制为 256 KB,大消息的处理方案。
4.1 基于Spring Boot的SQS解决方案
4.1.1 引入依赖
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-sdk-java</artifactId>
<version>2.14.26</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.2.9.RELEASE</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-messaging-lib</artifactId>
<version>1.0.8</version>
</dependency>
4.1.2 核心配置类
这个类是历经千辛万苦查阅大量API的结晶,包括JMS和AWS Java SDK两种方式操作SQS的配置,全网应该没有这么全的SQS配置类。具体作用见代码注释。
package com.erbadagang.pay.config;
import com.amazon.sqs.javamessaging.ProviderConfiguration;
import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;
import javax.jms.Session;
/**
* @ClassName: JmsConfig
* @Description: 通过JMS操作AWS SQS队列的配置类
* @author: 郭秀志 jbcode@126.com
* @date: 2020/10/27 19:17
* @Copyright:
*/
@Configuration
@EnableJms
public class JmsConfig {
@Value("${aws.accessKey}")
private String accessKey;
@Value("${aws.secretKey}")
private String secretKey;
@Value("${aws.region}")
private String region;
SQSConnectionFactory connectionFactory = null;
AmazonSQSClientBuilder amazonSQSClientBuilder = null;
/**
* 使用aws-doc-sdk-examples/javav2版本代码,操作sqs的client。
* https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/sqs/src/main/java/com/example/sqs/SQSExample.java
*
* @return
*/
@Bean
public SqsClient getSqsClient() {
SqsClientBuilder sqsClientBuilder = SqsClient.builder().
region(Region.of(region)).credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("" +
accessKey, secretKey)));
return sqsClientBuilder.build();
}
/**
* 组装client builder
*
* @return
*/
public AmazonSQSClientBuilder getAmazonSQSClientBuilder() {
if (amazonSQSClientBuilder != null) {
return amazonSQSClientBuilder;
}
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setConnectionTimeout(3000);
clientConfiguration.setProtocol(Protocol.HTTP);
clientConfiguration.useGzip();
clientConfiguration.useTcpKeepAlive();
AmazonSQSClientBuilder amazonSQSClientBuilder = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.withClientConfiguration(clientConfiguration)
.withRegion(region);
return amazonSQSClientBuilder;
}
/**
* 构建ConnectionFactory,来构造DefaultJmsListenerContainerFactory。
*
* @return
*/
public SQSConnectionFactory getConnectionFactory() {
if (connectionFactory != null) {
return connectionFactory;
}
connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), getAmazonSQSClientBuilder());
return connectionFactory;
}
/**
* 核心配置方法,返回自定义的DefaultJmsListenerContainerFactory。
*
* @return
*/
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(this.getConnectionFactory());
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("3-10");
/**
* SESSION_TRANSACTED
* CLIENT_ACKNOWLEDGE : After the client confirms, the client must call the acknowledge method of javax.jms.message after receiving the message. After the confirmation, the JMS server will delete the message
* AUTO_ACKNOWLEDGE : Automatic acknowledgment, no extra work required for client to send and receive messages
* DUPS_OK_ACKNOWLEDGE : Allow the confirmation mode of the replica. Once a method call from the receiving application returns from the processing message, the session object acknowledges the receipt of the message; and duplicate acknowledgments are allowed. This pattern is very effective when resource usage needs to be considered.
*/
factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
factory.setSessionTransacted(false);
return factory;
}
/**
* 构建自己的JmsTemplate,在后续的Service或者Controller里面通过自动装配调用发送消息等操作。
*
* @return
*/
@Bean
public JmsTemplate defaultJmsTemplate() {
return new JmsTemplate(this.getConnectionFactory());
}
}
4.1.3 yml文件
注:key的命名不规范,最起码应该是aws.sqs.accesskey....
aws:
accessKey: AKIA526HI45SNYSS***
secretKey: akuRMbffhhbcZko5+VG0uOtTWYueCg********
region: cn-northwest-1
4.1.4 controller方法
自动装配了前面配置的核心配置类的JmsTemplate
,代码中"NPay-Queue"是我们事先通过控制台建好的队列名称。
package com.deepvision.pay.controller;
import com.amazon.sqs.javamessaging.message.SQSTextMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.JMSException;
import javax.jms.Message;
/**
* @ClassName: AwsSqsController
* @Description: 通过JMS方式发送和消费消息。
* @author: 郭秀志 jbcode@126.com
* @date: 2020/10/27 21:28
* @Copyright:
*/
@Slf4j
@RestController
@RequestMapping("/aws/sqs")
public class AwsSqsJMSController {
/**
* 第一种方式使用标准的JMS,简单
*/
@Autowired
private JmsTemplate jmsTemplate;
/**
* 发送消息
*
* @param message
*/
@GetMapping("/send")
public void oeSend(String message) {
jmsTemplate.convertAndSend("NPay-Queue", message);
}
/**
* 接收消息
* destination 队列名称
*
* @param message
* @throws JMSException 我只创建了一个listenerFactory 这里会默认使用那一个,如果有多个Factory 需要手动指定
*/
@JmsListener(destination = "NPay-Queue")
public void oeListener(Message message) throws JMSException {
SQSTextMessage textMessage = (SQSTextMessage) message;
System.out.println("收到一条消息" + textMessage.getText());
//如果设置的是客户端确认模式(Session.CLIENT_ACKNOWLEDGE),所以要记得调用acknowledge()删除sqs消息。
message.acknowledge();
}
}
4.1.5 测试
启动Springboot项目。访问send
地址:http://localhost:8080/aws/sqs/send/?message=%7Bname:%22guoxiuzhi%22,from:%22java%20code%22%7D
,URL中参数message的值为json串{name:"guoxiuzhi",from:"java code"}
经过URLEncoding后的结果,看着像乱码,其实不然。
在控制台立即看到消费了一条消息:
2020-10-27 22:17:12.304 INFO 11776 --- [nio-8080-exec-4] c.a.s.javamessaging.SQSMessageProducer : Message sent to SQS with SQS-assigned messageId: 5a74a9c3-66e3-4bfd-9cf8-47d4b1e015c2
2020-10-27 22:17:12.305 INFO 11776 --- [nio-8080-exec-4] com.amazon.sqs.javamessaging.SQSSession : Shutting down SessionCallBackScheduler executor
收到一条消息{name:"guoxiuzhi",from:"java code"}
至此一个最简单的SQS生产和消费消息功能可用。
五、总结
对SQS的操作无非两种:
- 对队列的操作;
- 对消息的操作。
JMS方式通过控制台来对队列进行操作(新增、配置),对消息的操作简便易用,但是灵活性较差。
采用AWS SDK for Java 2.0 对 [Amazon SQS] 进行编程则更加灵活,可以代替控制台部分的工作。当然也更加繁琐。下面演示代码创建队列和操作消息。
六、AWS SDK for Java 2.0 操作 SQS
6.1 代码
包括了:创建队列、查询(可带条件)队列、发送消息、消费消息、改变和删除消息的操作。
注意: 由于发送消息的时候设置了延迟消费
5条可用消息.delaySeconds(10)
,即发送延迟10秒的消息,消费者不能立即可见此消息进行消费。如果多次请求发送消息的方法可以模拟多条待消费消息场景。
package com.erbadagang.pay.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.*;
import java.util.List;
/**
* @ClassName: AwsSqsController
* @Description: 采用AWS SDK for Java 2.0 对 [Amazon SQS] 进行编程则更加灵活,可以代替控制台部分的工作。当然也更加繁琐。下面演示代码创建队列和操作消息。
* @author: 郭秀志 jbcode@126.com
* @date: 2020/10/26 18:28
* @Copyright:
*/
@Slf4j
@RestController
@RequestMapping("/aws/sqs")
public class AwsSqsJava2ApiController {
private static final String QUEUE_NAME = "testQueue_guoxiuzhi_20201027";
/**
* 使用AWS的java2新API操作队列,
* v2版本示例代码:https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/sqs/src/main/java/com/example/sqs
*/
@Autowired
private SqsClient sqsClient;
@GetMapping("/useQueueByJava2Api")
public void useQueueByJava2Api() {
// Create a queue
String queueUrl = createQueue(sqsClient, QUEUE_NAME);
listQueues(sqsClient);
listQueuesFilter(sqsClient, queueUrl);
sendMessage(sqsClient, queueUrl);
List<Message> messages = receiveMessages(sqsClient, queueUrl);
changeMessages(sqsClient, queueUrl, messages);
deleteMessages(sqsClient, queueUrl, messages);
}
/**
* 创建队列,如果存在则不新建。
*
* @param sqsClient
* @param queueName 队列的名字
* @return
*/
public static String createQueue(SqsClient sqsClient, String queueName) {
System.out.println("\nCreate queue");
// snippet-start:[sqs.java2.sqs_example.create_queue]
CreateQueueRequest createQueueRequest = CreateQueueRequest.builder()
.queueName(queueName)
.build();
sqsClient.createQueue(createQueueRequest);
// snippet-end:[sqs.java2.sqs_example.create_queue]
System.out.println("\nGet queue URL");
// snippet-start:[sqs.java2.sqs_example.get_queue]
GetQueueUrlResponse getQueueUrlResponse =
sqsClient.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build());
String queueUrl = getQueueUrlResponse.queueUrl();
return queueUrl;
// snippet-end:[sqs.java2.sqs_example.get_queue]
}
/**
* 查询所有的队列
*
* @param sqsClient
*/
public static void listQueues(SqsClient sqsClient) {
System.out.println("\nList Queues");
// snippet-start:[sqs.java2.sqs_example.list_queues]
ListQueuesRequest listQueuesRequest = ListQueuesRequest.builder().build();
ListQueuesResponse listQueuesResponse = sqsClient.listQueues(listQueuesRequest);
for (String url : listQueuesResponse.queueUrls()) {
System.out.println(url);
}
// snippet-end:[sqs.java2.sqs_example.list_queues]
}
/**
* 通过队列名的前缀,带条件查询所有的队列。
*
* @param sqsClient
* @param queueUrl
*/
public static void listQueuesFilter(SqsClient sqsClient, String queueUrl) {
// List queues with filters
String namePrefix = "testQueue";
ListQueuesRequest filterListRequest = ListQueuesRequest.builder()
.queueNamePrefix(namePrefix).build();
ListQueuesResponse listQueuesFilteredResponse = sqsClient.listQueues(filterListRequest);
System.out.println("Queue URLs with prefix: " + namePrefix);
for (String url : listQueuesFilteredResponse.queueUrls()) {
System.out.println(url);
}
// snippet-end:[sqs.java2.sqs_example.send_message]
}
/**
* 批量发送消息。
*
* @param sqsClient
* @param queueUrl
*/
public static void sendBatchMessages(SqsClient sqsClient, String queueUrl) {
System.out.println("\nSend multiple messages");
// snippet-start:[sqs.java2.sqs_example.send__multiple_messages]
SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(SendMessageBatchRequestEntry.builder().id("id1").messageBody("msg 1").build(),
SendMessageBatchRequestEntry.builder().id("id2").messageBody("msg 2").delaySeconds(10).build())
.build();
sqsClient.sendMessageBatch(sendMessageBatchRequest);
// snippet-end:[sqs.java2.sqs_example.send__multiple_messages]
}
/**
* 发送消息
*
* @param sqsClient
* @param queueUrl
*/
public static void sendMessage(SqsClient sqsClient, String queueUrl) {
System.out.println("\nSend message");
// snippet-start:[sqs.java2.sqs_example.send_message]
sqsClient.sendMessage(SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody("Hello world from guoxiuzhi!")
.delaySeconds(10) //发送延迟10秒的消息,消费者不能立即可见此消息进行消费。
.build());
// snippet-end:[sqs.java2.sqs_example.send_message]
}
/**
* 消费消息。
*
* @param sqsClient
* @param queueUrl
* @return
*/
public static List<Message> receiveMessages(SqsClient sqsClient, String queueUrl) {
System.out.println("\nReceive messages");
// snippet-start:[sqs.java2.sqs_example.retrieve_messages]
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(5)
.build();
List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).messages();
for (Message message : messages) {
System.out.println("message.body() = " + message.body());
}
return messages;
// snippet-end:[sqs.java2.sqs_example.retrieve_messages]
}
/**
* 改变消息的可见性超时,即这段时间内一个消费者进行消费时其他消费者对消息不可见。
*
* @param sqsClient
* @param queueUrl
* @param messages
*/
public static void changeMessages(SqsClient sqsClient, String queueUrl, List<Message> messages) {
System.out.println("\nChange message visibility");
for (Message message : messages) {
ChangeMessageVisibilityRequest req = ChangeMessageVisibilityRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.visibilityTimeout(100)
.build();
sqsClient.changeMessageVisibility(req);
}
}
/**
* 删除消息
*
* @param sqsClient
* @param queueUrl
* @param messages
*/
public static void deleteMessages(SqsClient sqsClient, String queueUrl, List<Message> messages) {
System.out.println("\nDelete messages");
// snippet-start:[sqs.java2.sqs_example.delete_message]
for (Message message : messages) {
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.build();
sqsClient.deleteMessage(deleteMessageRequest);
}
// snippet-end:[sqs.java2.sqs_example.delete_message]
}
}
6.2 测试
启动Spring Boot项目后,访问入口url useQueueByJava2Api
,可以看到Java控制台输出信息:
Create queue
Get queue URL
List Queues
https://sqs.cn-northwest-1.amazonaws.com.cn/951216301924/NPay-Queue
https://sqs.cn-northwest-1.amazonaws.com.cn/951216301924/testQueue_guoxiuzhi_20201027
Queue URLs with prefix: testQueue
https://sqs.cn-northwest-1.amazonaws.com.cn/951216301924/testQueue_guoxiuzhi_20201027
Send message
Receive messages
message.body() = Hello world from guoxiuzhi!
Change message visibility
Delete messages
如果有多条待消费消息,输出如下:
消费多条消息
参考文献
-
官方示例 AWS SDK for Java 2.0,由于官网还有老版本的文档和代码,代码的实现完全不同,千万注意!!!以免文档和代码版本不匹配导致不能调通,所以看代码之前一定要看版本是不是2.0(使用
SqsClient
进行操作SQS为2.0版本API),如果是1.0就直接略过,我一开始费了很多周折实现了1.0的逻辑,后来再看文档怎么实现不一样才发现还有2.0,结果又重新实现了一遍2.0逻辑。 -
如果在消费消息的时候需要将消息转成实体对象,可以参考:springboot + aws +sqs + jms(简单托管消息队列)。
-
如果消息的大小超过256k,参考使用S3存储消息,大消息