filebeat采集日志到Kafka,Java消费Kafka,转
2019-11-13 本文已影响0人
莫夏_b560
1、采集到的日志通过kafka可以确定的是json格式,序列化不存在问题
2、kafka消费配置问题排查:发现消费的反序列化是StringDeserializer,所以在转换JSONObject的时候一直报错。
备注:序列化
public class JsonSerialize implements Serializer<JSONObject> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, JSONObject data) {
try {
return data.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return null;
}
}
@Override
public void close() {
}
}
反序列化
public class JsonDeserialize implements Deserializer<JSONObject> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public JSONObject deserialize(String topic, byte[] data) {
JSONObject obj = null;
try {
obj = new JSONObject(new String(data,"UTF-8"));
} catch (JSONException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return obj;
}
@Override
public void close() {
}
}