聊聊flink的CsvReader
序
本文主要研究一下flink的CsvReader
实例
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<RecordDto> csvInput = env.readCsvFile(csvFilePath)
.pojoType(RecordDto.class, "playerName", "country", "year", "game", "gold", "silver", "bronze", "total");
csvInput.map(new MapFunction<RecordDto, RecordDto>() {
@Override
public RecordDto map(RecordDto value) throws Exception {
LOGGER.info("execute map:{}",value);
TimeUnit.SECONDS.sleep(5);
return value;
}
}).print();
ExecutionEnvironment.readCsvFile
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/ExecutionEnvironment.java
/**
* Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
* define parameters and field types and will eventually produce the DataSet that corresponds to
* the read and parsed CSV input.
*
* @param filePath The path of the CSV file.
* @return A CsvReader that can be used to configure the CSV input.
*/
public CsvReader readCsvFile(String filePath) {
return new CsvReader(filePath, this);
}
- 这里根据filePath创建了CsvReader
CsvReader
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvReader.java
public CsvReader(String filePath, ExecutionEnvironment executionContext) {
this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext);
}
public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
Preconditions.checkNotNull(filePath, "The file path may not be null.");
Preconditions.checkNotNull(executionContext, "The execution context may not be null.");
this.path = filePath;
this.executionContext = executionContext;
}
/**
* Configures the reader to read the CSV data and parse it to the given type. The all fields of the type
* must be public or able to set value. The type information for the fields is obtained from the type class.
*
* @param pojoType The class of the target POJO.
* @param pojoFields The fields of the POJO which are mapped to CSV fields.
* @return The DataSet representing the parsed CSV data.
*/
public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields) {
Preconditions.checkNotNull(pojoType, "The POJO type class must not be null.");
Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO.");
final TypeInformation<T> ti = TypeExtractor.createTypeInfo(pojoType);
if (!(ti instanceof PojoTypeInfo)) {
throw new IllegalArgumentException(
"The specified class is not a POJO. The type class must meet the POJO requirements. Found: " + ti);
}
final PojoTypeInfo<T> pti = (PojoTypeInfo<T>) ti;
CsvInputFormat<T> inputFormat = new PojoCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask);
configureInputFormat(inputFormat);
return new DataSource<T>(executionContext, inputFormat, pti, Utils.getCallLocationName());
}
- CsvReader提供了pojoType方法,用于将csv的数据映射为java类型,同时转换为flink的DataSource;创建DataSource的时候,这里提供了PojoCsvInputFormat以及PojoTypeInfo
Task
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
/**
* The Task represents one execution of a parallel subtask on a TaskManager.
* A Task wraps a Flink operator (which may be a user function) and
* runs it, providing all services necessary for example to consume input data,
* produce its results (intermediate result partitions) and communicate
* with the JobManager.
*
* <p>The Flink operators (implemented as subclasses of
* {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
* The task connects those to the network stack and actor messages, and tracks the state
* of the execution and handles exceptions.
*
* <p>Tasks have no knowledge about how they relate to other tasks, or whether they
* are the first attempt to execute the task, or a repeated attempt. All of that
* is only known to the JobManager. All the task knows are its own runnable code,
* the task's configuration, and the IDs of the intermediate results to consume and
* produce (if any).
*
* <p>Each Task is run by one dedicated thread.
*/
public class Task implements Runnable, TaskActions, CheckpointListener {
//......
/**
* The core work method that bootstraps the task and executes its code.
*/
@Override
public void run() {
//......
// now load and instantiate the task's invokable code
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// ----------------------------------------------------------------
// actual task core work
// ----------------------------------------------------------------
// we must make strictly sure that the invokable is accessible to the cancel() call
// by the time we switched to running.
this.invokable = invokable;
// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}
// notify everyone that we switched to running
notifyObservers(ExecutionState.RUNNING, null);
taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader);
// run the invokable
invokable.invoke();
//......
}
}
- Task的run方法会调用invokable.invoke(),这里的invokable为DataSourceTask
DataSourceTask.invoke
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/DataSourceTask.java
@Override
public void invoke() throws Exception {
// --------------------------------------------------------------------
// Initialize
// --------------------------------------------------------------------
initInputFormat();
LOG.debug(getLogString("Start registering input and output"));
try {
initOutputs(getUserCodeClassLoader());
} catch (Exception ex) {
throw new RuntimeException("The initialization of the DataSource's outputs caused an error: " +
ex.getMessage(), ex);
}
LOG.debug(getLogString("Finished registering input and output"));
// --------------------------------------------------------------------
// Invoke
// --------------------------------------------------------------------
LOG.debug(getLogString("Starting data source operator"));
RuntimeContext ctx = createRuntimeContext();
final Counter numRecordsOut;
{
Counter tmpNumRecordsOut;
try {
OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
ioMetricGroup.reuseInputMetricsForTask();
if (this.config.getNumberOfChainedStubs() == 0) {
ioMetricGroup.reuseOutputMetricsForTask();
}
tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
} catch (Exception e) {
LOG.warn("An exception occurred during the metrics setup.", e);
tmpNumRecordsOut = new SimpleCounter();
}
numRecordsOut = tmpNumRecordsOut;
}
Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
((RichInputFormat) this.format).setRuntimeContext(ctx);
LOG.debug(getLogString("Rich Source detected. Initializing runtime context."));
((RichInputFormat) this.format).openInputFormat();
LOG.debug(getLogString("Rich Source detected. Opening the InputFormat."));
}
ExecutionConfig executionConfig = getExecutionConfig();
boolean objectReuseEnabled = executionConfig.isObjectReuseEnabled();
LOG.debug("DataSourceTask object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
try {
// start all chained tasks
BatchTask.openChainedTasks(this.chainedTasks, this);
// get input splits to read
final Iterator<InputSplit> splitIterator = getInputSplits();
// for each assigned input split
while (!this.taskCanceled && splitIterator.hasNext())
{
// get start and end
final InputSplit split = splitIterator.next();
LOG.debug(getLogString("Opening input split " + split.toString()));
final InputFormat<OT, InputSplit> format = this.format;
// open input format
format.open(split);
LOG.debug(getLogString("Starting to read input from split " + split.toString()));
try {
final Collector<OT> output = new CountingCollector<>(this.output, numRecordsOut);
if (objectReuseEnabled) {
OT reuse = serializer.createInstance();
// as long as there is data to read
while (!this.taskCanceled && !format.reachedEnd()) {
OT returned;
if ((returned = format.nextRecord(reuse)) != null) {
output.collect(returned);
}
}
} else {
// as long as there is data to read
while (!this.taskCanceled && !format.reachedEnd()) {
OT returned;
if ((returned = format.nextRecord(serializer.createInstance())) != null) {
output.collect(returned);
}
}
}
if (LOG.isDebugEnabled() && !this.taskCanceled) {
LOG.debug(getLogString("Closing input split " + split.toString()));
}
} finally {
// close. We close here such that a regular close throwing an exception marks a task as failed.
format.close();
}
completedSplitsCounter.inc();
} // end for all input splits
// close the collector. if it is a chaining task collector, it will close its chained tasks
this.output.close();
// close all chained tasks letting them report failure
BatchTask.closeChainedTasks(this.chainedTasks, this);
}
catch (Exception ex) {
// close the input, but do not report any exceptions, since we already have another root cause
try {
this.format.close();
} catch (Throwable ignored) {}
BatchTask.cancelChainedTasks(this.chainedTasks);
ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
if (ex instanceof CancelTaskException) {
// forward canceling exception
throw ex;
}
else if (!this.taskCanceled) {
// drop exception, if the task was canceled
BatchTask.logAndThrowException(ex, this);
}
} finally {
BatchTask.clearWriters(eventualOutputs);
// --------------------------------------------------------------------
// Closing
// --------------------------------------------------------------------
if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
((RichInputFormat) this.format).closeInputFormat();
LOG.debug(getLogString("Rich Source detected. Closing the InputFormat."));
}
}
if (!this.taskCanceled) {
LOG.debug(getLogString("Finished data source operator"));
}
else {
LOG.debug(getLogString("Data source operator cancelled"));
}
}
- DataSourceTask的invoke方法这里只要不是taskCanceled及format.reachedEnd(),都会调用format.nextRecord(serializer.createInstance())来拉取数据,然后执行output.collect(returned)
- 这里的format为CsvInputFormat(
PojoCsvInputFormat
),不过nextRecord以及reachedEnd方法它是调用的父类DelimitedInputFormat - PojoCsvInputFormat继承了抽象类CsvInputFormat,而CsvInputFormat继承了抽象类GenericCsvInputFormat,GenericCsvInputFormat则继承了抽象类DelimitedInputFormat
DelimitedInputFormat
flink-core-1.6.2-sources.jar!/org/apache/flink/api/common/io/DelimitedInputFormat.java
/**
* The default read buffer size = 1MB.
*/
private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;
private transient byte[] readBuffer;
private int bufferSize = -1;
private void initBuffers() {
this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
if (this.bufferSize <= this.delimiter.length) {
throw new IllegalArgumentException("Buffer size must be greater than length of delimiter.");
}
if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
this.readBuffer = new byte[this.bufferSize];
}
if (this.wrapBuffer == null || this.wrapBuffer.length < 256) {
this.wrapBuffer = new byte[256];
}
this.readPos = 0;
this.limit = 0;
this.overLimit = false;
this.end = false;
}
/**
* Checks whether the current split is at its end.
*
* @return True, if the split is at its end, false otherwise.
*/
@Override
public boolean reachedEnd() {
return this.end;
}
@Override
public OT nextRecord(OT record) throws IOException {
if (readLine()) {
return readRecord(record, this.currBuffer, this.currOffset, this.currLen);
} else {
this.end = true;
return null;
}
}
/**
* Fills the read buffer with bytes read from the file starting from an offset.
*/
private boolean fillBuffer(int offset) throws IOException {
int maxReadLength = this.readBuffer.length - offset;
// special case for reading the whole split.
if (this.splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
int read = this.stream.read(this.readBuffer, offset, maxReadLength);
if (read == -1) {
this.stream.close();
this.stream = null;
return false;
} else {
this.readPos = offset;
this.limit = read;
return true;
}
}
// else ..
int toRead;
if (this.splitLength > 0) {
// if we have more data, read that
toRead = this.splitLength > maxReadLength ? maxReadLength : (int) this.splitLength;
}
else {
// if we have exhausted our split, we need to complete the current record, or read one
// more across the next split.
// the reason is that the next split will skip over the beginning until it finds the first
// delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the
// previous split.
toRead = maxReadLength;
this.overLimit = true;
}
int read = this.stream.read(this.readBuffer, offset, toRead);
if (read == -1) {
this.stream.close();
this.stream = null;
return false;
} else {
this.splitLength -= read;
this.readPos = offset; // position from where to start reading
this.limit = read + offset; // number of valid bytes in the read buffer
return true;
}
}
- DelimitedInputFormat首先调用readLine()读取数据到currBuffer,如果有数据,则调用子类CsvInputFormat实现的readRecord方法,这里传递了currBuffer、currOffset、currLen
- DelimitedInputFormat的readLine()方法里头会调用fillBuffer方法,fillBuffer方法会根据splitLength(
DelimitedInputFormat.getStatistics方法里头FileInputSplit的length
)及maxReadLength来确定toRead,之后从offset开始到toRead从文件读取数据到readBuffer中,然后设置currBuffer、currOffset、currLen - readBuffer在init的时候会设置bufferSize,bufferSize初始化的时候为-1,在getStatistics方法里头被设置为4 * 1024,而DEFAULT_READ_BUFFER_SIZE是1024*1024
CsvInputFormat.readRecord
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/CsvInputFormat.java
@Override
public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
/*
* Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
*/
// Found window's end line, so find carriage return before the newline
if (this.lineDelimiterIsLinebreak && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') {
//reduce the number of bytes so that the Carriage return is not taken as data
numBytes--;
}
if (commentPrefix != null && commentPrefix.length <= numBytes) {
//check record for comments
boolean isComment = true;
for (int i = 0; i < commentPrefix.length; i++) {
if (commentPrefix[i] != bytes[offset + i]) {
isComment = false;
break;
}
}
if (isComment) {
this.commentCount++;
return null;
}
}
if (parseRecord(parsedValues, bytes, offset, numBytes)) {
return fillRecord(reuse, parsedValues);
} else {
this.invalidLineCount++;
return null;
}
}
- CsvInputFormat的readRecord方法负责读取原始数据,之后通过parseRecord方法解析原始数据填充到parsedValues(
Object[]
),之后调用子类的fillRecord方法(这里是PojoCsvInputFormat
)将parsedValues填充到reuse对象(该对象是DataSourceTask在调用format.nextRecord时传入的serializer.createInstance()
)
PojoCsvInputFormat.fillRecord
flink-java-1.6.2-sources.jar!/org/apache/flink/api/java/io/PojoCsvInputFormat.java
/**
* Input format that reads csv into POJOs.
* @param <OUT> resulting POJO type
*/
@Internal
public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
//......
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
pojoFields = new Field[pojoFieldNames.length];
Map<String, Field> allFields = new HashMap<String, Field>();
findAllFields(pojoTypeClass, allFields);
for (int i = 0; i < pojoFieldNames.length; i++) {
pojoFields[i] = allFields.get(pojoFieldNames[i]);
if (pojoFields[i] != null) {
pojoFields[i].setAccessible(true);
} else {
throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName());
}
}
}
@Override
public OUT fillRecord(OUT reuse, Object[] parsedValues) {
for (int i = 0; i < parsedValues.length; i++) {
try {
pojoFields[i].set(reuse, parsedValues[i]);
} catch (IllegalAccessException e) {
throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e);
}
}
return reuse;
}
//......
}
- PojoCsvInputFormat的open方法用于在executor的executePlan的时候调用,提前使用反射获取所需的Field
- fillRecord方法这里仅仅是使用反射将parsedValues设置到pojo中
- 如果反射设置不成功则抛出IllegalAccessException异常
CountingCollector.collect
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/util/metrics/CountingCollector.java
public class CountingCollector<OUT> implements Collector<OUT> {
private final Collector<OUT> collector;
private final Counter numRecordsOut;
public CountingCollector(Collector<OUT> collector, Counter numRecordsOut) {
this.collector = collector;
this.numRecordsOut = numRecordsOut;
}
@Override
public void collect(OUT record) {
this.numRecordsOut.inc();
this.collector.collect(record);
}
@Override
public void close() {
this.collector.close();
}
}
- 这里的collector为org.apache.flink.runtime.operators.chaining.ChainedMapDriver
ChainedMapDriver
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java
@Override
public void collect(IT record) {
try {
this.numRecordsIn.inc();
this.outputCollector.collect(this.mapper.map(record));
} catch (Exception ex) {
throw new ExceptionInChainedStubException(this.taskName, ex);
}
}
- 这里会先调用mapper的map方法,执行map逻辑,然后调用outputCollector.collect将结果发送出去
- 这里的outputCollector为CountingCollector,它里头包装的collector为org.apache.flink.runtime.operators.shipping.OutputCollector
OutputCollector
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputCollector.java
/**
* Collects a record and emits it to all writers.
*/
@Override
public void collect(T record) {
if (record != null) {
this.delegate.setInstance(record);
try {
for (RecordWriter<SerializationDelegate<T>> writer : writers) {
writer.emit(this.delegate);
}
}
catch (IOException e) {
throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
}
catch (InterruptedException e) {
throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(), e);
}
}
else {
throw new NullPointerException("The system does not support records that are null."
+ "Null values are only supported as fields inside other objects.");
}
}
- 这里调用RecordWriter的emit方法来发射数据
RecordWriter
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
public void emit(T record) throws IOException, InterruptedException {
for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
sendToTarget(record, targetChannel);
}
}
- 这里通过channelSelector.selectChannels返回要发送的targetChannel,这里的channelSelector为OutputEmitter
OutputEmitter
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
@Override
public final int[] selectChannels(SerializationDelegate<T> record, int numberOfChannels) {
switch (strategy) {
case FORWARD:
return forward();
case PARTITION_RANDOM:
case PARTITION_FORCED_REBALANCE:
return robin(numberOfChannels);
case PARTITION_HASH:
return hashPartitionDefault(record.getInstance(), numberOfChannels);
case BROADCAST:
return broadcast(numberOfChannels);
case PARTITION_CUSTOM:
return customPartition(record.getInstance(), numberOfChannels);
case PARTITION_RANGE:
return rangePartition(record.getInstance(), numberOfChannels);
default:
throw new UnsupportedOperationException("Unsupported distribution strategy: " + strategy.name());
}
}
private int[] forward() {
return this.channels;
}
- 这里的strategy为FORWARD
小结
- CsvReader创建的inputFormat为PojoCsvInputFormat,它主要的方法是fillRecord,利用反射填充数据,而数据的读取则是在DelimitedInputFormat的readLine方法中,它会调用fillBuffer方法,而fillBuffer方法会根据splitLength(
DelimitedInputFormat.getStatistics方法里头FileInputSplit的length
)及maxReadLength来确定toRead,之后从offset开始到toRead从文件读取数据到readBuffer中 - DataSourceTask在invoke方法里头会不断循环调用format.nextRecord,然后挨个调用output.collect方法(
包装了org.apache.flink.runtime.operators.shipping.OutputCollector的CountingCollector
),直到taskCanceled或者format.reachedEnd() - output.collect方法,这里的output为CountingCollector,它代理的collector为ChainedMapDriver;ChainedMapDriver会对读取的数据进行map操作,最后将map的结果传递给代理了OutputCollector的CountingCollector,OutputCollector使用RecordWriter来发射数据