Raku Programming Language

发送数据到 socket

2018-11-16  本文已影响0人  焉知非鱼

Spark Streaming 有时候需要使用 nc -lk 9999 开启一个终端来手动键入一些数据供 Streaming 来拉取数据,这种方式不能很好地模拟实时流,所以使用 Perl 6 的 react .. wheneverPromise 来搞很合适:

my $vin = 'LSJA0000000000091';
my $last_meter = 0;             # 当前里程数

react {
    whenever IO::Socket::Async.listen('0.0.0.0', 3333) -> $conn {
        loop {
            react {
                whenever Supply.interval(1) {
                    $conn.print: sprintf("\{'vin':'%s','createTime':%s,'mileage':%s}\n", $vin, DateTime.now.posix, $last_meter);
                }
        
                whenever Promise.in(5) {
                    done;
                }
            
                whenever signal(SIGINT) {
                    say "Done.";
                    done;
                }
            } 
        sleep 10;
        }
    }
    CATCH {
        default {
            say .^name, ': ', .Str;
            say "handled in $?LINE";
        }
    }
}

上面的代码会不断发送

{'vin':'LSJA0000000000091','createTime':1542358572,'mileage':0}

这样的带当前时间戳的数据。怎么验证能不能接收到数据呢?使用 telnet

telnet 0.0.0.0 3333

或者拷贝一个 Streaming Demo:

package com.github.yuvalitzchakov.structuredstateful

import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}


/**
  * 
  */
object readSocket {

  def main(args: Array[String]): Unit = {

    val host = "127.0.0.1"
    val port = 3333

    val spark: SparkSession = SparkSession.builder
      .master("local[*]")
      .appName("Stateful Structured Streaming")
      .getOrCreate()

    import spark.implicits._

    val ds: Dataset[String] = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()
      .as[String]

    ds.writeStream
      .outputMode(OutputMode.Append())
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .format("console")
      .option("truncate", "false") // 不截断显示
      .start()
      .awaitTermination()
  }
}
上一篇下一篇

猜你喜欢

热点阅读