StateStore源码解析
一.Trait StateStore
版本化key-value store的基本trait。“StateStore”的每个实例都代表一个状态数据的特定版本。实例由StateStoreProvider提供。
![](https://img.haomeiwen.com/i2818100/18761a7bceddabce.png)
有以下两个实现:
![](https://img.haomeiwen.com/i2818100/9becf7fa08e6fa21.png)
变量:
1.独一无二的标识
![](https://img.haomeiwen.com/i2818100/b31634565c10272d.png)
2.提交更新之前,此store中存放数据的版本
![](https://img.haomeiwen.com/i2818100/19e2917d14ff1a73.png)
方法:
1.获得一个非空key的当前值
![](https://img.haomeiwen.com/i2818100/70673b3bda5227b0.png)
2.对一个非空的key赋新值。具体实现必须知道参数中的UnsafeRow可以被重用,且必须拷贝数据以进行持久化。
![](https://img.haomeiwen.com/i2818100/b35d9f180a2db4a7.png)
3.移除一个非空的key
![](https://img.haomeiwen.com/i2818100/f2a45f78f527aabd.png)
4.通过可选的start和end获取key-value对
![](https://img.haomeiwen.com/i2818100/72e414639cf737a7.png)
5.提交store中所有的更新,并且返回一个新版本。具体实现应该确保在提交之后不会有更多的更新(put、remove)以免应用不当。
![](https://img.haomeiwen.com/i2818100/053601e6f09d030a.png)
6.中止store中所有的更新。
![](https://img.haomeiwen.com/i2818100/1d42d9f74840a39e.png)
7.返回一个包含StateStore中所有key-value对的迭代器。
![](https://img.haomeiwen.com/i2818100/13e15cacc1ca9d65.png)
8.StateStore的当前指标
![](https://img.haomeiwen.com/i2818100/05fb9c6abd103cfd.png)
9.是否所有的更新都已提交
![](https://img.haomeiwen.com/i2818100/594a316bef5b8cec.png)
二.Trait StateStoreProvider
提供一个表示状态数据版本的StateStore实例
提供者及其提供存储的生命周期如下:
(1)第一批流查询的数据在executor上被执行时,一个StateStoreProvider在一个executor中被创建,每个独一无二的StateStoreId都会创建一个。所有后续的批次重用这个实例直到查询结束。
(2)每批流数据通过调用getStore(version)请求状态数据的特定版本,该版本返回StateStore的实例,通过该实例可以访问所需版本的数据。provider负责用上下文信息(如key-value模式)填充该存储。
(3)在流查询停止后,已经创建的provider实例被延迟处理掉。
![](https://img.haomeiwen.com/i2818100/1ddf940cdf24776e.png)
有以下两个实现:
![](https://img.haomeiwen.com/i2818100/19d4edd0b7d92763.png)
方法:
1.返回此provider生成的StateStore的ID。应与init()中传递的相同。
![](https://img.haomeiwen.com/i2818100/8a0c9569d25b016e.png)
2.通过SQL操作符初始化provider的上下文信息。通过反射创建StateStoreProvider的实例后,将实现调用此方法。
![](https://img.haomeiwen.com/i2818100/aff309ed0fc18298.png)
3.当从executor卸载provider实例时调用
![](https://img.haomeiwen.com/i2818100/c35aa489924b5b30.png)
4.返回指定版本状态数据的StateStore的实例
![](https://img.haomeiwen.com/i2818100/58a22cdba088b3af.png)
三.具体实现:HDFSBackedStateStoreProvider
![](https://img.haomeiwen.com/i2818100/1ab0633f31522a41.png)
StateStoreProvider和StateStore的具体实现,其中所有数据都由hdfs兼容的文件系统中的文件支持。对store所有的更新都必须以事务的方式完成,每一组更新都会对store的版本进行递增。这些版本可用于在正确版本的state上重新执行更新(通过rdd操作中的重试),并重新生成存储版本。
如何使用:
要更新stateStore中的数据,需要执行以下操作步骤。
1.获取正确的store
![](https://img.haomeiwen.com/i2818100/2cfe67c3703c739b.png)
![](https://img.haomeiwen.com/i2818100/ac183d5a810d3f71.png)
容错模型:
1.提交之前,每一组更新都被写入一个delta文件
2.state store负责管理、折叠和清理这些delta文件
3.多次提交相同版本的更新可能会互相覆盖,一次性保证取决于多次尝试是否具有相同的更新和底层文件系统的覆盖语义。
4.后台文件维护确保存储的最后版本总是可恢复的,以确保重新执行RDD操作重新应用正确的过去版本。
接下来看看这个类:
![](https://img.haomeiwen.com/i2818100/93d623fea47c7223.png)
用一个concurrentHashMap来存key-value对。
其中实现了一些方法:
![](https://img.haomeiwen.com/i2818100/6abca5882555794e.png)
比较重要的commit方法:
![](https://img.haomeiwen.com/i2818100/4fd44341f39ca644.png)
点进去commitUpdates方法:
![](https://img.haomeiwen.com/i2818100/8061b84eb0ede3f0.png)
进入到putStateIntoStateCacheMap方法,发现一句:
![](https://img.haomeiwen.com/i2818100/0f102c7066352211.png)
查看loadedMaps:
![](https://img.haomeiwen.com/i2818100/489518ceaacc308c.png)
原来是创建了个TreeMap来存放每个版本的数据,key为版本号,value就是每个版本自己的map。
四.IncrementalExecution
![](https://img.haomeiwen.com/i2818100/8e6459580a56ca3b.png)
QueryExecution的继承,将给定的LogicalPlan增量化执行。在每次执行之间保持状态。
![](https://img.haomeiwen.com/i2818100/6a970a425e189fd8.png)
获取StateStore的数量
进入SHUFFLE_PARITIONS
![](https://img.haomeiwen.com/i2818100/eb6d54d14071941c.png)
可以看到,partition的数量由参数spark.sql.shuffle.partitions控制,如果不设置,默认为200.
接着往下看
![](https://img.haomeiwen.com/i2818100/6f3f45422d42e4f4.png)
获得下一个有状态操作的state信息,进入StatefulOperatorStateInfo看看
![](https://img.haomeiwen.com/i2818100/4bd59c93159be6ed.png)
这个方法只返回了一些字符串信息,但在下文通过这个信息调用了StateStoreSaveExec方法
![](https://img.haomeiwen.com/i2818100/adc6818abcd94117.png)
这个方法创建了stateManager来管理stateStore,对stateStore进行具体的get,put等操作
![](https://img.haomeiwen.com/i2818100/17052c19a8feb63f.png)
接着调用了doExecute方法
![](https://img.haomeiwen.com/i2818100/002d9b72c286aa46.png)
在这个方法里有这样一句
![](https://img.haomeiwen.com/i2818100/7af00ba97a0e82d3.png)
进入mapPartitionsWithStateStore中
![](https://img.haomeiwen.com/i2818100/457cbf1fc4f8163b.png)
将RDD中的每个分区与StateStore中的数据进行映射
这里的参数中有一个storeCoordinator
![](https://img.haomeiwen.com/i2818100/bdc6c6898b817fd9.png)
进入StateStoreCoordinatorRef
![](https://img.haomeiwen.com/i2818100/4655b1f688bd912c.png)
引用StateStoreCoordinator,可用于在所有executor之间协调StateStore的实例,并获取它们的位置进行作业调度。
以下是StateStoreCoordinator类
![](https://img.haomeiwen.com/i2818100/3d7c828916c6ded1.png)
这个创建了一个Map来存储StateStoreProviderId和host,executorId的存储关系。
在上文调用StateStoreSaveExec方法时,调用了StateStoreRestoreExec方法
![](https://img.haomeiwen.com/i2818100/a0c1432165144922.png)
对于每个输入元祖,将计算key并将StateStore中的值添加到流。
![](https://img.haomeiwen.com/i2818100/71224586ca8fac99.png)
由此可知,State Store是分布式的存储在所有Executor上的。