sparkstreaming消费kafka消息

2018-04-26  本文已影响0人  阿甘骑士
之前我们已经介绍过怎么把nginx日志同步到kafka,现在我们尝试消费里面的消息并固化到hdfs里面;
在实施方案前,假设读者已经熟悉以下技术 (不细说)
方案实施
建好表
su hdfs;

#这里的库已经建好kudu_vip,接着建表NGINX_LOG
#考虑到nginx日志中每个用户一次操作就会形成一行记录,而kudu表需要主键,用uuid补充

impala-shell -i node1:21000 -q  "
CREATE TABLE kudu_vip.NGINX_LOG
(
uuid STRING,
remote_addr STRING,
time_local STRING,
remote_user STRING,
status STRING,
body_bytes_sent STRING,
http_referer STRING,
http_user_agent STRING,
http_x_forwarded_for STRING,
PRIMARY KEY(uuid)
)
PARTITION BY HASH PARTITIONS 3
STORED AS KUDU;  
"
编写kafka消费端
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <spring.version>5.0.2.RELEASE</spring.version>
  </properties>

  <dependencies>
       <!-- spring 管理  -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>
        
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
  
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
    
       <!-- spark配置 -->
        <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming_2.11</artifactId>
             <version>2.3.0</version>
             <exclusions>
                 <exclusion>
                      <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>


         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
             <version>2.3.0</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-api</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>

         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <version>1.7.23</version>
         </dependency>

         <dependency>
             <groupId>org.apache.kudu</groupId>
             <artifactId>kudu-client</artifactId>
             <version>1.4.0</version>
         </dependency>

         <dependency>
             <groupId>org.apache.kudu</groupId>
             <artifactId>kudu-spark2_2.11</artifactId>
             <version>1.4.0</version>
         </dependency>


         <dependency>
             <groupId>org.apache.kudu</groupId>
             <artifactId>kudu-client-tools</artifactId>
             <version>1.4.0</version>
         </dependency>

         <!-- 用于解析消息 -->
        <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>1.2.46</version>
         </dependency>
  </dependencies>

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd  
                          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
                          http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd         
                          http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.1.xsd         
                          http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">
    
    
    <!-- 加载系统properties -->
    <context:property-placeholder 
        location="file:D:/yougouconf/bi/bi-sparkstreaming/*.properties" 
        file-encoding="UTF-8"
        ignore-unresolvable="true" ignore-resource-not-found="true" order="2" system-properties-mode="NEVER" />
    <context:property-placeholder 
        location="file:/etc/wonhighconf/bi/bi-sparkstreaming/*.properties"
        file-encoding="UTF-8"
        ignore-unresolvable="true" ignore-resource-not-found="true" order="2" system-properties-mode="NEVER" />
    
     <!-- 初始化spark -->
     <bean id="sparkConf" class="org.apache.spark.SparkConf">
        <property name="master" value="${conf.master}"></property>
        <property name="appName" value="${conf.appName}"></property>
     </bean>
     
     <!-- 初始化spark工具类 -->
     <bean id="spark" class="sparkStreaming.spark.Spark">
         <constructor-arg index="0" ref="sparkConf"/>
         <constructor-arg index="1" value="${conf.bootStrapServers}"/>
         <constructor-arg index="2" value="${conf.topics}"/>
         <constructor-arg index="3" value="${conf.loglevel}"/>
     </bean>    
     
     <!-- 其他配置参数 -->
      <bean id="commonConfig"  class="java.util.HashMap" >
        <constructor-arg>
            <map>
                <entry key="kudu.instances" value="${kudu.instances}" />
                <entry key="kudu.schema" value="${kudu.schema}" />
            </map>
        </constructor-arg>
     </bean>
</beans>
#spark的一些设置
#假如在yarn上面跑就改成yarn-cluster
conf.master=local[*]
conf.appName=testSparkStreaming
conf.bootStrapServers=node1:9092,node2:9092,node3:9092
#监听主题
conf.topics=testnginx
conf.loglevel=ERROR

#kudu连接实例
kudu.instances=node1:7051
#kudu数据库
kudu.schema=impala::KUDU_VIP
package sparkStreaming.spark;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

public class Spark {
    
    private SparkConf conf;
    
    private JavaStreamingContext jssc;
    
    private String bootStrapServers;
    
    private String topics;
    
    private String logLevel;
    
    
    public Spark(SparkConf conf, String bootStrapServers, String topics, String logLevel) {
        this.conf = conf;
        this.jssc = new JavaStreamingContext(conf, Durations.milliseconds(500));
        jssc.sparkContext().setLogLevel(this.setLogLevel(logLevel));
        this.bootStrapServers = bootStrapServers;
        this.topics = topics;
    }
    
    /**
     * 获取sparkstreamingContenxt
     * @return
     */
    public JavaInputDStream<ConsumerRecord<String, String>> getStream() {
        
        if (null != conf && null != bootStrapServers && null != topics) {
            // kafka配置
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put("bootstrap.servers", bootStrapServers);
            kafkaParams.put("key.deserializer", StringDeserializer.class);
            kafkaParams.put("value.deserializer", StringDeserializer.class);
            kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
            kafkaParams.put("auto.offset.reset", "latest");
            kafkaParams.put("enable.auto.commit", false);
            
            String[] topicArr = topics.split(",");
            Collection<String> topics = Arrays.asList(topicArr);
            
            JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
                    .createDirectStream(jssc, LocationStrategies
                            .PreferConsistent(), ConsumerStrategies
                            .<String, String> Subscribe(topics, kafkaParams));
            
            return stream;
        
        } else {
            
            return null;
        }
        
    }
    
    public void streamingStart(){
        if (null != jssc) {
            jssc.start();
        }
    }
    
    public void streamingAwaitTermination() throws InterruptedException{
        if (null != jssc) {
            jssc.awaitTermination();
        }
    }
    
    public SparkConf getConf() {
        return conf;
    }

    public void setConf(SparkConf conf) {
        this.conf = conf;
    }

    public JavaStreamingContext getJssc() {
        return jssc;
    }

    public void setJssc(JavaStreamingContext jssc) {
        this.jssc = jssc;
    }

    public String getBootStrapServers() {
        return bootStrapServers;
    }

    public void setBootStrapServers(String bootStrapServers) {
        this.bootStrapServers = bootStrapServers;
    }

    public String getTopics() {
        return topics;
    }

    public void setTopics(String topics) {
        this.topics = topics;
    }

    public String getLogLevel() {
        return logLevel;
    }

    public String setLogLevel(String logLevel) {
        this.logLevel = logLevel;
        return logLevel;
    }

}

package sparkStreaming.kudu;

import java.util.HashMap;
import java.util.Map;

import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sparkStreaming.utils.SpringContextUtil;


public enum Kudu {
    INSTANCE;

    private Kudu() {
        init();
        addShutdownHook();
    }

    private KuduClient client = null;
    private Map<String, KuduTable> tables = new HashMap<String, KuduTable>();
    private Logger logger = LoggerFactory.getLogger(Kudu.class);

    private void init() {
        //获取配置信息
        @SuppressWarnings("unchecked")
        Map<String,String> commonConfig = (Map<String, String>) SpringContextUtil.getBean("commonConfig");
      
        if (null!= commonConfig && commonConfig.containsKey("kudu.instances")) {
              client = new KuduClient.KuduClientBuilder(commonConfig.get("kudu.instances")).defaultOperationTimeoutMs(60000)
                      .defaultSocketReadTimeoutMs(30000).defaultAdminOperationTimeoutMs(60000).build();
        }
    }

    private void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                if (client != null) {
                    try {
                        client.close();
                    } catch (Exception e) {
                        logger.error("ShutdownHook Close KuduClient Error!", e);
                    }
                }
            }
        });
    }

    public KuduClient client() {
        return client;
    }

    public KuduTable table(String name) throws KuduException {
        KuduTable table = tables.get(name);
        if (table == null) {
            table = client.openTable(name);
            tables.put(name, table);
        }
        return table;
    }

    /**
     * FlushMode:AUTO_FLUSH_BACKGROUND
     *
     * @return
     * @throws KuduException
     */
    public KuduSession newAsyncSession() throws KuduException {
        KuduSession session = client.newSession();
        //session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
        session.setFlushInterval(500);
        session.setMutationBufferSpace(5000);
        return session;
    }

    /**
     * FlushMode:AUTO_FLUSH_SYNC
     *
     * @return
     * @throws KuduException
     */
    public KuduSession newSession() throws KuduException {
        KuduSession session = client.newSession();
        //session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
        session.setMutationBufferSpace(5000);
        return session;
    }

    public void closeSession(KuduSession session) {
        if (session != null && !session.isClosed()) {
            try {
                session.close();
            } catch (KuduException e) {
                logger.error("Close KuduSession Error!", e);
            }
        }
    }

    public KuduScanner.KuduScannerBuilder scannerBuilder(String table) {
        return client.newScannerBuilder(tables.get(table));
    }
}

package sparkStreaming.kudu;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.Delete;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.Update;
import org.apache.kudu.client.Upsert;

import com.alibaba.fastjson.JSONObject;

public class KuduUtils {
     private static final ThreadLocal<KuduSession> threadLocal = new ThreadLocal();

        public static KuduTable table(String name) throws KuduException {
            return Kudu.INSTANCE.table(name);
        }

        public static Insert emptyInsert(String table) throws KuduException {
            KuduTable ktable = Kudu.INSTANCE.table(table);
            return ktable.newInsert();
        }

        public static Update emptyUpdate(String table) throws KuduException {
            KuduTable ktable = Kudu.INSTANCE.table(table);
            return ktable.newUpdate();
        }

        public static Upsert emptyUpsert(String table) throws KuduException {
            KuduTable ktable = Kudu.INSTANCE.table(table);
            return ktable.newUpsert();
        }

        /**
         * Only columns which are part of the key can be set
         *
         * @param table
         * @return
         */
        public static Delete emptyDelete(String table) throws KuduException {
            KuduTable ktable = Kudu.INSTANCE.table(table);
            return ktable.newDelete();
        }

        public static Upsert createUpsert(String table, JSONObject data) throws KuduException {
            KuduTable ktable = Kudu.INSTANCE.table(table);
            Upsert upsert = ktable.newUpsert();
            PartialRow row = upsert.getRow();
            Schema schema = ktable.getSchema();
            for (String colName : data.keySet()) {
                ColumnSchema colSchema = schema.getColumn(colName);
                fillRow(row, colSchema, data);
            }
            return upsert;
        }

        public static Insert createInsert(String table, JSONObject data) throws KuduException {
            KuduTable ktable = Kudu.INSTANCE.table(table);
            Insert insert = ktable.newInsert();
            PartialRow row = insert.getRow();
            Schema schema = ktable.getSchema();
            for (String colName : data.keySet()) {
                ColumnSchema colSchema = schema.getColumn(colName.toLowerCase());
                fillRow(row, colSchema, data);
            }
            return insert;
        }
        
        public static void insert(String table, JSONObject data) throws KuduException {
            Insert insert = createInsert(table, data);
            KuduSession session = getSession();
            session.apply(insert);
            session.flush();
            closeSession();
        }
        
        public static void upsert(String table, JSONObject data) throws KuduException {
            Upsert upsert = createUpsert(table, data);
            KuduSession session = getSession();
            session.apply(upsert);
            session.flush();
            closeSession();
        }
        
        
        
        private static void fillRow(PartialRow row, ColumnSchema colSchema, JSONObject data) {
            String name = colSchema.getName();
            if (data.get(name) == null) {
                return;
            }
            
            Type type = colSchema.getType();
            switch (type) {
                case STRING:
                    row.addString(name, data.getString(name));
                    break;
                case INT64:
                case UNIXTIME_MICROS:
                    row.addLong(name, data.getLongValue(name));
                    break;
                case DOUBLE:
                    row.addDouble(name, data.getDoubleValue(name));
                    break;
                case INT32:
                    row.addInt(name, data.getIntValue(name));
                    break;
                case INT16:
                    row.addShort(name, data.getShortValue(name));
                    break;
                case INT8:
                    row.addByte(name, data.getByteValue(name));
                    break;
                case BOOL:
                    row.addBoolean(name, data.getBooleanValue(name));
                    break;
                case BINARY:
                    row.addBinary(name, data.getBytes(name));
                    break;
                case FLOAT:
                    row.addFloat(name, data.getFloatValue(name));
                    break;
                default:
                    break;
            }
        }

        public static KuduSession getSession() throws KuduException {
            KuduSession session = threadLocal.get();
            if (session == null) {
                session = Kudu.INSTANCE.newAsyncSession();
                threadLocal.set(session);
            }
            return session;
        }

        public static KuduSession getAsyncSession() throws KuduException {
            KuduSession session = threadLocal.get();
            if (session == null) {
                session = Kudu.INSTANCE.newAsyncSession();
                threadLocal.set(session);
            }
            return session;
        }

        public static void closeSession() {
            KuduSession session = threadLocal.get();
            threadLocal.set(null);
            Kudu.INSTANCE.closeSession(session);
        }

}

package sparkStreaming.utils;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;


public class SpringContextUtil {

    
    // Spring应用上下文环境  
    private static ApplicationContext applicationContext;  
    
    /** 
     * 实现ApplicationContextAware接口的回调方法,设置上下文环境 
     *  
     * @param applicationContext 
     */  
    public static void setApplicationContext(ApplicationContext applicationContext) {  
        SpringContextUtil.applicationContext = applicationContext;  
    } 
    
    /** 
     * @return ApplicationContext 
     */  
    public static ApplicationContext getApplicationContext() {  
        return applicationContext;  
    }  
    
    /** 
     * 获取对象 
     *  
     * @param name 
     * @return Object
     * @throws BeansException 
     */  
    public static Object getBean(String name) throws BeansException {  
        return applicationContext.getBean(name);  
    }  
}

package sparkStreaming.flume.kafkaComsumer;

import java.util.Collections;
import java.util.UUID;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.alibaba.fastjson.JSONObject;

import sparkStreaming.kudu.KuduUtils;
import sparkStreaming.spark.Spark;
import sparkStreaming.utils.SpringContextUtil;

public class App {
    
    private static final String BEAN_CONF = "classpath:spring/spring-bean.xml";
    
    public static void main(String args[]) {
        try {
            //把actx设置进去,后续可以共用
            String[] confs = new String[] {
                    BEAN_CONF
            };
            
            //把actx设置进去,后续可以共用
            SpringContextUtil.setApplicationContext(new ClassPathXmlApplicationContext(confs));
            
            //获取spark bean
            Spark spark = (Spark) SpringContextUtil.getBean("spark");
            
            //获取sparkStreamingContext
            JavaInputDStream<ConsumerRecord<String, String>> stream = spark.getStream();
            
            //nginx日志对应的字段
            String[] columns = {"remote_addr","remote_user","time_local",
                                "request","status","body_bytes_sent","http_referer",
                                "http_user_agent","http_x_forwarded_for"};
            
            stream.foreachRDD(rdd -> {
                rdd.foreachPartition(records -> {
                    try {
                        
                        while (records.hasNext()) {
                            // 解析数据
                            ConsumerRecord<String, String> consumerRecords = records.next();
                            
                            String[] messages = consumerRecords.value() == null? (String[]) Collections.EMPTY_LIST.toArray():consumerRecords.value().split("\\|\\+\\|");
                            
                            int length = messages.length;
                            
                            JSONObject json = new JSONObject();
                            
                            for (int i = 0 ; i < columns.length; i++) {
                                if (i < length) {
                                    json.put(columns[i], messages[i]);
                                }  
                            }
                            
                            //kudu表一定要有主键
                            json.put("uuid", UUID.randomUUID().toString().replace("-", ""));
                            
                            KuduUtils.insert("impala::kudu_vip.NGINX_LOG", json);
                        }
                        
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            });
            
            spark.streamingStart();
            spark.streamingAwaitTermination();
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

启动程序
上一篇下一篇

猜你喜欢

热点阅读