Hive中UDTF的实现方式调研

2019-04-01  本文已影响0人  分裂四人组

简单UDTF的实现

实现基于切割字符串并生成多行数据。

package com.alipay.mbaprod.spark.udtf;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

public class SimpleUDTF extends GenericUDTF {
  private PrimitiveObjectInspector stringOI = null;

  @Override
  public void configure(MapredContext context) {
    super.configure(context);
    Configuration conf = context.getJobConf();
  }

  @Override
  public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
    if (args.length != 1) {
      throw new UDFArgumentException("SimpleUDTF() should have 1 arguments");
    }

    for (int i = 0; i < args.length; ++i) {
      if (args[i].getCategory() != ObjectInspector.Category.PRIMITIVE ||
          !args[i].getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
        throw new UDFArgumentException("SimpleUDTF()'s arguments have to be string type");
      }
    }

    stringOI = (PrimitiveObjectInspector) args[0];

    List<String> fieldNames = new ArrayList<String>(2);
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);

    fieldNames.add("col1");
    fieldNames.add("col2");
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
  }

  @VisibleForTesting
  public List<Object[]> processInputRecord(String id){
    ArrayList<Object[]> result = new ArrayList<Object[]>();
    // ignoring null or empty input
    if (id == null || id.isEmpty()) {
      return result;
    }
    String[] lines = id.split("\n");
    for (String line : lines) {
      String[] tokens = line.trim().split("\\s+");
      if (tokens.length == 2){
        result.add(new Object[] { tokens[0], tokens[1]});
      } else if (tokens.length == 3){
        result.add(new Object[] { tokens[0], tokens[1]});
        result.add(new Object[] { tokens[0], tokens[2]});
      }
    }
    return result;
  }

  @Override
  public void process(Object[] args) throws HiveException {
    final String id = stringOI.getPrimitiveJavaObject(args[0]).toString();
    List<Object[]> results = processInputRecord(id);
    Iterator<Object[]> it = results.iterator();
    while (it.hasNext()) {
      Object[] r = it.next();
      forward(r);
    }
  }

  @Override
  public void close() throws HiveException {
    // TODO Auto-generated method stub
  }
}

上一篇下一篇

猜你喜欢

热点阅读