【刘文彬】 Debug EOS:nodeos + mongo_d
原文链接:https://www.cnblogs.com/Evsward/p/storage.html
上文书说到区块链的存储方式,并结合了EOSIO进行分析,其中也提到了使用CLion调试EOS的方法。本文将继续深入细致地展开对加载了mongo_db_plugin的nodeos的调试过程以及心得。
关键字:源码分析,Debug EOS,nodeos,mongo_db_plugin,CLion,C++,boost::asio::signal_set,queue
本文涉及的环境:clang-6.0, clang++-6.0, GDB Debugger, make 4.1, mongodb-linux-x86_64-3.6.3, boost 1.67.0
调试EOS: nodeos
关于EOS的调试环境的搭建这里不再赘述了,下文开始针对nodeos程序进行调试。
(一)CMakeList.txt
nodeos开始运行前,要先使用项目的总CmakeList.txt配置,这里我配置了boost库的位置,如果你配置了boost的环境变量可以跳过这里。
set( BOOST_ROOT "/home/evsward/opt/boost")
- 这个文件中有很多的set语句,这些语句都是开关,或者路径,或者全局变量,是配置各个子CMakeList.txt而用的。
- include 语句是为runtime引入相关依赖库。
- add_subdirectory语句设置了子目录程序。
- install语句是将相关命令安装到指定位置以供runtime后续使用。
总的CMakeList文件介绍完了,下面会执行到nodeos目录下的CMakeList.txt文件:
- add_executable( nodeos main.cpp )语句设定了nodeos程序执行入口。
- set, configure_file, include, install 等都是为runtime准备的环境相关的。
- 重点语句target_link_libraries,这里定义了链runtime环境需要启动的plugin。(注意记住这个顺序)
(二)static register plugin
我们打开每一个plugin的cpp文件,会发现有一个static的register方法的调用。这里会首先执行按上面plugin定义的顺序中第一个login_plugin,它的static语句如下:
static appbase::abstract_plugin& _login_plugin = app().register_plugin<login_plugin>();
执行此语句时,会先执行app(),这是application单例方法。
(三)application
- application是nodeos的执行者,上面调用的app函数:
application& application::instance() {
static application _app;
return _app;
}
application& app() { return application::instance(); }
application与plugin拥有相同的实现函数,而由于它作为执行者、统筹者的存在,它会安排调用所有plugin,例如set_program_options。
-
执行app()以后获取到了application的实例,然后调用了register_plugin函数,通过模板类(泛型类)携带了login_plugin的类型。register_plugin函数是模板函数,定义在application.hpp文件中。
-
application.hpp 中定义了私有的内存变量
map<string, std::unique_ptr<abstract_plugin>> plugins;
-
abstract_plugin是所有plugin的基类,它定义了虚函数,需要继承它的子类去实现。他们与application的关系是:
abstract_plugin=>plugin(对基类的虚函数进一步使用,由application定义管理)=>各个plugin
template<typename Plugin>
auto& register_plugin() {
auto existing = find_plugin<Plugin>(); // 从plugins寻找该plugin是否已注册。
if(existing)
return *existing; // 如果已注册了直接返回plugin的实例
auto plug = new Plugin(); // 创建该未注册plugin的实例
plugins[plug->name()].reset(plug); // 先插入到上面定义的内存变量plugins
plug->register_dependencies();// 注册该plugin的依赖plugins,每个plugin内部都会调用APPBASE_PLUGIN_REQUIRES((chain_plugin))来注册自己依赖的别的plugin。
return *plug; // 返回plugin的实例
}
(四)main.cpp->main()
在编译runtime环境结束以后,进入入口函数main(),
int main(int argc, char** argv)
main函数的参数就是调用命令nodeos的通过--加入的参数,我们可以通过nodeos的Edit Configuration来调整。其中argc是个数,argv是参数的值,是一个数组类型。如下图:
pic1.jpg我们接着来看main函数,它的函数体是通过app()对application单例进行的设置,包括版本号、data路径、config路径,然后是对于application实例内部方法的调用:
- initialize<chain_plugin, http_plugin, net_plugin, producer_plugin>
- startup()
- exec()
main函数执行了内部函数initialize_logging()还通过ilog打印了日志,输出了版本号以及eosio root路径地址。
由于main函数是入口函数,上面也介绍了它主要是对application实例的使用,以及一些异常处理等,接下来会逐一进行debug跟踪分析。
(五)initialize plugin
这个初始化函数是一个模板函数,模板类参数是plugin基类,在main函数调用该函数时传入了基本的插件依赖(这些是不需要我们在config中配置的,是链启动的基础插件):chain_plugin, http_plugin, net_plugin, producer_plugin。下面来看initialize函数在application头文件中的声明:
/**
* @brief 查看 --plugin(存在于命令行或者config配置文件中)调用这些plugin的 initialize方法。
*
* @tparam Plugin plugin的列表用来初始化,即使在config中没有配置的但被其他plugin所依赖的plugin,都可以统一使用该模板类没有影响。
* @return true:plugin初始化完成,false:出现异常。
*/
template<typename... Plugin>
bool initialize(int argc, char** argv) {
return initialize_impl(argc, argv, {find_plugin<Plugin>()...}); // ...是可变参数的语法,上面通过main函数的调用,我们传入了多个plugin。
}
实现类initialize_impl的内容较多,不粘贴源码,直接总结一下:
(1)set_program_options()函数
application.cpp文件中的set_program_options()函数是用来生成初始化的config.ini文件内容以及nodeos命令行--help的输出内容。
该函数首先遍历插件列表,调用每个插件都会实现的plugin基类的虚函数set_program_options(options_description& cli, options_description& cfg),例如下面就是mongo_db_plugin的实现:
void mongo_db_plugin::set_program_options(options_description& cli, options_description& cfg)
{
cfg.add_options()
("mongodb-queue-size,q", bpo::value<uint32_t>()->default_value(256),
"The target queue size between nodeos and MongoDB plugin thread.")
("mongodb-wipe", bpo::bool_switch()->default_value(false),
"Required with --replay-blockchain, --hard-replay-blockchain, or --delete-all-blocks to wipe mongo db."
"This option required to prevent accidental wipe of mongo db.")
("mongodb-block-start", bpo::value<uint32_t>()->default_value(0),
"If specified then only abi data pushed to mongodb until specified block is reached.")
("mongodb-uri,m", bpo::value<std::string>(),
"MongoDB URI connection string, see: https://docs.mongodb.com/master/reference/connection-string/."
" If not specified then plugin is disabled. Default database 'EOS' is used if not specified in URI."
" Example: mongodb://127.0.0.1:27017/EOS")
;
}
通过调用mongo_db_plugin的这个方法,就可以拼凑到config.ini文件中关于mongo_db_plugin的部分,因为这个插件只有对于config.ini配置文件的配置,没有对于命令行的内容,我们可以去查看chain_plugin的实现,它会同时有配置文件和命令行两个方面的内容设置,源码较长请自行查看。
配置的对象options_description_easy_init是一个灵活的结构。可以表示:一个配置项,一个配置的值;一个配置项,一个配置的值,一个注释或者描述;一个配置项,一个注释或者描述。这些多种组合,我们也可以直接去查看自己的config.ini的每一个配置项去对应。
那么是如何拼凑所有的插件配置内容呢?
application.cpp文件中的set_program_options()函数的函数体中使用了application的两个类变量来存储这些参数:
- _app_options:用于接收来自于命令行和config.ini两个参数来源的参数。
- _cfg_options:仅存储来自于config.ini配置文件的参数。
插件遍历结束后,我们已经有了所有插件的config.ini配置内容以及命令行提示配置内容,下面要从宏观角度去配置一些属于application的配置项,config.ini中加入了plugins的配置,通过这个配置我们可以手动指定要启动的插件,例如mongo_db_plugin就是需要手动配置的。接着,要配置命令行的参数内容,包括help, version, print-default-config, data-dir, config-dir, config, logconf。将他们追加存储到上面那两个类变量中。
到这里,application.cpp文件中的set_program_options()函数的工作就完成了。
上面提到的_app_options和_cfg_options仍就是傻傻分不清楚,他们的用意到底是什么呢?
简单来理解就是,命令行能够做所有配置文件config.ini中的配置的工作,同时命令行还有专属于自己的help, version, print-default-config, data-dir, config-dir, config, logconf配置。这样就好理解了,config.ini是命令行配置的子集,命令行配置是全集。
(2)app全局参数的检测与合并
我们回到initialize_impl,目前我们已经拥有了两套默认配置参数,这里直接使用全集_app_options配置,我们先接收来自于命令行的参数,将以它为优先级高的方式与_app_options配置合并:
bpo::variables_map options;
bpo::store(bpo::parse_command_line(argc, argv, my->_app_options), options);
(3)app全局参数配置项生效与响应
拿到合并后的配置对象options,依次针对配置项的内容进行响应:
- help:直接输出_app_options配置项的全部内容。
- version:输出application实例的类成员_version的值。
- print-default-config:与_app_options无关,重新去每个plugin找配置,然后会基于_cfg_options生成一份默认的config配置打印到终端界面。
- data-dir:是设置data目录的命令保存至application的类成员_data_dir,没有响应的输出。
- config-dir:设置config路径,保存在类成员_config_dir中。config和data的路径结构如下:
evsward@evsward-TM1701:~/.local/share/eosio/nodeos$ tree
.
├── config
│ └── config.ini
└── data
├── blocks
│ ├── blocks.index
│ ├── blocks.log
│ └── reversible
│ ├── shared_memory.bin
│ └── shared_memory.meta
└── state
├── forkdb.dat
├── shared_memory.bin
└── shared_memory.meta
5 directories, 8 files
- logconf:默认是logging.json,放置在config目录下面,可自定义设置,保存在类成员_logging_conf中。
- config:指定配置文件的名字,默认是config.ini。如果发现在config目录下找到config.ini文件,则按照该文件的配置载入。
bpo::store(bpo::parse_config_file<char>(config_file_name.make_preferred().string().c_str(),
my->_cfg_options, true), options);
得到整合好本地config.ini文本配置的options对象。然后对其参数配置项进行设置。
- plugin:读取配置文件中的plugin配置(多条),对于每一个plugin,要重新调用各自的initialize方法去按照新的配置初始化。
- autostart_plugins:设置前面的初始化插件chain_plugin, http_plugin, net_plugin, producer_plugin,同样分别调用他们的初始化函数设置新的配置。
(4)plugin initialize
承接上文,所有相关的plugin的执行各自的initialize。这个initialize函数是abstract_plugin的虚函数,而该虚函数被plugin类所复写:
virtual void initialize(const variables_map& options) override {
if(_state == registered) {//如果注册过
_state = initialized;
static_cast<Impl*>(this)->plugin_requires([&](auto& plug){ plug.initialize(options); });// 先执行依赖plugin的initialize方法。
static_cast<Impl*>(this)->plugin_initialize(options);// 调用自身的plugin_initialize方法实现。
//ilog( "initializing plugin ${name}", ("name",name()) );
app().plugin_initialized(*this);// 保存到一个initialized_plugins类成员变量中,用来按顺序记录已经开始启动运行的plugin。
}
assert(_state == initialized); /// 如果插件未注册,则不能执行initialize方法。
}
所以在plugin调用initialize函数的时候,是先执行的以上复写的plugin的虚函数。我们这里先设定几个要跟踪的plugin为目标吧,否则plugin太多,望山止步。
目标:主要研究mongo_db_plugin,以及基础plugin(chain_plugin, http_plugin, net_plugin, producer_plugin),路线是研究主plugin,若有额外的依赖plugin,看情况控制研究的深浅程度。
(5)eosio::mongo_db_plugin::plugin_initialize
前面写set_program_options()提到了mongo_db_plugin,这里研究它的plugin_initialize方法。传入的参数是结合了命令行以及本地config文件的合并配置项,按照此配置环境。
void mongo_db_plugin::plugin_initialize(const variables_map& options)
{
try {
if( options.count( "mongodb-uri" )) {//查mongodb-uri的配置,config.ini中有对应的。
ilog( "initializing mongo_db_plugin" );
my->configured = true;//设置标志位:已配置
if( options.at( "replay-blockchain" ).as<bool>() || options.at( "hard-replay-blockchain" ).as<bool>() || options.at( "delete-all-blocks" ).as<bool>() ) {//捕捉是否有replay-blockchain、hard-replay-blockchain、delete-all-blocks三个动作,有的话要判断是否要擦出mongo历史数据。
if( options.at( "mongodb-wipe" ).as<bool>()) {//检查擦除项mongodb-wipe的配置
ilog( "Wiping mongo database on startup" );
my->wipe_database_on_startup = true;//如果设置擦除,这里设置本地标志位wipe_database_on_startup
} else if( options.count( "mongodb-block-start" ) == 0 ) {//如果设置是从0开始,检查是否要全部擦除历史数据。
EOS_ASSERT( false, chain::plugin_config_exception, "--mongodb-wipe required with --replay-blockchain, --hard-replay-blockchain, or --delete-all-blocks"
" --mongodb-wipe will remove all EOS collections from mongodb." );
}
}
if( options.count( "abi-serializer-max-time-ms") == 0 ) {//eosio::chain_plugin的配置
EOS_ASSERT(false, chain::plugin_config_exception, "--abi-serializer-max-time-ms required as default value not appropriate for parsing full blocks");
}
my->abi_serializer_max_time = app().get_plugin<chain_plugin>().get_abi_serializer_max_time();
if( options.count( "mongodb-queue-size" )) {// queue大小
my->queue_size = options.at( "mongodb-queue-size" ).as<uint32_t>();
}
if( options.count( "mongodb-block-start" )) {// mongo对应的开始区块号
my->start_block_num = options.at( "mongodb-block-start" ).as<uint32_t>();
}
if( my->start_block_num == 0 ) {
my->start_block_reached = true;
}
std::string uri_str = options.at( "mongodb-uri" ).as<std::string>();
ilog( "connecting to ${u}", ("u", uri_str));
mongocxx::uri uri = mongocxx::uri{uri_str};
my->db_name = uri.database();
if( my->db_name.empty())
my->db_name = "EOS";// 默认起的库名字为EOS,如果在mongodb-uri有配置的话就使用配置的名字。
my->mongo_conn = mongocxx::client{uri};// 客户端连接到mongod
// controller中拉取得信号,在init函数中注册信号机制,始终监听链上信号,作出反应。
chain_plugin* chain_plug = app().find_plugin<chain_plugin>();//检查chain_plugin是否加载,chain_plugin是必要依赖,下面我们要使用chain的数据。
EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "" );
auto& chain = chain_plug->chain();// 获得chain实例
my->chain_id.emplace( chain.get_chain_id());
// accepted_block_connection对应了chain的signal,是boost提供的一种信号槽机制,这种connection对象有四个,见当前源码的下方展示。
my->accepted_block_connection.emplace( chain.accepted_block.connect( [&]( const chain::block_state_ptr& bs ) {// 建立connect,每当chain有accepted_block信号(这些信号是定义在controller.hpp中,稍后会介绍),即调用下面的函数。
my->accepted_block( bs );// accepted_block认同block信息
} ));
my->irreversible_block_connection.emplace(//含义同上
chain.irreversible_block.connect( [&]( const chain::block_state_ptr& bs ) {
my->applied_irreversible_block( bs );// applied_irreversible_block,应用不可逆区块
} ));
my->accepted_transaction_connection.emplace(//含义同上
chain.accepted_transaction.connect( [&]( const chain::transaction_metadata_ptr& t ) {
my->accepted_transaction( t );// accepted_transaction认同交易
} ));
my->applied_transaction_connection.emplace(//含义同上
chain.applied_transaction.connect( [&]( const chain::transaction_trace_ptr& t ) {
my->applied_transaction( t );// applied_transaction,应用交易
} ));
if( my->wipe_database_on_startup ) {
my->wipe_database();// 擦除mongo历史数据
}
my->init();//初始化函数
} else {
wlog( "eosio::mongo_db_plugin configured, but no --mongodb-uri specified." );
wlog( "mongo_db_plugin disabled." );
}
} FC_LOG_AND_RETHROW()
}
四个connection对象的声明如下:
fc::optional<boost::signals2::scoped_connection> accepted_block_connection;
fc::optional<boost::signals2::scoped_connection> irreversible_block_connection;
fc::optional<boost::signals2::scoped_connection> accepted_transaction_connection;
fc::optional<boost::signals2::scoped_connection> applied_transaction_connection;
queue
这段代码中涉及到四个函数分别是accepted_block,applied_irreversible_block,accepted_transaction,applied_transaction,他们都对应着对queue的操作,mongo_db_plugin_impl类成员定义了一下几种queue:
std::deque<chain::transaction_metadata_ptr> transaction_metadata_queue;
std::deque<chain::transaction_metadata_ptr> transaction_metadata_process_queue;
std::deque<chain::transaction_trace_ptr> transaction_trace_queue;
std::deque<chain::transaction_trace_ptr> transaction_trace_process_queue;
std::deque<chain::block_state_ptr> block_state_queue;
std::deque<chain::block_state_ptr> block_state_process_queue;
std::deque<chain::block_state_ptr> irreversible_block_state_queue;
std::deque<chain::block_state_ptr> irreversible_block_state_process_queue;
queue是mongo_db_plugin自己定义的:
/**
* 模板类Queue,可以匹配以上我们定义的多个queue类型。
* 模板类Entry,可以匹配block_state_ptr以及transaction_trace_ptr作为被存储实体类型。
*/
template<typename Queue, typename Entry>
void queue(boost::mutex& mtx, boost::condition_variable& condition, Queue& queue, const Entry& e, size_t queue_size) {
int sleep_time = 100;//默认线程睡眠时间
size_t last_queue_size = 0;
boost::mutex::scoped_lock lock(mtx);//mutex锁机制
if (queue.size() > queue_size) {//如果超过了我们设定的queue大小,则采取如下措施。
lock.unlock();//先解锁
condition.notify_one();// 见下文对condition的介绍
if (last_queue_size < queue.size()) {//说明queue的增加速度大于我们程序消费处理的速度
sleep_time += 100;//增加睡眠时间
} else {
sleep_time -= 100;//说明queue的增加速度小于我们消费的速度,就要减少睡眠时间,尽快更新last_queue_size的值。
if (sleep_time < 0) sleep_time = 100;
}
last_queue_size = queue.size();
boost::this_thread::sleep_for(boost::chrono::milliseconds(sleep_time));//线程睡眠,睡眠的时间按照上面的机制定夺。
lock.lock();//上锁
}
queue.emplace_back(e);//生效部分:插入到队列中去。
lock.unlock();//解锁
condition.notify_one();
}
mongo_db_plugin_impl::wipe_database()
真正执行擦除mongo历史数据的函数,这个动作是由我们配置mongodb-wipe参数来指定。擦除的函数体如下:
void mongo_db_plugin_impl::wipe_database() {
ilog("mongo db wipe_database");
// 定义的六张mongo的表类型,通过客户端连接获取到六张表的权限。
auto block_states = mongo_conn[db_name][block_states_col];
auto blocks = mongo_conn[db_name][blocks_col];
auto trans = mongo_conn[db_name][trans_col];
auto trans_traces = mongo_conn[db_name][trans_traces_col];
auto actions = mongo_conn[db_name][actions_col];
accounts = mongo_conn[db_name][accounts_col];
// 分别删除,执行drop动作。
block_states.drop();
blocks.drop();
trans.drop();
trans_traces.drop();
actions.drop();
accounts.drop();
}
mongo_db_plugin_impl::init()
源码较多不粘贴,上面wipe_database函数,我们删除了六张表,在init函数中,我们要对应的建立这六张表。表名初始化:
const std::string mongo_db_plugin_impl::block_states_col = "block_states";
const std::string mongo_db_plugin_impl::blocks_col = "blocks";
const std::string mongo_db_plugin_impl::trans_col = "transactions";
const std::string mongo_db_plugin_impl::trans_traces_col = "transaction_traces";
const std::string mongo_db_plugin_impl::actions_col = "actions";
const std::string mongo_db_plugin_impl::accounts_col = "accounts";
这就是刘张表对应的名字。这六张表在初始化建立时是一个整体操作,也就是说互为依赖关系,accounts表先创建,通过
accounts = mongo_conn[db_name][accounts_col];
即可创建成功accounts表,其他表亦然,后面不赘述。表数据是由make_document进行组装的。首先我们向accounts表插入一条数据,结构是name为eosio,createAt是当前时间。
- chain::config::system_account_name ).to_string()
- std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()});
通过insert_one方法将该条数据插入accounts表中。
接下来通过create_index方法对五张表建立索引,注意transaction_traces是没有索引的,init操作时不涉及transaction_traces表。
auto blocks = mongo_conn[db_name][blocks_col]; // Blocks
blocks.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" ));
blocks.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" ));// block建立了两个索引
auto block_stats = mongo_conn[db_name][block_states_col];
block_stats.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" ));
block_stats.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" ));// block_stats建立了两个索引
// accounts indexes
accounts.create_index( bsoncxx::from_json( R"xxx({ "name" : 1 })xxx" ));
// transactions indexes
auto trans = mongo_conn[db_name][trans_col]; // Transactions
trans.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" ));
auto actions = mongo_conn[db_name][actions_col];
actions.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" ));
初始化准备就完成了,接下来要建立线程监听出块消息,同步到mongo数据库中来。
ilog("starting db plugin thread");
consume_thread = boost::thread([this] { consume_blocks(); });
startup = false;// 结束,调用析构函数,关闭mongo_db_plugin:设定标志位done = true;
(6)mongo_db_plugin_impl::consume_blocks()
上面init函数执行到最后时,开启了一个线程,执行的是consume_blocks()函数,如字面含义这是消费区块的函数。这个函数是一个无限循环,保持线程的存活,监听queue的数据随时消费同步到mongo数据库中去,而queue的数据的是由上面plugin_initialize函数中的connect信号槽机制连接chain的出块信号往queue里面插入/同步链上数据。
condition
无线循环第一部分就是对condition.wait(lock)的操作,condition在上面queue的源码中有一个notify_one()的调用,实际上就是与wait相互应的操作。
boost::mutex::scoped_lock lock(mtx);
while ( transaction_metadata_queue.empty() &&
transaction_trace_queue.empty() &&
block_state_queue.empty() &&
irreversible_block_state_queue.empty() &&
!done ) {
condition.wait(lock);
}
消费区块占用了一个线程,这个线程在上面四个queue是空的时候并且done也没有完成是flase的时候,该线程会通过condition来阻塞线程,参数是mutex的一个锁。
condition.notify_one()会重新唤起这个阻塞的线程,而在mongo_db_plugin中,condition.notify_one()出现了3次:
- queue模板类型,有了新的数据插入的时候。
- 当queue模板类型的队列超过设置值的时候,要主动唤起consume_block开启消费线程加速消费(上面介绍queue的时候也谈到了队列大小超限时会增加queue插入的睡眠等待时间,这两方面相当于针对中间队列对两边进行开源节流,从而控制了队列的大小)
- ~mongo_db_plugin_impl()析构函数中
mongo_db_plugin_impl::~mongo_db_plugin_impl() {
if (!startup) {//标志位,上面init函数结尾有这个值的赋值。
try {
ilog( "mongo_db_plugin shutdown in process please be patient this can take a few minutes" );
done = true;//设定标志位done,consume_block()会使用到。
condition.notify_one();// 唤醒consume_thread线程继续消费掉queue中残余数据。
consume_thread.join();// 挂起主线程,等待consume_thread线程执行完毕再唤起主线程。
} catch( std::exception& e ) {
elog( "Exception on mongo_db_plugin shutdown of consume thread: ${e}", ("e", e.what()));
}
}
}
process_queue准备
我们要将链上的数据同步至mongo,是通过上面判断是否为空的那四个queue来做,为了增加消费效率,进入consume_block函数以后,要先将数据move导入到一个process_queue中来慢慢处理,相当于一个中转站。
size_t transaction_metadata_size = transaction_metadata_queue.size();
if (transaction_metadata_size > 0) {
transaction_metadata_process_queue = move(transaction_metadata_queue);
transaction_metadata_queue.clear();
}
size_t transaction_trace_size = transaction_trace_queue.size();
if (transaction_trace_size > 0) {
transaction_trace_process_queue = move(transaction_trace_queue);
transaction_trace_queue.clear();
}
size_t block_state_size = block_state_queue.size();
if (block_state_size > 0) {
block_state_process_queue = move(block_state_queue);
block_state_queue.clear();
}
size_t irreversible_block_size = irreversible_block_state_queue.size();
if (irreversible_block_size > 0) {
irreversible_block_state_process_queue = move(irreversible_block_state_queue);
irreversible_block_state_queue.clear();
}
队列大小报警器
接下来是一个针对四个源队列的大小进行一个监控,当任意超过限额的75%时,会触发报警,打印到控制台。
分发到具体执行函数消费队列
接下来,就是将上面的四个中转的process_queue的数据分别分发到不同的函数(对应下面四个_process函数)中去消费处理。最后每个中转队列处理一条,就pop出去一条,都处理结束以后,会再次判断四个源队列的大小是否为空,都消费完了,同时也得有析构函数的done标志位为true,才会中断consume_thread线程的consume_block()的无线循环。
1. mongo_db_plugin_impl::_process_accepted_transaction()
执行接收交易, 需要start_block_reached标识位为true。源码较长不粘贴,语言介绍一下,该函数的主要工作是获得mongo的连接以及库表对象,同时解析传入的const chain::transaction_metadata_ptr& t 对象,该对象的路线是:
chain=>signal=>mongo_db_plugin connect signal=>queue=>process_queue=>遍历出一条数据即是t
获得这个对象以后,也准备好了mongo数据库的连接库表对象,剩下的工作就是从t解析导入mongo库表了。
mongo作为列存储的nosql文件数据库,这里只接收document类型
这里创建了一个它的对象act_doc,解析过程:
- 链数据对象的解析
const auto trx_id = t->id;
const auto trx_id_str = trx_id.str();
const auto& trx = t->trx;
const chain::transaction_header& trx_header = trx;
- mongo数据库存储结构的定义,值数据的传入,通过process_action函数进行处理,
act_doc.append( kvp( "action_num", b_int32{act_num} ), kvp( "trx_id", trx_id_str ));
act_doc.append( kvp( "cfa", b_bool{cfa} ));
act_doc.append( kvp( "account", act.account.to_string()));
act_doc.append( kvp( "name", act.name.to_string()));
act_doc.append( kvp( "authorization", [&act]( bsoncxx::builder::basic::sub_array subarr ) {
for( const auto& auth : act.authorization ) {
subarr.append( [&auth]( bsoncxx::builder::basic::sub_document subdoc ) {
subdoc.append( kvp( "actor", auth.actor.to_string()),
kvp( "permission", auth.permission.to_string()));
} );
}
} ));
process_action函数处理的是action数据的匹配,而如果action涉及到新账户的创建,这部分要在process_action函数中继续通过update_account函数进行处理。update_account函数只会过滤由system合约执行的newaccount动作,system合约默认是由chain::config::system_account_name(就是eosio)来创建的。所以过滤后的action的结构如下:
field | value |
---|---|
account | eosio |
name | newaccount |
然后会同步在mongo的accounts表中添加一条记录,要有当时的添加时间createdAt。添加之前,要根据这个用户名去mongo中查找,通过函数find_account,如果查找到了则update,未查到就insert。
auto find_account(mongocxx::collection& accounts, const account_name& name) {
using bsoncxx::builder::basic::make_document;
using bsoncxx::builder::basic::kvp;
return accounts.find_one( make_document( kvp( "name", name.to_string())));
}
接着,是transaction表的数据插入,这个工作是对trans_doc文本类型变量的设置:
- trx_id设置
- irreversible设置
- transaction_header设置
- signing_keys设置
- actions设置:遍历源trx的actions,每一项去调用上面定义的process_action函数执行action数据的处理发到action_array变量中,赋给actions。
- context_free_actions,与action的处理过程差不多。
- transaction_extensions设置
- signatures
- context_free_data
- createdAt
整合完毕,将trans_doc插入到transaction表中去。整个_process_accepted_transaction执行完毕,其中涉及到了transaction, action, accounts三张表的内容的增加与修改。
2. mongo_db_plugin_impl::_process_applied_transaction
执行应用交易,需要start_block_reached标识位为true。这个函数是对mongo中transaction_traces表的操作。同样的,是通过一个文本类型变量trans_traces_doc操作。这个函数的参数传入是transaction_trace_ptr类型的对象t(对应的上面_process_accepted_transaction接收的是transaction_metadata_ptr类型的)
abi_serializer::to_variant, 转化成abi格式的json数据。
abi_serializer::from_variant, 通过abi格式的json数据转换出来对应的对象数据。
3. mongo_db_plugin_impl::_process_accepted_block
这里先要从process_accepted_block函数进入,上面的下划线_开头的函数都是又没有下划线的相同名字的函数调用的,只是他们除了调用以外都是一些异常的处理,日志的输出工作。而process_accepted_block函数有了简单的逻辑,就是根据标志位start_block_reached作出了处理。前面我们介绍plugin_initialize函数的时候,通过配置文件的配置项"mongodb-block-start",我们设定了全局变量start_block_num作为标志位。这里面就是对于这个参数值的一个判断,如果达到了这个设定的起始区块,则设定全局变量标志位start_block_reached为true,那么就可以进入到_process_accepted_block函数进行处理了。
这个函数是接收区块处理。传入的参数为block_state_ptr类型的对象bs。它的路线与上面介绍过的其他函数的参数t是相同的,只是类结构不同,存的数据不同。该函数涉及到mongo的两张表,一个是block_states,另一个是blocks。我们分别来研究。
- block_state_doc
mongocxx::options::update update_opts{};
update_opts.upsert( true );// upsert模式为true,代表update操作如果未找到对象则新增一条数据。
auto block_states = mongo_conn[db_name][block_states_col];
auto block_state_doc = bsoncxx::builder::basic::document{};
// 数据结构映射
block_state_doc.append(kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
kvp( "block_id", block_id_str ),
kvp( "validated", b_bool{bs->validated} ),
kvp( "in_current_chain", b_bool{bs->in_current_chain} ));
auto json = fc::json::to_string( bhs );
try {
const auto& value = bsoncxx::from_json( json );
block_state_doc.append( kvp( "block_header_state", value ));// 追加block_header_state的值
} catch( bsoncxx::exception& ) {
try {
json = fc::prune_invalid_utf8(json);
const auto& value = bsoncxx::from_json( json );
block_state_doc.append( kvp( "block_header_state", value ));
block_state_doc.append( kvp( "non-utf8-purged", b_bool{true}));
} catch( bsoncxx::exception& e ) {
elog( "Unable to convert block_header_state JSON to MongoDB JSON: ${e}", ("e", e.what()));
elog( " JSON: ${j}", ("j", json));
}
}
block_state_doc.append(kvp( "createdAt", b_date{now} ));// 追加createdAt的值
try {
// update_one, 没有查询到相关数据则直接新增一条
if( !block_states.update_one( make_document( kvp( "block_id", block_id_str )),
make_document( kvp( "$set", block_state_doc.view())), update_opts )) {
EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block_state ${bid}", ("bid", block_id));
}
} catch(...) {
handle_mongo_exception("block_states insert: " + json, __LINE__);
}
- block_doc
auto blocks = mongo_conn[db_name][blocks_col];
auto block_doc = bsoncxx::builder::basic::document{};
// 数据结构映射
block_doc.append(kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
kvp( "block_id", block_id_str ),
kvp( "irreversible", b_bool{false} ));
auto v = to_variant_with_abi( *bs->block, accounts, abi_serializer_max_time );// 转化为abi格式的数据存储。
json = fc::json::to_string( v );
try {
const auto& value = bsoncxx::from_json( json );
block_doc.append( kvp( "block", value ));// 追加block的值,为json
} catch( bsoncxx::exception& ) {
try {
json = fc::prune_invalid_utf8(json);
const auto& value = bsoncxx::from_json( json );
block_doc.append( kvp( "block", value ));
block_doc.append( kvp( "non-utf8-purged", b_bool{true}));
} catch( bsoncxx::exception& e ) {
elog( "Unable to convert block JSON to MongoDB JSON: ${e}", ("e", e.what()));
elog( " JSON: ${j}", ("j", json));
}
}
block_doc.append(kvp( "createdAt", b_date{now} ));// 追加createdAt的值
try {
// update_one, 没有查询到相关数据则直接新增一条
if( !blocks.update_one( make_document( kvp( "block_id", block_id_str )),
make_document( kvp( "$set", block_doc.view())), update_opts )) {
EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block ${bid}", ("bid", block_id));
}
} catch(...) {
handle_mongo_exception("blocks insert: " + json, __LINE__);
}
4. mongo_db_plugin_impl::_process_irreversible_block
执行不可逆区块,,需要start_block_reached标识位为true。涉及mongo的两张表:blocks表和transactions表。
// 创世块区块号为1,没有信号到accepted_block处理。
if (block_num < 2) return;
传入的参数,思想与上面的几个函数设计是相同的,它的类型与上面的_process_accepted_block函数相同,是block_state_ptr类型的对象bs。从bs中获取到区块,首先会通过find_block去mongo中查询,如果有的话就不再处理。
- blocks
数据映射更新插入。由于它是在_process_accepted_block函数的后面执行,所以语句update_opts.upsert( true );在这里的update_one也是有效的。
bulk: 是一系列操作的集合。
mongocxx::options::bulk_write bulk_opts;
bulk_opts.ordered(false);// false说明可以并行,所有操作互不影响。若为true,则顺序执行,一旦遇错直接中止,后面的操作不会被执行到。
auto bulk = trans.create_bulk_write(bulk_opts);//所有的操作针对的是trans对象,对应的mongo表为transactions。
auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ),
kvp( "validated", b_bool{bs->validated} ),
kvp( "in_current_chain", b_bool{bs->in_current_chain} ),
kvp( "updatedAt", b_date{now}))));
blocks.update_one( make_document( kvp( "_id", ir_block->view()["_id"].get_oid())), update_doc.view());
- transactions
transactons是一个数组,一个block可以包含很多条transaction,因此这里要有个循环来处理。对于transaction在mongo中的存储历史,也有对应的find_transaction去mongo中查询,如果有的话就不再处理。
auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ),
kvp( "block_id", block_id_str),
kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
kvp( "updatedAt", b_date{now}))));
mongocxx::model::update_one update_op{ make_document(kvp("_id", ir_trans->view()["_id"].get_oid())), update_doc.view()};
最后通过在transaction循环中设定一个标志位transactions_in_block来确定transaction遍历结束。
mongo_db_plugin总结
我们是通过nodeos命令的initialize函数跟踪到mongo_db_plugin的,关于mongo_db_plugin的一切,可以总结为顺序:
1. set_program_option,设置配置参数
2. plugin_initialize,初始化使plugin配置参数生效,准备mongo连接,queue机制,信号槽监听chain出块action。
3. init,mongo库表初始化,建立索引,定义了consume_thread线程用来消费queue区块数据。initialize周期结束。
4. consume_block,线程触发与等待策略,process_queue消费中转策略,根据四种数据结构(即上文反复提到的那四个结构)分发消费函数。
table | function insert | function update |
---|---|---|
transactions | accepted_trx | irreversible_block(bulk) |
actions | accepted_trx(bulk) | |
block_states | accepted_block | |
blocks | accepted_block | irreversible_block |
transaction_traces | applied_trx | |
accounts | accepted_trx |
比较特殊的一个表是accounts,它可以过滤actions内容,找到newaccount的action并保存账户到表里。这给我们以启发,我们可以自己定义新的表来过滤自己需要的action,例如我们自己写的智能合约。
(六)initialize_logging()
日志系统初始化。
void initialize_logging()
{
auto config_path = app().get_logging_conf();
if(fc::exists(config_path))
fc::configure_logging(config_path); //故意不去捕捉异常
for(auto iter : fc::get_appender_map())
iter.second->initialize(app().get_io_service());
// 重复以上代码逻辑,利用boost::asio::signal\_set机制,async\_wait。
logging_conf_loop();
}
(七)startup()
void application::startup() {
try {
for (auto plugin : initialized_plugins)//遍历所有已初始化的插件,执行他们的startup函数。
plugin->startup();
} catch(...) {
shutdown();//如有异常,则调用shutdown函数,清空容器,释放资源。
throw;
}
}
这里仍旧以mongo_db_plugin为例,它的startup()是空。而对于其他plugin而言,startup都有很多工作要做,例如producer_plugin和chain_plugin都非常重要,此外涉及到重要的控制器部分controller也需要仔细研究。由于本文篇幅过长,我们重点仍旧围绕mongo_db_plugin来介绍整个nodeos启动的生命周期。
(八)exec()
main入口函数执行到最后一个步骤:exec函数。
void application::exec() {
std::shared_ptr<boost::asio::signal_set> sigint_set(new boost::asio::signal_set(*io_serv, SIGINT));
sigint_set->async_wait([sigint_set,this](const boost::system::error_code& err, int num) {
quit();
sigint_set->cancel();
});
std::shared_ptr<boost::asio::signal_set> sigterm_set(new boost::asio::signal_set(*io_serv, SIGTERM));
sigterm_set->async_wait([sigterm_set,this](const boost::system::error_code& err, int num) {
quit();
sigterm_set->cancel();
});
std::shared_ptr<boost::asio::signal_set> sigpipe_set(new boost::asio::signal_set(*io_serv, SIGPIPE));
sigpipe_set->async_wait([sigpipe_set,this](const boost::system::error_code& err, int num) {
quit();
sigpipe_set->cancel();
});
io_serv->run();// 与上面initialize_logging的get_io_service()获取到的io\_serv是同一个对象
shutdown(); /// 同步推出
}
这个函数与initialize_logging的循环中涉及到相同的信号机制boost::asio::signal_set。
boost::asio::signal_set
boost库的信号量技术。它要使用到boost::asio::io_service,这也是上面提到多次的。信号量对象的初始化可参照前文一段代码,如下:
std::shared_ptr<boost::asio::signal_set> sigint_set(new boost::asio::signal_set(*io_serv, SIGINT));
共享指针这里不谈了,感兴趣的同学请转到这里。它的构造函数是传入了一个boost::asio::io_service以及一个信号number SIGINT。这个SIGINT的声明为:
#define SIGINT 2 /* Interrupt (ANSI). */
这个构造函数实现了向信号量集合中添加了一个信号2。
接着,我要通过async_wait来使用信号量。可以贴上上面initialize_logging函数的logging_conf_loop函数。
void logging_conf_loop()
{
std::shared_ptr<boost::asio::signal_set> sighup_set(new boost::asio::signal_set(app().get_io_service(), SIGHUP));
sighup_set->async_wait([sighup_set](const boost::system::error_code& err, int /*num*/) {
if(!err)
{
ilog("Received HUP. Reloading logging configuration.");
auto config_path = app().get_logging_conf();
if(fc::exists(config_path))
::detail::configure_logging(config_path);
for(auto iter : fc::get_appender_map())
iter.second->initialize(app().get_io_service());
logging_conf_loop();
}
});
}
可以直接通过sighup_set->async_wait的方式来使用。它的声明定义是:
void (boost::system::error_code, int))
会在所监听的信号触发时调用函数体。当发生错误的时候,退出logging_conf_loop函数的递归调用。
总结
写到这里,我们的nodeos的命令就启动成功了,由于篇幅限制,我们没有仔细去研究所有依赖的plugin,以及controller的逻辑。本文重点研究了mongo_db_plugin的源码实现,通过该插件,我们全面分析了nodeos命令启动的所有流程。而对于mongo_db_plugin插件本身的学习,我们也明白了链数据是如何同步到mongo里面的。接下来,我会继续深入分析其他相关插件的初始化流程以及启动流程,还有controller的逻辑细节,以及出块逻辑等等。
参考资料
EOSIO/eos
相关文章和视频推荐
圆方圆学院汇集大批区块链名师,打造精品的区块链技术课程。 在各大平台都长期有优质免费公开课,欢迎报名收看。