Bigflow Logical Plan
Logical Plan的概念
Logical Plan是分布式计算过程抽象为数据流在算子上变换的表示图。
核心概念
算子
- 算子是Flume中的基本计算单元,
- 它体现了分布式计算中的’计算’
- Processor是主要的算子
Node
- Node上绑定着相应的自定义算子
Group/Scope
- Scope表示数据的分组方式
-
Scope中的数据按其分组方式分组,每组为一个Group
分别以week和(week,weekday)作为分组应用进行求和
联系
每一个算子, 都必须置于某一个Scope下, 数据以Group为基本处理单位, 算子每次处理一组数据。
Logical Plan的组成
Node
- 组成逻辑计划的基本单位
- Node上绑定着相应的自定义算子
- 每个Node上只有一份数据
- 逻辑执行计划上的所有Node组成一个DAG
- Node 种类:process node\load node \sink node\shuffle node\ union node
Scope/Group
- Scope表示数据的分组方式
- Scope中的数据按其分组方式分组,每组为一个Group
- 逻辑计划上的每个Node都属于某个Scope
- 逻辑计划中的所有Scope组成一棵树
- Scope 的种类:global scope\ shuffle scope \ load scope
算子/Entity
- 算子是可以被反射的C++类, 用以承载用户逻辑. 具体参见 flume/core/entity.h
- 在所有算子中, 数据流被抽象为无类型的, 每条数据的类型都是void*
- 每个带有输出的Node上必须指定Objector算子, 该算子负责完成数据的序列化和-
- 反序列化工作,只有有外部输出和shuffle时,才会进行序列化和反序列化。
注:在 flume/proto/logical_plan.proto 中定义了以上三个概念的proto
message PbLogicalPlan {
repeated PbLogicalPlanNode node = 1;
repeated PbScope scope = 2;
optional PbEntity environment = 3;
}
message PbLogicalPlanNode {
enum Type {
UNION_NODE = 0;
LOAD_NODE = 1;
SINK_NODE = 2;
PROCESS_NODE = 3;
SHUFFLE_NODE = 4;
}
required string id = 1; // uuid
required Type type = 2;
optional bytes debug_info = 3;
optional PbEntity objector = 4;
required string scope = 5; // scope uuid
// according to type, only corresponding member makes sense
optional PbUnionNode union_node = 100;
optional PbLoadNode load_node = 101;
optional PbSinkNode sink_node = 102;
optional PbProcessNode process_node = 103;
optional PbShuffleNode shuffle_node = 104;
}
message PbScope {
required string id = 1; // uuid
optional string father = 2; // scope uuid
optional bool is_sorted = 3 [default = false];
optional uint32 concurrency = 4 [default = 1]; // default value may be ignored by planner
}
message PbEntity { // PbEntity 是可被反射的C++类,可通过类名和config参数来得到相应实例
required string name = 1;
required bytes config = 2;
}
PROCESS_NODE
PROCESS_NODE是基本的数据处理节点, 多入单出, 下面是它的proto定义:
message PbProcessNode {
message Input {
required string from = 1; // 上游来源节点
optional bool is_partial = 101 [default = false]; // 表示是否需要拥有全量数据才能计算(可用于map阶段的预聚合)
optional bool is_prepared = 102 [default = false]; // 表示输入是一个Stream还是一个Collection
};
repeated Input input = 1; // 输入的属性
required PbEntity processor = 2; // 实际的处理逻辑,可由用户自定义
optional int32 least_prepared_inputs = 101 [default = 0]; // 表示至少有几路输入是Collection才能开始计算
}
Processor的接口定义如下:
class Processor {
public:
virtual ~Processor() {}
virtual void Setup(const std::string& config) = 0;
// keys传入该record所在Group在Scope中的位置
// inputs用来传入可迭代的输入.
// emitter用来将结果传给下游节点.
//如果is_prepared为true 数据会在BeginGroup聚集
virtual void BeginGroup(const std::vector<toft::StringPiece>& keys,
const std::vector<Iterator*>& inputs,
Emitter* emitter) = 0;
// index表示传入的记录属于哪路输入.
// 对于第N路输入, 如果inputs[N] != NULL, 则index != N
//如果is_prepared 为false ,数据会直接到Process()
virtual void Process(uint32_t index, void* object) = 0;
// 当前分组处理结束
virtual void EndGroup() = 0;
};
LOAD_NODE
LOAD_NODE代表了框架的输入, 每个Load操作会在Global Scope下创建一个新Scope和该Scope下的一个LOAD_NODE. 之所以Load操作会创建Scope, 是因为数据是按组存储的。
LOAD_NODE的proto定义如下:
message PbLoadNode {
repeated string uri = 1; // 确定数据源所在位置,可以有多个。
required PbEntity loader = 2; // 读取数据采取的方式,类似Hadoop的InputFormat
}
Loader算子的定义比较接近与Hadoop中InputFormat的设定, 分为切分和读取两个部分. 不同的是, Loader算子并不默认输入都是KeyValue形式的. 另外, 在执行的时候, 每个split都是一个数据分组的key.
Loader算子的接口定义如下:
class Loader {
public:
virtual ~Loader() {}
virtual void Setup(const std::string& config) = 0;
// 参数uri指定了数据所在路径,splits是存放了数据切片后的结果,
// 如URI、数据起始位置和偏移量. splits中的每个元素, 都会作为Load的参数
virtual void Split(const std::string& uri, std::vector<std::string>* splits) = 0;
// split是对一个数据块的描述,是 Split() 方法存入的.
// 这个方法中是对这个数据块的具体处理逻辑,比如反序列化和简单过滤等。
virtual void Load(const std::string& split, Emitter* emitter) = 0;
};
SINK_NODE
SINK_NODE代表了框架的输出. 和PROCESS_NODE相同, SINK_NODE也属于某个Scope, 将该组Scope中的每组数据输出到外部系统.
SINK_NODE的proto定义如下:
message PbSinkNode {
required string from = 1; // 指数据所在位置,可以实现每个scope有不同的输出
required PbEntity sinker = 2; // 指输出数据是采取的方式,类似Hadoop的OutputFormat
}
Sinker的接口定义如下:
class Sinker {
public:
virtual ~Sinker() {}
virtual void Setup(const std::string& config) = 0;
// 打开要写入的文件, 相当于Processor中的BeginGroup.
virtual void Open(const std::vector<toft::StringPiece>& keys) = 0;
// 写入实际数据。每条记录都是void*类型,由用户自己转换.
virtual void Sink(void* object) = 0;
// 关闭写入.
virtual void Close() = 0;
};
SHUFFLE_NODE
SHUFFLE_NODE代表分组后的数据流, 由Shuffle操作产生.
其proto定义如下所述:
message PbShuffleNode {
// 数据源参与分组的三种方式.
enum Type {
BROADCAST = 0; // 不参与下面两种处理,所有的记录都会被分发到每一组中
KEY = 1; // 表示按key分组,不同的key属于不同的组
SEQUENCE = 2; // 表示将数据分桶,预先设定桶数,按照某种策略(如hash)将key分到这些桶中
};
required string from = 1; // 上游来源节点
required Type type = 2;
optional PbEntity key_reader = 3; // 表示用来提取key的方式,对应于KEY类型的shuffle
optional PbEntity partitioner = 4; // 表示分桶方式,对应于SEQUENCE类型的shuffle
}
Flume支持两种分组方式:
- 按Key聚集
- 按Key聚集是为参与分组的每条记录附加一个key, 把所有key的记录汇聚到同一组中.
- 分桶
- 分桶是指事先决定好分组数量, 再把每条记录分配到某个桶中的分组方式.
这个过程中涉及到KeyReader和Partitioner两种算子, 其接口定义如下:
class KeyReader {
public:
virtual ~KeyReader() {}
virtual void Setup(const std::string& config) = 0;
// 具体的提取key的逻辑实现,object是整条记录,由用户自己理解其类型.
// buffer是最终存放key的变量,要求必须将key转换为char* 存放到buffer中。
// wing/common 下的comparable.h中提供了专门方法,生成可用来排序的string类型的key。
// 同时提供了升序和降序两种方法。
// 返回值是key的实际长度。
virtual uint32_t ReadKey(void* object, char* buffer, uint32_t buffer_size) = 0;
};
class Partitioner {
public:
virtual ~Partitioner() {}
virtual void Setup(const std::string& config) = 0;
// 返回该条记录应该属于的分桶.
virtual uint32_t Partition(void* object, uint32_t partition_number) = 0;
};
UNION_NODE
UNION_NODE用来将多个数据源和合并为一个数据源统一处理。
其proto定义如下:
message PbUnionNode {
repeated string from = 1; // 用于合并数据流,repeated字段中存放的是多个上游节点。
}
编程实例 - WordCount
为了方便使用。我们也开发了python的相应接口。
下面我就以python为例,让你更好的理解上述的概念。
import sys
from bigflow.core.serde import record_objector
from bigflow import input, base, output
from bigflow.core import entity
from bigflow import serde
class PythonToRecordProcessor(entity.SelfNamedEntityBase):
pass
class PythonFromRecordProcessor(entity.SelfNamedEntityBase):
pass
class WordSpliter:
def __init__(self):
pass
def begin(self, keys, inputs, emitter):
self._emitter = emitter
def process(self, index, record):
words = record.split()
for word in words:
r = (word, 1)
self._emitter.emit(r)
def end(self):
pass
class WordIdentity:
def __init__(self, key_extractor, key_serde):
self.objector = entity.Entity.of(entity.Entity.objector, key_serde) \
.to_proto_message().SerializeToString()
self.read_key = key_extractor
class WordCount:
def __init__(self):
pass
def begin(self, keys, inputs, emitter):
self._emitter = emitter
self._sum = 0
self._word = ""
def process(self, index, record):
if self._word == "" :
self._word = record[0]
self._sum = record[1]
else:
self._sum += record[1]
def end(self):
record = (self._word, self._sum)
self._emitter.emit(record)
pipeline = base.Pipeline.create('local')
plan = pipeline.plan(1)
plan.set_environment(entity.PythonEnvironment())
input_path = sys.path[0] + "/" + "input"
input_urls = [input_path];
output_path = sys.path[0] + "/" + "output"
single_word = plan.load(input_urls)\
.by(input.TextFile(input_urls[0]).input_format).as_type(record_objector.RecordObjector())\
.process_by(PythonFromRecordProcessor()).as_type(serde.any())\
.process_by(WordSpliter()).as_type(serde.any()).leave_scope()
result = plan.shuffle(single_word.scope(), [single_word])\
.with_concurrency(10)\
.node(0).match_by(WordIdentity(lambda x: x[0], serde.any()))\
.process_by(WordCount()).as_type(serde.any())\
.input(0).allow_partial_processing().done()\
.process_by(WordCount()).as_type(serde.any())
plan.shuffle(plan.global_scope(), [result]).node(0).distribute_by_default()\
.process_by(PythonToRecordProcessor()).as_type(record_objector.RecordObjector())\
.sink_by(output.TextFile(output_path).output_format)
pipeline.run()
word count logical plan