spark查询分表mysql

2018-11-30  本文已影响0人  Cu提

我们的集群使用的是 ucloud的集群,spark版本是1.6。是一个较低的版本,对于mysql的支持非常有限,比如还不能支持in的索引查询等。但是我们分析工作很大一部分都依托于mysql,而且服务端数据经常会对数据分表存储,如下图所示
实现如下图


因此便考虑实现一个查询多表数据源的RDD。
实现一个RDD有三个必要元素。
1、partitions组成改rdd的每个partition
2、dependencies 该RDD的依赖关系
3、iterator 计算每个分区内的元素

import java.util.Properties

import com.kkworld.common.dao.DBPool
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkContext, TaskContext}

import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag

/**
  * Created by cuti on 2018/11/30.
  */

class MultiTableRDD[T: ClassTag](sqlUrl: String,
                                 sqlProp: Properties,
                                 sql: String,
                                 tableNumber: Int,
                                 sc: SparkContext,
                                 classTag: Class[T]) extends RDD[T](sc, Nil) {


  if (tableNumber == 1) {
    //单表查询
    assert(!sql.contains("{index}"), "one table do not need table index")
  } else {
    assert(sql.contains("{index}"), "more than one  table  need table index")
  }


  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    val map = Jdbc2BeanUtil.getSchemaMap(classTag)
    val pool = new DBPool(sqlUrl, sqlProp)
    val listBuffer = ListBuffer[T]()
    DBPool.withConnectQuery(pool, statement => {
      val partiionedSql = sql.replace("{index}", split.index.toString)
      val resultSet = statement.executeQuery(partiionedSql)
      while (resultSet.next()) {
        val result = Jdbc2BeanUtil.composeResult[T](map, resultSet, classTag)
        listBuffer.+=(result)
      }
    })
    listBuffer.iterator
  }

  override protected def getPartitions: Array[Partition] = {
    Range(0, tableNumber).map(index => MysqlPartition(index)).toArray
  }

}

case class MysqlPartition(tableIndex: Int) extends Partition {
  override def index: Int = tableIndex
}
import com.google.common.collect.Maps;

import java.lang.reflect.Field;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by cuti on 2018/11/30.
 */
public class Jdbc2BeanUtil {

    public static <T> HashMap<String, AnnonationHandle<Class>> getSchemaMap(Class<T> classTag) {
        HashMap<String, AnnonationHandle<Class>> map = Maps.newHashMap();
        for (Field field : classTag.getFields()) {
            String name = field.getName();
            JDBCField annotation = field.getAnnotation(JDBCField.class);
            String description = annotation.description();
            map.put(name, new AnnonationHandle(field, description, field.getType()));
        }
        for (Map.Entry<String, AnnonationHandle<Class>> stringAnnonationHandleEntry : map.entrySet()) {

            System.out.println(stringAnnonationHandleEntry);
        }
        return map;
    }


    public static <T> T composeResult(HashMap<String, AnnonationHandle<Class>> map, ResultSet resultset, Class<T> classType) {

        T student = null;

        try {
            student = classType.newInstance();
            for (Map.Entry<String, AnnonationHandle<Class>> entry : map.entrySet()) {
                Object object = resultset.getObject(entry.getValue().annonationName);
                entry.getValue().field.set(student, object);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
        return student;
    }


}
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * Created by cuti on 2018/11/7.
 */
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface JDBCField {
    String description() default "field name in db";

}
import java.io.Serializable;
import java.lang.reflect.Field;

/**
 * Created by cuti on 2018/11/30.
 */
public class AnnonationHandle<T> implements Serializable{

    Field field;
    String annonationName;
    Class<T> fieldType;

    public AnnonationHandle(Field field, String annonationName, Class<T> fieldType) {
        this.field = field;
        this.annonationName = annonationName;
        this.fieldType = fieldType;
    }
}

实现细节:
1、使用反射和注解实现了如何从resultset中取出对应字段的数据。
2、每个table对应一个分区。将sql中的{index}特殊字段替换成对应的index

上一篇下一篇

猜你喜欢

热点阅读