JanusGraph源码解析(二) cassandra连接池设置
JanusGraph源码解析(二) cassandra连接池设置不生效之谜
1. Cassandra连接数配置不生效
在给JanusGraph导入数据时,发现在并发量较高时,出现了cassandra连接数不够用的情况,异常堆栈如下:
12:13:48.689 [pool-1-thread-5] DEBUG com.datastax.driver.core.RequestHandler - [1447374269-1] Error querying /127.0.0.1:9042 : com.datastax.driver.core.exceptions.BusyPoolException: [/127.0.0.1] Pool is busy (no available connection and the queue has reached its max size 256)
12:13:48.689 [pool-1-thread-1] DEBUG com.datastax.driver.core.RequestHandler - [1036619747-1] Error querying /127.0.0.1:9042 : com.datastax.driver.core.exceptions.BusyPoolException: [/127.0.0.1] Pool is busy (no available connection and the queue has reached its max size 256)
12:13:48.689 [pool-1-thread-5] DEBUG com.datastax.driver.core.RequestHandler - [848853037-1] Error querying /127.0.0.1:9042 : com.datastax.driver.core.exceptions.BusyPoolException: [/127.0.0.1] Pool is busy (no available connection and the queue has reached its max size 256)
12:13:48.689 [pool-1-thread-1] DEBUG com.datastax.driver.core.RequestHandler - [1914107877-1] Error querying /127.0.0.1:9042 : com.datastax.driver.core.exceptions.BusyPoolException: [/127.0.0.1] Pool is busy (no available connection and the queue has reached its max size 256)
12:13:48.689 [pool-1-thread-2] DEBUG com.datastax.driver.core.RequestHandler - [288447423-1] Error querying /127.0.0.1:9042 : com.datastax.driver.core.exceptions.BusyPoolException: [/127.0.0.1] Pool is busy (no available connection and the queue has reached its max size 256)
org.janusgraph.diskstorage.TemporaryBackendException: Temporary failure in storage backend
at io.vavr.API$Match$Case0.apply(API.java:3174)
at io.vavr.API$Match.of(API.java:3137)
at org.janusgraph.diskstorage.cql.CQLKeyColumnValueStore.lambda$static$0(CQLKeyColumnValueStore.java:123)
at org.janusgraph.diskstorage.cql.CQLStoreManager.mutateManyUnlogged(CQLStoreManager.java:508)
at org.janusgraph.diskstorage.cql.CQLStoreManager.mutateMany(CQLStoreManager.java:439)
at org.janusgraph.diskstorage.locking.consistentkey.ExpectedValueCheckingStoreManager.mutateMany(ExpectedValueCheckingStoreManager.java:79)
at org.janusgraph.diskstorage.keycolumnvalue.cache.CacheTransaction$1.call(CacheTransaction.java:94)
at org.janusgraph.diskstorage.keycolumnvalue.cache.CacheTransaction$1.call(CacheTransaction.java:91)
at org.janusgraph.diskstorage.util.BackendOperation.executeDirect(BackendOperation.java:68)
at org.janusgraph.diskstorage.util.BackendOperation.execute(BackendOperation.java:54)
at org.janusgraph.diskstorage.keycolumnvalue.cache.CacheTransaction.persist(CacheTransaction.java:91)
at org.janusgraph.diskstorage.keycolumnvalue.cache.CacheTransaction.flushInternal(CacheTransaction.java:133)
at org.janusgraph.diskstorage.keycolumnvalue.cache.CacheTransaction.mutate(CacheTransaction.java:86)
at org.janusgraph.diskstorage.keycolumnvalue.cache.KCVSCache.mutateEntries(KCVSCache.java:65)
at org.janusgraph.diskstorage.BackendTransaction.mutateIndex(BackendTransaction.java:212)
at org.janusgraph.graphdb.database.StandardJanusGraph.prepareCommit(StandardJanusGraph.java:633)
at org.janusgraph.graphdb.database.StandardJanusGraph.commit(StandardJanusGraph.java:726)
at org.janusgraph.graphdb.transaction.StandardJanusGraphTx.commit(StandardJanusGraphTx.java:1380)
at org.janusgraph.graphdb.tinkerpop.JanusGraphBlueprintsGraph$GraphTransaction.doCommit(JanusGraphBlueprintsGraph.java:296)
at org.apache.tinkerpop.gremlin.structure.util.AbstractTransaction.commit(AbstractTransaction.java:104)
at org.iiai.season.tool.BulkLoadFactory.lambda$main$0(BulkLoadFactory.java:84)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
这时就想着,增加连接数,看了下cassandra的文档,JanusGraph在17年的版本里(这里还是Titan吧)就已经支持了cassandra连接数可配置的配置项https://github.com/JanusGraph/janusgraph/pull/776
然后,按照手册配置如下:
storage.cql.protocol-version=3
storage.cql.local-core-connections-per-host=10
storage.cql.local-max-connections-per-host=20
storage.cql.local-max-requests-per-connection=2000
storage.buffer-size=1024
但是接下来的情况有点诡异,我发现虽然我已经更改了设置,但是发现情况并没有改善,于是查看了下初始化的JanusGraph对象,发现graph对象中的CQLStoreManager中的连接数并不是我设置的值,而是默认值1!!!!
(当时没截图,这里截图意思一下,就是在这个对象里看到连接数)
2. 源码解析
JanusGraph的cassandra连接池是在对象CQLStoreManager中的,于是我在CQLStoreManager中打了断点,查看CQLStoreManager的初始化时的属性,然而我发现在实例化JanusGraph时,CQLStoreManager启动了两次,第一次连接池中的连接数是10,而第二次变成了默认值1,而在JanusGraph修改cassandra时使用的恰好是连接数是1的这个实例。下面根据源码分析下到底是哪里出了问题。
查看CQLConfigOptions.java中的配置,CQLStoreManager中使用了这些配置,使用配置项初始化PoolingOptions,并用PoolingOptions来初始化Cluster
CQLStoreManager.java
//CQLStoreManager连接池配置
PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions
.setMaxRequestsPerConnection(
HostDistance.LOCAL,
configuration.get(LOCAL_MAX_REQUESTS_PER_CONNECTION))
.setMaxRequestsPerConnection(
HostDistance.REMOTE,
configuration.get(REMOTE_MAX_REQUESTS_PER_CONNECTION));
poolingOptions
.setConnectionsPerHost(
HostDistance.LOCAL,
configuration.get(LOCAL_CORE_CONNECTIONS_PER_HOST),
configuration.get(LOCAL_MAX_CONNECTIONS_PER_HOST))
.setConnectionsPerHost(
HostDistance.REMOTE,
configuration.get(REMOTE_CORE_CONNECTIONS_PER_HOST),
configuration.get(REMOTE_MAX_CONNECTIONS_PER_HOST));
return builder.withPoolingOptions(poolingOptions).build();
JanusGraphFactory.java
public static JanusGraph open(ReadConfiguration configuration, String backupName) {
final ModifiableConfiguration config = new ModifiableConfiguration(ROOT_NS, (WriteConfiguration) configuration, BasicConfiguration.Restriction.NONE);
final String graphName = config.has(GRAPH_NAME) ? config.get(GRAPH_NAME) : backupName;
final JanusGraphManager jgm = JanusGraphManagerUtility.getInstance();
if (null != graphName) {
Preconditions.checkNotNull(jgm, JANUS_GRAPH_MANAGER_EXPECTED_STATE_MSG);
return (JanusGraph) jgm.openGraph(graphName, gName -> new StandardJanusGraph(new GraphDatabaseConfigurationBuilder().build(configuration)));
} else {
if (jgm != null) {
log.warn("You should supply \"graph.graphname\" in your .properties file configuration if you are opening " +
"a graph that has not already been opened at server start, i.e. it was " +
"defined in your YAML file. This will ensure the graph is tracked by the JanusGraphManager, " +
"which will enable autocommit and rollback functionality upon all gremlin script executions. " +
"Note that JanusGraphFactory#open(String === shortcut notation) does not support consuming the property " +
"\"graph.graphname\" so these graphs should be accessed dynamically by supplying a .properties file here " +
"or by using the ConfiguredGraphFactory.");
}
// 此处初始化JanusGraph实例
return new StandardJanusGraph(new GraphDatabaseConfigurationBuilder().build(configuration));
}
}
实际上CQLStoreManager在cassandra中增加InternalRelation时使用的是openStores属性中取到的CQLKeyClumnValue
CQLStoreManager.java
// Create an async un-logged batch per partition key
private void mutateManyUnlogged(final Map<String, Map<StaticBuffer, KCVMutation>> mutations, final StoreTransaction txh) throws BackendException {
final MaskedTimestamp commitTime = new MaskedTimestamp(txh);
final Future<Seq<ResultSet>> result = Future.sequence(this.executorService, Iterator.ofAll(mutations.entrySet()).flatMap(tableNameAndMutations -> {
final String tableName = tableNameAndMutations.getKey();
final Map<StaticBuffer, KCVMutation> tableMutations = tableNameAndMutations.getValue();
// 这里从openStores中属性中取到CQLKeyColumnStore预编译,给cassandra提交数据
final CQLKeyColumnValueStore columnValueStore = Option.of(this.openStores.get(tableName))
.getOrElseThrow(() -> new IllegalStateException("Store cannot be found: " + tableName));
return Iterator.ofAll(tableMutations.entrySet()).flatMap(keyAndMutations -> {
final StaticBuffer key = keyAndMutations.getKey();
final KCVMutation keyMutations = keyAndMutations.getValue();
// 提交数据
final Iterator<Statement> deletions = Iterator.of(commitTime.getDeletionTime(this.times))
.flatMap(deleteTime -> Iterator.ofAll(keyMutations.getDeletions()).map(deletion -> columnValueStore.deleteColumn(key, deletion, deleteTime)));
final Iterator<Statement> additions = Iterator.of(commitTime.getAdditionTime(this.times))
.flatMap(addTime -> Iterator.ofAll(keyMutations.getAdditions()).map(addition -> columnValueStore.insertColumn(key, addition, addTime)));
return Iterator.concat(deletions, additions)
.grouped(this.batchSize)
.map(group -> Future.fromJavaFuture(this.executorService,
this.session.executeAsync(
new BatchStatement(Type.UNLOGGED)
.addAll(group)
.setConsistencyLevel(getTransaction(txh).getWriteConsistencyLevel()))));
});
}));
result.await();
if (result.isFailure()) {
throw EXCEPTION_MAPPER.apply(result.getCause().get());
}
sleepAfterWrite(txh, commitTime);
}
那openStores种的CQLKeyColumnValue哪来的,openDatabase方法,根据所设定的参数,来打开对应不同表的连接
@Override
public KeyColumnValueStore openDatabase(final String name, final Container metaData) throws BackendException {
Supplier<Boolean> initializeTable = () -> Optional.ofNullable(this.cluster.getMetadata().getKeyspace(this.keyspace)).map(k -> k.getTable(name) == null).orElse(true);
// 这里gerStoreConfig方法获取了传入CQLStoreManager的配置
return this.openStores.computeIfAbsent(name, n -> new CQLKeyColumnValueStore(this, n, getStorageConfig(), () -> this.openStores.remove(n), allowCompactStorage, initializeTable));
}
GraphDatabaseConfigurationBuilder.build 这个建造者模式的build方法初始化了一个KeyColumnValueStoreManager storeManager,调用的是Backend.getStoreManager方法,这个方法也是传入了配置对象,然后根据后端存储的配置返回KeyColumnValueStoreManager的实现,这里的实现是CQLStoreManager
public GraphDatabaseConfiguration build(ReadConfiguration localConfig){
Preconditions.checkNotNull(localConfig);
BasicConfiguration localBasicConfiguration = new BasicConfiguration(ROOT_NS,localConfig, BasicConfiguration.Restriction.NONE);
ModifiableConfiguration overwrite = new ModifiableConfiguration(ROOT_NS,new CommonsConfiguration(), BasicConfiguration.Restriction.NONE);
// 根据配置获取后端存储的storeManager,这里是CQLStoreManager
// 这里的CQLStoreManager确实打开了10个连接
final KeyColumnValueStoreManager storeManager = Backend.getStorageManager(localBasicConfiguration);
final StoreFeatures storeFeatures = storeManager.getFeatures();
// 然后只有这里用到了storeManager
final ReadConfiguration globalConfig = new ReadConfigurationBuilder().buildGlobalConfiguration(
localConfig, localBasicConfiguration, overwrite, storeManager,
new ModifiableConfigurationBuilder(), new KCVSConfigurationBuilder());
//Copy over local config options
ModifiableConfiguration localConfiguration = new ModifiableConfiguration(ROOT_NS, new CommonsConfiguration(), BasicConfiguration.Restriction.LOCAL);
localConfiguration.setAll(getLocalSubset(localBasicConfiguration.getAll()));
Configuration combinedConfig = new MixedConfiguration(ROOT_NS,globalConfig,localConfig);
//Compute unique instance id
String uniqueGraphId = UniqueInstanceIdRetriever.getInstance().getOrGenerateUniqueInstanceId(combinedConfig);
overwrite.set(UNIQUE_INSTANCE_ID, uniqueGraphId);
checkAndOverwriteTransactionLogConfiguration(combinedConfig, overwrite, storeFeatures);
checkAndOverwriteSystemManagementLogConfiguration(combinedConfig, overwrite);
MergedConfiguration configuration = new MergedConfiguration(overwrite,combinedConfig);
return new GraphDatabaseConfiguration(localConfig, localConfiguration, uniqueGraphId, configuration);
}
代码执行到这里日志打印确实打开了10个cassandra连接
10:33:22.659 [main] DEBUG com.datastax.driver.core.Host.STATES - [/127.0.0.1:9042] preparing to open 10 new connections, total = 11
10:33:22.660 [JanusGraph Cluster-nio-worker-1] DEBUG com.datastax.driver.core.Connection - Connection[/127.0.0.1:9042-2, inFlight=0, closed=false] Connection established, initializing transport
10:33:22.662 [JanusGraph Cluster-nio-worker-2] DEBUG com.datastax.driver.core.Connection - Connection[/127.0.0.1:9042-3, inFlight=0, closed=false] Connection established, initializing transport
10:33:22.662 [JanusGraph Cluster-nio-worker-3] DEBUG com.datastax.driver.core.Connection - Connection[/127.0.0.1:9042-4, inFlight=0, closed=false] Connection established, initializing transport
10:33:22.664 [JanusGraph Cluster-nio-worker-4] DEBUG com.datastax.driver.core.Connection - Connection[/127.0.0.1:9042-5, inFlight=0, closed=false] Connection established, initializing transport
10:33:22.664 [JanusGraph Cluster-nio-worker-5] DEBUG com.datastax.driver.core.Connection - Connection[/127.0.0.1:9042-6, inFlight=0, closed=false] Connection established, initializing transport
10:33:22.666 [JanusGraph Cluster-nio-worker-6] DEBUG com.datastax.driver.core.Connection - Connection[/127.0.0.1:9042-7, inFlight=0, closed=false] Connection established, initializing transport
image-20190724104036337.png
这里初始化的storeManager确实也初始化了十个连接
然后在GraphDatabaseConfigurationBuilder的build方法中只有初始化ReadConfiguration globalConfig时使用了这个storeManager,看下这里如何使用了这个storeManager
ReadConfigurationBuilder().bulidGlobalConfiguration()
public ReadConfiguration buildGlobalConfiguration(ReadConfiguration localConfig,
BasicConfiguration localBasicConfiguration,
ModifiableConfiguration overwrite,
KeyColumnValueStoreManager storeManager,
ModifiableConfigurationBuilder modifiableConfigurationBuilder,
KCVSConfigurationBuilder kcvsConfigurationBuilder){
//Read out global configuration
// 这个try-with-resources中用到了storeManager进去看下,源码贴在下文
try (KCVSConfiguration keyColumnValueStoreConfiguration =
kcvsConfigurationBuilder.buildStandaloneGlobalConfiguration(storeManager,localBasicConfiguration)){
// If lock prefix is unspecified, specify it now
if (!localBasicConfiguration.has(LOCK_LOCAL_MEDIATOR_GROUP)) {
overwrite.set(LOCK_LOCAL_MEDIATOR_GROUP, storeManager.getName());
}
//Freeze global configuration if not already frozen!
ModifiableConfiguration globalWrite = modifiableConfigurationBuilder.buildGlobalWrite(keyColumnValueStoreConfiguration);
if (!globalWrite.isFrozen()) {
//Copy over global configurations
globalWrite.setAll(getGlobalSubset(localBasicConfiguration.getAll()));
setupJanusGraphVersion(globalWrite);
setupStorageVersion(globalWrite);
setupTimestampProvider(globalWrite, localBasicConfiguration, storeManager);
globalWrite.freezeConfiguration();
} else {
String graphName = localConfig.get(GRAPH_NAME.toStringWithoutRoot(), String.class);
final boolean upgradeAllowed = isUpgradeAllowed(globalWrite, localBasicConfiguration);
if (upgradeAllowed) {
setupUpgradeConfiguration(graphName, globalWrite);
} else {
checkJanusGraphStorageVersionEquality(globalWrite, graphName);
}
checkJanusGraphVersion(globalWrite, localBasicConfiguration, keyColumnValueStoreConfiguration, overwrite);
checkOptionsWithDiscrepancies(globalWrite, localBasicConfiguration, overwrite);
}
// 上面一些参数设置和属性校验,但是用到的跟cassandra打交道的方法都是从system_properties表中查数据,以asReadConfiguration为例,源码在下文
return keyColumnValueStoreConfiguration.asReadConfiguration();
}
}
KCVSConfigurationBuilder.buildStandaloneGloabelConfiguraton
这里初始化了表system_properties的CQLKeyColumnValueStore,注意调用storeManager的openDatabase方法初始化的CQLKeyColumnValueStore与storeManager共用一个连接池,因此如果此时使用这个storeManager对象的mutateMany方法改变表system_properites的数据时,这里可用连接数也是10个,这里返回的KCVSConfiguration包含了这里初始化的对system_properties的CQLKeyColumnValueStore 对象
public KCVSConfiguration buildStandaloneGlobalConfiguration(final KeyColumnValueStoreManager manager, final Configuration config) {
try {
final StoreFeatures features = manager.getFeatures();
return buildGlobalConfiguration(new BackendOperation.TransactionalProvider() {
@Override
public StoreTransaction openTx() throws BackendException {
return manager.beginTransaction(StandardBaseTransactionConfig.of(config.get(TIMESTAMP_PROVIDER),features.getKeyConsistentTxConfig()));
}
@Override
public void close() throws BackendException {
manager.close();
}
// CQLStoreManager.openDatabase如果openStores的Map中不存在KeyColumnValueStore,则初始化一个,这里应该是CQLKeyColumnValueStore
},manager.openDatabase(SYSTEM_PROPERTIES_STORE_NAME),config);
} catch (BackendException e) {
throw new JanusGraphException("Could not open global configuration",e);
}
}
KCVSConfiguration.asReadConfiguration()
这里先调用了toMap()方法然后就是一个ReadConfiguration的适配器,这里直接看toMap方法是干嘛的,在toMap方法中开始跟cassandra交互了,注意这里的store属性是system_properties表中的CQLKeyColumnValueStore的实现
public ReadConfiguration asReadConfiguration() {
final Map<String,Object> entries = toMap();
return new ReadConfiguration() {
@Override
public <O> O get(String key, Class<O> dataType) {
Preconditions.checkArgument(!entries.containsKey(key) || dataType.isAssignableFrom(entries.get(key).getClass()));
return (O)entries.get(key);
}
@Override
public Iterable<String> getKeys(final String prefix) {
final boolean prefixBlank = StringUtils.isBlank(prefix);
return entries.keySet().stream().filter(s -> prefixBlank || s.startsWith(prefix)).collect(Collectors.toList());
}
@Override
public void close() {
//Do nothing
}
};
}
private Map<String,Object> toMap() {
Map<String,Object> entries = Maps.newHashMap();
List<Entry> result = BackendOperation.execute(new BackendOperation.Transactional<List<Entry>>() {
@Override
public List<Entry> call(StoreTransaction txh) throws BackendException {
//这里从system_properties表里去查配置
return store.getSlice(new KeySliceQuery(rowKey, BufferUtil.zeroBuffer(1), BufferUtil.oneBuffer(128)),txh);
}
@Override
public String toString() {
return "setConfiguration";
}
},txProvider, times, maxOperationWaitTime);
for (Entry entry : result) {
String key = staticBuffer2String(entry.getColumnAs(StaticBuffer.STATIC_FACTORY));
Object value = staticBuffer2Object(entry.getValueAs(StaticBuffer.STATIC_FACTORY), Object.class);
entries.put(key,value);
}
return entries;
}
这里的entries没有返回我的我设的连接池的连接数,之后初始化的图里面就会取默认值,连接池的连接数是1!!!看到这里,所以我设了10个连接数,就是为了从system_properties表中查出系统属性?然后这个属性还没有我的连接池连接数???
我本来在想这会不会可能是个bug,但是实在不敢相信,这要是个bug实在影响太过严重,但是此时我已经知道JanusGraph连接cassandra的配置是存储在cassandra中的表system_properties中,
所以接下来我想是不是这个配置在JanusGraph是不是初始化配置以后就不会改变了。所以接下来我把keyspace drop掉,重新初始化JanusGraph发现问题居然解决了,此时连接表edgestore的连接数是10了!
也就是说这个思路是正确的,那么接下来还是看源码,把JanusGraph在cassandra中的keyspace drop掉以后,从头开始跟下代码,看下system_properties属性是怎么插入进去的。
首先看下keyspace是在哪里初始化的,这里应该是在CQLStoreManager中intializeSession方法中初始化的,调用堆栈也同样是初始化GraphDatabaseConfigurationBuilder中的build方法中实例化的那个CQLStoreManager,但是虽然此处创建了keyspace但是还没有创建任何表
Session initializeSession(final String keyspaceName) {
final Session s = this.cluster.connect();
// if the keyspace already exists, just return the session
if (this.cluster.getMetadata().getKeyspace(keyspaceName) != null) {
return s;
}
final Configuration configuration = getStorageConfig();
// Setting replication strategy based on value reading from the configuration: either "SimpleStrategy" or "NetworkTopologyStrategy"
final Map<String, Object> replication = Match(configuration.get(REPLICATION_STRATEGY)).of(
Case($("SimpleStrategy"), strategy -> HashMap.<String, Object> of("class", strategy, "replication_factor", configuration.get(REPLICATION_FACTOR))),
Case($("NetworkTopologyStrategy"),
strategy -> HashMap.<String, Object> of("class", strategy)
.merge(Array.of(configuration.get(REPLICATION_OPTIONS))
.grouped(2)
.toMap(array -> Tuple.of(array.get(0), Integer.parseInt(array.get(1)))))))
.toJavaMap();
// 此处初始化了keyspace
s.execute(createKeyspace(keyspaceName)
.ifNotExists()
.with()
.replication(replication));
return s;
}
经过之前的代码走读,已经知道会在KCVSConfigurationBuilder中的buildStandaloneGlobalConfiguration方法调用CQLStoreManager的openDatabase方法来创建表,但是这里还没有插入数据,其实在之前的走读中已经注意到了在ReadConfigurationBuiler中的buildGlobalConfiguration方法中有个判断globalWrite是否frozen方法,之前keyspace已经创建好以后走的都是已经frozen的分支,如果globalWrite没有frozen就会将连接数等属性值存在system_properties表中,globalWrite的write方法会将属性存到cassandra的system_properties表中。
所以,如果要更改的你的连接数的话,记得要把globalWrite解冻,然后再修改,我个人现在是没有理解要做这种设计,如果在生产环境里随着业务量的增长需要更改连接数的话,是不可能把整个graph drop掉重建的,但是可以考虑修改cassandra的frozen字段来解冻再更改连接数。