zookeeper源码分析(5)-序列化和协议
在网络传输时,传输的是二进制数据,所以发送端需要将序列化对象转变为二进制数据
,也就是序列化过程
。接收端需要将二进制数据转化为序列化对象
,也就是反序列化过程
。在序列化和反序列化过程中,需要定义一种对数据相互转变的一致性协议
,也就是序列化协议
。zookeeper使用Jute作为序列化组件。首先看下Jute的使用:
public class RequestHeader implements Record {
private int xid;
private int type;
public RequestHeader() {
}
public RequestHeader(
int xid,
int type) {
this.xid=xid;
this.type=type;
}
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
//先写入xid
a_.writeInt(xid,"xid");
a_.writeInt(type,"type");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
//先读出xid
xid=a_.readInt("xid");
type=a_.readInt("type");
a_.endRecord(tag);
}
//测试方法
public void testJute() throws IOException {
//实现Record接口,自定义序列化
RequestHeader requestHeader = new RequestHeader(1, ZooDefs.OpCode.create);
System.out.print("requestHeader: " +requestHeader );
//序列化
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
BinaryOutputArchive binaryOutputArchive = BinaryOutputArchive.getArchive(outputStream);
requestHeader.serialize(binaryOutputArchive,"header");
//通常是TCP网络通信对象
ByteBuffer bb = ByteBuffer.wrap(outputStream.toByteArray());
//反序列化
RequestHeader requestHeader1 = new RequestHeader();
ByteBufferInputStream inputStream = new ByteBufferInputStream(bb);
BinaryInputArchive binaryInputArchive = BinaryInputArchive.getArchive(inputStream);
requestHeader1.deserialize(binaryInputArchive,"header");
System.out.print("requestHeader1: " + requestHeader1);
outputStream.close();
inputStream.close();
}
1.定义序列化对象RequestHeader,需要实现Record接口的serialize
和deserialize
接口
2.构建序列化器BinaryOutputArchive,调用serialize
方法将对象序列化流中
3.构建反序列化器BinaryInputArchive,调用deserialize
方法将流反序列化为对象
从上面的使用我们可以看出,对RequestHeader对象的序列化 就是对其成员变量xid,type的按顺序的写入序列化器BinaryOutputArchive,反序列化就是从反序列化器BinaryInputArchive按顺序的读出xid,type。
所以序列化组件Jute的实现关键就是对序列化对象
,序列化器
和反序列化器
的设计。
序列化对象
所有的序列化对象都要实现Record
接口,它定义了serialize
和deserialize
方法用于子类自己实现自己的序列化和反序列方式。
public interface Record {
public void serialize(OutputArchive archive, String tag)
throws IOException;
public void deserialize(InputArchive archive, String tag)
throws IOException;
}
zookeeper的org.apache.zookeeper.proto包下定义了很多用于网络通信和数据存储所需要的序列化对象。
序列化器
在zookeeper中序列化就是将Record对象变为二进制数据的过程,序列化器接口为OutputArchive
public interface OutputArchive {
public void writeByte(byte b, String tag) throws IOException;
public void writeBool(boolean b, String tag) throws IOException;
public void writeInt(int i, String tag) throws IOException;
public void writeLong(long l, String tag) throws IOException;
public void writeFloat(float f, String tag) throws IOException;
public void writeDouble(double d, String tag) throws IOException;
public void writeString(String s, String tag) throws IOException;
public void writeBuffer(byte buf[], String tag)
throws IOException;
public void writeRecord(Record r, String tag) throws IOException;
public void startRecord(Record r, String tag) throws IOException;
public void endRecord(Record r, String tag) throws IOException;
public void startVector(List<?> v, String tag) throws IOException;
public void endVector(List<?> v, String tag) throws IOException;
public void startMap(TreeMap<?,?> v, String tag) throws IOException;
public void endMap(TreeMap<?,?> v, String tag) throws IOException;
}
有三种实现:BinaryOutputArchive,CsvOutputArchive和XmlOutputArchive,分别对应无特殊格式,有csv格式和有xml格式的数据序列化。
BinaryOutputArchive
public class BinaryOutputArchive implements OutputArchive {
private ByteBuffer bb = ByteBuffer.allocate(1024);
private DataOutput out;
public static BinaryOutputArchive getArchive(OutputStream strm) {
return new BinaryOutputArchive(new DataOutputStream(strm));
}
/** Creates a new instance of BinaryOutputArchive */
public BinaryOutputArchive(DataOutput out) {
this.out = out;
}
public void writeByte(byte b, String tag) throws IOException {
out.writeByte(b);
}
public void writeBool(boolean b, String tag) throws IOException {
out.writeBoolean(b);
}
··········省略代码·····
}
可以看到BinaryOutputArchive其实是对DataOutput out
的包装,从而实现了对各种数据类型的写入,tag并不会写入进二进制数据中,而对于CsvOutputArchive和XmlOutputArchive,如果想在二进制数据中保存对应格式,就需要tag控制,如XmlOutputArchive中
public void writeBool(boolean b, String tag) throws IOException {
printBeginEnvelope(tag);
stream.print("<boolean>");
stream.print(b ? "1" : "0");
stream.print("</boolean>");
printEndEnvelope(tag);
}
private void printBeginEnvelope(String tag) {
if (!compoundStack.empty()) {
String s = compoundStack.peek();
if ("struct".equals(s)) {
putIndent();
stream.print("<member>\n");
addIndent();
putIndent();
stream.print("<name>"+tag+"</name>\n");
putIndent();
stream.print("<value>");
} else if ("vector".equals(s)) {
stream.print("<value>");
} else if ("map".equals(s)) {
stream.print("<value>");
}
} else {
stream.print("<value>");
}
}
反序列化器
在zookeeper中反序列化就是将二进制数据变为Record对象的过程,反序列化器接口为InputArchive
public interface InputArchive {
public byte readByte(String tag) throws IOException;
public boolean readBool(String tag) throws IOException;
public int readInt(String tag) throws IOException;
public long readLong(String tag) throws IOException;
public float readFloat(String tag) throws IOException;
public double readDouble(String tag) throws IOException;
public String readString(String tag) throws IOException;
public byte[] readBuffer(String tag) throws IOException;
public void readRecord(Record r, String tag) throws IOException;
public void startRecord(String tag) throws IOException;
public void endRecord(String tag) throws IOException;
public Index startVector(String tag) throws IOException;
public void endVector(String tag) throws IOException;
public Index startMap(String tag) throws IOException;
public void endMap(String tag) throws IOException;
}
同样有三种实现:BinaryInputArchive,CsvInputArchive和XmlInputArchive,分别对应无特殊格式,有csv格式和有xml格式的数据序列化。
public class BinaryInputArchive implements InputArchive {
private DataInput in;
static public BinaryInputArchive getArchive(InputStream strm) {
return new BinaryInputArchive(new DataInputStream(strm));
}
/** Creates a new instance of BinaryInputArchive */
public BinaryInputArchive(DataInput in) {
this.in = in;
}
public byte readByte(String tag) throws IOException {
return in.readByte();
}
public boolean readBool(String tag) throws IOException {
return in.readBoolean();
}
}
··········省略代码·····
}
可以看到BinaryInputArchive其实是对DataInput in
的包装,从而实现了对各种数据类型的读取,tag并不会写入进二进制数据中。
实际zookeeper的客户端在向服务端发送请求时,通信协议体如下:
len
为请求数据的总长度,占4位。请求头
就是事例中的RequestHeader
的xid和type。xid
用于记录客户端请求发起的先后顺序,占4位。type
代表请求的操作类型,占4位。这样子在服务端反序列化时,就可以根据type的值来选择对应的Record来读取请求体内容。