基于Spark的Druid 索引任务(druid-spark-b
1. 前言
随着Druid上的DataSource的数量和数据量增加,使用原来的 Hadoop MR索引任务已经不能满足对大数据量写入Druid的需求,急需找到更快的写入方式。就是本文介绍的druid-spark-batch
2. druid-spark-batch
2.1 编译
git clone https://github.com/metamx/druid-spark-batch.git
cd druid-spark-batch
git checkout -b 0.11.0 0.11.0 # 获取 0.11.0 分支代码
sbt clean test publish-local publish-m2
注:sbt 编译druid-spark-batch时,需要先把对应版本的druid(如: 0.11.0)安装到本地mavne仓库,否则或报错 找不到对应的druid包
2.2 配置
-
Overload 节点安装 druid-spark-batch 扩展
-
MiddleManager 节点安装 druid-spark-batch 扩展
-
Spark依赖包安装
- 拷贝对应版本despark的包到
druid-install-path/hadoop-dependencies/spark-core_2.10/spark-version
目录下 - MiddleManager 节点添加spark依赖
# Hadoop indexing druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.3", "org.apache.spark:spark-core_2.10:2.2.0"]
- 拷贝对应版本despark的包到
2.3 Task Json
{
"type": "index_spark",
"dataSchema": {
"dataSource": "spark-test",
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "longSum",
"name": "L_QUANTITY_longSum",
"fieldName": "l_quantity"
},
{
"type": "doubleSum",
"name": "L_EXTENDEDPRICE_doubleSum",
"fieldName": "l_extendedprice"
},
{
"type": "doubleSum",
"name": "L_DISCOUNT_doubleSum",
"fieldName": "l_discount"
},
{
"type": "doubleSum",
"name": "L_TAX_doubleSum",
"fieldName": "l_tax"
}
],
"parser": {
"type": "string",
"parseSpec": {
"format": "csv",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensionExclusions": [],
"dimensions": [
"l_orderkey",
"l_partkey",
"l_suppkey",
"l_linenumber",
"l_returnflag",
"l_linestatus",
"l_shipinstruct",
"l_shipmode",
"l_comment"
],
"spatialDimensions": []
},
"columns": [
"l_orderkey",
"l_partkey",
"l_suppkey",
"l_linenumber",
"l_quantity",
"l_extendedprice",
"l_discount",
"l_tax",
"l_returnflag",
"l_linestatus",
"l_shipdate",
"l_commitdate",
"l_receiptdate",
"l_shipinstruct",
"l_shipmode",
"l_comment"
]
}
},
"granularitySpec": {
"queryGranularity": null,
"intervals": [
"2018-02-01/2018-03-02"
],
"type": "uniform",
"segmentGranularity": null
}
},
"intervals": [
"2018-02-01/2018-03-02"
],
"paths": [
"/tmp/druid/ad_dwb_channel_v4__ad_channel_to_druid_201802__20180302"
],
"targetPartitionSize": 8139,
"maxRowsInMemory": 389,
"properties": {
"some.property": "someValue",
"java.util.logging.manager": "org.apache.logging.log4j.jul.LogManager",
"user.timezone": "UTC",
"org.jboss.logging.provider": "log4j2",
"file.encoding": "UTF-8",
"druid.processing.columnCache.sizeBytes": "1000000000"
},
"master": "local[10]",
"context": {},
"indexSpec": {
"bitmap": {
"type": "concise"
},
"dimensionCompression": null,
"metricCompression": null
},
"hadoopDependencyCoordinates": [
"org.apache.spark:spark-core_2.10:2.2.0"
]
}
Field | Type | Required | Default | Description |
---|---|---|---|---|
type |
String | Yes, index_spark
|
N/A | Must be index_spark
|
paths |
List of strings | Yes | N/A | A list of hadoop-readable input files. The values are joined with a , and used as a SparkContext.textFile
|
dataSchema |
DataSchema | Yes | N/A | The data schema to use |
intervals |
List of strings | Yes | N/A | A list of ISO intervals to be indexed. ALL data for these intervals MUST be present in paths
|
maxRowsInMemory |
positive integer | No | 75000 |
Maximum number of rows to store in memory before an intermediate flush to disk |
targetPartitionSize |
positive integer | No | 5000000 |
The target number of rows per partition per segment granularity |
master |
String | No | master[1] |
The spark master URI |
properties |
Map | No | none | A map of string key/value pairs to inject into the SparkContext properties overriding any prior set values |
id |
String | No | Assigned based on dataSource , intervals , and DateTime.now()
|
The ID for the task. If not provied it will be assigned |
indexSpec |
InputSpec | No | concise, lz4, lz4 | The InputSpec containing the various compressions to be used |
context |
Map | No | none | The task context |
hadoopDependencyCoordinates |
List of strings | No |
null (use default set by druid config) |
The spark dependency coordinates to load in the ClassLoader when launching the task |
buildV9Directly |
Boolean | No | False | Build v9 index directly instead of building v8 index and converting it to v9 format. |
3. 问题
3.1 Hadoop dependency [...] didn't exist!?
Spark is included in the default hadoop coordinates similar to druid.indexer.task.defaultHadoopCoordinates=["org.apache.spark:spark-core_2.10:1.5.2-mmx1"]
1.5.2-mmx1 是依赖的spark版本号,本例使用的是spark-2.2.0, 所以middleManager节点的配置如下:
# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.3", "org.apache.spark:spark-core_2.10:2.2.0"]
同时把依赖的spark2.2.0的包拷贝到druid-install-path/hadoop-dependencies/spark-core_2.10/2.2.0
目录下
3.2 Jackson version is too old 2.4.6
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]
at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:218) ~[druid-indexing-service-0.11.1-SNAPSHOT.jar:0.11.1-SNAPSHOT]
at io.druid.indexer.spark.SparkBatchIndexTask.run(SparkBatchIndexTask.scala:151) [druid-spark-batch_2.10-0.11.0.1-SNAPSHOT.jar:0.11.0.1-SNAPSHOT]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.11.1-SNAPSHOT.jar:0.11.1-SNAPSHOT]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.11.1-SNAPSHOT.jar:0.11.1-SNAPSHOT]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_111]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_111]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_111]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_111]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111]
at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:215) ~[druid-indexing-service-0.11.1-SNAPSHOT.jar:0.11.1-SNAPSHOT]
... 7 more
Caused by: java.lang.ExceptionInInitializerError
at org.apache.spark.SparkContext.withScope(SparkContext.scala:701) ~[?:?]
at org.apache.spark.SparkContext.textFile(SparkContext.scala:830) ~[?:?]
at io.druid.indexer.spark.SparkDruidIndexer$.loadData(SparkDruidIndexer.scala:134) ~[?:?]
at io.druid.indexer.spark.SparkBatchIndexTask$.runTask(SparkBatchIndexTask.scala:423) ~[?:?]
at io.druid.indexer.spark.SparkBatchIndexTask.runTask(SparkBatchIndexTask.scala) ~[?:?]
at io.druid.indexer.spark.Runner.runTask(Runner.java:29) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_111]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_111]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_111]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111]
at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:215) ~[druid-indexing-service-0.11.1-SNAPSHOT.jar:0.11.1-SNAPSHOT]
... 7 more
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Jackson version is too old 2.4.6
at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:56) ~[?:?]
at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19) ~[?:?]
at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:549) ~[jackson-databind-2.4.6.jar:2.4.6]
at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82) ~[?:?]
at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala) ~[?:?]
at org.apache.spark.SparkContext.withScope(SparkContext.scala:701) ~[?:?]
at org.apache.spark.SparkContext.textFile(SparkContext.scala:830) ~[?:?]
at io.druid.indexer.spark.SparkDruidIndexer$.loadData(SparkDruidIndexer.scala:134) ~[?:?]
at io.druid.indexer.spark.SparkBatchIndexTask$.runTask(SparkBatchIndexTask.scala:423) ~[?:?]
at io.druid.indexer.spark.SparkBatchIndexTask.runTask(SparkBatchIndexTask.scala) ~[?:?]
at io.druid.indexer.spark.Runner.runTask(Runner.java:29) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_111]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_111]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_111]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_111]
at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:215) ~[druid-indexing-service-0.11.1-SNAPSHOT.jar:0.11.1-SNAPSHOT]
Druid 0.11.0 jackson版本为2.4.6, Druid-spark-batch jackson版本为2.6.5。
修改Druid 0.11.0 jackson版本为2.6.5,重新编译,解决该问题。
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<apache.curator.version>4.0.0</apache.curator.version>
<apache.curator.test.version>2.12.0</apache.curator.test.version>
<avatica.version>1.9.0</avatica.version>
<calcite.version>1.12.0</calcite.version>
<guava.version>16.0.1</guava.version>
<guice.version>4.1.0</guice.version>
<jetty.version>9.3.19.v20170502</jetty.version>
<jersey.version>1.19.3</jersey.version>
<!-- Watch out for Hadoop compatibility when updating to >= 2.5; see https://github.com/druid-io/druid/pull/1669 -->
<jackson.version>2.6.5</jackson.version>
<log4j.version>2.5</log4j.version>
<!-- Update to Netty 4.1 is not possible yet, see https://github.com/druid-io/druid/issues/4390 and comments
in https://github.com/druid-io/druid/pull/4973 -->
<netty.version>4.0.52.Final</netty.version>
<slf4j.version>1.7.12</slf4j.version>
<!-- If compiling with different hadoop version also modify default hadoop coordinates in TaskConfig.java -->
<hadoop.compile.version>2.7.3</hadoop.compile.version>
<hive.version>2.0.0</hive.version>
<powermock.version>1.6.6</powermock.version>
<!-- Cannot update to AWS SDK 1.11+ because of Jackson incompatibility.
Need to update Druid to use Jackson 2.6+ -->
<aws.sdk.version>1.10.77</aws.sdk.version>
<!-- When upgrading ZK, edit docs and integration tests as well (integration-tests/docker-base/setup.sh) -->
<zookeeper.version>3.4.10</zookeeper.version>
<caffeine.version>2.5.5</caffeine.version>
</properties>
3.3 Job fails due to NoSuchMethodError exception
2017-04-10T16:17:58,925 WARN [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager - Lost task 2.0 in stage 0.0 (TID 2, 172.20.0.1): java.lang.NoSuchMethodError: com.google.inject.util.Types.collectionOf(Ljava/lang/reflect/Type;)Ljava/lang/reflect/ParameterizedType;
at com.google.inject.multibindings.Multibinder.collectionOfProvidersOf(Multibinder.java:202)
at com.google.inject.multibindings.Multibinder$RealMultibinder.<init>(Multibinder.java:283)
at com.google.inject.multibindings.Multibinder$RealMultibinder.<init>(Multibinder.java:258)
at com.google.inject.multibindings.Multibinder.newRealSetBinder(Multibinder.java:178)
at com.google.inject.multibindings.Multibinder.newSetBinder(Multibinder.java:150)
at io.druid.guice.LifecycleModule.getEagerBinder(LifecycleModule.java:130)
at io.druid.guice.LifecycleModule.configure(LifecycleModule.java:136)
at com.google.inject.spi.Elements$RecordingBinder.install(Elements.java:223)
at com.google.inject.spi.Elements.getElements(Elements.java:101)
at com.google.inject.spi.Elements.getElements(Elements.java:92)
at com.google.inject.util.Modules$RealOverriddenModuleBuilder$1.configure(Modules.java:152)
at com.google.inject.AbstractModule.configure(AbstractModule.java:59)
at com.google.inject.spi.Elements$RecordingBinder.install(Elements.java:223)
at com.google.inject.spi.Elements.getElements(Elements.java:101)
at com.google.inject.spi.Elements.getElements(Elements.java:92)
at com.google.inject.util.Modules$RealOverriddenModuleBuilder$1.configure(Modules.java:152)
at com.google.inject.AbstractModule.configure(AbstractModule.java:59)
at com.google.inject.spi.Elements$RecordingBinder.install(Elements.java:223)
at com.google.inject.spi.Elements.getElements(Elements.java:101)
at com.google.inject.internal.InjectorShell$Builder.build(InjectorShell.java:133)
at com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:103)
at com.google.inject.Guice.createInjector(Guice.java:95)
at com.google.inject.Guice.createInjector(Guice.java:72)
at com.google.inject.Guice.createInjector(Guice.java:62)
at io.druid.initialization.Initialization.makeInjectorWithModules(Initialization.java:366)
at io.druid.indexer.spark.SerializedJsonStatic$.liftedTree1$1(SparkDruidIndexer.scala:438)
at io.druid.indexer.spark.SerializedJsonStatic$.injector$lzycompute(SparkDruidIndexer.scala:437)
at io.druid.indexer.spark.SerializedJsonStatic$.injector(SparkDruidIndexer.scala:436)
at io.druid.indexer.spark.SerializedJsonStatic$.liftedTree2$1(SparkDruidIndexer.scala:465)
at io.druid.indexer.spark.SerializedJsonStatic$.mapper$lzycompute(SparkDruidIndexer.scala:464)
at io.druid.indexer.spark.SerializedJsonStatic$.mapper(SparkDruidIndexer.scala:463)
at io.druid.indexer.spark.SerializedJson.getMap(SparkDruidIndexer.scala:520)
at io.druid.indexer.spark.SerializedJson.readObject(SparkDruidIndexer.scala:534)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:64)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
guice 和 guava包冲突,druid-spark-batch 是用的是guice-4.1.0.jar 和 guava-16.0.1.jar。
properties 设置 spark.executor.extraClassPath=true, 并配置 spark.executor.extraLibraryPath, 如下:
"properties" : {
"spark.executor.extraClassPath": "true",
"spark.executor.extraLibraryPath": "/opt/druid/libs/guava-16.0.1.jar:/opt/druid/libs/guice-4.1.0.jar:/opt/druid/libs/guice-multibindings-4.1.0.jar"
}
3.4 链接HDFS做rename超时
2017-07-18T21:00:45,994 WARN [publish-0] io.druid.segment.realtime.appenderator.AppenderatorDriver - Failed publish (try 18), retrying in 36,459ms.
java.util.concurrent.ExecutionException: java.lang.IllegalAccessError: tried to access method org.apache.hadoop.fs.FileSystem.rename(Lorg/apache/hadoop/fs/Path;Lorg/apache/hadoop/fs/Path;[Lorg/apache/hadoop/fs/Options$Rename;)V from class org.apache.hadoop.fs.HadoopFsWrapper
at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) ~[guava-16.0.1.jar:?]
at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) ~[guava-16.0.1.jar:?]
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[guava-16.0.1.jar:?]
at io.druid.segment.realtime.appenderator.AppenderatorDriver.lambda$publish$3(AppenderatorDriver.java:530) ~[druid-server-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_101]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_101]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_101]
Caused by: java.lang.IllegalAccessError: tried to access method org.apache.hadoop.fs.FileSystem.rename(Lorg/apache/hadoop/fs/Path;Lorg/apache/hadoop/fs/Path;[Lorg/apache/hadoop/fs/Options$Rename;)V from class org.apache.hadoop.fs.HadoopFsWrapper
at org.apache.hadoop.fs.HadoopFsWrapper.rename(HadoopFsWrapper.java:51) ~[?:?]
at io.druid.storage.hdfs.HdfsDataSegmentPusher.copyFilesWithChecks(HdfsDataSegmentPusher.java:167) ~[?:?]
at io.druid.storage.hdfs.HdfsDataSegmentPusher.push(HdfsDataSegmentPusher.java:148) ~[?:?]
at io.druid.segment.realtime.appenderator.AppenderatorImpl.mergeAndPush(AppenderatorImpl.java:579) ~[druid-server-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
at io.druid.segment.realtime.appenderator.AppenderatorImpl.access$600(AppenderatorImpl.java:97) ~[druid-server-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
at io.druid.segment.realtime.appenderator.AppenderatorImpl$3.apply(AppenderatorImpl.java:477) ~[druid-server-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
at io.druid.segment.realtime.appenderator.AppenderatorImpl$3.apply(AppenderatorImpl.java:465) ~[druid-server-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
at com.google.common.util.concurrent.Futures$1.apply(Futures.java:713) ~[guava-16.0.1.jar:?]
at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:861) ~[guava-16.0.1.jar:?]
... 3 more
druid hdfs-storage extension 模块的HadoopFsWrapper.rename 调用了 Hadoop的FileSystem类中的Rename方法。
这方法在Hadoop中有2个一个是2个参数,一个3个参数;其中一个保护的一个是公开的。
package org.apache.hadoop.fs;
import io.druid.java.util.common.logger.Logger;
import java.io.IOException;
/**
* This wrapper class is created to be able to access some of the the "protected" methods inside Hadoop's
* FileSystem class. Those are supposed to become public eventually or more appropriate alternatives would be
* provided.
* This is a hack and should be removed when no longer necessary.
*/
public class HadoopFsWrapper
{
private static final Logger log = new Logger(HadoopFsWrapper.class);
private HadoopFsWrapper() {}
/**
* Same as FileSystem.rename(from, to, Options.Rename.NONE) . That is,
* it returns "false" when "to" directory already exists. It is different from FileSystem.rename(from, to)
* which moves "from" directory inside "to" directory if it already exists.
*
* @param from
* @param to
* @return
* @throws IOException
*/
public static boolean rename(FileSystem fs, Path from, Path to) throws IOException
{
try {
fs.rename(from, to, Options.Rename.NONE); // 该rename方法被废弃,且修改为protect
return true;
}
catch (IOException ex) {
log.warn(ex, "Failed to rename [%s] to [%s].", from, to);
return false;
}
}
}
public abstract boolean rename(Path var1, Path var2) throws IOException;
/** @deprecated */
@Deprecated
protected void rename(Path src, Path dst, Rename... options) throws IOException {
……
}
解决方法: **修改HadoopFsWrapper种的rename方法,修改成fs.rename(from, to) **
package org.apache.hadoop.fs;
import io.druid.java.util.common.logger.Logger;
import java.io.IOException;
/**
* This wrapper class is created to be able to access some of the the "protected" methods inside Hadoop's
* FileSystem class. Those are supposed to become public eventually or more appropriate alternatives would be
* provided.
* This is a hack and should be removed when no longer necessary.
*/
public class HadoopFsWrapper
{
private static final Logger log = new Logger(HadoopFsWrapper.class);
private HadoopFsWrapper() {}
/**
* Same as FileSystem.rename(from, to, Options.Rename.NONE) . That is,
* it returns "false" when "to" directory already exists. It is different from FileSystem.rename(from, to)
* which moves "from" directory inside "to" directory if it already exists.
*
* @param from
* @param to
* @return
* @throws IOException
*/
public static boolean rename(FileSystem fs, Path from, Path to) throws IOException
{
try {
// fs.rename(from, to, Options.Rename.NONE); // 该rename方法被废弃,且修改为protect
fs.rename(from, to)
return true;
}
catch (IOException ex) {
log.warn(ex, "Failed to rename [%s] to [%s].", from, to);
return false;
}
}
}
3.5 Size exceeds Integer.MAX_VALUE
src/main/scala/io/druid/indexer/spark/SparkDruidIndexer.scala
val totalGZSize = dataFiles.map(
s => {
val p = new Path(s)
val fs = p.getFileSystem(sc.hadoopConfiguration)
fs.getFileStatus(p).getLen // 当该路径包含目录时,无法获取文件大小
}
).sum
val startingPartitions = (totalGZSize / (100L << 20)).toInt + 1
原因:当hdfs路径包含目录时,无法获取文件大小,导致分区startingPartitions值异常,超出2G限制
修改如下:
val totalGZSize = dataFiles.map(
s => {
val p = new Path(s)
val fs = p.getFileSystem(sc.hadoopConfiguration)
if (fs.getFileStatus(p).isDirectory) {
var sum = 0l
val children = fs.listFiles(p, false)
while(children.hasNext) {
sum += children.next().getLen
}
sum
} else {
fs.getFileStatus(p).getLen
}
}
).sum
val startingPartitions = (totalGZSize / (100L << 20)).toInt + 1