区块链 Ripple

Ripple中处理高并发请求的方法

2018-05-28  本文已影响2人  SwordShield

1.角色与限制

下面这段代码是根据发送请求的客户端ip来判断客户端的角色,根据角色不同做出不同的处理,admin角色的用户无并发限制,而普通用户则会判断当前是否应该断开连接:

func:ServerHandlerImp::processRequest
...
Resource::Consumer usage;
if (isUnlimited(role))
{
    usage = m_resourceManager.newUnlimitedEndpoint(
        remoteIPAddress.to_string());
}
else
{
    usage = m_resourceManager.newInboundEndpoint(remoteIPAddress);
    if (usage.disconnect())
    {
        HTTPReply(503, "Server is overloaded", output, rpcJ);
        return;
    }
}

admin用户

admin用户可在配置文件中配置:

[port_ws_public]
port = 6006
ip = 0.0.0.0
admin = 192.168.0.133,192.168.0.144
protocol = ws

配置为admin的ip是不受并发约束的,所以如果是一个我们已知的客户端要发送大量请求到节点,可以将这个客户端的ip添加到admin列表中,以避免连接断开

普通用户

在Ripple中,每个客户端都是有一个并发额度限制的,每个请求根据请求类型不同,收取相应的并发费用,当并发费用超过额度(15000*32),则下次请求会断开连接。

并发费用的计算:

发送请求费用会增加,不发送请求时,费用会减少,减少规则:

// A span larger than four times the window decays the
// value to an insignificant amount so just reset it.
//
if (elapsed > 4 * Window)
{
    m_value = value_type();
}
else
{
    while (elapsed--)
        m_value -= (m_value + Window - 1) / Window;
}

上面代码中:

不同请求的收费情况(部分):

不考虑衰减,并发较高的情况下,用户发送到 15000 * 32 / 400 = 1200 个交易就会被断开连接,考虑到衰减,这值会大一些,实测并发为18的情况下,这个值为1500-1700

2.FeeEscalation特性

开启方式

开启FeeEscalation特性有两种方式,可直接在配置文件中配置:

[features]
FeeEscalation

也可以在链启动两周时自动开启

功能

开启FeeEscalation特性后,当节点发现当前共识中的交易数量多了,新的交易就会排队,返回terQUEUED错误,返回这一错误后,交易有极大可能仍然会成功,所以需要 像返回tesSUCCESS一样去确认共识结果

关于排队的参数,有一个是比较重要的:

[transaction_queue]
minimum_txn_in_ledger = 200

这个配置标明当前区块所能共识的交易数量,也就是说当前正在共识的交易量达到200时,后面的交易都要进行排队

实现方式:

一个交易发过来之后会经过下面的流程:

  1. 先检查FeeEscalation特性是否开启,如果未开启,直接走正常流程的apply
  2. 计算当前的费用,代码如下:
// We may need the base fee for multiple transactions
// or transaction replacement, so just pull it up now.
// TODO: Do we want to avoid doing it again during
//   preclaim?
auto const baseFee = calculateBaseFee(app, view, *tx, j);
auto const feeLevelPaid = getFeeLevelPaid(*tx,
    baseLevel, baseFee, setup_);
auto const requiredFeeLevel = [&]()
{
    auto feeLevel = FeeMetrics::scaleFeeLevel(metricsSnapshot, view);
    if ((flags & tapPREFER_QUEUE) && byFee_.size())
    {
        return std::max(feeLevel, byFee_.begin()->feeLevel);
    }
    return feeLevel;
}();

scaleFeeLevel很关键,下面是实现:

std::uint64_t
TxQ::FeeMetrics::scaleFeeLevel(Snapshot const& snapshot,
    OpenView const& view, std::uint32_t txCountPadding)
{
    // Transactions in the open ledger so far
    auto const current = view.txCount() + txCountPadding;

    auto const target = snapshot.txnsExpected;
    auto const multiplier = snapshot.escalationMultiplier;

    // Once the open ledger bypasses the target,
    // escalate the fee quickly.
    if (current > target)
    {
        // Compute escalated fee level
        // Don't care about the overflow flag
        return mulDiv(multiplier, current * current,
            target * target).second;
    }

    return baseLevel;
}

std::uint32_t minimumEscalationMultiplier = baseLevel * 500;

里面的txnsExpected 的值就是我们配置的 minimum_txn_in_ledger

这里multiplier的默认值为128000,可以看到current与target差距越大,最终的费用越高

  1. 如果交易提供的费用足够(不小于requiredFeeLevel),则继续执行,否则排队
  2. 排队涉及到两个数据结构:
FeeMultiSet byFee_;
AccountMap byAccount_;

byFee是按费用排序的交易集

byAccount_是按账户分隔的map

  1. 当新的区块共识通过,会创建新的OpenLedger,创建后立即应用队列中的交易:
RCLConsensus::Adaptor::doAccept
...
    // Build new open ledger
    auto lock = make_lock(app_.getMasterMutex(), std::defer_lock);
    auto sl = make_lock(ledgerMaster_.peekMutex(), std::defer_lock);
    std::lock(lock, sl);

    auto const lastVal = ledgerMaster_.getValidatedLedger();
    boost::optional<Rules> rules;
    if (lastVal)
        rules.emplace(*lastVal, app_.config().features);
    else
        rules.emplace(app_.config().features);
    app_.openLedger().accept(
        app_,
        *rules,
        sharedLCL.ledger_,
        localTxs_.getTxSet(),
        anyDisputes,
        retriableTxs,
        tapNONE,
        "consensus",
        [&](OpenView& view, beast::Journal j) {
            // Stuff the ledger with transactions from the queue.
            return app_.getTxQ().accept(app_, view);
        });
...

TxQ::accept中仍然有费用的限制

TxQ::accept
...
    auto const requiredFeeLevel = FeeMetrics::scaleFeeLevel(
        metricSnapshot, view);
    auto const feeLevelPaid = candidateIter->feeLevel;
    if (feeLevelPaid >= requiredFeeLevel)
    {
        auto firstTxn = candidateIter->txn;

        JLOG(j_.trace()) << "Applying queued transaction " <<
            candidateIter->txID << " to open ledger.";

        TER txnResult;
        bool didApply;
        std::tie(txnResult, didApply) = candidateIter->apply(app, view);
...

TxQ::accept中使用了互斥量

std::lock_guard<std::mutex> lock(mutex_);

与TxQ::apply是并发互斥的

上一篇 下一篇

猜你喜欢

热点阅读