大数据

Camel笔记:链式传递与扇出

2021-05-14  本文已影响0人  阿乐_822e

本文接上篇Camel笔记(从Mysql到本地文件与Kafka队列) - 简书 (jianshu.com)

完成了通过页面触发,将Mysql的数据写入本地文件和Kafka队列后,再了解一下链式传递与扇出(fan out,一对多输出)

//Kafka,Mysql--->Kafka&Filesystem
        from("direct:kafka").to("sql:select * from employee").process(new Processor() {
            public void process(Exchange xchg) throws Exception {
                ArrayList<Map<String, String>> dataList = (ArrayList<Map<String, String>>) xchg.getIn().getBody();
                List<Employee> employees = new ArrayList<Employee>();
                System.out.println(dataList);
                for (Map<String, String> data : dataList) {
                    Employee employee = new Employee();
                    employee.setEmpId(data.get("empId"));
                    employee.setEmpName(data.get("empName"));
                    employees.add(employee);
                }
                xchg.getIn().setBody(employees.toString());
            }
        }).multicast().to(toKafka,"file:data/outbox").process(new Processor() {
            public void process(Exchange exchange) throws Exception {
                System.out.println("it is :"+toKafka);
            }
        });
上一篇 下一篇

猜你喜欢

热点阅读