2018-06-30

2018-07-01  本文已影响253人  nit小星星

比特币实现一个核心客户端。也就是人们经常分析的bitcoind程序。bitcoind程序实现的是一个http服务器。另外bitcoin-cli实现的是一个http客户端。二者之间的传输数据使用json-rpc

编译好程序之后。执行一下 bitcoind-cli gettransaction 9ca8f969bd3ef5ec2a8685660fdbf7a8bd365524c2e1fc66c309acbae2c14ae3

然后在命令行就能看待这个交易信息:

{

    "amount" : 0.05000000,

    "confirmations" : 0,

    "txid":"9ca8f969bd3ef5ec2a8685660fdbf7a8bd365524c2e1fc66c309acbae2c14ae3",

    "time" : 1392660908,

    "timereceived" : 1392660908,

    "details" : [

    {

    "account" : "",

    "address":"1hvzSofGwT8cjb8JU7nBsCSfEVQX5u9CL",

    "category" : "receive",

    "amount" : 0.05000000

    }

  ]

}

json-rpc的请求非常简单,其格式如下:

{

    "jsonrpc" : 2.0,

    "method" : "getinfo",

    "params" : [""],

    "id" : 1

}

    jsonrpc:json-rpc的版本;

method:rpc调用的方法名;

params:方法传入的参数,没有参数传入nullptr;

id:调用的标识符,可以为字符串,也可以为nullptr,但是不建议使用nullptr,因为容易引起混乱。

2.2 响应

{

    "jsonrpc" : 2.0,

    "result" : "info",

    "error" : null,

    "id" : 1

}

    jsonrpc:json-rpc版本;

result:rpc调用的返回值,调用成功时不能为nullptr,调用失败必须为nullptr;

error:调用错误时用,无错误为nullptr,有错误时返回错误对象,参见下一节;

    id:调用标识符,与调用方传入的保持一致。

josn-rpc

是一个基于josn的跨语言rpc协议。具有目前java c++都支持。

Json -rpc的请求非常简单。

{

    "jsonrpc" : 2.0,

    "method" : "getinfo",

    "params" : [""],

    "id" : 1

}

    jsonrpc:json-rpc的版本;

method:rpc调用的方法名;

params:方法传入的参数,没有参数传入nullptr;

id:调用的标识符,可以为字符串,也可以为nullptr,但是不建议使用nullptr,因为容易引起混乱。

{

    "jsonrpc" : 2.0,

    "result" : "info",

    "error" : null,

    "id" : 1

}

    jsonrpc:json-rpc版本;

result:rpc调用的返回值,调用成功时不能为nullptr,调用失败必须为nullptr;

error:调用错误时用,无错误为nullptr,有错误时返回错误对象,参见下一节;

    id:调用标识符,与调用方传入的保持一致。

libevent实现的http服务器。

io复用:通过select poll 或者epoll等系统api 实现io复用

多线程或者多进程:

多线程和多进程可以解决大量的并发请求。无论多线程还是多进程,都存在问题,多进程不适合短连接。多线程

 将IO复用和多线程结合起来,这是目前解决大并发的常用方案。最常见的套路就是主线程里监听某个端口以及接受的描述符,当有读写事件产生时,将事件交给工作线程去处理。

下面使用一个libevent http服务器

1,首先创建套接字病在指定的端口上监听:

int sock_fd = ::socket(AF_INET, SOCK_STREAM, 0);

    if( sock_fd == -1 ) 

        return -1; 

    evutil_make_listen_socket_reuseable(sock_fd); 

    struct sockaddr_in sin; 

    sin.sin_family = AF_INET; 

    sin.sin_addr.s_addr = 0; 

    sin.sin_port = htons(port); 

    if( ::bind(sock_fd, (SA*)&sin, sizeof(sin)) < 0 ) 

        goto error; 

    if( ::listen(sock_fd, listen_num) < 0) 

        goto error; 

    evutil_make_socket_nonblocking(listener);

 (2) 创建一个监听客户连接请求的事件:

    struct event* ev_listen = event_new(base, sock_fd, EV_READ | EV_PERSIST, 

                                        accept_cb, base); 

    event_add(ev_listen, NULL); 

    event_base_dispatch(base);

    当监听套接字有新连接时,事件将被触发,从而执行回调accept_cb:

   void accept_cb(int fd, short events, void* arg) 

    { 

        evutil_socket_t sockfd; 

        struct sockaddr_in client; 

        socklen_t len = sizeof(client); 

        sockfd = ::accept(fd, (struct sockaddr*)&client, &len ); 

        evutil_make_socket_nonblocking(sockfd); 

        printf("accept a client %d\n", sockfd); 

        struct event_base* base = (event_base*)arg; 

        bufferevent* bev = bufferevent_socket_new(base, sockfd, BEV_OPT_CLOSE_ON_FREE); 

        bufferevent_setcb(bev, socket_read_cb, NULL, event_cb, arg); 

        bufferevent_enable(bev, EV_READ | EV_PERSIST); 

    }

    创建一个bufferevent事件,将accept以后的已连接套接字与之关联,这样当套接字上有数据到来时,就会触发bufferevent事件,从而执行socket_read_cb回调:

    void socket_read_cb(bufferevent* bev, void* arg) 

    { 

        char msg[4096]; 

        size_t len = bufferevent_read(bev, msg, sizeof(msg)); 

        msg[len] = '\0';   

        char reply_msg[4096] = "recvieced msg:"; 

        strcat(reply_msg + strlen(reply_msg), msg); 

        bufferevent_write(bev, reply_msg, strlen(reply_msg)); 

    }

    然后就能从bufferevent中读取到客户数据。

  (1) 创建事件集和evhttp事件:

struct event_base *event_base_new(void);

struct evhttp *evhttp_new(struct event_base *base);

    (2) 绑定地址和端口

int evhttp_bind_socket(struct evhttp *http, const char *address, ev_uint16_t port);

    (3) 设置回调来处理http请求

void evhttp_set_gencb(struct evhttp *http, void (*cb)(struct evhttp_request *, void *), void *arg);

    (4) 进入事件循环

int event_base_dispatch(struct event_base *);

    在下一节我们结合比特币的源码,来看看比特币中是如使用上面这些api实现http服务的,当然比特币的http服务封装的更为复杂一些。

josn-rpc的初始化也是在Bitcoind的初始步骤 在Init.cpp的appinitmain函数里。

RegisterAllCoreRPCCommands(tableRPC);

    g_wallet_init_interface.RegisterRPC(tableRPC);

/* Start the RPC server already. It will be started in "warmup" mode

    * and not really process calls already (but it will signify connections

    * that the server is there and will be ready later).  Warmup mode will

    * be disabled when initialisation is finished.

    */

第一行 RegisterAllCoreRPCCommands(tableRPC);

就是注册rpc命令

static inline void RegisterAllCoreRPCCommands(CRPCTable &t)

    {

        RegisterBlockchainRPCCommands(t);

        RegisterNetRPCCommands(t);

        RegisterMiscRPCCommands(t);

        RegisterMiningRPCCommands(t);

        RegisterRawTransactionRPCCommands(t);

    }

可以看到这里分类注册了rpc命令

,操作区块链的、网络相关的、挖矿相关的以及比特币交易相关的RPC命令一应俱全。这里不妨列出来,这样读者对通过客户端能做些什么事情有个大概印象:

 (1) 区块链相关的rpc,位于blockchain.cpp中:

static const CRPCCommand commands[] =

{ //  category              name                      actor (function)        argNames

  //  --------------------- ------------------------  -----------------------  ----------

    { "blockchain",        "getblockchaininfo",      &getblockchaininfo,      {} },

    { "blockchain",        "getchaintxstats",        &getchaintxstats,        {"nblocks", "blockhash"} },

    { "blockchain",        "getblockstats",          &getblockstats,          {"hash_or_height", "stats"} },

    { "blockchain",        "getbestblockhash",      &getbestblockhash,      {} },

    { "blockchain",        "getblockcount",          &getblockcount,          {} },

    { "blockchain",        "getblock",              &getblock,              {"blockhash","verbosity|verbose"} },

    { "blockchain",        "getblockhash",          &getblockhash,          {"height"} },

    { "blockchain",        "getblockheader",        &getblockheader,        {"blockhash","verbose"} },

    { "blockchain",        "getchaintips",          &getchaintips,          {} },

    { "blockchain",        "getdifficulty",          &getdifficulty,          {} },

    { "blockchain",        "getmempoolancestors",    &getmempoolancestors,    {"txid","verbose"} },

    { "blockchain",        "getmempooldescendants",  &getmempooldescendants,  {"txid","verbose"} },

    { "blockchain",        "getmempoolentry",        &getmempoolentry,        {"txid"} },

    { "blockchain",        "getmempoolinfo",        &getmempoolinfo,        {} },

    { "blockchain",        "getrawmempool",          &getrawmempool,          {"verbose"} },

    { "blockchain",        "gettxout",              &gettxout,              {"txid","n","include_mempool"} },

    { "blockchain",        "gettxoutsetinfo",        &gettxoutsetinfo,        {} },

    { "blockchain",        "pruneblockchain",        &pruneblockchain,        {"height"} },

    { "blockchain",        "savemempool",            &savemempool,            {} },

    { "blockchain",        "verifychain",            &verifychain,            {"checklevel","nblocks"} },

    { "blockchain",        "preciousblock",          &preciousblock,          {"blockhash"} },

    /* Not shown in help */

    { "hidden",            "invalidateblock",        &invalidateblock,        {"blockhash"} },

    { "hidden",            "reconsiderblock",        &reconsiderblock,        {"blockhash"} },

    { "hidden",            "waitfornewblock",        &waitfornewblock,        {"timeout"} },

    { "hidden",            "waitforblock",          &waitforblock,          {"blockhash","timeout"} },

    { "hidden",            "waitforblockheight",    &waitforblockheight,    {"height","timeout"} },

    { "hidden",            "syncwithvalidationinterfacequeue", &syncwithvalidationinterfacequeue, {} },

};

    所有的RPC命令以及对应的回调函数指针都封装在了CRPCCommand中,按分类、rpc方法名,回调函数,参数名封装。基本上通过方法名就能猜出其作用。

    (2) 网络相关的rpc,位于net.cpp中:

static const CRPCCommand commands[] =

{ //  category              name                      actor (function)        argNames

  //  --------------------- ------------------------  -----------------------  ----------

    { "network",            "getconnectioncount",    &getconnectioncount,    {} },

    { "network",            "ping",                  &ping,                  {} },

    { "network",            "getpeerinfo",            &getpeerinfo,            {} },

    { "network",            "addnode",                &addnode,                {"node","command"} },

    { "network",            "disconnectnode",        &disconnectnode,        {"address", "nodeid"} },

    { "network",            "getaddednodeinfo",      &getaddednodeinfo,      {"node"} },

    { "network",            "getnettotals",          &getnettotals,          {} },

    { "network",            "getnetworkinfo",        &getnetworkinfo,        {} },

    { "network",            "setban",                &setban,                {"subnet", "command", "bantime", "absolute"} },

    { "network",            "listbanned",            &listbanned,            {} },

    { "network",            "clearbanned",            &clearbanned,            {} },

    { "network",            "setnetworkactive",      &setnetworkactive,      {"state"} },

    (3) 挖矿相关的rpc,位于mining.cpp中:

static const CRPCCommand commands[] =

{ //  category              name                      actor (function)        argNames

  //  --------------------- ------------------------  -----------------------  ----------

    { "mining",            "getnetworkhashps",      &getnetworkhashps,      {"nblocks","height"} },

    { "mining",            "getmininginfo",          &getmininginfo,          {} },

    { "mining",            "prioritisetransaction",  &prioritisetransaction,  {"txid","dummy","fee_delta"} },

    { "mining",            "getblocktemplate",      &getblocktemplate,      {"template_request"} },

    { "mining",            "submitblock",            &submitblock,            {"hexdata","dummy"} },

    { "generating",        "generatetoaddress",      &generatetoaddress,      {"nblocks","address","maxtries"} },

    { "hidden",            "estimatefee",            &estimatefee,            {} },

    { "util",              "estimatesmartfee",      &estimatesmartfee,      {"conf_target", "estimate_mode"} },

    { "hidden",            "estimaterawfee",        &estimaterawfee,        {"conf_target", "threshold"} },

};

    (4) 比特币交易相关rpc,位于rawtransaction.cpp中:

static const CRPCCommand commands[] =

{ //  category              name                            actor (function)            argNames

  //  --------------------- ------------------------        -----------------------    ----------

    { "rawtransactions",    "getrawtransaction",            &getrawtransaction,        {"txid","verbose","blockhash"} },

    { "rawtransactions",    "createrawtransaction",        &createrawtransaction,      {"inputs","outputs","locktime","replaceable"} },

    { "rawtransactions",    "decoderawtransaction",        &decoderawtransaction,      {"hexstring","iswitness"} },

    { "rawtransactions",    "decodescript",                &decodescript,              {"hexstring"} },

    { "rawtransactions",    "sendrawtransaction",          &sendrawtransaction,        {"hexstring","allowhighfees"} },

    { "rawtransactions",    "combinerawtransaction",        &combinerawtransaction,    {"txs"} },

    { "rawtransactions",    "signrawtransaction",          &signrawtransaction,        {"hexstring","prevtxs","privkeys","sighashtype"} }, /* uses wallet if enabled */

    { "rawtransactions",    "signrawtransactionwithkey",    &signrawtransactionwithkey, {"hexstring","privkeys","prevtxs","sighashtype"} },

    { "rawtransactions",    "testmempoolaccept",            &testmempoolaccept,        {"rawtxs","allowhighfees"} },

    { "blockchain",        "gettxoutproof",                &gettxoutproof,            {"txids", "blockhash"} },

    { "blockchain",        "verifytxoutproof",            &verifytxoutproof,          {"proof"} },

};

    当注册完以后,如果用户启用了-server选项,将会调用AppInitServers创建Http服务器。

AppInitServers实现如下:

static bool AppInitServers()

{

    RPCServer::OnStarted(&OnRPCStarted);

    RPCServer::OnStopped(&OnRPCStopped);

    if (!InitHTTPServer())

        return false;

    if (!StartRPC())

        return false;

    if (!StartHTTPRPC())

        return false;

    if (gArgs.GetBoolArg("-rest", DEFAULT_REST_ENABLE) && !StartREST())

        return false;

    if (!StartHTTPServer())

        return false;

    return true;

}

    这里按步骤一步一步的来。首先是调用InitHTTPServer,使用libevent api来建立http服务器,这里截取主要代码来看看,位于httpserver.cpp文件:

    raii_event_base base_ctr = obtain_event_base();

    /* Create a new evhttp object to handle requests. */

    raii_evhttp http_ctr = obtain_evhttp(base_ctr.get());

    struct evhttp* http = http_ctr.get();

    if (!http) {

        LogPrintf("couldn't create evhttp. Exiting.\n");

        return false;

    }

    evhttp_set_timeout(http, gArgs.GetArg("-rpcservertimeout", DEFAULT_HTTP_SERVER_TIMEOUT));

    evhttp_set_max_headers_size(http, MAX_HEADERS_SIZE);

    evhttp_set_max_body_size(http, MAX_SIZE);

    evhttp_set_gencb(http, http_request_cb, nullptr);

    if (!HTTPBindAddresses(http)) {

        LogPrintf("Unable to bind any endpoint for RPC server\n");

        return false;

    }

    LogPrint(BCLog::HTTP, "Initialized HTTP server\n");

    int workQueueDepth = std::max((long)gArgs.GetArg("-rpcworkqueue", DEFAULT_HTTP_WORKQUEUE), 1L);

    LogPrintf("HTTP: creating work queue of depth %d\n", workQueueDepth);

    workQueue = new WorkQueue(workQueueDepth);

    // transfer ownership to eventBase/HTTP via .release()

    eventBase = base_ctr.release();

    eventHTTP = http_ctr.release();

    这里的套路和3.4节中用libevent建立http服务器的步骤基本一样,注意两点:

    (1) 用evhttp_set_gencb设置了http请求的处理函数:http_request_cb;

    (2) 创建了一个工作队列,队列里的元素类型HTTPClosure,这是一个函数对象接口类,重写了函数调用操作符,HttpWorkItem实现了此接口

http的请求处理:

当我们收到一个http请求后。使用函数http_request_cb回调,主要代码如下

   // Find registered handler for prefix

    std::string strURI = hreq->GetURI();

    std::string path;

    std::vector::const_iterator i = pathHandlers.begin();

    std::vector::const_iterator iend = pathHandlers.end();

    for (; i != iend; ++i) {

        bool match = false;

        if (i->exactMatch)

            match = (strURI == i->prefix);

        else

            match = (strURI.substr(0, i->prefix.size()) == i->prefix);

        if (match) {

            path = strURI.substr(i->prefix.size());

            break;

        }

    }

    // Dispatch to worker thread

    if (i != iend) {

        std::unique_ptr item(new HTTPWorkItem(std::move(hreq), path, i->handler));

        assert(workQueue);

        if (workQueue->Enqueue(item.get()))

            item.release(); /* if true, queue took ownership */

        else {

            LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");

            item->req->WriteReply(HTTP_INTERNAL, "Work queue depth exceeded");

        }

    } else {

        hreq->WriteReply(HTTP_NOTFOUND);

    }

    用一句句话来概括这个函数的作用就是:将请求的url的path部分与注册过的前缀进行匹配,并生成HttpWorkItem放入到工作队列中。目前注册了两个前缀:/和/wallet/,代码在StartHttpRPC中:

bool StartHTTPRPC()

{

    LogPrint(BCLog::RPC, "Starting HTTP RPC server\n");

    if (!InitRPCAuthentication())

        return false;

    RegisterHTTPHandler("/", true, HTTPReq_JSONRPC);

#ifdef ENABLE_WALLET

    // ifdef can be removed once we switch to better endpoint support and API versioning

    RegisterHTTPHandler("/wallet/", false, HTTPReq_JSONRPC);

#endif

    assert(EventBase());

    httpRPCTimerInterface = MakeUnique(EventBase());

    RPCSetTimerInterface(httpRPCTimerInterface.get());

    return true;

}

    两个前缀/和/wallet/对应的回调处理函数均为HttpReq_JSONRPC。

    之后调用StartHttpServer让工作队列运行起来:

bool StartHTTPServer()

{

    LogPrint(BCLog::HTTP, "Starting HTTP server\n");

    int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);

    LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);

    std::packaged_task task(ThreadHTTP);

    threadResult = task.get_future();

    threadHTTP = std::thread(std::move(task), eventBase, eventHTTP);

    for (int i = 0; i < rpcThreads; i++) {

        g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue);

    }

    return true;

}

    最终会调用到工作队列的run方法:

void Run()

    {

        while (true) {

            std::unique_ptr i;

            {

                std::unique_lock lock(cs);

                while (running && queue.empty())

                    cond.wait(lock);

                if (!running)

                    break;

                i = std::move(queue.front());

                queue.pop_front();

            }

            (*i)();

        }

    }

    很简单,工作队列为空的时候线程阻塞等待,当收到http请求以后,解析请求并添加HttpWorkItem到队列中并唤醒线程,线程从队列头部取出一个item运行。最终将执行HttpReq_JSONRPC这个回调,这里会将JSONRPC中的rpc方法分发到服务端不同的方法中,来看看其处理:

 (1) 请求合法性检查及认证

    首先检查请求是否合法,http头部中的auchoization是否合法:

static bool HTTPReq_JSONRPC(HTTPRequest* req, const std::string &)

{

    // JSONRPC handles only POST

    if (req->GetRequestMethod() != HTTPRequest::POST) {

        req->WriteReply(HTTP_BAD_METHOD, "JSONRPC server handles only POST requests");

        return false;

    }

    // Check authorization

    std::pair authHeader = req->GetHeader("authorization");

    if (!authHeader.first) {

        req->WriteHeader("WWW-Authenticate", WWW_AUTH_HEADER_DATA);

        req->WriteReply(HTTP_UNAUTHORIZED);

        return false;

    }

    JSONRPCRequest jreq;

    jreq.peerAddr = req->GetPeer().ToString();

    if (!RPCAuthorized(authHeader.second, jreq.authUser)) {

        LogPrintf("ThreadRPCServer incorrect password attempt from %s\n", jreq.peerAddr);

        /* Deter brute-forcing

          If this results in a DoS the user really

          shouldn't have their RPC port exposed. */

        MilliSleep(250);

        req->WriteHeader("WWW-Authenticate", WWW_AUTH_HEADER_DATA);

        req->WriteReply(HTTP_UNAUTHORIZED);

        return false;

    }

可以看到,比特币的json rpc服务只支持POST。

    (2) 读取http请求数据,将rpc请求分发到不同的函数

    try {

        // Parse request

        UniValue valRequest;

        if (!valRequest.read(req->ReadBody()))

            throw JSONRPCError(RPC_PARSE_ERROR, "Parse error");

        // Set the URI

        jreq.URI = req->GetURI();

        std::string strReply;

        // singleton request

        if (valRequest.isObject()) {

            jreq.parse(valRequest);

            UniValue result = tableRPC.execute(jreq);

            // Send reply

            strReply = JSONRPCReply(result, NullUniValue, jreq.id);

        // array of requests

        } else if (valRequest.isArray())

            strReply = JSONRPCExecBatch(jreq, valRequest.get_array());

        else

            throw JSONRPCError(RPC_PARSE_ERROR, "Top-level object parse error");

        req->WriteHeader("Content-Type", "application/json");

        req->WriteReply(HTTP_OK, strReply);

    如果收到的是单个json,则tableRPC.execute执行,否则如果收到的是以数组形式的批量rpc请求,则批量执行,批量执行最终也是走tableRPC.execute()来分发,execute()执行后的结果将写入到http响应包中:

UniValue CRPCTable::execute(const JSONRPCRequest &request) const

{

    // Return immediately if in warmup

    {

        LOCK(cs_rpcWarmup);

        if (fRPCInWarmup)

            throw JSONRPCError(RPC_IN_WARMUP, rpcWarmupStatus);

    }

    // Find method

    const CRPCCommand *pcmd = tableRPC[request.strMethod];

    if (!pcmd)

        throw JSONRPCError(RPC_METHOD_NOT_FOUND, "Method not found");

    g_rpcSignals.PreCommand(*pcmd);

    try

    {

        // Execute, convert arguments to array if necessary

        if (request.params.isObject()) {

            return pcmd->actor(transformNamedArguments(request, pcmd->argNames));

        } else {

            return pcmd->actor(request);

        }

    }

    catch (const std::exception& e)

    {

        throw JSONRPCError(RPC_MISC_ERROR, e.what());

    }

}

    代码也比较容易理解,就是从根据json-rpc协议,从请求中读取method,然后根据method找到对应的CRPCCommand执行体,这些执行体就是4.2.1节中提到那几张分门别类的映射表。

    至此,比特币的json-rpc服务端的脉络我们就梳理的差不多了,整体框架并不难理解,只是封装的略微复杂一点点。

上一篇下一篇

猜你喜欢

热点阅读