jedis实现分布式锁
通过此篇文章可以了解Redis的底层通信,Redis的协议,以及自己手写与服务器通信.
在分布式锁的实现上, 基于Redis的实现是其中一种.
而具体的实现依赖包又有两个
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.1</version>
</dependency>
本篇文章我们就讲解第一种Jedis.
基于Jedis又有两种实现
// 单机
Jedis(String host, int port, int connectionTimeout, int soTimeout)
// 集群
JedisCluster(Set<HostAndPort> jedisClusterNode, int timeout, int maxAttempts, GenericObjectPoolConfig poolConfig)
不管是单机还是集群,它们的底层和服务器之间的通信都是基于java.net.Socket
比如说我们通过Jedis(String host, int port, int connectionTimeout, int soTimeout)构造了一个客户端,连接单机的服务器.
public Jedis(final String host, final int port, final int connectionTimeout, final int soTimeout) {
// 调用父类BinaryJedis
super(host, port, connectionTimeout, soTimeout);
}
public BinaryJedis(final String host, final int port, final int connectionTimeout,
final int soTimeout) {
// 创建一个Client,它并没有连接服务器,只是先保存了host,port. 在Client类内部有个Socket属性.
client = new Client(host, port);
client.setConnectionTimeout(connectionTimeout);
client.setSoTimeout(soTimeout);
}
调用首次调用setnx向服务器发送命令时,会连接服务器
public Long setnx(final String key, final String value) {
checkIsInMultiOrPipeline();
// 调用上面构造好的client的setnx方法
client.setnx(key, value);
return client.getIntegerReply();
}
最后会跟进到如下代码
protected Connection sendCommand(final Command cmd, final byte[]... args) {
try {
// 连接服务器
connect();
// 发送指令
Protocol.sendCommand(outputStream, cmd, args);
pipelinedCommands++;
return this;
} catch (JedisConnectionException ex) {
// ...
}
}
跟进到connect()方法
public void connect() {
// 判断socket是否已连接,如果没有连接服务器则连接服务器
if (!isConnected()) {
try {
socket = new Socket();
socket.setReuseAddress(true);
socket.setKeepAlive(true);
socket.setTcpNoDelay(true);
socket.setSoLinger(true, 0);
socket.connect(new InetSocketAddress(host, port), connectionTimeout);
socket.setSoTimeout(soTimeout);
// 输出流
outputStream = new RedisOutputStream(socket.getOutputStream());
// 输入流
inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ex) {
broken = true;
throw new JedisConnectionException(ex);
}
}
}
这就是最熟悉的通过Socket和服务器进行通信,还有输入输出流.
连接好服务器之后,接下来就是发送命令给服务器了.
跟进到发送命令的代码
private static void sendCommand(final RedisOutputStream os, final byte[] command,
final byte[]... args) {
// 通过输出流向服务器发送数据
try {
os.write(ASTERISK_BYTE);
os.writeIntCrLf(args.length + 1);
os.write(DOLLAR_BYTE);
os.writeIntCrLf(command.length);
os.write(command);
os.writeCrLf();
for (final byte[] arg : args) {
os.write(DOLLAR_BYTE);
os.writeIntCrLf(arg.length);
os.write(arg);
os.writeCrLf();
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}
通过输出流,将命令发送给服务器.
下面我们将这个方法里面的常量替换一下,把一些不必要的代码删除,再看下这个方法
// #1
os.write('*');
os.write(args.length + 1);
os.write("\r\n");
// #2
os.write('$');
os.write(command.length);
os.write("\r\n");
os.write(command);
os.write("\r\n");
// #3
for (final byte[] arg : args) {
os.write('$');
os.write(arg.length);
os.write("\r\n");
os.write(arg);
os.write("\r\n");
}
在说这段代码之前,我们要说下Redis的协议.
互联网的通信是基于协议的,我们熟悉的TCP/IP协议,Dubbo通信的dubbo协议,Zookeeper的zookeeper协议,RocketMQ通信的自身应用层协议.没有协议,那么客户端和服务器就不能通信,彼此'听不懂'对方在说什么.
那么Redis客户端和服务器之间要想彼此知道对方说的什么,那么它们之间也有通过协议通信,这就是Redis协议.
具体协议如下
*<number of arguments> CR LF
$<number of bytes of argument 1> CR LF
<argument data> CR LF
...
$<number of bytes of argument N> CR LF
<argument data> CR LF
比如我们要向服务器发送SET mykey myvalue这个命令,那么转换成协议之后,具体的内容如下
*3
$3
SET
$5
mykey
$7
myvalue
并不是我不把它们写成一行,而是在它们彼此之间有'\r\n',也就是回车换行. 我们简单介绍下它
*3
*这个符号是固定的,那么后面这个3是什么意思呢,3表示后面有3个内容(或者说命令由3部分组成),细心的读者也看到了,整个命令中有3个符号代表一个内容.
$3
SET
$符号是固定的,后面的3表示后面有3个字符,因为SET就是3个字符组成的
$5
mykey
$符号是固定的,后面的5表示后面有5个字符,因为mykey就是5个字符组成的
$7
myvalue
$符号是固定的,后面的7表示后面有7个字符,因为myvalue就是7个字符组成的
协议的编解码也是一个话题. Redis的协议是基于长度的,通过长度就可以准确的知道,命令的开始在哪里,结束又在哪里. 基于长度的协议有很多,比如Dubbo或者RocketMQ的协议,它们将数据发送给对方之后,对方就是通过基于长度的解码器,将数据解码出来.
相信读者朋友应该明白了Redis的协议.那么我们只要通过Socket的输出流将这些协议内容发送给服务器就可以了,服务器基于协议,就能'读懂'我们发送给它的命令是什么了.
我们再回到上面的那段代码.
// #1
os.write('*');
os.write(args.length + 1);
os.write("\r\n");
// #2
os.write('$');
os.write(command.length);
os.write("\r\n");
os.write(command);
os.write("\r\n");
// #3
for (final byte[] arg : args) {
os.write('$');
os.write(arg.length);
os.write("\r\n");
os.write(arg);
os.write("\r\n");
}
相信这个时候,你再来看这段代码应该就明白了. 就是平铺直叙的将协议'翻译'成代码.
所以说,当我们需要和服务器通信的时候,也未必是必须依赖Redis的依赖包,我们完成可以自己通过Socket与服务器直接通信. 比如下面这段简单的代码,就可以直接和Redis通信了,当然它很简单.这里只是提供给你一个思路.
import java.io.IOException;
import java.net.Socket;
public static void connectRedis(String key, String value) throws IOException {
Socket client = new Socket(host, port);
// 执行 set key value命令
StringBuilder command = new StringBuilder();
String number = "*3" + CRLF;
command.append(number);
String cmd = "$3" + CRLF + "SET" + CRLF;
command.append(cmd);
cmd = "$" + key.getBytes().length + CRLF + key + CRLF;
command.append(cmd);
cmd = "$" + value.getBytes().length + CRLF + value + CRLF;
command.append(cmd);
// 向服务器发送命令
client.getOutputStream().write(command.toString().getBytes());
// 接收服务器响应
byte[] response = new byte[1024];
client.getInputStream().read(response);
System.out.println(new String(response, 0, response.length));
}
在开篇我们也讲到,Redis分布式锁的实现有jedis和redisson两种. jedis的底层通信是直接基于Socket, 而redisson的底层与服务器通信是基于Netty.