flume自定义输出sink&多线程

2019-10-22  本文已影响0人  王金松

场景

一般情况下 我们通过flume采集程序 把数据写入到另一个kafka或者es当中,但是这次我们有一个需求,是把数据进行解析之后写入到图数据库neo4j中,但是flume不支持这种sink,所以需要我们自定义sink来实现

实现

配置文件

a1.sources=r1
a1.channels=c1
a1.sinks=k1 k2

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource  
a1.sources.r1.batchSize = 500  
a1.sources.r1.batchDurationMillis = 2000  
a1.sources.r1.kafka.bootstrap.servers = 127.0.0.1:9092
a1.sources.r1.kafka.topics = a-topic,b-topic
a1.sources.r1.kafka.consumer.group.id = graph

a1.sinks.k1.type = com.jd.sink.CsaNeo4jSink
a1.sinks.k1.graph.name = preattackgraph
a1.sinks.k1.graph.batchsize = 500
a1.sinks.k1.graph.jdclass = 2,3,8,29,30,33,37
a1.sinks.k1.graph.neo4j.url = bolt://127.0.0.1:7687
a1.sinks.k1.graph.neo4j.username = neo4j
a1.sinks.k1.graph.neo4j.password = neo4j
a1.sinks.k1.graph.redis.maxactive = 100
a1.sinks.k1.graph.redis.maxidel = 10
a1.sinks.k1.graph.redis.maxwait = 2000
a1.sinks.k1.graph.redis.ip = 127.0.0.1
a1.sinks.k1.graph.redis.port = 6379
a1.sinks.k1.graph.redis.password = root
a1.sinks.k1.graph.redis.expired = 300
a1.sinks.k2.type = com.jd.sink.CsaNeo4jSink
a1.sinks.k2.graph.batchsize = 500
a1.sinks.k2.graph.name = preattackgraph
a1.sinks.k2.graph.neo4j.url = bolt://127.0.0.1:7687
a1.sinks.k2.graph.neo4j.username = neo4j
a1.sinks.k2.graph.neo4j.password = neo4j
a1.sinks.k2.graph.redis.maxactive = 100
a1.sinks.k2.graph.redis.maxidel = 10
a1.sinks.k2.graph.redis.maxwait = 2000
a1.sinks.k2.graph.redis.ip = 127.0.0.1
a1.sinks.k2.graph.redis.port = 6379
a1.sinks.k2.graph.redis.password = root
a1.sinks.k2.graph.redis.expired = 300

a1.channels.c1.type=memory
a1.channels.c1.keep-alive=60
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=10000

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1

自定义抽象的neo4jsink

public abstract class Neo4jSink extends AbstractSink implements Configurable {

    private static final Logger logger = LoggerFactory.getLogger(Neo4jSink.class);

    public static final String CSA_GRAPH_ATTACK_PREFIX = "csa_graph_attack_";
    public static final String CSA_GRAPH_PROCESS_PREFIX = "csa_graph_process_";
    Integer batchSize;
    Integer maxActive;
    Integer maxIdel;
    Integer maxWait;
    String reidsUrl;
    Integer port;
    String password;
    Integer expired;

    Neo4jStore client;
    RedisPoolStore redisPool;

    Context context;


    @Override
    public synchronized void start() {
        super.start();
        try {
            client = new Neo4jStore(context);
            logger.info("graph connect success");

            redisPool = new RedisPoolStore(maxActive, maxIdel, maxWait, reidsUrl, port, password);
            redisPool.init();
        } catch (Exception e) {
            logger.error("graph connect error", e);
        }
    }

    public abstract Status process();

    @Override
    public synchronized void stop() {
        super.stop();
    }

    @Override
    public void configure(Context context) {
        this.context = context;
        batchSize = context.getInteger("graph.batchsize");
        if (batchSize == null)
            batchSize = 1000;
        maxActive = context.getInteger("graph.redis.maxactive");
        maxIdel = context.getInteger("graph.redis.maxidel");
        maxWait = context.getInteger("graph.redis.maxwait");
        reidsUrl = context.getString("graph.redis.ip");
        port = context.getInteger("graph.redis.port");
        password = context.getString("graph.redis.password");
        expired = context.getInteger("graph.redis.expired");
        otherInit(context);
    }

    public void otherInit(Context context) {

    }

    public String generateNetKey(String srcIp, int srcPort, String destIp, int destPort, long window, String serverId) {
        return srcIp + "-" + serverId;
    }


    public enum GraphLabel {
        PROCESS("process"),
        IP("ip"),
        EVENT("event"),
        SERVER("server"),
        SOCKET("socket"),
        NETLINK("netlink"),
        SERVERLINK("serverlink"),
        IN("in");

        GraphLabel(String name) {
            this.name = name;
        }

        private String name;

        public void setName(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }
}

实现类

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jd.datasource.RedisManager;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.mapdb.Atomic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class CsaNeo4jSink extends Neo4jSink {
    private static final Logger logger = LoggerFactory.getLogger(CsaNeo4jSink.class);

    /*String event_sql = "UNWIND $message as row\n" +
            "MERGE (:server {serverName:row.serverName, serverId:row.serverId,vid:row.serverVid,floatingIp:row.floatingIp, fixedIp:row.fixedIp, dataCenter:row.dataCenter, pin:row.pin})\n" +
            "MERGE (:attacker {country:row.country, city:row.city,province:row.province,latitude:row.latitude, longitude:row.longitude, ip:row.ip})\n" +
            "MERGE (:event {time:toInteger(row.time), status:toInteger(row.status),source:row.source,srcIp:row.srcIp, srcPort:toInteger(row.srcPort), destIp:row.destIp, destPort:toInteger(row.destPort), vid:row.eventVid,eventId:row.eventId, jdclass:toInteger(row.jdclass), severity:toInteger(row.severity), pin:row.pin, window:toInteger(row.window),serverId:row.serverId, direction:toInteger(row.direction)});";
*/

    String event_sql = "UNWIND $message as row\n" +
            "MERGE (s:server{serverId:row.serverId})\n" +
            "on create set s.serverName=row.serverName,s.serverId=row.serverId,s.vid=row.serverVid,s.floatingIp=row.floatingIp,s.fixedIp=row.fixedIp,s.dataCenter=row.dataCenter,s.pin=row.pin\n" +
            "on match set s.serverName=row.serverName,s.serverId=row.serverId,s.vid=row.serverVid,s.floatingIp=row.floatingIp,s.fixedIp=row.fixedIp,s.dataCenter=row.dataCenter,s.pin=row.pin\n" +
            "MERGE (a:attacker{ip:row.ip})\n" +
            "on create set a.country=row.country,a.city=row.city,a.province=row.province,a.latitude=row.latitude,a.longitude=row.longitude,a.ip=row.ip\n" +
            "on match set a.country=row.country,a.city=row.city,a.province=row.province,a.latitude=row.latitude,a.longitude=row.longitude,a.ip=row.ip\n" +
            "MERGE (e:event {time:toInteger(row.time), status:toInteger(row.status),source:row.source,srcIp:row.srcIp, srcPort:toInteger(row.srcPort), destIp:row.destIp, destPort:toInteger(row.destPort), vid:row.eventVid,eventId:row.eventId, jdclass:toInteger(row.jdclass), severity:toInteger(row.severity), pin:row.pin, window:toInteger(row.window),serverId:row.serverId, direction:toInteger(row.direction)});";

    String attacker2event_sql = "UNWIND $message as row\n" +
            "MATCH (att:attacker {ip:row.ip})\n" +
            "MATCH (e:event {vid:row.eventVid})\n" +
            "MERGE (att)-[:attacker2event{eid:row.attacker2eventId}]->(e);";

    String event2attacker_sql = "UNWIND $message as row\n" +
            "MATCH (e:event {vid:row.eventVid})\n" +
            "MATCH (att:attacker {ip:row.ip})\n" +
            "MERGE (e)-[:event2attacker{eid:row.event2attackerId}]->(att);";

    String event2server_sql = "UNWIND $message as row\n" +
            "MATCH (e:event {vid:row.eventVid})\n" +
            "MATCH (ser:server {vid:row.serverVid})\n" +
            "MERGE (e)-[:event2server{eid:row.event2serverId}]->(ser);";

    String server2event_sql = "UNWIND $message as row\n" +
            "MATCH (ser:server {vid:row.serverVid})\n" +
            "MATCH (e:event {vid:row.eventVid})\n" +
            "MERGE (ser)-[:server2event{eid:row.server2eventId}]->(e);";

    private List<String> jdClasses;

    @Override
    public Status process(){
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        Event event;
        String body = "";
        List<JSONObject> eventParamsList = new ArrayList<>();
        List<JSONObject> attacker2eventList = new ArrayList<>();
        List<JSONObject> event2serverList = new ArrayList<>();
        List<JSONObject> server2eventList = new ArrayList<>();
        List<JSONObject> event2attackerList = new ArrayList<>();
        try {
            Map<String, String> redisData = new HashMap<>();
            for (int i = 0; i < batchSize; i++) {

                event = channel.take();
                if (event != null) {
                    try {
                        body = new String(event.getBody(), "utf-8");
                        JSONObject bodyObj = JSON.parseObject(body);
                        JSONObject jdcloud_alert = bodyObj.getJSONObject("jdcloud_alert");
                        Integer jdclass = jdcloud_alert.getInteger("jd_class");
                        if (jdClasses == null || jdClasses.contains(jdclass + "")) {
                            Integer src_port = bodyObj.getInteger("src_port");
                            Integer dest_port = bodyObj.getInteger("dest_port");
                            if (src_port == null)
                                src_port = 0;
                            if (dest_port == null)
                                dest_port = 0;
                            String peerIp = jdcloud_alert.getString("peer_ip");
                            if (peerIp == null)
                                peerIp = "";
                            String floattingIp = jdcloud_alert.getString("floating_ip");
                            if (floattingIp == null)
                                floattingIp = "";
                            String fixedIp = jdcloud_alert.getString("fixed_ip");
                            String lip = fixedIp;
                            Integer direction = jdcloud_alert.getInteger("direction");
                            Integer severity = jdcloud_alert.getInteger("severity");
                            String eventId = jdcloud_alert.getString("event_id");
                            if (eventId == null)
                                eventId = "";
                            String serverId = jdcloud_alert.getString("server_id");
                            long timestamp = jdcloud_alert.getLong("timestamp");
                            String pin = jdcloud_alert.getString("pin");
                            if (pin == null)
                                pin = "";
                            if (serverId == null)
                                serverId = "";
                            String dataCenter = jdcloud_alert.getString("data_center");
                            String serverVid = dataCenter + "-" + serverId;
                            String serverName = jdcloud_alert.getString("server_name");
                            JSONObject geo = bodyObj.getJSONObject("peer_ip_geo");
                            String uid = "";
                            long window = timestamp / 1000;
                            window = window - (window % 300);
                            if (jdclass != 38) {
                                uid = generateNetKey(peerIp, src_port, lip, dest_port, window, serverId);
                            } else {
                                uid = generateNetKey(lip, src_port, peerIp, dest_port, window, serverId);
                            }
                            Integer status = jdcloud_alert.getInteger("status");
                            if (status == null)
                                status = 0;
                            // if (StringUtils.isNotEmpty(serverId) && StringUtils.isNotEmpty(peerIp) && StringUtils.isNotEmpty(eventId) && StringUtils.isNotEmpty(uid)) {
                            if (StringUtils.isEmpty(serverId))
                                continue;
                            redisData.put(CSA_GRAPH_ATTACK_PREFIX + uid, eventId);
                            String eventVid = uid + "-" + eventId;
                            JSONObject eventParams = new JSONObject();
                            generateAttacker(eventParams, peerIp, geo);
                            generateServer(eventParams, serverName, serverId, serverVid, floattingIp, fixedIp, dataCenter, pin);
                            generateEvent(eventParams, timestamp, status, jdcloud_alert.getString("source"), peerIp, src_port,
                                    floattingIp, dest_port, eventVid, eventId, jdclass, severity, window, direction);

                            eventParamsList.add(eventParams);
                            // if (direction == null || 2 == direction) {
                            if (jdclass != 2 && jdclass != 3) {
                                JSONObject attacker2eventParams = new JSONObject();
                                JSONObject event2serverParams = new JSONObject();
                                attacker2eventParams.put("attacker2eventId", eventId);
                                attacker2eventParams.put("ip", peerIp);
                                attacker2eventParams.put("eventVid", eventVid);

                                event2serverParams.put("event2serverId", eventId);
                                event2serverParams.put("eventVid", eventVid);
                                event2serverParams.put("serverVid", serverVid);

                                attacker2eventList.add(attacker2eventParams);
                                event2serverList.add(event2serverParams);

                            } else {
                                JSONObject event2attackerParams = new JSONObject();
                                JSONObject server2eventParams = new JSONObject();
                                event2attackerParams.put("event2attackerId", eventId);
                                event2attackerParams.put("ip", peerIp);
                                event2attackerParams.put("eventVid", eventVid);

                                server2eventParams.put("server2eventId", eventId);
                                server2eventParams.put("eventVid", eventVid);
                                server2eventParams.put("serverVid", serverVid);

                                event2attackerList.add(event2attackerParams);
                                server2eventList.add(server2eventParams);
                            }
                        }
                    } catch (Exception e) {
                        logger.error("log parse error " + body, e);
                    }
                }
            }

            // logger.info("{}, {}", eventParamsList.size()+"", attacker2eventList.size());
            if (eventParamsList.size() > 0) {
                client.execute(eventParamsList, event_sql);
            }
            if (attacker2eventList.size() > 0) {
                client.execute(attacker2eventList, attacker2event_sql);
            }

            if (event2serverList.size() > 0) {
                client.execute(event2serverList, event2server_sql);
            }

            if (server2eventList.size() > 0) {
                client.execute(server2eventList, server2event_sql);
            }

            if (event2attackerList.size() > 0) {
                client.execute(event2attackerList, event2attacker_sql);
            }
            transaction.commit();
            RedisManager redisManager = new RedisManager(redisPool);
            redisManager.batchSet(redisData, expired);
            redisManager.close();
            return Status.READY;
        } catch (Throwable e) {
            logger.error("Failed to commit transaction." +
                    "Transaction rolled back.", e);
            try {
                transaction.rollback();
            } catch (Exception e2) {
                logger.error("Exception in rollback. Rollback might not have been" +
                        "successful.", e2);
            }
            // Throwables.propagate(e);
            return Status.BACKOFF;
        } finally {
            transaction.close();
        }
    }

    private void generateServer(JSONObject params, String serverName, String serverId, String vid, String floatingIp, String fixedIp, String dataCenter, String pin) {
        params.put("serverName", serverName);
        params.put("serverId", serverId);
        params.put("serverVid", vid);
        params.put("floatingIp", floatingIp);
        params.put("fixedIp", fixedIp);
        params.put("dataCenter", dataCenter);
        params.put("pin", pin);
    }

    private void generateAttacker(JSONObject params, String peerIp, JSONObject geo) {
        if (geo != null) {
            String country = geo.getString("country");
            String city = geo.getString("city");
            String province = geo.getString("province");
            String latitude = geo.getString("latitude");
            String longitude = geo.getString("longitude");
            params.put("country", country == null ? "": country);
            params.put("city", city == null ? "" : city);
            params.put("province", province == null ? "" : province);
            params.put("latitude", latitude == null ? "" : latitude);
            params.put("longitude", longitude == null ? "": longitude);
        } else {
            params.put("country", "");
            params.put("city", "");
            params.put("province", "");
            params.put("latitude", "");
            params.put("longitude", "");
        }
        params.put("ip", peerIp);
    }

    private void generateEvent(JSONObject params, long timestamp, int status, String source, String peerIp, Integer src_port,
                               String floattingIp, Integer dest_port, String eventVid, String eventId,
                               int jdclass, int severity, long window, Integer direction) {
        params.put("time", timestamp);
        params.put("status", status);
        params.put("source", source);
        params.put("srcIp", peerIp);
        params.put("srcPort", src_port);
        params.put("destIp", floattingIp);
        params.put("destPort", dest_port);
        params.put("eventVid", eventVid);
        params.put("eventId", eventId);
        params.put("jdclass", jdclass);
        params.put("severity", severity);
        params.put("window", window);
        if (jdclass != 2 && jdclass != 3) {
        // if (direction == null || 2 == direction) {
            params.put("direction", 2);
        } else {
            params.put("direction", 1);
        }
    }

    @Override
    public void otherInit(Context context) {
        String jdclass = context.getString("graph.jdclass");
        if (StringUtils.isEmpty(jdclass)) {
            jdClasses = new ArrayList<>();
        } else {
            jdClasses = Arrays.asList(jdclass.split(","));
        }
    }
}

neo4jStore实现

import com.alibaba.fastjson.JSONObject;
import com.google.api.client.util.Preconditions;
import org.apache.flume.Context;
import org.neo4j.driver.v1.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static org.neo4j.driver.v1.Values.parameters;

public class Neo4jStore {
    private final static Logger logger = LoggerFactory.getLogger(Neo4jStore.class);
    public static final String INDEX_PREFIX = "jdcloud-seclog-merge-stage1-";
    public Neo4jStore(){}

    String url;
    String username;
    String password;
    Driver driver = null;
    public Neo4jStore(Context context) {
        this.url = context.getString("graph.neo4j.url");
        this.username = context.getString("graph.neo4j.username");
        this.password = context.getString("graph.neo4j.password");
        Preconditions.checkNotNull(url, "graph.neo4j.url must be set!!");
        Preconditions.checkNotNull(username, "graph.neo4j.username must be set!!");
        Preconditions.checkNotNull(password, "graph.neo4j.password must be set!!");
        setup();
    }

    public Neo4jStore(String url, String username, String password) {
        this.url = url;
        this.username = username;
        this.password = password;
        Preconditions.checkNotNull(url, "graph.neo4j.url must be set!!");
        Preconditions.checkNotNull(username, "graph.neo4j.username must be set!!");
        Preconditions.checkNotNull(password, "graph.neo4j.password must be set!!");
        setup();
    }


    public void setup() {
        try {
            driver = GraphDatabase.driver(url, AuthTokens.basic(username, password));
        } catch (Exception e) {
            logger.error("init neo4j error", e);
        }
    }

    public void execute(List<JSONObject> message, String sql) {
        if  (driver == null)
            setup();
        try {
            Session session = driver.session();
            session.run(sql, parameters("message", message));
            session.close();
        } catch (Exception e) {
            logger.error("execute neo4j error", e);
        }
    }
    public void close() {
        if (driver != null) driver.close();
    }
}
上一篇 下一篇

猜你喜欢

热点阅读