Kafka-connect-hdfs源码解析
2020-07-10 本文已影响0人
xhh199090
写数据流程分析
启动task类方法,HdfsSinkTask.java类中start
@Override
public void start(Map<String, String> props) {
Set<TopicPartition> assignment = context.assignment();
try {
HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
boolean hiveIntegration = connectorConfig.getBoolean(HiveConfig.HIVE_INTEGRATION_CONFIG);
if (hiveIntegration) {
StorageSchemaCompatibility compatibility = StorageSchemaCompatibility.getCompatibility(
connectorConfig.getString(StorageSinkConnectorConfig.SCHEMA_COMPATIBILITY_CONFIG)
);
if (compatibility == StorageSchemaCompatibility.NONE) {
throw new ConfigException(
"Hive Integration requires schema compatibility to be BACKWARD, FORWARD or FULL"
);
}
}
//check that timezone it setup correctly in case of scheduled rotation
if (connectorConfig.getLong(HdfsSinkConnectorConfig.ROTATE_SCHEDULE_INTERVAL_MS_CONFIG) > 0) {
String timeZoneString = connectorConfig.getString(PartitionerConfig.TIMEZONE_CONFIG);
if (timeZoneString.equals("")) {
throw new ConfigException(PartitionerConfig.TIMEZONE_CONFIG,
timeZoneString, "Timezone cannot be empty when using scheduled file rotation."
);
}
DateTimeZone.forID(timeZoneString);
}
int schemaCacheSize = connectorConfig.getInt(
HdfsSinkConnectorConfig.SCHEMA_CACHE_SIZE_CONFIG
);
avroData = new AvroData(schemaCacheSize);
hdfsWriter = new DataWriter(connectorConfig, context, avroData); //初始化DataWriter
recover(assignment);
if (hiveIntegration) {
syncWithHive();
}
} catch (ConfigException e) {
throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", e);
} catch (ConnectException e) {
// Log at info level to help explain reason, but Connect logs the actual exception at ERROR
log.info("Couldn't start HdfsSinkConnector:", e);
log.info("Shutting down HdfsSinkConnector.");
if (hdfsWriter != null) {
try {
try {
log.debug("Closing data writer due to task start failure.");
hdfsWriter.close();
} finally {
log.debug("Stopping data writer due to task start failure.");
hdfsWriter.stop();
}
} catch (Throwable t) {
log.debug("Error closing and stopping data writer: {}", t.getMessage(), t);
}
}
// Always throw the original exception that prevent us from starting
throw e;
}
log.info("The connector relies on offsets in HDFS filenames, but does commit these offsets to "
+ "Connect to enable monitoring progress of the HDFS connector. Upon startup, the HDFS "
+ "Connector restores offsets from filenames in HDFS. In the absence of files in HDFS, "
+ "the connector will attempt to find offsets for its consumer group in the "
+ "'__consumer_offsets' topic. If offsets are not found, the consumer will "
+ "rely on the reset policy specified in the 'consumer.auto.offset.reset' property to "
+ "start exporting data to HDFS.");
}
初始化DataWriter,DataWriter.java
@SuppressWarnings("unchecked")
public DataWriter(
HdfsSinkConnectorConfig connectorConfig,
SinkTaskContext context,
AvroData avroData,
Time time
) {
this.time = time;
try {
String hadoopHome = connectorConfig.getString(HdfsSinkConnectorConfig.HADOOP_HOME_CONFIG);
System.setProperty("hadoop.home.dir", hadoopHome);
this.connectorConfig = connectorConfig;
this.avroData = avroData;
this.context = context;
String hadoopConfDir = connectorConfig.getString(
HdfsSinkConnectorConfig.HADOOP_CONF_DIR_CONFIG
);
log.info("Hadoop configuration directory {}", hadoopConfDir);
Configuration conf = connectorConfig.getHadoopConfiguration();
if (!hadoopConfDir.equals("")) {
conf.addResource(new Path(hadoopConfDir + "/core-site.xml"));
conf.addResource(new Path(hadoopConfDir + "/hdfs-site.xml"));
}
boolean secureHadoop = connectorConfig.getBoolean(
HdfsSinkConnectorConfig.HDFS_AUTHENTICATION_KERBEROS_CONFIG
);
if (secureHadoop) {
SecurityUtil.setAuthenticationMethod(
UserGroupInformation.AuthenticationMethod.KERBEROS,
conf
);
String principalConfig = connectorConfig.getString(
HdfsSinkConnectorConfig.CONNECT_HDFS_PRINCIPAL_CONFIG
);
String keytab = connectorConfig.getString(
HdfsSinkConnectorConfig.CONNECT_HDFS_KEYTAB_CONFIG
);
if (principalConfig == null || keytab == null) {
throw new ConfigException(
"Hadoop is using Kerberos for authentication, you need to provide both a connect "
+ "principal and the path to the keytab of the principal.");
}
conf.set("hadoop.security.authentication", "kerberos");
conf.set("hadoop.security.authorization", "true");
String hostname = InetAddress.getLocalHost().getCanonicalHostName();
String namenodePrincipalConfig = connectorConfig.getString(
HdfsSinkConnectorConfig.HDFS_NAMENODE_PRINCIPAL_CONFIG
);
String namenodePrincipal = SecurityUtil.getServerPrincipal(
namenodePrincipalConfig,
hostname
);
// namenode principal is needed for multi-node hadoop cluster
if (conf.get("dfs.namenode.kerberos.principal") == null) {
conf.set("dfs.namenode.kerberos.principal", namenodePrincipal);
}
log.info("Hadoop namenode principal: " + conf.get("dfs.namenode.kerberos.principal"));
UserGroupInformation.setConfiguration(conf);
// replace the _HOST specified in the principal config to the actual host
String principal = SecurityUtil.getServerPrincipal(principalConfig, hostname);
UserGroupInformation.loginUserFromKeytab(principal, keytab);
final UserGroupInformation ugi = UserGroupInformation.getLoginUser();
log.info("Login as: " + ugi.getUserName());
final long renewPeriod = connectorConfig.getLong(
HdfsSinkConnectorConfig.KERBEROS_TICKET_RENEW_PERIOD_MS_CONFIG
);
isRunning = true;
ticketRenewThread = new Thread(new Runnable() {
@Override
public void run() {
synchronized (DataWriter.this) {
while (isRunning) {
try {
DataWriter.this.wait(renewPeriod);
if (isRunning) {
ugi.reloginFromKeytab();
}
} catch (IOException e) {
// We ignore this exception during relogin as each successful relogin gives
// additional 24 hours of authentication in the default config. In normal
// situations, the probability of failing relogin 24 times is low and if
// that happens, the task will fail eventually.
log.error("Error renewing the ticket", e);
} catch (InterruptedException e) {
// ignored
}
}
}
}
});
log.info("Starting the Kerberos ticket renew thread with period {}ms.", renewPeriod);
ticketRenewThread.start();
}
url = connectorConfig.getUrl();
topicsDir = connectorConfig.getString(StorageCommonConfig.TOPICS_DIR_CONFIG);
@SuppressWarnings("unchecked")
Class<? extends HdfsStorage> storageClass = (Class<? extends HdfsStorage>) connectorConfig
.getClass(StorageCommonConfig.STORAGE_CLASS_CONFIG);
storage = io.confluent.connect.storage.StorageFactory.createStorage(
storageClass,
HdfsSinkConnectorConfig.class,
connectorConfig,
url
);
createDir(topicsDir);
createDir(topicsDir + HdfsSinkConnectorConstants.TEMPFILE_DIRECTORY);
String logsDir = connectorConfig.getString(HdfsSinkConnectorConfig.LOGS_DIR_CONFIG);
createDir(logsDir);
// Try to instantiate as a new-style storage-common type class, then fall back to old-style
// with no parameters
try {
Class<io.confluent.connect.storage.format.Format> formatClass =
(Class<io.confluent.connect.storage.format.Format>)
connectorConfig.getClass(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG);
newFormat = formatClass.getConstructor(HdfsStorage.class).newInstance(storage);
newWriterProvider = newFormat.getRecordWriterProvider();
schemaFileReader = newFormat.getSchemaFileReader();
} catch (NoSuchMethodException e) {
Class<Format> formatClass =
(Class<Format>) connectorConfig.getClass(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG);
format = formatClass.getConstructor().newInstance();
writerProvider = format.getRecordWriterProvider();
final io.confluent.connect.hdfs.SchemaFileReader oldReader
= format.getSchemaFileReader(avroData);
schemaFileReader = new SchemaFileReader<HdfsSinkConnectorConfig, Path>() {
@Override
public Schema getSchema(HdfsSinkConnectorConfig hdfsSinkConnectorConfig, Path path) {
try {
return oldReader.getSchema(hdfsSinkConnectorConfig.getHadoopConfiguration(), path);
} catch (IOException e) {
throw new ConnectException("Failed to get schema", e);
}
}
@Override
public Iterator<Object> iterator() {
throw new UnsupportedOperationException();
}
@Override
public boolean hasNext() {
throw new UnsupportedOperationException();
}
@Override
public Object next() {
throw new UnsupportedOperationException();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException {
}
};
}
partitioner = newPartitioner(connectorConfig);
assignment = new HashSet<>(context.assignment());
hiveIntegration = connectorConfig.getBoolean(HiveConfig.HIVE_INTEGRATION_CONFIG);
if (hiveIntegration) {
hiveDatabase = connectorConfig.getString(HiveConfig.HIVE_DATABASE_CONFIG);
hiveMetaStore = new HiveMetaStore(conf, connectorConfig);
if (format != null) {
hive = format.getHiveUtil(connectorConfig, hiveMetaStore);
} else if (newFormat != null) {
final io.confluent.connect.storage.hive.HiveUtil newHiveUtil
= ((HiveFactory) newFormat.getHiveFactory())
.createHiveUtil(connectorConfig, hiveMetaStore);
hive = new HiveUtil(connectorConfig, hiveMetaStore) {
@Override
public void createTable(
String database, String tableName, Schema schema,
Partitioner partitioner
) {
newHiveUtil.createTable(database, tableName, schema, partitioner);
}
@Override
public void alterSchema(String database, String tableName, Schema schema) {
newHiveUtil.alterSchema(database, tableName, schema);
}
};
} else {
throw new ConnectException("One of old or new format classes must be provided");
}
executorService = Executors.newSingleThreadExecutor();
hiveUpdateFutures = new LinkedList<>();
}
topicPartitionWriters = new HashMap<>();
for (TopicPartition tp : assignment) {
TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
tp,
storage,
writerProvider,
newWriterProvider,
partitioner,
connectorConfig,
context,
avroData,
hiveMetaStore,
hive,
schemaFileReader,
executorService,
hiveUpdateFutures,
time
);
topicPartitionWriters.put(tp, topicPartitionWriter);
}
} catch (ClassNotFoundException
| IllegalAccessException
| InstantiationException
| InvocationTargetException
| NoSuchMethodException e
) {
throw new ConnectException("Reflection exception: ", e);
} catch (IOException e) {
throw new ConnectException(e);
}
}
初始化DataWriter后,下一步执行方法recover方法
private void recover(Set<TopicPartition> assignment) {
for (TopicPartition tp : assignment) {
hdfsWriter.recover(tp);
}
}
hdfsWriter.recover(tp)方法跳转到DataWriter类中执行
DataWriter.java
public void recover(TopicPartition tp) {
topicPartitionWriters.get(tp).recover();
}
TopicPartitionWriter.java中执行recover
@SuppressWarnings("fallthrough")
public boolean recover() {
try {
switch (state) {
case RECOVERY_STARTED:
log.info("Started recovery for topic partition {}", tp);
pause();
nextState();
case RECOVERY_PARTITION_PAUSED:
log.debug("Start recovery state: Apply WAL for topic partition {}", tp);
applyWAL();
nextState();
case WAL_APPLIED:
log.debug("Start recovery state: Truncate WAL for topic partition {}", tp);
truncateWAL();
nextState();
case WAL_TRUNCATED:
log.debug("Start recovery state: Reset Offsets for topic partition {}", tp);
resetOffsets();
nextState();
case OFFSET_RESET:
log.debug("Start recovery state: Resume for topic partition {}", tp);
resume();
nextState();
log.info("Finished recovery for topic partition {}", tp);
break;
default:
log.error(
"{} is not a valid state to perform recovery for topic partition {}.",
state,
tp
);
}
} catch (ConnectException e) {
log.error("Recovery failed at state {}", state, e);
setRetryTimeout(timeoutMs);
return false;
}
return true;
}
task线程启动后执行执行workersinktask中的iteration()方法
第二步执行iteration()方法;
protected void iteration() {
final long offsetCommitIntervalMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
try {
long now = time.milliseconds();
// Maybe commit
if (!committing && (context.isCommitRequested() || now >= nextCommit)) {
commitOffsets(now, false);
nextCommit = now + offsetCommitIntervalMs;
context.clearCommitRequest();
}
final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
// Check for timed out commits
if (committing && now >= commitTimeoutMs) {
log.warn("{} Commit of offsets timed out", this);
commitFailures++;
committing = false;
}
// And process messages
long timeoutMs = Math.max(nextCommit - now, 0);
poll(timeoutMs);
} catch (WakeupException we) {
log.trace("{} Consumer woken up", this);
if (isStopping())
return;
if (shouldPause()) {
pauseAll();
onPause();
context.requestCommit();
} else if (!pausedForRedelivery) {
resumeAll();
onResume();
}
}
}
/**
* Poll for new messages with the given timeout. Should only be invoked by the worker thread.
*/
protected void poll(long timeoutMs) {
rewind();
long retryTimeout = context.timeout();
if (retryTimeout > 0) {
timeoutMs = Math.min(timeoutMs, retryTimeout);
context.timeout(-1L);
}
log.trace("{} Polling consumer with timeout {} ms", this, timeoutMs);
ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
assert messageBatch.isEmpty() || msgs.isEmpty();
log.trace("{} Polling returned {} messages", this, msgs.count());
convertMessages(msgs);
deliverMessages();
}
private void deliverMessages() {
// Finally, deliver this batch to the sink
try {
// Since we reuse the messageBatch buffer, ensure we give the task its own copy
log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
long start = time.milliseconds();
task.put(new ArrayList<>(messageBatch));
recordBatch(messageBatch.size());
sinkTaskMetricsGroup.recordPut(time.milliseconds() - start);
currentOffsets.putAll(origOffsets);
messageBatch.clear();
// If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
// the task had not explicitly paused
if (pausedForRedelivery) {
if (!shouldPause())
resumeAll();
pausedForRedelivery = false;
}
} catch (RetriableException e) {
log.error("{} RetriableException from SinkTask:", this, e);
// If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
// but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
pausedForRedelivery = true;
pauseAll();
// Let this exit normally, the batch will be reprocessed on the next loop.
} catch (Throwable t) {
log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not "
+ "recover until manually restarted. Error: {}", this, t.getMessage(), t);
throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.", t);
}
}
task.put(new ArrayList<>(messageBatch));具体实现方法为hdfssinktask中的put方法,实现如下
@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
if (log.isDebugEnabled()) {
log.debug("Read {} records from Kafka", records.size());
}
try {
hdfsWriter.write(records);
} catch (ConnectException e) {
throw new ConnectException(e);
}
}