二、 DataSetAPI

2019-05-20  本文已影响0人  木戎

编程结构


public class SocketTextStreamWordCount {

    public static void main(String[] args) throws Exception {
        if (args.length != 2){
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }
        String hostName = args[0];
        Integer port = Integer.parseInt(args[1]);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();
        DataStream<String> text = env.socketTextStream(hostName, port);

        DataStream<Tuple2<String, Integer>> counts 
        text.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);
        counts.print();
        env.execute("Java WordCount from SocketTextStream Example");
    }

DataSet API


分类

DataSet Source

基于文件

基于集合

fromCollection(Collection) 从Java Java.util.Collection创建数据集。集合中的所有数据元必须属于同一类型。

fromCollection(Iterator, Class) 从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。

fromElements(T ...) 根据给定的对象序列创建数据集。所有对象必须属于同一类型。

fromParallelCollection(SplittableIterator, Class) 并行地从迭代器创建数据集。该类指定迭代器返回的数据元的数据类型。

generateSequence(from, to) 并行生成给定间隔中的数字序列。

通用方法

代码示例

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 从本地文件系统读
        DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");

        // 读取HDFS文件
        DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");

        // 读取CSV文件
        DataSet<Tuple3<Integer, String, Double>> csvInput1 = env.readCsvFile("hdfs:///the/CSV/file").types(Integer.class, String.class, Double.class);

        // 读取CSV文件中的部分
        DataSet<Tuple2<String, Double>> csvInput2 = env.readCsvFile("hdfs:///the/CSV/file").includeFields("10010").types(String.class, Double.class);


        // 读取CSV映射为一个java类
//    DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file").pojoType(Person.class, "name", "age", "zipcode");

        // 读取一个指定位置序列化好的文件
//    DataSet<Tuple2<IntWritable, Text>> tuples =
//            env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");

        // 从输入字符创建
        DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");

        DataSource<Long> fromParallelCollection = env.fromParallelCollection(new NumberSequenceIterator(10, 20), Long.class);

        // 创建一个数字序列
        DataSet<Long> numbers = env.generateSequence(1, 10000000);

        // 从关系型数据库读取
//    DataSet<Tuple2<String, Integer> dbData =
//            env.createInput(JDBCInputFormat.buildJDBCInputFormat().setDrivername("org.apache.derby.jdbc.EmbeddedDriver").setDBUrl("jdbc:derby:memory:persons")
//                    .setQuery("select name, age from persons")
//                    .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
//                    .finish());

DataSet Transformation

详情参考官网:数据集转换

data.map(new MapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});
data.flatMap(new FlatMapFunction<String, String>() {
  public void flatMap(String value, Collector<String> out) {
    for (String s : value.split(" ")) {
      out.collect(s);
    }
  }
});
上一篇 下一篇

猜你喜欢

热点阅读