Ripple中处理高并发请求的方法
1.角色与限制
下面这段代码是根据发送请求的客户端ip来判断客户端的角色,根据角色不同做出不同的处理,admin角色的用户无并发限制,而普通用户则会判断当前是否应该断开连接:
func:ServerHandlerImp::processRequest
...
Resource::Consumer usage;
if (isUnlimited(role))
{
usage = m_resourceManager.newUnlimitedEndpoint(
remoteIPAddress.to_string());
}
else
{
usage = m_resourceManager.newInboundEndpoint(remoteIPAddress);
if (usage.disconnect())
{
HTTPReply(503, "Server is overloaded", output, rpcJ);
return;
}
}
admin用户
admin用户可在配置文件中配置:
[port_ws_public]
port = 6006
ip = 0.0.0.0
admin = 192.168.0.133,192.168.0.144
protocol = ws
配置为admin的ip是不受并发约束的,所以如果是一个我们已知的客户端要发送大量请求到节点,可以将这个客户端的ip添加到admin列表中,以避免连接断开
普通用户
在Ripple中,每个客户端都是有一个并发额度限制的,每个请求根据请求类型不同,收取相应的并发费用,当并发费用超过额度(15000*32),则下次请求会断开连接。
并发费用的计算:
发送请求费用会增加,不发送请求时,费用会减少,减少规则:
// A span larger than four times the window decays the
// value to an insignificant amount so just reset it.
//
if (elapsed > 4 * Window)
{
m_value = value_type();
}
else
{
while (elapsed--)
m_value -= (m_value + Window - 1) / Window;
}
上面代码中:
- Window = 32,值参见 decayWindowSeconds 的定义及使用,所以最大经过2分钟,费用会重置为0,实际情况考虑衰减的情况,应该1分钟左右就会重置
- eclapsed为上次请求到现在经过的秒数,当elapsed越大,m_value值越大,m_value减小的越快
不同请求的收费情况(部分):
- 普通请求:20
- 提交交易:400
- 无效请求:100
- 异常请求:100
- 查找路径:50-400
- gateway_balances,ledger_request,path_find(create),ripple_path_find,sign,signFor : 3000
不考虑衰减,并发较高的情况下,用户发送到 15000 * 32 / 400 = 1200 个交易就会被断开连接,考虑到衰减,这值会大一些,实测并发为18的情况下,这个值为1500-1700
2.FeeEscalation特性
开启方式
开启FeeEscalation特性有两种方式,可直接在配置文件中配置:
[features]
FeeEscalation
也可以在链启动两周时自动开启
功能
开启FeeEscalation特性后,当节点发现当前共识中的交易数量多了,新的交易就会排队,返回terQUEUED错误,返回这一错误后,交易有极大可能仍然会成功,所以需要 像返回tesSUCCESS一样去确认共识结果
关于排队的参数,有一个是比较重要的:
[transaction_queue]
minimum_txn_in_ledger = 200
这个配置标明当前区块所能共识的交易数量,也就是说当前正在共识的交易量达到200时,后面的交易都要进行排队
实现方式:
一个交易发过来之后会经过下面的流程:
- 先检查FeeEscalation特性是否开启,如果未开启,直接走正常流程的apply
- 计算当前的费用,代码如下:
// We may need the base fee for multiple transactions
// or transaction replacement, so just pull it up now.
// TODO: Do we want to avoid doing it again during
// preclaim?
auto const baseFee = calculateBaseFee(app, view, *tx, j);
auto const feeLevelPaid = getFeeLevelPaid(*tx,
baseLevel, baseFee, setup_);
auto const requiredFeeLevel = [&]()
{
auto feeLevel = FeeMetrics::scaleFeeLevel(metricsSnapshot, view);
if ((flags & tapPREFER_QUEUE) && byFee_.size())
{
return std::max(feeLevel, byFee_.begin()->feeLevel);
}
return feeLevel;
}();
scaleFeeLevel很关键,下面是实现:
std::uint64_t
TxQ::FeeMetrics::scaleFeeLevel(Snapshot const& snapshot,
OpenView const& view, std::uint32_t txCountPadding)
{
// Transactions in the open ledger so far
auto const current = view.txCount() + txCountPadding;
auto const target = snapshot.txnsExpected;
auto const multiplier = snapshot.escalationMultiplier;
// Once the open ledger bypasses the target,
// escalate the fee quickly.
if (current > target)
{
// Compute escalated fee level
// Don't care about the overflow flag
return mulDiv(multiplier, current * current,
target * target).second;
}
return baseLevel;
}
std::uint32_t minimumEscalationMultiplier = baseLevel * 500;
里面的txnsExpected 的值就是我们配置的 minimum_txn_in_ledger
这里multiplier的默认值为128000,可以看到current与target差距越大,最终的费用越高
- 如果交易提供的费用足够(不小于requiredFeeLevel),则继续执行,否则排队
- 排队涉及到两个数据结构:
FeeMultiSet byFee_;
AccountMap byAccount_;
byFee是按费用排序的交易集
byAccount_是按账户分隔的map
- 当新的区块共识通过,会创建新的OpenLedger,创建后立即应用队列中的交易:
RCLConsensus::Adaptor::doAccept
...
// Build new open ledger
auto lock = make_lock(app_.getMasterMutex(), std::defer_lock);
auto sl = make_lock(ledgerMaster_.peekMutex(), std::defer_lock);
std::lock(lock, sl);
auto const lastVal = ledgerMaster_.getValidatedLedger();
boost::optional<Rules> rules;
if (lastVal)
rules.emplace(*lastVal, app_.config().features);
else
rules.emplace(app_.config().features);
app_.openLedger().accept(
app_,
*rules,
sharedLCL.ledger_,
localTxs_.getTxSet(),
anyDisputes,
retriableTxs,
tapNONE,
"consensus",
[&](OpenView& view, beast::Journal j) {
// Stuff the ledger with transactions from the queue.
return app_.getTxQ().accept(app_, view);
});
...
TxQ::accept中仍然有费用的限制
TxQ::accept
...
auto const requiredFeeLevel = FeeMetrics::scaleFeeLevel(
metricSnapshot, view);
auto const feeLevelPaid = candidateIter->feeLevel;
if (feeLevelPaid >= requiredFeeLevel)
{
auto firstTxn = candidateIter->txn;
JLOG(j_.trace()) << "Applying queued transaction " <<
candidateIter->txID << " to open ledger.";
TER txnResult;
bool didApply;
std::tie(txnResult, didApply) = candidateIter->apply(app, view);
...
TxQ::accept中使用了互斥量
std::lock_guard<std::mutex> lock(mutex_);
与TxQ::apply是并发互斥的