数客联盟我爱编程

Spark Streaming中简单粗暴执行TensorFlow

2017-09-28  本文已影响344人  biggeng

Spark Streaming =>很火,在流处理中得到了广泛的应用。TensorFlow=>很火,由Google大神开源,目前已经在深度学习领域展现了超高的流行潜质。那么如何在Spark Streaming中调用TensorFlow?笔者此文尝试使用了简单粗暴的方式在Spark Streaming中调用TensorFlow.

1. 需求和目标

笔者已经实现了一个基于Spark Streaming的流处理平台,能够对Kafka输入的流数据进行运算,并且可以通过Sql的方式对数据进行过滤和逻辑判断。
笔者玩了一段时间的TensorFlow,一直在琢磨在机器学习的Model Serving阶段,能否由Kafka喂数据,将TF融入到已有的流处理中,流入的每条数据都可以进行predict,处理结果还能通过Spark DataFrame进行SQL配置过滤。

2. 第一次尝试,失败。

因为在已有的Spark Streaming代码框架中,有一个自定义的函数接口,能够依次处理RDD的每一个record,所以刚开始考虑在此函数接口中通过调用Python脚本实现调用TensorFlow。
步骤如下:

//加入依赖库
import sys.process._
    // 执行TF hello world脚本
    val result = "sudo python /data/tf/hello.py".!!
    // 把执行脚本的结果输出到Map,最终在另外的地方输出到Kafka中
    labelMap += (isAILabel -> result.toString)

hello.py脚本非常简单,就是简单的调用TensorFlow的hello world.

import tensorflow as tf
hello = tf.constant('Hello, TensorFlow!')
sess = tf.Session()
print sess.run(hello)
3. 第二种简单粗暴方式 RDD pipe.

对每一条数据执行TF脚本不可取,那么换个思路,针对整个RDD进行处理,尝试使用了RDD pipe的方法进行处理。在测试中性能比上一种方法有了极大的提升,基本上在几秒内处理完成。
方法如下:

val piped = labelRDD.pipe("sudo python /data/tf/hello.py", Map("SEPARATOR" -> ","))
val aiRDD = labelRDD.zipPartitions(piped){
      (rdd1Iter,rdd2Iter) => {
           var result = List[String]()
           while(rdd1Iter.hasNext && rdd2Iter.hasNext) {
               val firstStr = rdd1Iter.next()
               result::=(firstStr.substring(0, firstStr.length - 1) + "," + rdd2Iter.next() + "}")
                }
                result.iterator
              }
            }
import sys
import json
import tensorflow as tf

hello = tf.constant('Hello, TensorFlow!')
sess = tf.Session()

while True:
  line = sys.stdin.readline()
  if not line:
     break
  data = json.loads(line)
  if (data['f1'] == 'f1'):
      print '"aiout":"' + sess.run(hello) + '"'
  else:
      print '"aiout":"No"'

NOTE: 本文只是简单的在Spark streaming中拉起Python调用TensorFlow,后续还需要从API层面看如何能够更好的集成。

上一篇 下一篇

猜你喜欢

热点阅读