Spark中存在的各种2G限制

2016-09-05  本文已影响0人  witgo

motivation 动机

The various 2G limit in Spark. Spark中存在的各种2G限制问题.

  1. When reading the data block is stored in the hard disk, the following code fragment is called. 获取缓存在本地硬盘的数据块时,会调用以下代码片段
  val iterToReturn: Iterator[Any] = {
    val diskBytes = diskStore.getBytes(blockId)
    if (level.deserialized) {
      val diskValues = serializerManager.dataDeserializeStream(
        blockId,
        diskBytes.toInputStream(dispose = true))(info.classTag)
      maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
    } else {
      val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
        .map {_.toInputStream(dispose = false)}
        .getOrElse { diskBytes.toInputStream(dispose = true) }
      serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
    }
  }

  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
    val file = diskManager.getFile(blockId.name)
    val channel = new RandomAccessFile(file, "r").getChannel
    Utils.tryWithSafeFinally {
      // For small files, directly read rather than memory map
      if (file.length < minMemoryMapBytes) {
        val buf = ByteBuffer.allocate(file.length.toInt)
        channel.position(0)
        while (buf.remaining() != 0) {
          if (channel.read(buf) == -1) {
            throw new IOException("Reached EOF before filling buffer\n" +
              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
          }
        }
        buf.flip()
        new ChunkedByteBuffer(buf)
      } else {
        new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
      }
    } {
      channel.close()
    }
  }

The above code has the following problems: 上面的代码存在以下问题:
* Channel.map(MapMode.READ_ONLY, 0, file.length) returns an instance of MappedByteBuffer. the size of MappedByteBuffer can not exceed 2G. channel.map(MapMode.READ_ONLY, 0, file.length) 返回的实例是MappedByteBuffer. MappedByteBuffer的大小不能超过2G
* When a Iterator[Any] is generated, need to load all the data into the memory,this may take up a lot of memory. 获取Iterator[Any]时需要把全部数据加载到内存中, 这可能会导致占用很多堆外内存.
* MappedByteBuffer map a file to memory, and it's controlled by operator system, JVM can't control the memory. MappedByteBuffer 使用系统缓存,系统缓存不可控.

  1. When using kryo serialized data, the following code fragment is called: 在使用kryo序列化数据时, 会调用以下代码片段:

  override def serialize[T: ClassTag](t: T): ByteBuffer = {
    output.clear()
    val kryo = borrowKryo()
    try {
      kryo.writeClassAndObject(output, t)
    } catch {
      case e: KryoException if e.getMessage.startsWith("Buffer overflow") =>
        throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " +
          "increase spark.kryoserializer.buffer.max value.")
    } finally {
      releaseKryo(kryo)
    }
    ByteBuffer.wrap(output.toBytes)
  }

The above code has the following problems: 上面的代码存在以下问题:
* The serialization data is stored in the output internal byte[], the size of byte[] can not exceed 2G. 序列化t时会把序列化后的数据存储在output内部byte[]里, byte[]的大小不能超过2G.

  1. When RPC writes data to be sent to a Channel, the following code fragment is called: 在RPC把要发送的数据写入到Channel时会调用以下代码片段:
  public long transferTo(final WritableByteChannel target, final long position) throws IOException {
    Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position.");
    // Bytes written for header in this call.
    long writtenHeader = 0;
    if (header.readableBytes() > 0) {
      writtenHeader = copyByteBuf(header, target);
      totalBytesTransferred += writtenHeader;
      if (header.readableBytes() > 0) {
        return writtenHeader;
      }
    }

    // Bytes written for body in this call.
    long writtenBody = 0;
    if (body instanceof FileRegion) {
      writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength);
    } else if (body instanceof ByteBuf) {
      writtenBody = copyByteBuf((ByteBuf) body, target);
    }
    totalBytesTransferred += writtenBody;
    return writtenHeader + writtenBody;
  }

The above code has the following problems: ~~上面的代码存在以下问题: ~~
* the size of ByteBuf cannot exceed 2G. ByteBuf的大小不能超过2G
* cannot transfer data over 2G in memory. ~~无法传输内存中超过2G的数据 ~~

  1. When decodes the RPC message received, the following code fragment is called: 解码RPC接收的消息时调用以下代码片段:
public final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {

  private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);

  @Override
  public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    Message.Type msgType = Message.Type.decode(in);
    Message decoded = decode(msgType, in);
    assert decoded.type() == msgType;
    logger.trace("Received message {}: {}", msgType, decoded);
    out.add(decoded);
  }

  private Message decode(Message.Type msgType, ByteBuf in) {
    switch (msgType) {
      case ChunkFetchRequest:
        return ChunkFetchRequest.decode(in);

      case ChunkFetchSuccess:
        return ChunkFetchSuccess.decode(in);

      case ChunkFetchFailure:
        return ChunkFetchFailure.decode(in);

      default:
        throw new IllegalArgumentException("Unexpected message type: " + msgType);
    }
  }
}

The above code has the following problems: 上面的代码存在以下问题:
* the size of ByteBuf cannot exceed 2G. ByteBuf的大小不能超过2G
* Must be in the receiver to complete the data can be decoded. 必须在接收到全部数据时才能解码.

Goals

Design

Setup for eliminating the various 2G limit in Spark. 解决Spark中存在的各种2G限制问题.

Replace ByteBuffer with ChunkedByteBuffer. 使用 ChunkedByteBuffer 替换 ByteBuffer. (The 2G limit 1,2)

ChunkedByteBuffer Introduction: ChunkedByteBuffer 介绍:

  1. Move the ChunkedByteBuffer class to common/network-common/src/main/java/org/apache/spark/network/buffer/. ~~把ChunkedByteBuffer类移动到 common/network-common/src/main/java/org/apache/spark/network/buffer/. ~~
  2. Modify ManagedBuffer.nioByteBuffer's return value to ChunkedByteBuffer instance. 修改ManagedBuffer.nioByteBuffer的返回值为ChunkedByteBuffer实例. (The 2G limit 1)
  3. Further standardize the use of ManagedBuffer and ChunkedByteBuffer. 进一步规范ManagedBufferChunkedByteBuffer的使用.
  1. Modify the parameter of SerializerInstance.deserialize and the return value of SerializerInstance.serialize to ChunkedByteBuffer instance.
    修改SerializerInstance.deserialize方法的参数和SerializerInstance.serialize方法的返回值改为ChunkedByteBuffer实例. (The 2G limit 2)
def serialize[T: ClassTag](t: T): ChunkedByteBuffer = {
  output.clear()
  val out = ChunkedByteBufferOutputStream.newInstance()
  // The data is output to the OutputStream, rather than the internal byte[] in the output object.
  // ~~序列化后的数据输出到OutputStream,而不是到output对象的内部字节数组里.~~
  output.setOutputStream(out)
  val kryo = borrowKryo()
  kryo.writeClassAndObject(output, t)
  output.close()
  out.toChunkedByteBuffer
}
  1. Other changes. 其他修改.
Replace ByteBuf with InputStream. 使用 InputStream 替换 ByteBuf.
  1. Add InputStreamManagedBuffer class, used to convert InputStream instance to ManagedBuffer instance. 添加InputStreamManagedBuffer类,用于把InputStream转换成ManagedBuffer实例. (The 2G limit 4)
  2. Modify NioManagedBuffer.convertToNetty method returns InputStream instances when the size of data is larger than Integer.MAX_VALUE. 修改NioManagedBuffer.convertToNetty方法在数据量大于Integer.MAX_VALUE时返回InputStream实例. (The 2G limit 3)
  3. Modify MessageWithHeader classes, support processing InputStream instance (The 2G limit 3) 修改MessageWithHeader类, 支持处理InputStream类型的body对象
  1. Modify the parameters of the Encodable.encode method to OutputStream instance. 修改Encodable.encode方法的参数为OutputStream实例. (The 2G limit 4)
    5.It can handle mixed storage data. ~~UploadBlock添加toInputStream方法,支持处理混合存储数据(The 2G limit 3) ~~
public InputStream toInputStream() throws IOException {
  ChunkedByteBufferOutputStream out = ChunkedByteBufferOutputStream.newInstance();
  Encoders.Bytes.encode(out, type().id());
  encodeWithoutBlockData(out);
  // out.toChunkedByteBuffer().toInputStream() data in memory 
  // blockData.createInputStream() data in hard disk(FileInputStream)
  return new SequenceInputStream(out.toChunkedByteBuffer().toInputStream(),
      blockData.createInputStream());
}
  1. Modify the parameters of the decode method of the classes who implement the Encodable interface to InputStream instance. ~~修改实现Encodable接口子类的decode方法参数为InputStream实例. (The 2G limit 4) ~~
  2. Modify TransportFrameDecoder class, use LinkedList<ByteBuf> to represent the Frame, remove the size limit of Frame. ~~修改TransportFrameDecoder类,使用LinkedList<ByteBuf> 来表示Frame,移除Frame的大小限制. ~~ (The 2G limit 4)
  3. Add ByteBufInputStream class, used to convert LinkedList<ByteBuf> instance to InputStream instance. 添加ByteBufInputStream类,用于把LinkedList<ByteBuf>包装成InputStream实例. 在读取完一个ByteBuf的数据时就会调用ByteBuf.release 方法释放ByteBuf. (The 2G limit 4)
  4. Modify the parameters of RpcHandler.receive method to InputStream instance. 修改RpcHandler.receive方法的参数为InputStream实例. (The 2G limit 4)

Read data

Local data
  1. Only the data stored in the memory is represented by ChunkedByteBuffer, the other is represented by ManagedBuffer. 只有存储在内存中的数据用 ChunkedByteBuffer 表示,其他的数据都使用 ManagedBuffer 表示. (The 2G limit 1)
Remote Data (The 2G limit 4)

There are three options: 有三个可选方案:

  1. Add InputStreamInterceptor to support propagate back-pressure to shuffle server(The option has been implemented): 添加InputStreamInterceptor支持propagate back-pressure 到 shuffle server端(该方案已经实现):
  1. When the size of message is greater than a certain value, the message is written to disk, not take up memory. ~~在消息大小大于一定值时,把消息写到硬盘上,不再占用内存. ~~
  1. Combined with buffer pool, qs far as possible stores data in memory. ~~结合缓存池,尽可能的把数据存储在内存里. ~~

Add buffer pool

The buffer pool can reduce memory allocation, reduce GC time, improve the performance of spark core. 缓存池能够减少内存分配占用, 减少GC时间,提升程序性能

  1. Reduce the number of large objects created in the Eden area, according to experience twitter using buffer pools can significantly reduce the number of GC. 减少在eden区创建大对象的次数,根据twitter的经验,使用缓存池能显著减少GC次数.
    Netty 4 Reduces GC Overhead by 5x at Twitter
  2. Use buffer pool to reduce the number of memory allocations and wiping zero. 使用缓存池能够减少内存分配和抹零次数.
    Using as a generic library
实现该功能的难点有:
  1. Spark在使用ByteBuffer时没有考虑释放问题, 由java GC回收.
  2. 添加引用计数主动释放, 减少GC压力, 需要添加引用计数和内存泄露检测相关代码, 改动大.
  3. 复用netty buffer代码,支持内存泄露检查和动态调整大小.
介绍文档:
  1. Netty Buffers
  2. 深入浅出Netty内存管理 PoolChunk
  3. jemalloc源码解析-核心架构jemalloc源码解析-内存管理
上一篇 下一篇

猜你喜欢

热点阅读