ScyllaDB源码分析-01: 轻松入门分布式架构源码

2019-10-13  本文已影响0人  larluo_罗浩

0. 大纲

  1. 引言
  2. ScyllaDB架构思想篇
  3. ScyllaDB整体流程分析
    a) 了解源码结构
    b) 快速弄清流程
    c) 确立组件依赖

一. 引言

上节介绍了ScyllaDB的背景及应用场景之后,让我们来单独更多的了解ScyllaDB。

首先ScyllaDB是个强p2p数据库,ElasticSearch以及Kafka是个简化版的p2p数据库,由于它们会首先选举出Controller进行代理操作。而ScyllaDB不一样,它采用了基于Ring的操作方式,更加去中心化,客户端可以连接任意节点作为集群Coordinator节点代理接口服务。当然有人会质疑kafka究竟是不是数据库,kafka有事务,并且官方的目标也是构造实时计算的数据库基础设施。

p2p数据库有什么优点与缺点呢?
由于p2p数据库是没有主节点的,所以扩容是非常方便的,使得运维成本大大降低。只要指定seed节点,分分钟扩容操作即可自动完成。seed节点相当于我们的稳定代理节点,相当于整个集群的地址薄管理处,仅作为整个集群的元数据代理入口。
但是另一方面,由于没有主节点,所有的操作必须由选举的Controller代理进行操作(去中心化程度低),或者通过共识协议大家商量一起处理(去中心化程度高)。ScyllaDB属于后一种情况。这样就会导致另一个问题,集体成员越多,进行协商的网络效应越强,使得网络产生一定量的延迟。所以这种共识机制往往节点数不能过多,只能维持在上千个节点左右。如果要上万,那么只能进行委托进行超级节点的选举。前一种去中心化程度低情况相当于超级节点为1的选举。

p2p特性使得一个数据库非常容易维护。而ScyllaDB不只做了这些。ScyllaDB强大的特性太多了。
ScyllaDB有着强大的服务接口,reset api, 兼容Cassandra类SQL的CQL查询,跨语言的thrift服务接口,以及兼容DynamoDB的服务接口。

ScyllaDB有着强大的存储模型。内存表Memtable,物理表SSTable,恢复日志CommitLog,删除墓碑Tombstones这些都是基本必备。分布式协调功能也不可少,它还有Snitch数据网络架构分布, VNode动态节点分布, Partitioner基础分片功能. 然后它还加入了Hinted Hanfdoff, Anti-Entrop, Read Repair, batchlog以及Throttle限流, 使得数据一致性及稳定性大为增强,运维自动化程度极其高效。甚至似,它还引入了lwt轻量级事务功能,甚至它还允许持久化cache进行预热,简直是母亲版的呵护,让人不禁有十一分的感动!

同时,在此之上ScyllaDB还有强大的数据模型,半结构化支持非常好。有set, list, map,甚至允许自定义数据类型,类似于struct(map的固态模式化版本)。物化视图功能的引入对于二级索引也是非常大的增强。并且对于数据的时效性,有ttl时间周期的支持,非常好用。
当然这都是细节,在此之上,还有更强大的分区功能,宽表功能(基于partition key跟cluster key)。真的是太强大了,自己慢慢体会。

对于集群的运行情况,必不可以的就是监控功能了!ScyllaDB也是非常键全的,提供了JMX与prometheus接口。JMX接口是异常强大,包罗万象,甚至可以回调,实时修改参数。而基于prometheus之上担任了scylla-monitoring,通过Grafana十分友好的展示着数据的运行情况。

二. ScyllaDB架构思想篇

ScyllDB的架构由很多服务组成。

  1. 启动时,加载配置信息,得到网络拓扑结构Snitch。
    接着,加载memtable, sstable, commitlog物理存储后启动Storage Engine。
  2. 通过gossip p2p协议加入节点到集群中。
  3. 通过物理存储建立数据模型,生成各种系统keyspaces并进行commit log的数据恢复操作,在此之上提供query processor查询引擎。
  4. 启动分布式基础RPC服务, message service, storage proxy, storage service。
  5. 启动外部接口服务, rest api, thrift, cql, alternator。

我们这里仅讲解cql服务。
1) 当用户调用cql服务的时候,cql调用storage service进行处理。
storage service则根据操作类型,选择本地操作或者分布式操作。如果是分布式操作则通过storage proxy分发到其它机器。

  1. 这里的storage proxy是走的message service 协议,这里我们后面会讲到, storage proxy是message service协议里面的一个handler,还有其它服务handler。
  2. 当其它机器接受到message service之后,调用storage proxy handler进行处理,再进一步将控制权交回了其它机器的storage service。这里就相当了一个完美的闭环了。
  3. 在整个分布式协调过程中,所有的操作处理最终由底层的Storage Engine完成。

所以基本上整个流程是:
cql -> storage service -> storage proxy -> message service(rpc) -> storage proxy -> storage service -> storage engine

现在分布式基础架构这里就介绍完了,然后我们来大致过一下源码。我们这一节主要集中在分布式服务这一块,后面的Storage Engine需要花更多的篇章完成,敬请期待。

三. ScyllaDB整体流程分析

如何去快速了解一下源码呢?这里我们开始进行实践。

首先我们要了解架构思想,前面一个章节已经讲过了,也许你了解得没有我多,这个不是主要问题,后面可以慢慢补充。一般这个周期主要是系统性看书看文档,在两个星期至一个月左右。当然如果有老司机带队,就可以完全加速了。对于新人的话,时间久点也没关系,这里只是给出基本目标作为参考。

那我们从哪里开始呢?从功能交互开始是最为方便的。

1. 了解源码

我们知道ScyllaDB的CQL服务端口是9042, 在配置文件里面进行配置:

# port for the CQL native transport to listen for clients on
# For security reasons, you should not expose this port to the internet.  Firewall it if needed.
native_transport_port: 9042

那么我们对源码进行搜索

larrys-MacBook-Pro:scylla larluo$ find . -name "*.cc" | grep -v '/tests/' | xargs -I {} grep -nH native_transport_port {}
./db/config.cc:500:        "Enable or disable the native transport server. Uses the same address as the rpc_address, but the port is different from the rpc_port. See native_transport_port.")
./db/config.cc:501:    , native_transport_port(this, "native_transport_port", value_status::Used, 9042,
./db/config.cc:503:    , native_transport_port_ssl(this, "native_transport_port_ssl", value_status::Used, 9142,
./db/config.cc:505:        "Enabling client encryption and keeping native_transport_port_ssl disabled will use encryption"
./db/config.cc:506:        "for native_transport_port. Setting native_transport_port_ssl to a different value"
./db/config.cc:507:        "from native_transport_port will use encryption for native_transport_port_ssl while"
./db/config.cc:508:        "keeping native_transport_port unencrypted")
./db/config.cc:753:    native_transport_port.add_command_line_option(init, "cql-port", "alias for 'native-transport-port'");
./service/storage_service.cc:2294:                std::vector<listen_cfg> configs({ { socket_address{ip, cfg.native_transport_port()} }});
./service/storage_service.cc:2318:                    if (cfg.native_transport_port_ssl.is_set() && cfg.native_transport_port_ssl() != cfg.native_transport_port()) {
./service/storage_service.cc:2319:                        configs.emplace_back(listen_cfg{{ip, cfg.native_transport_port_ssl()}, std::move(cred)});

由此可见这个服务是在 service/storage_service.cc 里面启动.
进一看法进行追查可以看到来自于storage_service::start_native_transport:

future<> storage_service::start_native_transport() {
   ...
   ...
                return f.then([cserver, configs = std::move(configs), keepalive] {
                    return parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) {
                        return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, keepalive).then([cfg] {
                            slogger.info("Starting listening for CQL clients on {} ({})"
                                            , cfg.addr, cfg.cred ? "encrypted" : "unencrypted"
                                            );
                        });
                    });

}
...
...

最终可以发现,storage service启动了cql_transport::cql_server::listen进行服务监听。

现在我们有了目标,就是storage service,并且我们从架构上也知道了它的功能。那么我们接着就要看它是如何启动的。

因为启动过程细节比较多,所以我们主要依据两条线索,查看前30名的文件名确立重要模块专攻点,以及日志追踪。

larrys-MacBook-Pro:scylla larluo$ find . -name "*.cc" | grep -vE '/tests/|/api/|/alternator/|/thrift/' | xargs -I {} wc -l {} | sort -nr | head -30
    3820 ./types.cc
    3789 ./service/storage_proxy.cc
    3558 ./service/storage_service.cc
    3433 ./sstables/sstables.cc
    2905 ./db/schema_tables.cc
    2592 ./mutation_partition.cc
    2568 ./table.cc
    2562 ./repair/row_level.cc
    2331 ./utils/logalloc.cc
    2319 ./gms/gossiper.cc
    2181 ./db/commitlog/commitlog.cc
    2109 ./db/system_keyspace.cc
    1993 ./database.cc
    1818 ./db/view/view.cc
    1755 ./mutation_reader.cc
    1641 ./transport/server.cc
    1621 ./cql3/statements/select_statement.cc
    1470 ./repair/repair.cc
    1442 ./sstables/mc/writer.cc
    1332 ./row_cache.cc
    1319 ./schema.cc
    1223 ./message/messaging_service.cc
    1167 ./main.cc
    1079 ./service/migration_manager.cc
     998 ./db/hints/manager.cc
     981 ./sstables/compaction.cc
     979 ./flat_mutation_reader.cc
     962 ./cql3/restrictions/statement_restrictions.cc
     905 ./sstables/compaction_strategy.cc
     867 ./sstables/compaction_manager.cc

这里我们已经排除掉了api, alternator(DynamoDB), thrift服务接口。

主要可以归类为四部分:
第一部分是基础模块
第一部分是网络模块
第二部分是服务模块
第三部分是存储模块

基础模块有:

    3820 ./types.cc
    2331 ./utils/logalloc.cc

网络模块有:

    2319 ./gms/gossiper.cc

服务模块有:

    3789 ./service/storage_proxy.cc
    3558 ./service/storage_service.cc
    1641 ./transport/server.cc
    1621 ./cql3/statements/select_statement.cc
    1223 ./message/messaging_service.cc
    1167 ./main.cc
    1079 ./service/migration_manager.cc
     962 ./cql3/restrictions/statement_restrictions.cc

存储模块有:

    3433 ./sstables/sstables.cc
    2905 ./db/schema_tables.cc
    2592 ./mutation_partition.cc
    2568 ./table.cc
    2562 ./repair/row_level.cc
    2181 ./db/commitlog/commitlog.cc
    2109 ./db/system_keyspace.cc
    1993 ./database.cc
    1818 ./db/view/view.cc
    1755 ./mutation_reader.cc
    1470 ./repair/repair.cc
    1442 ./sstables/mc/writer.cc
    1332 ./row_cache.cc
    1319 ./schema.cc
     998 ./db/hints/manager.cc
     981 ./sstables/compaction.cc
     979 ./flat_mutation_reader.cc
     905 ./sstables/compaction_strategy.cc
     867 ./sstables/compaction_manager.cc

基础模块属于源码细节,我们暂且不过多关注,需要时再去了解。

2. 快速弄清流程

接着进入日志追踪阶段,也就是流程梳理阶段。
有条件的可以运行集群并查看日志,或者从生产环境拿日志。
这里我为了加强功力,采用零依赖手撕源码。有条件的可以用日志来辅助分析。
那么现在就从./main.cc开始,首先查看打印消息。
代码打印消息是非常要用的,切记,也是运维排错的第一入手点。

int main(int ac, char** av) { 
    print_starting_message(ac, av, parsed_opts);
    ...
    return app.run(ac, av, [&] () -> future<int> {
        ...
        return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs, &pctx, &prometheus_server, &return_value, &cf_cache_hitrate_calculator,
                               &feature_service] {
            ...
            startlog.info("Scylla version {} starting.", scylla_version());
            ...
            supervisor::notify("starting prometheus API server");
            ...
            supervisor::notify("creating tracing");
            ...
            supervisor::notify("creating snitch");
            ...
            supervisor::notify("determining DNS name");
            ...
            supervisor::notify("starting API server");
            ...
            supervisor::notify("initializing storage service");
            ...
            supervisor::notify("starting per-shard database core");
            ...
            supervisor::notify("creating data directories");
            ...
            supervisor::notify("creating commitlog directory");
            ...
            supervisor::notify("creating hints directories");
            ...
            supervisor::notify("verifying directories");
            ...
            supervisor::notify("starting gossip");
            ...
            supervisor::notify("starting storage proxy");
            ...
            supervisor::notify("starting migration manager");
            ...
            supervisor::notify("starting query processor");
            ...
            supervisor::notify("initializing batchlog manager");
            ...
            supervisor::notify("loading system sstables");
            ...
            supervisor::notify("loading non-system sstables");
            ...
            supervisor::notify("starting view update generator");
            ...
            supervisor::notify("discovering staging sstables");
            ...
            supervisor::notify("setting up system keyspace");
            ...
            supervisor::notify("starting commit log");
                    ...
                    supervisor::notify("replaying commit log");
                    ...
                    supervisor::notify("replaying commit log - flushing memtables");
                    ...
                    supervisor::notify("replaying commit log - removing old commitlog segments");
                    ...
            supervisor::notify("initializing migration manager RPC verbs");
            ...
            supervisor::notify("initializing storage proxy RPC verbs");
            ...
            supervisor::notify("starting streaming service");
            ...
            supervisor::notify("starting hinted handoff manager");
            ...
            supervisor::notify("starting messaging service");
            ...
            supervisor::notify("starting storage service", true);
            ...
            supervisor::notify("starting batchlog manager");
            ...
            supervisor::notify("starting load broadcaster");
            ...
            supervisor::notify("starting cf cache hit rate calculator");
            ...
            supervisor::notify("starting view update backlog broker");
            ...
            supervisor::notify("allow replaying hints");
            ...
            supervisor::notify("starting native transport");
            ...
            supervisor::notify("serving");
            ...
            startlog.info("Scylla version {} initialization completed.", scylla_version());
            ...
          startlog.info("Scylla version {} shutdown complete.", scylla_version());

    }
  }
}

这里面内容不算多,我们快速过一遍,我们可以看到:

            supervisor::notify("starting prometheus API server");
                (void)prometheus::start(prometheus_server, pctx);

            supervisor::notify("creating tracing");
            tracing::tracing::create_tracing(tracing_backend_registry, "trace_keyspace_helper").get();

            supervisor::notify("creating snitch");
            i_endpoint_snitch::create_snitch(cfg->endpoint_snitch()).get();
            supervisor::notify("determining DNS name");
                    return gms::inet_address::lookup(api_address, family, preferred).get0();

            supervisor::notify("starting API server");
            with_scheduling_group(maintenance_scheduling_group, [&] {
                return ctx.http_server.listen(socket_address{ip, api_port});
            }).get();
            startlog.info("Scylla API server listening on {}:{} ...", api_address, api_port);
            ...
            api::set_server_done(ctx).get();
            supervisor::notify("serving");

            supervisor::notify("initializing storage service");         
            (void)init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg);
            supervisor::notify("starting per-shard database core");
            db.start(std::ref(*cfg), dbcfg).get();
            supervisor::notify("creating data directories");
            dirs.touch_and_lock(db.local().get_config().data_file_directories()).get();
            supervisor::notify("creating commitlog directory");
            dirs.touch_and_lock(db.local().get_config().commitlog_directory()).get();
            supervisor::notify("creating hints directories");
                dirs.touch_and_lock(db.local().get_config().hints_directory()).get();
            supervisor::notify("verifying directories");

            supervisor::notify("starting gossip");
            init_ms_fd_gossiper(gossiper
                    , feature_service
                    , *cfg
                    , listen_address
                    , storage_port
                    , ssl_storage_port
                    , tcp_nodelay_inter_dc
                    , encrypt_what
                    , trust_store
                    , cert
                    , key
                    , prio
                    , clauth
                    , cfg->internode_compression()
                    , seed_provider
                    , memory::stats().total_memory()
                    , scfg
                    , cluster_name
                    , phi
                    , cfg->listen_on_broadcast_address());
            supervisor::notify("starting storage proxy");
            proxy.start(std::ref(db), spcfg, std::ref(node_backlog)).get();
            supervisor::notify("starting migration manager");
            mm.start().get();
            supervisor::notify("initializing batchlog manager");
            db::get_batchlog_manager().start(std::ref(qp), bm_cfg).get();
            supervisor::notify("loading system sstables");

            distributed_loader::ensure_system_table_directories(db).get();

            supervisor::notify("loading non-system sstables");
            distributed_loader::init_non_system_keyspaces(db, proxy).get();

            supervisor::notify("starting view update generator");
            view_update_generator.start(std::ref(db), std::ref(proxy)).get();
            supervisor::notify("discovering staging sstables");
            supervisor::notify("setting up system keyspace");
            db::system_keyspace::setup(db, qp, service::get_storage_service()).get();
            supervisor::notify("starting commit log");

            supervisor::notify("initializing migration manager RPC verbs");
            service::get_migration_manager().invoke_on_all([] (auto& mm) {
                mm.init_messaging_service();
            }).get();
            supervisor::notify("starting batchlog manager");
            proxy.invoke_on_all([] (service::storage_proxy& p) {
                p.init_messaging_service();
            }).get();
            supervisor::notify("starting streaming service");
            streaming::stream_session::init_streaming_service(db, sys_dist_ks, view_update_generator).get();
                (void)local_proxy.start_hints_manager(gms::get_local_gossiper().shared_from_this(), ss.shared_from_this());
            supervisor::notify("starting messaging service");
            // Start handling REPAIR_CHECKSUM_RANGE messages
            netw::get_messaging_service().invoke_on_all([&db] (auto& ms) {
                ms.register_repair_checksum_range([&db] (sstring keyspace, sstring cf, dht::token_range range, rpc::optional<repair_checksum> hash_version) {
                    auto hv = hash_version ? *hash_version : repair_checksum::legacy;
                    return do_with(std::move(keyspace), std::move(cf), std::move(range),
                            [&db, hv] (auto& keyspace, auto& cf, auto& range) {
                        return checksum_range(db, keyspace, cf, range, hv);
                    });
                });
            }).get();

            supervisor::notify("starting storage service", true);
            ss.init_messaging_service_part().get();
            ss.init_server_without_the_messaging_service_part().get();
            supervisor::notify("starting batchlog manager");
            db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
                return b.start();
            }).get();

            supervisor::notify("starting load broadcaster");
            lb->start_broadcasting();
            supervisor::notify("starting cf cache hit rate calculator");
            cf_cache_hitrate_calculator.start(std::ref(db), std::ref(cf_cache_hitrate_calculator)).get();
            supervisor::notify("starting view update backlog broker");

            view_backlog_broker.start(std::ref(proxy), std::ref(gms::get_gossiper())).get();
            view_backlog_broker.invoke_on_all(&service::view_update_backlog_broker::start).get();

            supervisor::notify("starting native transport");
            with_scheduling_group(dbcfg.statement_scheduling_group, [] {
                return service::get_local_storage_service().start_native_transport();
            }).get();
            if (start_thrift) {
                with_scheduling_group(dbcfg.statement_scheduling_group, [] {
                    return service::get_local_storage_service().start_rpc_server();
                }).get();
            }
            if (cfg->alternator_port() || cfg->alternator_https_port()) {
                alternator_server.init(addr, alternator_port, alternator_https_port, creds, cfg->alternator_enforce_authorization()).get();
            }

从本章角度来说,主要关注以下几个组件

三. 确立组件依赖

            with_scheduling_group(dbcfg.statement_scheduling_group, [] {
                return service::get_local_storage_service().start_native_transport();
            }).get();

start_native_transport属于storage_service 模块.

future<> storage_service::start_native_transport() {

                return f.then([cserver, configs = std::move(configs), keepalive] {
                    return parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) {
                        return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, keepalive).then([cfg] {
                            slogger.info("Starting listening for CQL clients on {} ({})"
                                            , cfg.addr, cfg.cred ? "encrypted" : "unencrypted"
                                            );
                        });
                    });

                });
}
future<std::unique_ptr<cql_server::response>> cql_server::connection::process_query(uint16_t stream, request_reader in, service::client_state& client_state, service_permit permit)
{
    ...
    return _server._query_processor.local().process(query, query_state, options).then([this, stream, &query_state, skip_metadata] (auto msg) {
         tracing::trace(query_state.get_trace_state(), "Done processing - preparing a result");
         return this->make_result(stream, msg, query_state.get_trace_state(), skip_metadata);
}
future<shared_ptr<cql_transport::messages::result_message>>
select_statement::execute(service::storage_proxy& proxy,
                          lw_shared_ptr<query::read_command> cmd,
                          dht::partition_range_vector&& partition_ranges,
                          service::query_state& state,
                          const query_options& options,
                          gc_clock::time_point now)
{
        ...
        return proxy.query(_schema, cmd, std::move(partition_ranges), options.get_consistency(), {timeout, state.get_permit(), state.get_client_state(), state.get_trace_state()})
            .then([this, &options, now, cmd] (service::storage_proxy::coordinator_query_result qr) {
                return this->process_results(std::move(qr.query_result), cmd, options, now);
            });
}
void storage_proxy::init_messaging_service() {
    ...
    ms.register_counter_mutation([] (const rpc::client_info& cinfo, rpc::opt_time_point t, std::vector<frozen_mutation> fms, db::consistency_level cl, std::optional<tracing::trace_info> trace_info) {
    ...
}


    future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
        ++_proxy->_stats.data_read_attempts.get_ep_stat(ep);
        auto opts = want_digest
                  ? query::result_options{query::result_request::result_and_digest, digest_algorithm()}
                  : query::result_options{query::result_request::only_result, query::digest_algorithm::none};
        if (fbu::is_me(ep)) {
            tracing::trace(_trace_state, "read_data: querying locally");
            return _proxy->query_result_local(_schema, _cmd, _partition_range, opts, _trace_state, timeout);
        } else {
            auto& ms = netw::get_local_messaging_service();
            tracing::trace(_trace_state, "read_data: sending a message to /{}", ep);
            return ms.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](rpc::tuple<query::result, rpc::optional<cache_temperature>> result_hit_rate) {
                auto&& [result, hit_rate] = result_hit_rate;
                tracing::trace(_trace_state, "read_data: got response from /{}", ep);
                return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>(rpc::tuple(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid())));
            });
        }
    }

future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>
storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts,
                                  tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, uint64_t max_size) {
            return db.query(gs, *cmd, opts, prv, trace_state, max_size, timeout).then([trace_state](auto&& f, cache_temperature ht) {
                tracing::trace(trace_state, "Querying is done");
                return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>>(rpc::tuple(make_foreign(std::move(f)), ht));
            });
}

至此,整个分布式架构源码及组件依赖已经完成。

  1. storage_service#native transport 启动调用cql_server
  2. cql_server启动并调用query parser 进行解析
  3. cql_server解析后调用storage proxy
  4. storage proxy通过message service进行节点的消息分发
  5. 每个节点的storage proxy调用database engine存储引擎进行数据的处理操作

请关注下一篇

由于篇幅关系,下一篇将会更细致得讲解各个组件的内容,敬请期待。

上一篇 下一篇

猜你喜欢

热点阅读