spark相关

关于spark往elasticsearch等外部源输出的问题

2019-02-21  本文已影响0人  黄药师ii

一, 对于spark流的操作
1, spark流是由很多rdd组成的, 所以要调用foreachRDD
2, spark流是由很多rdd组成, 而这些rdd是被rdd action惰性执行的, 所以在里面定义的资源也惰性加载, 性能才会好.

******spark流输出es******

dstream.foreachRDD(rdd -> {
  //这里是drive空间, 初始化了没用
  rdd.foreachPartition(partitionOfRecords -> {
    ESLazyHolder.esurl = esurl;
    while (partitionOfRecords.hasNext()) {
      TransportClient client = ESLazyHolder.getInstance();
      partitionOfRecords.next();
      ......
    }
  });
});

******es惰性加载的类******

public class ESLazyHolder
{
    public volatile static String esurl;

    private static class LazyHolder
    {
        private final static transient TransportClient client = ESUtil.getClient(esurl);
        static
        {
            SysUtil.addhook(client);
        }
    }

    public static final TransportClient getInstance()
    {
        return LazyHolder.client;
    }
}

二, 对于rdd操作就更简单了, 不需要foreachRDD, 也不需要惰性加载

  rdd.foreachPartition(partitionOfRecords -> {
    if (partitionOfRecords.hasNext())
      try (final TransportClient client = ESUtil.getClient(esurl);)
      {
        while (partitionOfRecords.hasNext())
        {
          partitionOfRecords.next();
          ......
        }
      }
  });

后经测试, rdd在10个分片的情况下, 只初始化了7次, 实际用惰性加载在rdd的情况下也有更好的性能.

上一篇 下一篇

猜你喜欢

热点阅读