Spark专题

SparkStreaming从kafka读取数据编码问题(Jav

2018-06-06  本文已影响0人  zhu02365

SparkStreaming从kafka读取文件流时(Java),默认是utf-8的,如果源文件的编码不是utf-8,那就会出现乱码现象,但是kafka的官网参数里没有关于编码的参数,不过kafka的源码里面是有的。源码如下:

class String Encoder(props:VerifiableProperties=null) extends Encoder[String]{

val encoding = if (props==null) "UTF8" else props.getString("serializer.encoding","UTF8")

 根据上面的源码,设置serializer.encoding就可以了,经过测试,设置serializer.encoding确实是有效的。下面是SparkStream从kafka读取数据代码:

...

HashSet_TopicsSet = newHashSet(Arrays.asList("a1_topic","a2_topic"));

Map_KafkaParams = new HashMap();

_KafkaParams.put("metadata.broker.list","dn1:9092,nn0:9092,dn0:9092");

_KafkaParams.put("group.id", "groupid");

_KafkaParams.put("fetch.message.max.bytes","5120000");

_KafkaParams.put("serializer.encoding", "gb18030");

JavaDStream_MessagesLines = KafkaUtils.createDirectStream(_JavaStreamingContext,String.class, String.class,StringDecoder.class,StringDecoder.class,_KafkaParams, _TopicsSet)

.map(newFunction, String>() {

publicString call(Tuple2 tuple2) {

returntuple2._2();

}

});

...

上一篇 下一篇

猜你喜欢

热点阅读