Kafka-connect-hdfs源码解析

2020-07-10  本文已影响0人  xhh199090

写数据流程分析

启动task类方法,HdfsSinkTask.java类中start

@Override
public void start(Map<String, String> props) {
  Set<TopicPartition> assignment = context.assignment();
  try {
    HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
    boolean hiveIntegration = connectorConfig.getBoolean(HiveConfig.HIVE_INTEGRATION_CONFIG);
    if (hiveIntegration) {
      StorageSchemaCompatibility compatibility = StorageSchemaCompatibility.getCompatibility(
          connectorConfig.getString(StorageSinkConnectorConfig.SCHEMA_COMPATIBILITY_CONFIG)
      );
      if (compatibility == StorageSchemaCompatibility.NONE) {
        throw new ConfigException(
            "Hive Integration requires schema compatibility to be BACKWARD, FORWARD or FULL"
        );
      }
    }

    //check that timezone it setup correctly in case of scheduled rotation
    if (connectorConfig.getLong(HdfsSinkConnectorConfig.ROTATE_SCHEDULE_INTERVAL_MS_CONFIG) > 0) {
      String timeZoneString = connectorConfig.getString(PartitionerConfig.TIMEZONE_CONFIG);
      if (timeZoneString.equals("")) {
        throw new ConfigException(PartitionerConfig.TIMEZONE_CONFIG,
            timeZoneString, "Timezone cannot be empty when using scheduled file rotation."
        );
      }
      DateTimeZone.forID(timeZoneString);
    }

    int schemaCacheSize = connectorConfig.getInt(
        HdfsSinkConnectorConfig.SCHEMA_CACHE_SIZE_CONFIG
    );
    avroData = new AvroData(schemaCacheSize);
    hdfsWriter = new DataWriter(connectorConfig, context, avroData);   //初始化DataWriter
    recover(assignment);
    if (hiveIntegration) {
      syncWithHive();
    }
  } catch (ConfigException e) {
    throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", e);
  } catch (ConnectException e) {
    // Log at info level to help explain reason, but Connect logs the actual exception at ERROR
    log.info("Couldn't start HdfsSinkConnector:", e);
    log.info("Shutting down HdfsSinkConnector.");
    if (hdfsWriter != null) {
      try {
        try {
          log.debug("Closing data writer due to task start failure.");
          hdfsWriter.close();
        } finally {
          log.debug("Stopping data writer due to task start failure.");
          hdfsWriter.stop();
        }
      } catch (Throwable t) {
        log.debug("Error closing and stopping data writer: {}", t.getMessage(), t);
      }
    }
    // Always throw the original exception that prevent us from starting
    throw e;
  }

  log.info("The connector relies on offsets in HDFS filenames, but does commit these offsets to "
      + "Connect to enable monitoring progress of the HDFS connector. Upon startup, the HDFS "
      + "Connector restores offsets from filenames in HDFS. In the absence of files in HDFS, "
      + "the connector will attempt to find offsets for its consumer group in the "
      + "'__consumer_offsets' topic. If offsets are not found, the consumer will "
      + "rely on the reset policy specified in the 'consumer.auto.offset.reset' property to "
      + "start exporting data to HDFS.");
}

初始化DataWriter,DataWriter.java

@SuppressWarnings("unchecked")
public DataWriter(
    HdfsSinkConnectorConfig connectorConfig,
    SinkTaskContext context,
    AvroData avroData,
    Time time
) {
  this.time = time;
  try {
    String hadoopHome = connectorConfig.getString(HdfsSinkConnectorConfig.HADOOP_HOME_CONFIG);
    System.setProperty("hadoop.home.dir", hadoopHome);

    this.connectorConfig = connectorConfig;
    this.avroData = avroData;
    this.context = context;

    String hadoopConfDir = connectorConfig.getString(
        HdfsSinkConnectorConfig.HADOOP_CONF_DIR_CONFIG
    );
    log.info("Hadoop configuration directory {}", hadoopConfDir);
    Configuration conf = connectorConfig.getHadoopConfiguration();
    if (!hadoopConfDir.equals("")) {
      conf.addResource(new Path(hadoopConfDir + "/core-site.xml"));
      conf.addResource(new Path(hadoopConfDir + "/hdfs-site.xml"));
    }

    boolean secureHadoop = connectorConfig.getBoolean(
        HdfsSinkConnectorConfig.HDFS_AUTHENTICATION_KERBEROS_CONFIG
    );
    if (secureHadoop) {
      SecurityUtil.setAuthenticationMethod(
          UserGroupInformation.AuthenticationMethod.KERBEROS,
          conf
      );
      String principalConfig = connectorConfig.getString(
          HdfsSinkConnectorConfig.CONNECT_HDFS_PRINCIPAL_CONFIG
      );
      String keytab = connectorConfig.getString(
          HdfsSinkConnectorConfig.CONNECT_HDFS_KEYTAB_CONFIG
      );

      if (principalConfig == null || keytab == null) {
        throw new ConfigException(
            "Hadoop is using Kerberos for authentication, you need to provide both a connect "
                + "principal and the path to the keytab of the principal.");
      }

      conf.set("hadoop.security.authentication", "kerberos");
      conf.set("hadoop.security.authorization", "true");
      String hostname = InetAddress.getLocalHost().getCanonicalHostName();
      String namenodePrincipalConfig = connectorConfig.getString(
          HdfsSinkConnectorConfig.HDFS_NAMENODE_PRINCIPAL_CONFIG
      );

      String namenodePrincipal = SecurityUtil.getServerPrincipal(
          namenodePrincipalConfig,
          hostname
      );
      // namenode principal is needed for multi-node hadoop cluster
      if (conf.get("dfs.namenode.kerberos.principal") == null) {
        conf.set("dfs.namenode.kerberos.principal", namenodePrincipal);
      }
      log.info("Hadoop namenode principal: " + conf.get("dfs.namenode.kerberos.principal"));

      UserGroupInformation.setConfiguration(conf);
      // replace the _HOST specified in the principal config to the actual host
      String principal = SecurityUtil.getServerPrincipal(principalConfig, hostname);
      UserGroupInformation.loginUserFromKeytab(principal, keytab);
      final UserGroupInformation ugi = UserGroupInformation.getLoginUser();
      log.info("Login as: " + ugi.getUserName());

      final long renewPeriod = connectorConfig.getLong(
          HdfsSinkConnectorConfig.KERBEROS_TICKET_RENEW_PERIOD_MS_CONFIG
      );

      isRunning = true;
      ticketRenewThread = new Thread(new Runnable() {
        @Override
        public void run() {
          synchronized (DataWriter.this) {
            while (isRunning) {
              try {
                DataWriter.this.wait(renewPeriod);
                if (isRunning) {
                  ugi.reloginFromKeytab();
                }
              } catch (IOException e) {
                // We ignore this exception during relogin as each successful relogin gives
                // additional 24 hours of authentication in the default config. In normal
                // situations, the probability of failing relogin 24 times is low and if
                // that happens, the task will fail eventually.
                log.error("Error renewing the ticket", e);
              } catch (InterruptedException e) {
                // ignored
              }
            }
          }
        }
      });
      log.info("Starting the Kerberos ticket renew thread with period {}ms.", renewPeriod);
      ticketRenewThread.start();
    }

    url = connectorConfig.getUrl();
    topicsDir = connectorConfig.getString(StorageCommonConfig.TOPICS_DIR_CONFIG);

    @SuppressWarnings("unchecked")
    Class<? extends HdfsStorage> storageClass = (Class<? extends HdfsStorage>) connectorConfig
        .getClass(StorageCommonConfig.STORAGE_CLASS_CONFIG);
    storage = io.confluent.connect.storage.StorageFactory.createStorage(
        storageClass,
        HdfsSinkConnectorConfig.class,
        connectorConfig,
        url
    );

    createDir(topicsDir);
    createDir(topicsDir + HdfsSinkConnectorConstants.TEMPFILE_DIRECTORY);
    String logsDir = connectorConfig.getString(HdfsSinkConnectorConfig.LOGS_DIR_CONFIG);
    createDir(logsDir);

    // Try to instantiate as a new-style storage-common type class, then fall back to old-style
    // with no parameters
    try {
      Class<io.confluent.connect.storage.format.Format> formatClass =
          (Class<io.confluent.connect.storage.format.Format>)
              connectorConfig.getClass(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG);
      newFormat = formatClass.getConstructor(HdfsStorage.class).newInstance(storage);
      newWriterProvider = newFormat.getRecordWriterProvider();
      schemaFileReader = newFormat.getSchemaFileReader();
    } catch (NoSuchMethodException e) {
      Class<Format> formatClass =
          (Class<Format>) connectorConfig.getClass(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG);
      format = formatClass.getConstructor().newInstance();
      writerProvider = format.getRecordWriterProvider();
      final io.confluent.connect.hdfs.SchemaFileReader oldReader
          = format.getSchemaFileReader(avroData);
      schemaFileReader = new SchemaFileReader<HdfsSinkConnectorConfig, Path>() {
        @Override
        public Schema getSchema(HdfsSinkConnectorConfig hdfsSinkConnectorConfig, Path path) {
          try {
            return oldReader.getSchema(hdfsSinkConnectorConfig.getHadoopConfiguration(), path);
          } catch (IOException e) {
            throw new ConnectException("Failed to get schema", e);
          }
        }

        @Override
        public Iterator<Object> iterator() {
          throw new UnsupportedOperationException();
        }

        @Override
        public boolean hasNext() {
          throw new UnsupportedOperationException();
        }

        @Override
        public Object next() {
          throw new UnsupportedOperationException();
        }

        @Override
        public void remove() {
          throw new UnsupportedOperationException();
        }

        @Override
        public void close() throws IOException {

        }
      };
    }

    partitioner = newPartitioner(connectorConfig);

    assignment = new HashSet<>(context.assignment());

    hiveIntegration = connectorConfig.getBoolean(HiveConfig.HIVE_INTEGRATION_CONFIG);
    if (hiveIntegration) {
      hiveDatabase = connectorConfig.getString(HiveConfig.HIVE_DATABASE_CONFIG);
      hiveMetaStore = new HiveMetaStore(conf, connectorConfig);
      if (format != null) {
        hive = format.getHiveUtil(connectorConfig, hiveMetaStore);
      } else if (newFormat != null) {
        final io.confluent.connect.storage.hive.HiveUtil newHiveUtil
            = ((HiveFactory) newFormat.getHiveFactory())
            .createHiveUtil(connectorConfig, hiveMetaStore);
        hive = new HiveUtil(connectorConfig, hiveMetaStore) {
          @Override
          public void createTable(
              String database, String tableName, Schema schema,
              Partitioner partitioner
          ) {
            newHiveUtil.createTable(database, tableName, schema, partitioner);
          }

          @Override
          public void alterSchema(String database, String tableName, Schema schema) {
            newHiveUtil.alterSchema(database, tableName, schema);
          }
        };
      } else {
        throw new ConnectException("One of old or new format classes must be provided");
      }
      executorService = Executors.newSingleThreadExecutor();
      hiveUpdateFutures = new LinkedList<>();
    }

    topicPartitionWriters = new HashMap<>();
    for (TopicPartition tp : assignment) {
      TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
          tp,
          storage,
          writerProvider,
          newWriterProvider,
          partitioner,
          connectorConfig,
          context,
          avroData,
          hiveMetaStore,
          hive,
          schemaFileReader,
          executorService,
          hiveUpdateFutures,
          time
      );
      topicPartitionWriters.put(tp, topicPartitionWriter);
    }
  } catch (ClassNotFoundException
          | IllegalAccessException
          | InstantiationException
          | InvocationTargetException
          | NoSuchMethodException e
  ) {
    throw new ConnectException("Reflection exception: ", e);
  } catch (IOException e) {
    throw new ConnectException(e);
  }
}

初始化DataWriter后,下一步执行方法recover方法

private void recover(Set<TopicPartition> assignment) {
  for (TopicPartition tp : assignment) {
    hdfsWriter.recover(tp);
  }
}

hdfsWriter.recover(tp)方法跳转到DataWriter类中执行
DataWriter.java

public void recover(TopicPartition tp) {
  topicPartitionWriters.get(tp).recover();
}

TopicPartitionWriter.java中执行recover

@SuppressWarnings("fallthrough")
public boolean recover() {
  try {
    switch (state) {
      case RECOVERY_STARTED:
        log.info("Started recovery for topic partition {}", tp);
        pause();
        nextState();
      case RECOVERY_PARTITION_PAUSED:
        log.debug("Start recovery state: Apply WAL for topic partition {}", tp);
        applyWAL();
        nextState();
      case WAL_APPLIED:
        log.debug("Start recovery state: Truncate WAL for topic partition {}", tp);
        truncateWAL();
        nextState();
      case WAL_TRUNCATED:
        log.debug("Start recovery state: Reset Offsets for topic partition {}", tp);
        resetOffsets();
        nextState();
      case OFFSET_RESET:
        log.debug("Start recovery state: Resume for topic partition {}", tp);
        resume();
        nextState();
        log.info("Finished recovery for topic partition {}", tp);
        break;
      default:
        log.error(
            "{} is not a valid state to perform recovery for topic partition {}.",
            state,
            tp
        );
    }
  } catch (ConnectException e) {
    log.error("Recovery failed at state {}", state, e);
    setRetryTimeout(timeoutMs);
    return false;
  }
  return true;
}

task线程启动后执行执行workersinktask中的iteration()方法
第二步执行iteration()方法;

protected void iteration() {
    final long offsetCommitIntervalMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);

    try {
        long now = time.milliseconds();

        // Maybe commit
        if (!committing && (context.isCommitRequested() || now >= nextCommit)) {
            commitOffsets(now, false);
            nextCommit = now + offsetCommitIntervalMs;
            context.clearCommitRequest();
        }

        final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);

        // Check for timed out commits
        if (committing && now >= commitTimeoutMs) {
            log.warn("{} Commit of offsets timed out", this);
            commitFailures++;
            committing = false;
        }

        // And process messages
        long timeoutMs = Math.max(nextCommit - now, 0);
        poll(timeoutMs);
    } catch (WakeupException we) {
        log.trace("{} Consumer woken up", this);

        if (isStopping())
            return;

        if (shouldPause()) {
            pauseAll();
            onPause();
            context.requestCommit();
        } else if (!pausedForRedelivery) {
            resumeAll();
            onResume();
        }
    }
}
/**
 * Poll for new messages with the given timeout. Should only be invoked by the worker thread.
 */
protected void poll(long timeoutMs) {
    rewind();
    long retryTimeout = context.timeout();
    if (retryTimeout > 0) {
        timeoutMs = Math.min(timeoutMs, retryTimeout);
        context.timeout(-1L);
    }

    log.trace("{} Polling consumer with timeout {} ms", this, timeoutMs);
    ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
    assert messageBatch.isEmpty() || msgs.isEmpty();
    log.trace("{} Polling returned {} messages", this, msgs.count());

    convertMessages(msgs);
    deliverMessages();
}
private void deliverMessages() {
    // Finally, deliver this batch to the sink
    try {
        // Since we reuse the messageBatch buffer, ensure we give the task its own copy
        log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
        long start = time.milliseconds();
        task.put(new ArrayList<>(messageBatch));
        recordBatch(messageBatch.size());
        sinkTaskMetricsGroup.recordPut(time.milliseconds() - start);
        currentOffsets.putAll(origOffsets);
        messageBatch.clear();
        // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
        // the task had not explicitly paused
        if (pausedForRedelivery) {
            if (!shouldPause())
                resumeAll();
            pausedForRedelivery = false;
        }
    } catch (RetriableException e) {
        log.error("{} RetriableException from SinkTask:", this, e);
        // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
        // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
        pausedForRedelivery = true;
        pauseAll();
        // Let this exit normally, the batch will be reprocessed on the next loop.
    } catch (Throwable t) {
        log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not "
                + "recover until manually restarted. Error: {}", this, t.getMessage(), t);
        throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.", t);
    }
}

task.put(new ArrayList<>(messageBatch));具体实现方法为hdfssinktask中的put方法,实现如下

@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
  if (log.isDebugEnabled()) {
    log.debug("Read {} records from Kafka", records.size());
  }
  try {
    hdfsWriter.write(records);
  } catch (ConnectException e) {
    throw new ConnectException(e);
  }
}
上一篇下一篇

猜你喜欢

热点阅读