flume自定义Inteceptor

2019-10-22  本文已影响0人  王金松
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.jd.common.Location;
import com.jd.constant.DataBaseConstant;
import com.jd.constant.Globals;
import com.jd.datasource.MysqlManager;
import com.jd.datasource.MysqlPoolManager;
import com.jd.entity.HoneypotAttack;
import com.jd.util.DateUtils;
import com.jd.util.GeoHelper;
import com.jd.util.IDGeneratorUtils;
import com.jd.util.ListUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.source.kafka.KafkaSourceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class InternalAssetInterceptor implements Interceptor {
    private static final Logger logger = LoggerFactory
            .getLogger(InternalAssetInterceptor.class);

    private Charset charset = Charsets.UTF_8;
    private Context context;
    private MysqlPoolManager mysqlPool = null;
    int expiredTime = 3600;
    private static Map<String, String> internalAssetMap = new HashMap<>();



    public InternalAssetInterceptor(Context context){
        Integer expired = context.getInteger("dataexpired");
        if (expired != null) {
            expiredTime = expired;
        }
        this.context = context;
    }

    @Override
    public void initialize() {
        try {
            mysqlPool = new MysqlPoolManager(context);
            mysqlPool.init();
            initAttackDic();
            Thread t = new Thread(() -> {
                try {
                    Thread.sleep(expiredTime * 1000);
                } catch (Exception e) {}
                logger.info("start sync attackInfo");
                initAttackDic();
            });
            t.setDaemon(true);
            t.start();
        } catch (Exception e) {
            logger.error("flume agent init failed", e);
            throw new RuntimeException(e);
        }
    }

    private void initAttackDic() {
        List<Map<String, Object>> mapList = new MysqlManager(mysqlPool).queryForList("select * from op_np_pin");
        for (Map<String, Object> map: mapList) {
            try {
                String pin = map.get("pin").toString();
                String ip = map.get("ip").toString();
                internalAssetMap.put(ip, pin);
            } catch (Exception e) {
                logger.error("sync interval asset error {}", map);
            }
        }
        mapList = null;
        logger.info("sync internal asset success");
    }

    @Override
    public Event intercept(Event event){
        try {
            String oldBody = new String(event.getBody(), charset);
            JSONObject body = JSON.parseObject(oldBody);
            JSONObject jdcloud_alert = body.getJSONObject("jdcloud_alert");
            String floattingIp = jdcloud_alert.getString("floating_ip");
            String pin = jdcloud_alert.getString("pin");
            if (StringUtils.isEmpty(pin)) {
                String depart = internalAssetMap.getOrDefault(floattingIp, "");
                jdcloud_alert.put("pin", depart);
            }
            event.setBody(body.toJSONString().getBytes(charset));
            return event;
        }catch (Exception e) {
            logger.error("Failed to process event " + event, e);
            event = null;
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> out = Lists.newArrayList();
        for (Event event : events) {
            Event outEvent = intercept(event);
            if (outEvent != null) {
                out.add(outEvent);
            }
        }
        return out;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        private Context context;

        @Override
        public Interceptor build() {
            return new InternalAssetInterceptor(context);
        }

        @Override
        public void configure(Context context) {
            this.context = context;
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读