ScyllaDB源码分析-01: 轻松入门分布式架构源码
0. 大纲
- 引言
- ScyllaDB架构思想篇
- ScyllaDB整体流程分析
a) 了解源码结构
b) 快速弄清流程
c) 确立组件依赖
一. 引言
上节介绍了ScyllaDB的背景及应用场景之后,让我们来单独更多的了解ScyllaDB。
首先ScyllaDB是个强p2p数据库,ElasticSearch以及Kafka是个简化版的p2p数据库,由于它们会首先选举出Controller进行代理操作。而ScyllaDB不一样,它采用了基于Ring的操作方式,更加去中心化,客户端可以连接任意节点作为集群Coordinator节点代理接口服务。当然有人会质疑kafka究竟是不是数据库,kafka有事务,并且官方的目标也是构造实时计算的数据库基础设施。
p2p数据库有什么优点与缺点呢?
由于p2p数据库是没有主节点的,所以扩容是非常方便的,使得运维成本大大降低。只要指定seed节点,分分钟扩容操作即可自动完成。seed节点相当于我们的稳定代理节点,相当于整个集群的地址薄管理处,仅作为整个集群的元数据代理入口。
但是另一方面,由于没有主节点,所有的操作必须由选举的Controller代理进行操作(去中心化程度低),或者通过共识协议大家商量一起处理(去中心化程度高)。ScyllaDB属于后一种情况。这样就会导致另一个问题,集体成员越多,进行协商的网络效应越强,使得网络产生一定量的延迟。所以这种共识机制往往节点数不能过多,只能维持在上千个节点左右。如果要上万,那么只能进行委托进行超级节点的选举。前一种去中心化程度低情况相当于超级节点为1的选举。
p2p特性使得一个数据库非常容易维护。而ScyllaDB不只做了这些。ScyllaDB强大的特性太多了。
ScyllaDB有着强大的服务接口,reset api, 兼容Cassandra类SQL的CQL查询,跨语言的thrift服务接口,以及兼容DynamoDB的服务接口。
ScyllaDB有着强大的存储模型。内存表Memtable,物理表SSTable,恢复日志CommitLog,删除墓碑Tombstones这些都是基本必备。分布式协调功能也不可少,它还有Snitch数据网络架构分布, VNode动态节点分布, Partitioner基础分片功能. 然后它还加入了Hinted Hanfdoff, Anti-Entrop, Read Repair, batchlog以及Throttle限流, 使得数据一致性及稳定性大为增强,运维自动化程度极其高效。甚至似,它还引入了lwt轻量级事务功能,甚至它还允许持久化cache进行预热,简直是母亲版的呵护,让人不禁有十一分的感动!
同时,在此之上ScyllaDB还有强大的数据模型,半结构化支持非常好。有set, list, map,甚至允许自定义数据类型,类似于struct(map的固态模式化版本)。物化视图功能的引入对于二级索引也是非常大的增强。并且对于数据的时效性,有ttl时间周期的支持,非常好用。
当然这都是细节,在此之上,还有更强大的分区功能,宽表功能(基于partition key跟cluster key)。真的是太强大了,自己慢慢体会。
对于集群的运行情况,必不可以的就是监控功能了!ScyllaDB也是非常键全的,提供了JMX与prometheus接口。JMX接口是异常强大,包罗万象,甚至可以回调,实时修改参数。而基于prometheus之上担任了scylla-monitoring,通过Grafana十分友好的展示着数据的运行情况。
二. ScyllaDB架构思想篇
ScyllDB的架构由很多服务组成。
- 启动时,加载配置信息,得到网络拓扑结构Snitch。
接着,加载memtable, sstable, commitlog物理存储后启动Storage Engine。 - 通过gossip p2p协议加入节点到集群中。
- 通过物理存储建立数据模型,生成各种系统keyspaces并进行commit log的数据恢复操作,在此之上提供query processor查询引擎。
- 启动分布式基础RPC服务, message service, storage proxy, storage service。
- 启动外部接口服务, rest api, thrift, cql, alternator。
我们这里仅讲解cql服务。
1) 当用户调用cql服务的时候,cql调用storage service进行处理。
storage service则根据操作类型,选择本地操作或者分布式操作。如果是分布式操作则通过storage proxy分发到其它机器。
- 这里的storage proxy是走的message service 协议,这里我们后面会讲到, storage proxy是message service协议里面的一个handler,还有其它服务handler。
- 当其它机器接受到message service之后,调用storage proxy handler进行处理,再进一步将控制权交回了其它机器的storage service。这里就相当了一个完美的闭环了。
- 在整个分布式协调过程中,所有的操作处理最终由底层的Storage Engine完成。
所以基本上整个流程是:
cql -> storage service -> storage proxy -> message service(rpc) -> storage proxy -> storage service -> storage engine
现在分布式基础架构这里就介绍完了,然后我们来大致过一下源码。我们这一节主要集中在分布式服务这一块,后面的Storage Engine需要花更多的篇章完成,敬请期待。
三. ScyllaDB整体流程分析
如何去快速了解一下源码呢?这里我们开始进行实践。
首先我们要了解架构思想,前面一个章节已经讲过了,也许你了解得没有我多,这个不是主要问题,后面可以慢慢补充。一般这个周期主要是系统性看书看文档,在两个星期至一个月左右。当然如果有老司机带队,就可以完全加速了。对于新人的话,时间久点也没关系,这里只是给出基本目标作为参考。
那我们从哪里开始呢?从功能交互开始是最为方便的。
1. 了解源码
我们知道ScyllaDB的CQL服务端口是9042, 在配置文件里面进行配置:
- conf/scylla.yaml
# port for the CQL native transport to listen for clients on
# For security reasons, you should not expose this port to the internet. Firewall it if needed.
native_transport_port: 9042
那么我们对源码进行搜索
larrys-MacBook-Pro:scylla larluo$ find . -name "*.cc" | grep -v '/tests/' | xargs -I {} grep -nH native_transport_port {}
./db/config.cc:500: "Enable or disable the native transport server. Uses the same address as the rpc_address, but the port is different from the rpc_port. See native_transport_port.")
./db/config.cc:501: , native_transport_port(this, "native_transport_port", value_status::Used, 9042,
./db/config.cc:503: , native_transport_port_ssl(this, "native_transport_port_ssl", value_status::Used, 9142,
./db/config.cc:505: "Enabling client encryption and keeping native_transport_port_ssl disabled will use encryption"
./db/config.cc:506: "for native_transport_port. Setting native_transport_port_ssl to a different value"
./db/config.cc:507: "from native_transport_port will use encryption for native_transport_port_ssl while"
./db/config.cc:508: "keeping native_transport_port unencrypted")
./db/config.cc:753: native_transport_port.add_command_line_option(init, "cql-port", "alias for 'native-transport-port'");
./service/storage_service.cc:2294: std::vector<listen_cfg> configs({ { socket_address{ip, cfg.native_transport_port()} }});
./service/storage_service.cc:2318: if (cfg.native_transport_port_ssl.is_set() && cfg.native_transport_port_ssl() != cfg.native_transport_port()) {
./service/storage_service.cc:2319: configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, std::move(cred)});
由此可见这个服务是在 service/storage_service.cc 里面启动.
进一看法进行追查可以看到来自于storage_service::start_native_transport:
future<> storage_service::start_native_transport() {
...
...
return f.then([cserver, configs = std::move(configs), keepalive] {
return parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) {
return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, keepalive).then([cfg] {
slogger.info("Starting listening for CQL clients on {} ({})"
, cfg.addr, cfg.cred ? "encrypted" : "unencrypted"
);
});
});
}
...
...
最终可以发现,storage service启动了cql_transport::cql_server::listen进行服务监听。
现在我们有了目标,就是storage service,并且我们从架构上也知道了它的功能。那么我们接着就要看它是如何启动的。
因为启动过程细节比较多,所以我们主要依据两条线索,查看前30名的文件名确立重要模块专攻点,以及日志追踪。
larrys-MacBook-Pro:scylla larluo$ find . -name "*.cc" | grep -vE '/tests/|/api/|/alternator/|/thrift/' | xargs -I {} wc -l {} | sort -nr | head -30
3820 ./types.cc
3789 ./service/storage_proxy.cc
3558 ./service/storage_service.cc
3433 ./sstables/sstables.cc
2905 ./db/schema_tables.cc
2592 ./mutation_partition.cc
2568 ./table.cc
2562 ./repair/row_level.cc
2331 ./utils/logalloc.cc
2319 ./gms/gossiper.cc
2181 ./db/commitlog/commitlog.cc
2109 ./db/system_keyspace.cc
1993 ./database.cc
1818 ./db/view/view.cc
1755 ./mutation_reader.cc
1641 ./transport/server.cc
1621 ./cql3/statements/select_statement.cc
1470 ./repair/repair.cc
1442 ./sstables/mc/writer.cc
1332 ./row_cache.cc
1319 ./schema.cc
1223 ./message/messaging_service.cc
1167 ./main.cc
1079 ./service/migration_manager.cc
998 ./db/hints/manager.cc
981 ./sstables/compaction.cc
979 ./flat_mutation_reader.cc
962 ./cql3/restrictions/statement_restrictions.cc
905 ./sstables/compaction_strategy.cc
867 ./sstables/compaction_manager.cc
这里我们已经排除掉了api, alternator(DynamoDB), thrift服务接口。
主要可以归类为四部分:
第一部分是基础模块
第一部分是网络模块
第二部分是服务模块
第三部分是存储模块
基础模块有:
3820 ./types.cc
2331 ./utils/logalloc.cc
网络模块有:
2319 ./gms/gossiper.cc
服务模块有:
3789 ./service/storage_proxy.cc
3558 ./service/storage_service.cc
1641 ./transport/server.cc
1621 ./cql3/statements/select_statement.cc
1223 ./message/messaging_service.cc
1167 ./main.cc
1079 ./service/migration_manager.cc
962 ./cql3/restrictions/statement_restrictions.cc
存储模块有:
3433 ./sstables/sstables.cc
2905 ./db/schema_tables.cc
2592 ./mutation_partition.cc
2568 ./table.cc
2562 ./repair/row_level.cc
2181 ./db/commitlog/commitlog.cc
2109 ./db/system_keyspace.cc
1993 ./database.cc
1818 ./db/view/view.cc
1755 ./mutation_reader.cc
1470 ./repair/repair.cc
1442 ./sstables/mc/writer.cc
1332 ./row_cache.cc
1319 ./schema.cc
998 ./db/hints/manager.cc
981 ./sstables/compaction.cc
979 ./flat_mutation_reader.cc
905 ./sstables/compaction_strategy.cc
867 ./sstables/compaction_manager.cc
基础模块属于源码细节,我们暂且不过多关注,需要时再去了解。
- 网络模块比较底层,我们后面再来分析,这里代码量不多是因为主要功能都是在基础库中实现,复杂度不一定低。
- 服务模块跟分布式架构比较密切。也是我们这章的关注点。
./main.cc为程序启动入口
./service/storage_proxy.cc, ./service/storage_service.cc这两个已经提到比较多了,属于核心模块.
./message/messaging_service.cc 前面提到过,属于不同节点的通讯接口,被storage_proxy使用。
./transport/server.cc 里面包含了我们的cql_server外部接口实现,用于处理用户交互协议
./cql3/statements/select_statement.cc 里面包含cql解析引擎,也是数据库查询引擎的核心技术。
./cql3/restrictions/statement_restrictions.cc 里面包含了cql解析引擎之后的存储查询转换,相当于将前面解析的cql转化为执行计划的过程。 - 存储模块是数据库的核心。所以占比非常大,内容非常多,我们第一步先不分析它,后面进行拆解分析。这里唯一需要了解的地方是./database.cc 代表了系统架构中的Storage Engine角色,是整个存储模块的入口点。
2. 快速弄清流程
接着进入日志追踪阶段,也就是流程梳理阶段。
有条件的可以运行集群并查看日志,或者从生产环境拿日志。
这里我为了加强功力,采用零依赖手撕源码。有条件的可以用日志来辅助分析。
那么现在就从./main.cc开始,首先查看打印消息。
代码打印消息是非常要用的,切记,也是运维排错的第一入手点。
int main(int ac, char** av) {
print_starting_message(ac, av, parsed_opts);
...
return app.run(ac, av, [&] () -> future<int> {
...
return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs, &pctx, &prometheus_server, &return_value, &cf_cache_hitrate_calculator,
&feature_service] {
...
startlog.info("Scylla version {} starting.", scylla_version());
...
supervisor::notify("starting prometheus API server");
...
supervisor::notify("creating tracing");
...
supervisor::notify("creating snitch");
...
supervisor::notify("determining DNS name");
...
supervisor::notify("starting API server");
...
supervisor::notify("initializing storage service");
...
supervisor::notify("starting per-shard database core");
...
supervisor::notify("creating data directories");
...
supervisor::notify("creating commitlog directory");
...
supervisor::notify("creating hints directories");
...
supervisor::notify("verifying directories");
...
supervisor::notify("starting gossip");
...
supervisor::notify("starting storage proxy");
...
supervisor::notify("starting migration manager");
...
supervisor::notify("starting query processor");
...
supervisor::notify("initializing batchlog manager");
...
supervisor::notify("loading system sstables");
...
supervisor::notify("loading non-system sstables");
...
supervisor::notify("starting view update generator");
...
supervisor::notify("discovering staging sstables");
...
supervisor::notify("setting up system keyspace");
...
supervisor::notify("starting commit log");
...
supervisor::notify("replaying commit log");
...
supervisor::notify("replaying commit log - flushing memtables");
...
supervisor::notify("replaying commit log - removing old commitlog segments");
...
supervisor::notify("initializing migration manager RPC verbs");
...
supervisor::notify("initializing storage proxy RPC verbs");
...
supervisor::notify("starting streaming service");
...
supervisor::notify("starting hinted handoff manager");
...
supervisor::notify("starting messaging service");
...
supervisor::notify("starting storage service", true);
...
supervisor::notify("starting batchlog manager");
...
supervisor::notify("starting load broadcaster");
...
supervisor::notify("starting cf cache hit rate calculator");
...
supervisor::notify("starting view update backlog broker");
...
supervisor::notify("allow replaying hints");
...
supervisor::notify("starting native transport");
...
supervisor::notify("serving");
...
startlog.info("Scylla version {} initialization completed.", scylla_version());
...
startlog.info("Scylla version {} shutdown complete.", scylla_version());
}
}
}
这里面内容不算多,我们快速过一遍,我们可以看到:
- 启动prometheus监控及tracing。
supervisor::notify("starting prometheus API server");
(void)prometheus::start(prometheus_server, pctx);
supervisor::notify("creating tracing");
tracing::tracing::create_tracing(tracing_backend_registry, "trace_keyspace_helper").get();
- 加载网络信息snitch
supervisor::notify("creating snitch");
i_endpoint_snitch::create_snitch(cfg->endpoint_snitch()).get();
supervisor::notify("determining DNS name");
return gms::inet_address::lookup(api_address, family, preferred).get0();
- 启动api服务
API服务是先创建,最后整个流程完成后开启。
supervisor::notify("starting API server");
with_scheduling_group(maintenance_scheduling_group, [&] {
return ctx.http_server.listen(socket_address{ip, api_port});
}).get();
startlog.info("Scylla API server listening on {}:{} ...", api_address, api_port);
...
api::set_server_done(ctx).get();
supervisor::notify("serving");
- 构建storage service
supervisor::notify("initializing storage service");
(void)init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg);
- 构建Storage Engine
supervisor::notify("starting per-shard database core");
db.start(std::ref(*cfg), dbcfg).get();
supervisor::notify("creating data directories");
dirs.touch_and_lock(db.local().get_config().data_file_directories()).get();
supervisor::notify("creating commitlog directory");
dirs.touch_and_lock(db.local().get_config().commitlog_directory()).get();
supervisor::notify("creating hints directories");
dirs.touch_and_lock(db.local().get_config().hints_directory()).get();
supervisor::notify("verifying directories");
- 构建message service以及gossip组件
supervisor::notify("starting gossip");
init_ms_fd_gossiper(gossiper
, feature_service
, *cfg
, listen_address
, storage_port
, ssl_storage_port
, tcp_nodelay_inter_dc
, encrypt_what
, trust_store
, cert
, key
, prio
, clauth
, cfg->internode_compression()
, seed_provider
, memory::stats().total_memory()
, scfg
, cluster_name
, phi
, cfg->listen_on_broadcast_address());
- 构建storage proxy
supervisor::notify("starting storage proxy");
proxy.start(std::ref(db), spcfg, std::ref(node_backlog)).get();
- 构建migration manager
supervisor::notify("starting migration manager");
mm.start().get();
- 构建batchlog
supervisor::notify("initializing batchlog manager");
db::get_batchlog_manager().start(std::ref(qp), bm_cfg).get();
- 加载数据模型
supervisor::notify("loading system sstables");
distributed_loader::ensure_system_table_directories(db).get();
supervisor::notify("loading non-system sstables");
distributed_loader::init_non_system_keyspaces(db, proxy).get();
supervisor::notify("starting view update generator");
view_update_generator.start(std::ref(db), std::ref(proxy)).get();
supervisor::notify("discovering staging sstables");
supervisor::notify("setting up system keyspace");
db::system_keyspace::setup(db, qp, service::get_storage_service()).get();
supervisor::notify("starting commit log");
- 添加messaging_service rpc处理器migration manager
supervisor::notify("initializing migration manager RPC verbs");
service::get_migration_manager().invoke_on_all([] (auto& mm) {
mm.init_messaging_service();
}).get();
- 添加messaging_service rpc处理器storage proxy
supervisor::notify("starting batchlog manager");
proxy.invoke_on_all([] (service::storage_proxy& p) {
p.init_messaging_service();
}).get();
- 启动streaming service
supervisor::notify("starting streaming service");
streaming::stream_session::init_streaming_service(db, sys_dist_ks, view_update_generator).get();
- 启动hinted handoff manager
(void)local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this(), ss.shared_from_this());
- messaging service添加REPAIR_CHECKSUM_RANGE处理器
supervisor::notify("starting messaging service");
// Start handling REPAIR_CHECKSUM_RANGE messages
netw::get_messaging_service().invoke_on_all([&db] (auto& ms) {
ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, dht::token_range range, rpc::optional<repair_checksum> hash_version) {
auto hv = hash_version ? *hash_version : repair_checksum::legacy;
return do_with(std::move(keyspace), std::move(cf), std::move(range),
[&db, hv] (auto& keyspace, auto& cf, auto& range) {
return checksum_range(db, keyspace, cf, range, hv);
});
});
}).get();
- 启动storage service
supervisor::notify("starting storage service", true);
ss.init_messaging_service_part().get();
ss.init_server_without_the_messaging_service_part().get();
- 启动batchlog manager
supervisor::notify("starting batchlog manager");
db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
return b.start();
}).get();
- 启动load broadcaster
supervisor::notify("starting load broadcaster");
lb->start_broadcasting();
- 启动cf cache hit rate calculator
supervisor::notify("starting cf cache hit rate calculator");
cf_cache_hitrate_calculator.start(std::ref(db), std::ref(cf_cache_hitrate_calculator)).get();
- 启动view update backlog broker
supervisor::notify("starting view update backlog broker");
view_backlog_broker.start(std::ref(proxy), std::ref(gms::get_gossiper())).get();
view_backlog_broker.invoke_on_all(&service::view_update_backlog_broker::start).get();
- 启动外部服务接口
supervisor::notify("starting native transport");
with_scheduling_group(dbcfg.statement_scheduling_group, [] {
return service::get_local_storage_service().start_native_transport();
}).get();
if (start_thrift) {
with_scheduling_group(dbcfg.statement_scheduling_group, [] {
return service::get_local_storage_service().start_rpc_server();
}).get();
}
if (cfg->alternator_port() || cfg->alternator_https_port()) {
alternator_server.init(addr, alternator_port, alternator_https_port, creds, cfg->alternator_enforce_authorization()).get();
}
从本章角度来说,主要关注以下几个组件
- storage proxy
首先构建storage proxy,然后挂钩message service
主要用于节点间分布式协调 - query processor
构建cql解析器 - messaging service
节点间rpc服务,用于服务组件添加handler处理器。 - storage service
分布式服务的核心模块,与storage engine分别掌管服务与存储功能。 - native transport
提供CQL外部服务接口
三. 确立组件依赖
- 用户首先发送请求给native_transport:
with_scheduling_group(dbcfg.statement_scheduling_group, [] {
return service::get_local_storage_service().start_native_transport();
}).get();
start_native_transport属于storage_service 模块.
- 接着start_native_transport[service/storage_service.cc] 启动cql_transport::cql_server
future<> storage_service::start_native_transport() {
return f.then([cserver, configs = std::move(configs), keepalive] {
return parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) {
return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, keepalive).then([cfg] {
slogger.info("Starting listening for CQL clients on {} ({})"
, cfg.addr, cfg.cred ? "encrypted" : "unencrypted"
);
});
});
});
}
- 接着cql_server[transport/server.cc]调用query_processor进行处理
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
{
...
return _server._query_processor.local().process(query, query_state, options).then([this, stream, &query_state, skip_metadata] (auto msg) {
tracing::trace(query_state.get_trace_state(), "Done processing - preparing a result");
return this->make_result(stream, msg, query_state.get_trace_state(), skip_metadata);
}
- 接着query_processor[cql3/query_processor.cc]解析cql后调用storage proxy进行处理
future<shared_ptr<cql_transport::messages::result_message>>
select_statement::execute(service::storage_proxy& proxy,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
service::query_state& state,
const query_options& options,
gc_clock::time_point now)
{
...
return proxy.query(_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
.then([this, &options, now, cmd] (service::storage_proxy::coordinator_query_result qr) {
return this->process_results(std::move(qr.query_result), cmd, options, now);
});
}
- storage_proxy[service/storage_proxy.cc]挂钩message service,将消息发送给本机storage proxy节点以及其它节点message service。其它节点storage proxy注册message service进行本机storage proxy消息处理。
void storage_proxy::init_messaging_service() {
...
ms.register_counter_mutation([] (const rpc::client_info& cinfo, rpc::opt_time_point t, std::vector<frozen_mutation> fms, db::consistency_level cl, std::optional<tracing::trace_info> trace_info) {
...
}
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
++_proxy->_stats.data_read_attempts.get_ep_stat(ep);
auto opts = want_digest
? query::result_options{query::result_request::result_and_digest, digest_algorithm()}
: query::result_options{query::result_request::only_result, query::digest_algorithm::none};
if (fbu::is_me(ep)) {
tracing::trace(_trace_state, "read_data: querying locally");
return _proxy->query_result_local(_schema, _cmd, _partition_range, opts, _trace_state, timeout);
} else {
auto& ms = netw::get_local_messaging_service();
tracing::trace(_trace_state, "read_data: sending a message to /{}", ep);
return ms.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](rpc::tuple<query::result, rpc::optional<cache_temperature>> result_hit_rate) {
auto&& [result, hit_rate] = result_hit_rate;
tracing::trace(_trace_state, "read_data: got response from /{}", ep);
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>(rpc::tuple(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid())));
});
}
}
- 所有节点的storage proxy进行storage engine本地数据读取操作
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>
storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts,
tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, uint64_t max_size) {
return db.query(gs, *cmd, opts, prv, trace_state, max_size, timeout).then([trace_state](auto&& f, cache_temperature ht) {
tracing::trace(trace_state, "Querying is done");
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>(rpc::tuple(make_foreign(std::move(f)), ht));
});
}
至此,整个分布式架构源码及组件依赖已经完成。
- storage_service#native transport 启动调用cql_server
- cql_server启动并调用query parser 进行解析
- cql_server解析后调用storage proxy
- storage proxy通过message service进行节点的消息分发
- 每个节点的storage proxy调用database engine存储引擎进行数据的处理操作
请关注下一篇
由于篇幅关系,下一篇将会更细致得讲解各个组件的内容,敬请期待。