Cassandra实战 笔记-《Cassandra的集群机制》
Cassandra中的集群机制与大多数的分布式系统不同。例如,在 Hadoop 系统中,有一台机器有主节点和多台从节点,主节点控制所有的从节点。当 Hadoop 系统中的从节点出现故障时,整个系统的工作不会受影响;而主节点出现故障后,整个 Hadoop 系统就不能正常使用了。由于 Cassandra 机器中的每一台机器都是对等的,不存在主节点和从节点的概念,所以集群中任何一台机器出现故障,整个系统都不会受影响,依旧可以正常工作。
1.1 一致性哈希
1.1.1 理解一致性哈希
一致性哈希(Consistent Hash)是整个 Cassandra 集群的基础。
要理解什么是一致性哈希,为什么要使用一致性哈希,可以先回到 Cassandra 需要解决的根本问题 -- 提供一个可以增加机器线性扩展性能的集群。
假设有5台服务器,需要实现一个集群以提供数据的存储与读取的功能。可以思考这样一种简洁的实现:每一台服务器都有一个固定标志,分别为0、1、2、3、4,并且每一台服务器都知道其他另外4台服务器的固定标志。
服务端处理客户端的写入请求过程如下:
根据写入请求的Key求哈希值,然后用Key的哈希值与集群的服务器数量(这里为5)取模,判断这个写入请求应该写入到集群中的哪台服务器中。如果写入请求应该由本服务器处理,就将数据写入磁盘;如果属于另外一台服务器,则转发写入请求到对应的服务器中。
服务器端处理客户端的写人请求过程的伪代码如下:
public void handl eWrite (WriteMessage wm) (
//从写入消息中获取Key
string key = wm. getKey ();
//计算实际处理该写入消息的节点编号
int handleID = key. hashCode()号ClusterNodeNum;
//判断写入消息是否由本服务器处理
if (MyClusterID == handleID) {
//该写入消息由本服务器处理,写入磁盘
wirteMessage (wm) ;
} else {
//将写入消息发送给对应的服务器处理
sendMessage (handleID, wm);
}
//通知客户端写入成功
send2C1ient ("write success");
}
服务器端处理客户端的读取请求过程如下:
根据读取请求的Key求哈希值,然后用Key的哈希值与集群的服务器数量(这里为5)取模,判断这个读取请求应该从集群中的哪台服务器中读取。如果读取请求应该由本服务器处理,就从磁盘中读取数据;如果属于另外-台服务器,则转发读取请求到对应的服务器中。
服务器端处理客户端的读取请求过程的伪代码如下:
public void handleRead (ReadMessage rm) {
//从读取消息中获取Key
String key = rm. getKey();
//计算实际处理该读取消息的节点编号
int handleID = key.hashCode() % ClusterNodeNum;
//用于存储读取的数据
String readData=”;
//判断读取消息是否由本服务器处理
if (MyClusterID== handleID) {
//从磁盘中读取数据
readData = readMessage (rm);
} else {
//将读取消息发送给对应的服务器处理
readData = sendMessage(handleID,rm);
}
//将读取到的数据返回给客户端
send2Client(readData);
}
最后,客户端调用服务端的伪代码如下:
public static void main (string[] args) {
Random rnd = new Random();
//所有可以使用的服务器瑞地址
String[] servers = new string[" (*10.192.168.0", "10.192.168.1",
10.192.168.2", "10.192.168.3", "10.192.168.4");
//由于所有的服务器都可以提供和同的服务,为了做到负载均街,随机从可用的服务器中选择一个
Client client = new Client (servers (rnd. nextInt (servers. length)]);
/向服务器端写入数据
client.write (new WriteMessage ("key1", "valuel "));
//从服务器端读取数据
String value = client.read (new ReadMessage ("key1"));
}
通过上面的设计,实现了一个简单的存储集群。
这种架构实现非常简单,同时可以利用多台机器共同提供服务。它的核心就在于对数据的Key进行哈希取模。
但是这种架构具有个严重的缺陷:添加服务器或减少服务器困难。一旦集群中的机器发生变动,集群中的所有服务器都需要对现有的数据进行重新分布。
针对集群中的机器发生变动的情况,一致性哈希可以极大地降低数据重分布的影响。
那么一致性哈希是如何工作的呢?
实现求出集群中的每一个节点的哈希值,并将其配置到 0~2^32 的圆环上,用同样的方法求出存储数据的Key的哈希值,并映射到圆上。然后从数据映射到的位置顺时针查找,将数据保存到找到的第一个服务器上。如果超过 2^32 仍然找不到相应服务器,就会保存到第一台服务器上,如下图1-1所示。
图1-1
从图1-1所示的状态中添加一台新的服务器,只有在圆环上增加服务器的地点逆时针方向的第一天服务器上的Key会受到影响,如图1-2所示。
图1-2
因此,一致性哈希最大限度地抑制了Key的重新分布。而且,有的一致性哈希的实现方法还采用了虚拟节点的思想。使用一般的哈希函数的服务器的映射地点分布非常不均匀,而使用虚拟节点的思想(比如Amazon公司的一个分布式Key/Value存储引擎Dynamo),为每个物理节点(服务器)在圆环上分配100~200个点,这样就能抑制分布不均匀的现象,最大限度地减少服务器增减时的缓存重新分布。
1.1.2 一致性哈希在Cassandra中的应用
了解了一致性哈希的原理,可以通过源码了解 Cassandra 中是如何应用该原理的。
在org.apache.cassandra.dht
包中,包含了分布式哈希表(Distribute Hash Table)的主要逻辑: Token、Range和Partitioner。
1. Token
在Cassandra中,每一个节点都对应一个唯一的Token,相当于一致性哈希圆环中的一个节点地址。
org.apache.cassandra.dht.Token<T>
是一个抽象类,Cassandra提供了以下4种实现:
- BigIntegerToken: 以
java.math.BigInteger
类型的实例代表一个Token,由org.apache.cassandra.dht.RandomPartitioner
使用。 - BytesToken:以byte[]类型的实例代表一个Token,由
org.apache.cassandra.dht.ByteOrderedPartitioner
和org.apache.cassandra.dht.CollatingOrderPreservingPartitioner
使用。 - LocalToken:以byte[]类型的实例和一个
org.apache.cassandra.db.marshal.AbstractType
类型的实例代表一个Token,由org.apache.cassandra.dht.LocalPartitioner
使用。 - StringToken:一
java.lang.String
类型的实例代表一个Token,由org.apache.cassandra.dht.OrderPreservingPartitioner
使用。
2.Range
在Cassandra中,每一个节点负责处理一致性哈希圆环中的一段数据,从逆时针方向的上一一个节点对应的Token (开区间)到本节点对应的Token (闭区间)范围内。这个范围就是Range。
Range包含如下两个重要的属性:
public final Token left;
public final Token right ;
其中,right代表-致性哈希圆环中本节点对应的Token,left代表-致性哈希圆环中逆时针方向的上一个节点对应的Token,即( left , right]。
同时,Range还提供了一系列的圆环相关的操作方法:
- public static boolean contains(Token left,Token right,Token bi):判断bi是否在由left和right构造的Range之中。
- public boolean contains(Range that):判断that是否被该Range所包含。
- piblic boolean intersects(Range that):判断两个Range相交的集合。
3.Partitioner
Paritioner用于管理Token在-致性哈希圆环中的生成规则,并且决定每一-台机器中SSTable数据的排序规则。每- 个Cassandra实例需要并且只能指定一个Partitioner 实现。配置文件中默认的配置为partitioner: org. apache. cassandra. dht. RandomPartitioner。
org. apache. cassandra. dht. IPartitioner <T >接口提供的方法功能如下:
- public DecoratedKey <T > decorateKey( byte[ ] key): 将key转换为DecoratedKey (To-ken和key的组合)。这个方法在Cassandra对数据进行写人和读取操作时使用,用于数据key之前的排序判断。其中的排序规则是由对应的Token决定的。
- public T midpoint(T left, T right):计算left 和right 两个Token的中间值。这个方法在新节点加人Cassandra节点时调用。
- public T getMinimumToken():获取致性哈 希圆环中最小 的Token。
- public T getToken( byte[] key): 根据key计算对应的Token。
- public T getRandomToken():获取一个随机的Token。在集群初始化的时候,如果没有显式指定节点的Token值,将调用这个方法为每一个节点随机生成Token。
Cassandra提供了如下5种IPartitioner的实现:
(1) org. apache. cassandra. dht. RandomParitioner根据key生成Token的算法为md5,逻辑如下:
public BigIntegerToken getToken (byte[] key){
if (key. length ==0) return MINIMUM;
return new BigIntegerToken (FBUtilities. ma5 hash (key));
}
Token的排序规则与java.math.BigInteger的排序规则相同。如果在配置文件中指定了InitialToken, 读取的逻辑如下:
public Token < BigInteger > fromString (String string{
return new BigIntegerToken (new Big Integer (string));
}
(2) org.apache.cassandra dht.OrderPreservingParitioner
根据key生成Token的算法为将key直接转换为UTF-8编码的字符串,逻辑如下:
public StringToken getToken (byte[] key){
String skey;
try {
skey = FBUtilities. decodeTOUTF8 (key) ;
}
catch (CharacterCodingException e){
throw new Runt imeExcept ion ("The provided key was not UTF8 encoded. ",e);
}
return new StringToken (skey);
}
Token的排序规则与 java.lang.String 的排序规则相同。如果在配置文件中指定了InitialToken, 读取的逻辑如下:
public Token < BigInteger > fromstring (String string){
return new StringToken (string);
}
(3) org. apache. cassandra. dht. LocalPartitioner
根据key生成Token的算法为将key直接转换为LocalToken,逻辑如下:
public LocalToken getToken (byte[] key){
return new LocalToken (comparator, key);
}
Token的排序规则与org. apache. cassandra. db. marshal. AbstractType的排序规则相同。
如果在配置文件中指定了InitialToken,读取将出错,LocalPartitioner 不支持手动指定InitialToken。
(4) org.apache.cassandra.dht.ByteOrderedPartitioner
根据key生成Token的算法为将key直接转换为BytesToken,逻辑如下:
public BytesToken getToken (byte[] key){
if (key. length==0)
return MINIMUM;
return new BytesToken (key);
}
Token的排序规则与byte的排序规则相同。如果在配置文件中指定了InitialToken, 读取的逻辑如下:
public Token < BigInteger > fromStr ing (String string){
return new BytesToken (FBUtilities. hexToBytes (string));
}
(5) org.apache.cassandra.dht.CollatingOrderPreservingPartitioner
根据key生成Token的算法为将key直接转换为UTF-8编码的字符串,逻辑如下:
public BytesToken getToken (byte[] key)
if (key. length==0)
return MINIMUM;
String skey ;
try{
skey = FBUE ilities. decodeTOUTF8 (key);
}
catch (CharacterCodi ngException e){
throw new Runt imeExcept ion ("The provided key was not UTF8 encoded. ",e);}
return new BytesToken (collator. getcollat ionKey (skey). toByteArray());
Token的排序规则与byte的排序规则相同。
如果在配置文件中指定了InitialToken, 读取的逻辑如下:
public Token < BigInteger > fromString (String string)
return new BytesToken (FBUtilities. hexToBytes (string));
}
以上5种Partitioner的主要区别就在于将Key转换为Token的算法与底层数据存储中key的排序顺序。如果不想使用Cassandra随机分配的Token,可以按照各种Partitioner从配置文件中读取Token的规则指定相应的InitialToken。
1.2 Gossip:集群节点之间的通信协议
在Cassandra中实现集群中节点通信主要由两个部分的实现:FailureDetector(失效节点的检验)与 Gossiper(节点之间的状态传递)。
1.2.1 FailureDetector
FailureDetector会记录集群中其他节点与本节点的通信历史,然后根据当前时间和某一个节点的通信历史,判断某一个节点是否还存活。
FailureDetector实现了org. apache. cassandra. gms. IFailureDetector接口,它的主要方法与相关作用如下。
1) public boolean isAlive( InetAddress ep):判断集群中的某个节点是否存活。
2) public void interpret( InetAddress ep):根据当前时间和该节点的通信历史,判断该节点是否还存活,如果该节点失效,则触发org. apache. cassandra. gms. IFailureDetectionEven-tListener的convict事件,即通知Gossiper从集群中将该节点设置为已经失效状态。该方法在Gossiper中每隔1s对集群中的每一个节点进行一次判断。
3) public void report( InetAddress ep):获取某- 个节点的通信信息,记录到该节点的通信历史中。该方法在Gossiper每接收到集群中另一个节点的通信消息( GossipDigestSynMes-sage)时调用。
4) public void registerFailureDetectionEventListener( IFailureDetectionEventListener listener) :在FailureDetector中注册IFailureDetectionEventListener, 这个IFailureDetectionEventListener 就是Gossiper,即在FailureDetector 发现某个节点失效时,自动调用Gossiper的convict 方法,将该结点的状态设置为失效。
1.2.2 Gossiper
Cassandra集群没有中心节点,各个节点的地位完全相同,节点之间通过一种叫做 Gossip 的协议进行通信,用于维护集群的状态。通过 Gossip,每个节点都能知道集群中包含哪些节点,以及每一个节点的状态。这使得 Cassandra 集群中的任何一个节点都可以完成任意读取和写入操作,若任意一个节点失效,整个集群依旧正常工作。
在Gossip初始化的时候,将构造4个集合,分别保存集群中存活的节点 (liveEndpoints_)、失效的节点 (unreachableEndpoints_)、种子节点 (seeds_)和各个节点信息 (endpointStateMap_)。
Cassandra启动时会从配置文件中加载seeds的信息(这个配置项中指定了集群中的原始节点地址)到seeds_中,然后启动一个 GossipTask 定时任务,每隔1s执行一次。
首先更新本节点的心跳版本号,然后构造需要发送给其他节点的GossipDigestSynMes-sage消息,再将GossipDigestSynMessage消息发送给合适的节点,最后通过调用FailureDetec-tor的interpret方法检查集群中是否有失效的节点。
GossipTask中选择发送GossipDigestSynMessage消息给集群中那些节点的逻辑如下:
从集群存活的节点(liveEndpoints) 中随机选择一一个 节点发送GossipDigestSynMessage;然后根据- -定的概率,从失效的节点( unreachableEndpoints _)中随机选取一个节点发送GossipDigestSynMessage;最后,如果之前发送GossipDigestSynMessage 消息的节点中不包含seed节点,或者当前活着的节点数少于seed节点数,则随机向一个seed发送GossipDi-gestSynMessage消息。
1. GossipDigestSynMessage
GossipDigestSynMessage消息作为Gossip通信中的第一步,包含所有节点的地址、心跳版本号与节点状态版本号。接收到GossipDigestSynMessage消息的节点将执行以下操作:
-
根据接收到的GossipDigest集合,调用FailureDetector 的 report 方法更新集群中节点的状态。
-
对接收到的GossipDigestSynMessage 消息中的GossipDigest 集合进行排序。
-
对比接收到的GossipDigest信息与本节点的GossipDigest差异,本节点需要进一步获取的节点信息由 deltaGossipDigestList 保存,本节点需要告诉发送 GossipDigest 信息节点的信息由 deltaEpStateMap 保存。
-
利用 deltaGossipDigestList 和 deltaEpStateMap 构建 GossipDigestAckMessage 消息,并将其发送给发送 GossipDigestSynMessage 消息的节点。
2. GossipDigestAckMessage
GossipDiestAckMessage 消息是 Gossip 通信中的第二步,接收到 GossipDigestAckMessage 消息的节点将执行以下操作:
-
在本地更新 GossipDigestAckMessage 消息中包含需要本节点更新的节点信息并调用 FailureDetector 的 report 方法更新集群中节点的状态。
-
将发送 GossipDigestAckMessage 消息的节点需要的其他节点的信息构造成 GossipDigestAck2Message 消息。
-
将 GossipDigestAck2Message 消息发送 GossipDigestAckMessahe 消息的节点。
3.GossipDigestAck2Message
GossipDigestAck2Message 消息是 Gossip 通信中的第三步也是最后一步,接收到 GossipDigestAck2Message 消息的节点将执行以下操作:在本地更新 GossipDigestAck2Message 消息中包含需要本节点更新的节点信息并调用 FailureDetector 的 report方法更新集群中节点的状态。
通过上面的流程,Gossip 就完成了节点与节点之前信息的交换。简单来说,就是比较集群中不同节点之间的数据差异,然后将不一致的数据统一更新到最新的状态。这样就保证集群中的每一个节点都了解集群中其他各个节点的状态。
1.3 集群的数据备份机制
Cassandra 是一个支持容灾的系统,即数据会在集群中保留多份,这样当某一个机器失效的时候,其他机器仍然有数据备份,从而保证整个服务正常。由于 Cassandra 为每一台机器上面的数据都提供备份,当集群机器的数量比较大的时候,选择哪些机器作为数据备份就尤为重要,特别是当需要跨数据中心的时候,就需要提供机架感应的相关功能。
所有和数据备份相关代码都在 org.apache.cassandra.locator
中,主要包括两个部分的实现: EndpointSnitch(机架感应)与 ReplicationStrategy(数据的备份策略)。
1.3.1 EndpointSnitch
通过机架感应,Cassandra集群中的每一个节点都可以知道哪几台节点和自己属于一个机架,哪几台节点和自己属
于一个数据中心。
所有的机架感应策略都实现了org. apache. cassandra. locator. IEndpointSnitch
接口。Iend-pointSnitch接口包含以下几个重要的方法:
- public String getRack( InetAddress endpoint):判断某一个节点所属的机架名称。
- public String getDatacenter( InetAddress endpoint): 判断某-个节点所属的数据中心名称。
- public List < InetAddress > sortByProximity ( InetAddress address , List < InetAddress> addresses):根据需要比较排序的地址,按照由近到远的规则对地址列表排序。
Cassandra提供了4种实现,可以直接在配置文件中指定选择使用的实现类型。配置文件中默认的选项如下:
endpoint_ snitch: org. apache. cassandra. locator. SimpleSnitchdynamic_ snitch: true(1) SimpleSnitch
(1) SimpleSnitch
是最简单的-种实现,它不提供机架和数据中心的功能,对节点距离排序就是直接返回,实现如下:
public List < InetAddress > sortByProximity (final InetAddress address, List < Ine-tAddress > addresses ) {
return addresses;
}
(2) PropertyFileSnitch
如果使用PropertyFileSnitch,需要在ClassPath 下添加一个名为“ cassandra-rack. properties"的配置文件,里面为每一个节点的地址指定对应的数据中心和机架的名称。格式为: IP = Data Center:Rack,如:
10.21.119.13 = DC3 :RAC1
10.21.119.10 = DC3 :RAC1
10.0.0.13 = DC1 :RAC2
10.21.119.14 = DC3 :RAC2
10.20.114.15 = DC2 :RAC2
# default for unknown nodes
default = DC1 :r1
如果Cassandra启动后,这个文件被修改,修改后的内容也会在1分钟内生效。
PropertyFileSnitch会根据cassandra-rack. properties文件中指定的内容返回某一个地址所属的机架与数据中心,对节点距离排序也会考虑机架和数据中心的因素。
(3) RackInferringSnitch
RackInferringSnitch 的实现与 PropertyFileSnith 类似,有同样的节点距离排序规则,不同的地方在于判断节点所属机架和数据中心的逻辑,实现如下:
public String getBack(InetAddress endpoint) {
return Byte.toString(endpoint.getAddress()[2]);
}
public String getDatacenter(InetAddress endpoint){
return Byte.toString(endpoint.getAddress()[1]);
}
(4) DynamicEndpointSnitch
DynamicEndpointSnitch是一个特殊的实现,它并不能被单独使用,而必须和之前介绍的3种 EndpointSnitch 搭配使用。 DynamicEndpointSnitch 能在原有 EndpointSnitch 的基础上,记录节点与节点之间通信的时间间隔,判断节点之间通信的快慢,从而达到根据实际的通信速度动态选择合适节点的目的。
1.3.2 ReplicationStrategy
通过 ReplicationStrategy,Cassandra 集群可以知道任意一份数据备份的节点信息,同时在节点失效的时候,还能够计算出应该接收 HINT 消息的节点。
org.apache.cassandra.locator.AbstractReplicationStrategy 是所有ReplicationStrategy的基类,它包含以下几个重要的方法:
- public ArrayList < InetAddress > getNaturalEndpoints ( Token searchToken): 从集群中找 出负责所有Token ( searchToken)对应数据的节点集合。
- public abstract List < InetAddress > calculateNaturalEndpoints ( Token searchToken ,TokenMetadata tokenMetadata) :计算在指定的一. 致性哈希圆环( TokenMetadata)中,负责所有Token ( searchToken)对应数据的节点集合
- public Multimap < InetAddress , InetAddress > getHintedEndpoints( Collection < InetAddress >
targets):如果目标节点中(targets)存在失效的节点,根据endpointSnitch从目标节点中计算出最合适的HINT节点。这个方法在Cassandra更新数据的时候使用,如果Cassandra在更新数据时发现某个节点不可用,将会把数据发送给另外一台HINT节点,HINT节点将缓存这部分数据到SystemTable中,等这个不可用的节点恢复后,再将缓存的数据发送给对应的节点,并将HINT节点存储在SystemTable中的数据删除。
Cassandra提供了3种ReplicationStrategy 实现:
(1) SimpleStrategy
这是最简单的ReplicationStrategy实现,它会根据指定的Token在指定的一致性哈 希圆环中按照顺时针方向找出下N个需要备份的节点。
(2) OldNetworkTopologyStrategy
OldNetworkTopologyStrategy 的备份策略与 SimpleStrategy 的备份策略类似:根据指定的Token在指定的一致性哈希圆环中按照顺时针方向找出下N个需要备份的节点。不同的是: OldNetworkTopologyStrategy 在寻找第二个备份节点的时候,会找个与第一个备份节点不在同一个数据中心的节点进行备份;寻找第三个备份节点的时候,会找个与第二个备份节点同数据中心,但是不同机架的节点进行备份;接下来所有的备份节点寻找策略就按照 SimpleStrategy 的备份策略继续寻找。
(3) NetworkTopologyStrategy
NetworkTopologyStrategy在OldNetworkTopologyStrategy的基础上,可以更加详细地指定每-个数据中心需要备份的数据份数。比如我们需要在DC1中备份3份,DC2中备份2份,那么在配置文件中的配置信息如下:
replica_ placement strategy :
org. apache. cassandra. locator . NetworkTopologystrategy
strategy options:
DC1:3
DC2:2
配置文件中就不需要再指定 replication factor 的参数了,因为上面的设置中就已经决定了 replication factor 的份数为5。
NetworkTopologyStrategy会先尝试在同一个数据中心中选择不同的机架作为该数据中心的备份节点,如果该数据中心找不到更多的机架,就会在同一个机架中寻找多个节点进行备份。最终保证每一个数据中心都有相应的备份数,并且每一个数据中心备份的节点尽可能在不同的机架中。
1.4 集群状态变化的处理机制
在Cassandra中,如果节点与节点之间通过Gossiper协议发现集群中的状态发生了变化(机器失效、新的机器加入等),将以事件的形式通知这些事件的订阅者。所有对这些事件感兴趣的订阅者需要实现org apache. cassandra. gms. IendpointStateChangeSubscriber
接口,它包含的重要方法如下:
- public void onJoin( InetAddress endpoint,EndpointState epState):当有新节点加人集群的时候,触发该事件。
- public void onChange( InetAddress endpoint , ApplicationState state, VersionedValue value) :当有新节点加人集群的时候,触发该事件。
- public void onAlive( InetAddress endpoint , EndpointState state):当有失效节点恢复服务的时候,触发该事件。
- public void onDead ( InetAddress endpoint , EndpointState state):当有节点失效的时候,触发该事件。
- public void onRemove ( InetAddress endpoint):当有节点被移出集群的时候,触发该事件。
org.apache.cassandra. gms.IendpointStateChangeSubscriber 接口有3个实现类: StorageLoadBalancer、StorageService 和 MigrationManager。
1.4.1 StorageLoadBalancer
StorageLoadBalancer 能够计算集群中每一个节点的负载(磁盘中数据量的大小),并提供 Cassandra 集群的负载均衡。
每当集群状态发生变化的时候, StorageLoadBalancer 所做的事情就是将节点负载变化散播出去。
当集群中某个节点的状态发生改变的时候,处理逻辑如下:
public void onChange(InetAddress endpoint,ApplicationState state,VersionedValue value){
if(state != ApplicationState.LOAD) return;
loadInfo_.put(endpoint,Double.paraseDouble(value.value));
}
当集群中有新节点加入的时候,处理逻辑如下:
public void onJoin(InetAddress endpoint,EndpointState epState){
VersionedValue localValue = epState.getApplicationState(ApplicationState.LOAD);
if(localValue != null){
onChange(endpoint,ApplicationState.LOAD,localValue);
}
}
1.4.2 StorageService
StorageService 抽象了整个 Cassandra集群的关系。通过它可以获取到整个集群的信息,并且可以管理整个集群,如删除某一个节点、移动某一个节点、为一个新加入的节点初始化数据等。
当集群中某个节点的状态发生改变的时候,处理逻辑如下:
public void onChange(InetAddress endpoint,ApplicationState state,VersionedValue value){
if(state != ApplicationState.LOAD) return;
String apStateValue = value.value;
String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR,-1);
assert(pieces.length > 0);
String moveName = pieces[0];
if(moveName.equals(VersionedValue.STATUS_BOOTSTARAPPING))
hanleStateBootstrap(endpoint,pieces);
else if(moveName.equals(VersionedValue.STATUS_NOMAL))
handleStateNormal(endpoint,pieces);
else if(moveName.equals(VersionedValue.STATUS_LEAVING))
handleStateLeaving(endpoint,pieces);
else if(moveName.equals(VersionedValue.STATUS_LEFT))
handleStateLeft(endpoint,pieces);
}
当集群中有新节点加入的时候,处理逻辑如下:
public void onJoin(InetAddress endpoint,EndpointState epState){
for(Map.Entry<ApplicationState,VersionedValue> entry : epState.getApplicationStateMap().entrySet()){
onChange(endpoint,entry.getKey(),entry.getValue());
}
}
当有失效节点恢复服务的时候,处理逻辑如下:
public void onAlive(InetAddress endpoint,EndpointState state){
if(!isClientMode)
deliverHints(endpoint);
}
当有节点被移出集群的时候,处理逻辑如下:
public void onRemove(InetAddress endpoint){
tokenMetadata_.removeEndpoint(endpoint);
calculatePendingRanges();
}
当有节点失效的时候,处理逻辑如下:
public void onDead(InetAddress endpoint,EndpointState state){
MessagingService.instance.covict(endpoint);
}
1.4.3 MigrationManager
0.7.x之前的 Cassandra 版本无法在集群运行的时候动态修改 Schema 信息,如修改 ColumnFamily 的属性、新增 Keyspace 等。MigrationManager 的出现解决了这个问题。
如果集群中有节点的 Schema 信息发生了变更,将触发相应的事件,将变更信息应用到整个集群中。
当集群中某个节点的状态发生改变的时候,处理逻辑如下:
public void onChange(InetAddress endpoint,ApplicationState state,VersionedValue value){
if(state != ApplicationState.SCHEMA) return;
UUID theirVersion = UUID.fromString(value.value);
rectify(theirVersion,endpoint);
}
当有失效节点恢复服务的时候,处理逻辑如下:
public void onChange(InetAddress endpoint,ApplicationState state,VersionedValue value){
if(state != ApplicationState.SCHEMA) return;
UUID theirVersion = UUID.fromString(value.value);
rectify(theirVersion,endpoint);
}
1.5 本章小结
本章从原理和源代码实现上讲解了Cassandra 集群的机制,包括一致性哈希、Gossiper通信协议、集群的备份机制和集群状态变化的处理机制。这些机制是理解Cassandra内部如何工作的基础。基于这点,后面的章节将详细讲解Cassandra如何实现数据的写人与读取以及相应的操作。
只有理解了Cassandra 内部的工作机制,才能更好地使用和优化Cassandra。