filebeat采集日志到Kafka,Java消费Kafka,转

2019-11-13  本文已影响0人  莫夏_b560

1、采集到的日志通过kafka可以确定的是json格式,序列化不存在问题

2、kafka消费配置问题排查:发现消费的反序列化是StringDeserializer,所以在转换JSONObject的时候一直报错。

备注:序列化

public class JsonSerialize implements Serializer<JSONObject> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, JSONObject data) {
        try {
            return data.toString().getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public void close() {
    }
}

反序列化

public class JsonDeserialize implements Deserializer<JSONObject> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public JSONObject deserialize(String topic, byte[] data) {
        JSONObject obj = null;
        try {
            obj = new JSONObject(new String(data,"UTF-8"));
        } catch (JSONException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return obj;
    }

    @Override
    public void close() {
    }
}
上一篇 下一篇

猜你喜欢

热点阅读