Flume 采集 kafka 数据实时写入 Kudu
2020-05-17 本文已影响0人
lei_charles
-
创建 JsonKuduOperationsProducer.java 用于处理 Json 字符串写入Kudu
import com.alibaba.fastjson.JSON; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Operation; import org.apache.kudu.client.PartialRow; import org.apache.kudu.flume.sink.KuduOperationsProducer; import org.apache.kudu.shaded.com.google.common.base.Preconditions; import org.apache.kudu.shaded.com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.nio.charset.Charset; import java.util.*; @InterfaceAudience.Public @InterfaceStability.Evolving public class JsonKuduOperationsProducer implements KuduOperationsProducer { private static final Logger logger = LoggerFactory.getLogger(JsonKuduOperationsProducer.class); private static final String INSERT = "insert"; private static final String UPSERT = "upsert"; private static final List<String> validOperations = Lists.newArrayList(UPSERT, INSERT); public static final String ENCODING_PROP = "encoding"; public static final String DEFAULT_ENCODING = "utf-8"; public static final String OPERATION_PROP = "operation"; public static final String DEFAULT_OPERATION = UPSERT; @Deprecated public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn"; @Deprecated public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false; @Deprecated public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue"; @Deprecated public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false; @Deprecated public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows"; @Deprecated public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true; public static final String MISSING_COLUMN_POLICY_PROP = "missingColumnPolicy"; public static final JsonKuduOperationsProducer.ParseErrorPolicy DEFAULT_MISSING_COLUMN_POLICY; public static final String BAD_COLUMN_VALUE_POLICY_PROP = "badColumnValuePolicy"; public static final JsonKuduOperationsProducer.ParseErrorPolicy DEFAULT_BAD_COLUMN_VALUE_POLICY; public static final String UNMATCHED_ROW_POLICY_PROP = "unmatchedRowPolicy"; public static final JsonKuduOperationsProducer.ParseErrorPolicy DEFAULT_UNMATCHED_ROW_POLICY; private KuduTable table; private Charset charset; private String operation; private JsonKuduOperationsProducer.ParseErrorPolicy missingColumnPolicy; private JsonKuduOperationsProducer.ParseErrorPolicy badColumnValuePolicy; private JsonKuduOperationsProducer.ParseErrorPolicy unmatchedRowPolicy; public JsonKuduOperationsProducer() { } @Override public void configure(Context context) { String charsetName = context.getString(ENCODING_PROP, DEFAULT_ENCODING); try { charset = Charset.forName(charsetName); } catch (IllegalArgumentException e) { throw new FlumeException( String.format("Invalid or unsupported charset %s", charsetName), e); } this.operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION).toLowerCase(); Preconditions.checkArgument(validOperations.contains(this.operation), "Unrecognized operation '%s'", this.operation); this.missingColumnPolicy = this.getParseErrorPolicyCheckingDeprecatedProperty( context, SKIP_MISSING_COLUMN_PROP, MISSING_COLUMN_POLICY_PROP, JsonKuduOperationsProducer.ParseErrorPolicy.WARN, JsonKuduOperationsProducer.ParseErrorPolicy.REJECT, DEFAULT_MISSING_COLUMN_POLICY); this.badColumnValuePolicy = this.getParseErrorPolicyCheckingDeprecatedProperty( context, SKIP_BAD_COLUMN_VALUE_PROP, BAD_COLUMN_VALUE_POLICY_PROP, JsonKuduOperationsProducer.ParseErrorPolicy.WARN, JsonKuduOperationsProducer.ParseErrorPolicy.REJECT, DEFAULT_BAD_COLUMN_VALUE_POLICY); this.unmatchedRowPolicy = this.getParseErrorPolicyCheckingDeprecatedProperty( context, WARN_UNMATCHED_ROWS_PROP, UNMATCHED_ROW_POLICY_PROP, JsonKuduOperationsProducer.ParseErrorPolicy.WARN, JsonKuduOperationsProducer.ParseErrorPolicy.IGNORE, DEFAULT_UNMATCHED_ROW_POLICY); } @Override public void initialize(KuduTable kuduTable) { this.table = kuduTable; } @Override public List<Operation> getOperations(Event event) throws FlumeException { String raw = new String(event.getBody(), charset); Map<String, String> rawMap = JSON.parseObject(raw, HashMap.class); Schema schema = this.table.getSchema(); List<Operation> ops = Lists.newArrayList(); if (raw != null && !raw.isEmpty()) { Operation op; switch (operation) { case UPSERT: op = this.table.newUpsert(); break; case INSERT: op = this.table.newInsert(); break; default: throw new FlumeException( String.format("Unrecognized operation type '%s' in getOperations(): " + "this should never happen!", this.operation)); } PartialRow row = op.getRow(); Iterator iterator = schema.getColumns().iterator(); while (iterator.hasNext()) { ColumnSchema col = (ColumnSchema) iterator.next(); // logger.error("Column:" + col.getName() + "----" + rawMap.get(col.getName()) + "----" + col.getType()); String msg; try { this.coerceAndSet(rawMap.get(col.getName()), col.getName(), col.getType(), row); } catch (NumberFormatException e) { msg = String.format("Raw value '%s' couldn't be parsed to type %s for column '%s'", raw, col.getType(), col.getName()); this.logOrThrow(this.badColumnValuePolicy, msg, e); } catch (IllegalArgumentException e) { msg = String.format("Column '%s' has no matching group in '%s'", col.getName(), raw); this.logOrThrow(this.missingColumnPolicy, msg, e); } catch (Exception e) { throw new FlumeException("Failed to create Kudu operation", e); } } ops.add(op); } return ops; } private void coerceAndSet(String rawVal, String colName, Type type, PartialRow row) throws NumberFormatException { switch (type) { case BOOL: row.addBoolean(colName, Boolean.parseBoolean(rawVal)); break; case INT8: row.addByte(colName, Byte.parseByte(rawVal)); break; case INT16: row.addShort(colName, Short.parseShort(rawVal)); break; case INT32: row.addInt(colName, Integer.parseInt(rawVal)); break; case INT64: case UNIXTIME_MICROS: row.addLong(colName, Long.parseLong(rawVal)); break; case FLOAT: row.addFloat(colName, Float.parseFloat(rawVal)); break; case DOUBLE: row.addDouble(colName, Double.parseDouble(rawVal)); break; case DECIMAL: row.addDecimal(colName, new BigDecimal(rawVal)); break; case BINARY: row.addBinary(colName, rawVal.getBytes(this.charset)); break; case STRING: row.addString(colName, rawVal == null ? "" : rawVal); break; default: logger.warn("got unknown type {} for column '{}'-- ignoring this column", type, colName); } } private void logOrThrow(JsonKuduOperationsProducer.ParseErrorPolicy policy, String msg, Exception e) throws FlumeException { switch (policy) { case REJECT: throw new FlumeException(msg, e); case WARN: logger.warn(msg, e); case IGNORE: default: } } @Override public void close() { } private JsonKuduOperationsProducer.ParseErrorPolicy getParseErrorPolicyCheckingDeprecatedProperty( Context context, String deprecatedPropertyName, String newPropertyName, JsonKuduOperationsProducer.ParseErrorPolicy trueValue, JsonKuduOperationsProducer.ParseErrorPolicy falseValue, JsonKuduOperationsProducer.ParseErrorPolicy defaultValue) { JsonKuduOperationsProducer.ParseErrorPolicy policy; if (context.containsKey(deprecatedPropertyName)) { logger.info("Configuration property {} is deprecated. Use {} instead.", deprecatedPropertyName, newPropertyName); Preconditions.checkArgument(!context.containsKey(newPropertyName), "Both {} and {} specified. Use only one of them, preferably {}.", deprecatedPropertyName, newPropertyName, newPropertyName); policy = context.getBoolean(deprecatedPropertyName) ? trueValue : falseValue; } else { String policyString = context.getString(newPropertyName, defaultValue.name()); try { policy = JsonKuduOperationsProducer.ParseErrorPolicy.valueOf(policyString.toUpperCase()); } catch (IllegalArgumentException var10) { throw new IllegalArgumentException("Unknown policy '" + policyString + "'. Use one of the following: " + Arrays.toString(JsonKuduOperationsProducer.ParseErrorPolicy.values()), var10); } } return policy; } static { DEFAULT_MISSING_COLUMN_POLICY = JsonKuduOperationsProducer.ParseErrorPolicy.REJECT; DEFAULT_BAD_COLUMN_VALUE_POLICY = JsonKuduOperationsProducer.ParseErrorPolicy.REJECT; DEFAULT_UNMATCHED_ROW_POLICY = JsonKuduOperationsProducer.ParseErrorPolicy.WARN; } private static enum ParseErrorPolicy { WARN, IGNORE, REJECT; private ParseErrorPolicy() { } } }
-
编写处理复杂 json 格式的拦截器
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; public class JsonInterceptor implements Interceptor { private static final Logger logger = LoggerFactory.getLogger(JsonInterceptor.class); private boolean preserveExisting; private String sourceKeys; private String targetKeys; private JsonInterceptor() { } private JsonInterceptor(boolean preserveExisting, String sourceKeys, String targetKeys) { this.preserveExisting = preserveExisting; this.sourceKeys = sourceKeys; this.targetKeys = targetKeys; } public void initialize() { } public Event intercept(Event event) { byte[] body = event.getBody(); String bodyStr = new String(body); String[] sourceArray = sourceKeys.trim().split(",", -1); String[] targetArray = targetKeys.trim().toLowerCase().split(",", -1); Map resultMap = new HashMap<String, String>(); if (sourceArray.length == targetArray.length) { JSONObject jsonObject = JSONObject.parseObject(bodyStr); JSONObject jsonObjectTemp = null; String[] arrayTemp = null; for (int i = 0; i < sourceArray.length; i++) { if (sourceArray[i].contains(".")) { arrayTemp = sourceArray[i].trim().split("\\.", -1); jsonObjectTemp = jsonObject; for (int j = 0; j < arrayTemp.length - 1; j++) { if (jsonObjectTemp != null) { jsonObjectTemp = jsonObjectTemp.getJSONObject(arrayTemp[j].trim()); }else { break; } } if (jsonObjectTemp != null){ resultMap.put(targetArray[i].trim(), String.valueOf(jsonObjectTemp.getOrDefault(arrayTemp[arrayTemp.length - 1], ""))); }else { resultMap.put(targetArray[i].trim(), ""); } } else { resultMap.put(targetArray[i].trim(), String.valueOf(jsonObject.getOrDefault(sourceArray[i], ""))); } } } else { logger.error("The sourceKeys and targetkeys lengths do not match"); } event.setBody(JSON.toJSONString(resultMap).getBytes(Charset.forName("UTF-8"))); return event; } public List<Event> intercept(List<Event> events) { Iterator i$ = events.iterator(); while (i$.hasNext()) { Event event = (Event) i$.next(); this.intercept(event); } return events; } public void close() { } public static class Constants { public static String PRESERVE = "preserveExisting"; public static String SOURCE_KEYS = "sourceKeys"; public static String TARGET_KEYS = "targetKeys"; public static boolean PRESERVE_DFLT = false; public Constants() { } } public static class Builder implements Interceptor.Builder { private boolean preserveExisting; private String sourceKeys; private String targetKeys; public Builder() { this.preserveExisting = JsonInterceptor.Constants.PRESERVE_DFLT; } public Interceptor build() { return new JsonInterceptor(this.preserveExisting, this.sourceKeys, this.targetKeys); } public void configure(Context context) { this.preserveExisting = context.getBoolean(JsonInterceptor.Constants.PRESERVE, JsonInterceptor.Constants.PRESERVE_DFLT); this.sourceKeys = context.getString(Constants.SOURCE_KEYS); this.targetKeys = context.getString(Constants.TARGET_KEYS); } } }
-
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cloudera</groupId> <artifactId>flume-demo</artifactId> <version>1.0-SNAPSHOT</version> <name>flume-demo</name> <repositories> <!-- cloudera 的仓库 --> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> <name>Cloudera Repositories</name> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> <!-- maven 的仓库 --> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <name>Maven2 Repositories</name> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <avro.version>1.8.2-cdh6.3.2</avro.version> <flume.version>1.9.0-cdh6.3.2</flume.version> <hadoop.version>3.0.0-cdh6.3.2</hadoop.version> <kudu.version>1.10.0-cdh6.3.2</kudu.version> <fastjson.version>1.2.58</fastjson.version> <slf4j.version>1.7.12</slf4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>${flume.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>${flume.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> <version>${flume.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>${kudu.version}</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-flume-sink</artifactId> <version>${kudu.version}</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>${avro.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <defaultGoal>install</defaultGoal> <sourceDirectory>src/main/java</sourceDirectory> <!--<testSourceDirectory>src/test/scala</testSourceDirectory>--> <resources> <resource> <directory>src/main/resources/</directory> <excludes> <exclude>env/*/*</exclude> </excludes> <includes> <include>**/*</include> </includes> </resource> <resource> <directory>src/main/resources/env/${profile.active}</directory> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> </resource> </resources> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> <configuration> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <version>3.0.2</version> <configuration> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.1</version> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
-
flume 配置文件
agent.sources = r1 agent.channels = c1 agent.sinks = k1 ## source r1 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource agent.sources.r1.batchSize = 5000 agent.sources.r1.batchDurationMillis = 2000 agent.sources.r1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092 agent.sources.r1.kafka.topics = test_topic agent.sources.r1.kafka.consumer.group.id = test_consumer agent.sources.r1.kafka.consumer.auto.offset.reset = earliest ## interceptor i1 agent.sources.r1.interceptors = i1 agent.sources.r1.interceptors.i1.type = com.cloudera.flume.interceptor.JsonInterceptor$Builder ## 以逗号分隔要提取的属性 agent.sources.r1.interceptors.i1.sourceKeys = $id,$name,$properties.$age,$properties.$address.addrDetail ## 配置转换后的属性名,与要提取的属性一一对应 agent.sources.r1.interceptors.i1.targetKeys = id,name,age,addrDetail ## channel c1 agent.channels.c1.type = file agent.channels.c1.checkpointDir = /data/flume-app/channel/checkpointDir/test_topic/checkpoint agent.channels.c1.dataDirs = /data/flume-app/channel/checkpointDir/test_topic/data agent.channels.c1.maxFileSize = 2146435071 agent.channels.c1.transactionCapacity = 10000 agent.channels.c1.capacity = 1000000 agent.channels.c1.keep-alive = 6 ## sinks k1 agent.sinks.k1.type = org.apache.kudu.flume.sink.KuduSink agent.sinks.k1.masterAddresses = node01,node02,node03 agent.sinks.k1.tableName = default.test agent.sinks.k1.batchSize = 5000 agent.sinks.k1.producer = com.cloudera.flume.sink.JsonKuduOperationsProducer ## 拼装 agent.sources.r1.channels = c1 agent.sinks.k1.channel= c1
-
flume 启动脚本
#! /bin/bash case $1 in "start"){ echo " --------启动 kudu-flume-sink 采集 flume-------" nohup flume-ng agent --conf-file kudu-flume-sink.conf --name agent -Xmx2048m -Dflume.root.logger=INFO,LOGFILE >> /data/flume-app/logs/kudu-flume-sink.log 2>&1 & };; "stop"){ echo " --------停止 kudu-flume-sink 采集flume-------" ps -ef | grep kudu-flume-sink.conf | grep -v grep |awk "{print \$2}" | xargs kill };; esac