「物联网」编译emqtt桥接kafka消息队列插件

2018-05-01  本文已影响0人  SmartKarren

前言

emqtt插件开发网络上可参考的地方很少,这里做一个记录,希望能帮你节约一点时间。

引用

  -emqtt
  -emq-plugin-template插件模板
  -参考的emqtt-kafka插件

环境

-编译环境 erlang OTP19
-运行环境 64位Centos7
-emqtt-2.3.7

编译

添加引用

#修改emq-relx的Makefile
DEPS += emqttd emq_modules emq_dashboard emq_retainer emq_recon emq_reloader \
        emq_auth_clientid emq_auth_username emq_auth_ldap emq_auth_http \
        emq_auth_mysql emq_auth_pgsql emq_auth_redis  emq_auth_mongo \
        emq_sn emq_coap emq_stomp emq_plugin_template emq_web_hook \
        emq_lua_hook emq_auth_jwt <font color=#ff0000> emqttd_plugin_kafka_bridge </font>

#添加插件git路径,或者用svn、cp(本地路径)
dep_emqttd_plugin_kafka_bridge = git https://github.com/msdevanms/emqttd_plugin_kafka_bridge master

添加插件自启动

#在emq-relx/data/loaded_plugins文件末尾添加
emqttd_plugin_kafka_bridge.

添加配置

#修改emq-relx的relx.config,只列出添加项
{release, {emqttd, "2.3.7"}, [
  ...
  #添加
  kafkamocker,
  ekaf,
  ranch,
  {emqttd_plugin_kafka_bridge, load},
  #添加结束
  {emq_plugin_template, load}
]}.
{overlay, [
    #添加配置文件
    {copy, "rel/conf/plugins/plugin.config", "etc/plugins/emqttd_plugin_kafka_bridge.config"},
    #添加结束
]}.

运行

$ cd _rel/emqttd/bin
$ ./emqttd start

kafka桥接修改的示例代码如下

%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2016 Huang Rui<vowstar@gmail.com>, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd_plugin_kafka_bridge.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_plugin_kafka_bridge).

-include("../../emqttd/include/emqttd.hrl").

-include("../../emqttd/include/emqttd_protocol.hrl").

-include("../../emqttd/include/emqttd_internal.hrl").

-export([load/1, unload/0]).

%% Hooks functions
-export([on_client_connected/3, on_client_disconnected/3]).

-export([on_client_subscribe/4, on_client_unsubscribe/4]).

-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).

-record(struct, {lst=[]}).

%% Called when the plugin application start
load(Env) ->
    ekaf_init([Env]),
    emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
    emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
    emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
    emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
    emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
    emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
    emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).

%%-----------client connect start-----------------------------------%%

on_client_connected(ConnAck, Client = #mqtt_client{client_id  = ClientId}, _Env) ->
    io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),

    Json = mochijson2:encode([
        {type, <<"connected">>},
        {client_id, ClientId},
        {cluster_node, node()}
%       ,{ts, emqttd_time:now_to_secs()}
    ]),
    
    ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json)),

    {ok, Client}.

%%-----------client connect end-------------------------------------%%



%%-----------client disconnect start---------------------------------%%

on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
%    io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),

    Json = mochijson2:encode([
        {type, <<"disconnected">>},
        {client_id, ClientId},
        {reason, Reason},
        {cluster_node, node()}
%       ,{ts, emqttd_time:now_to_secs()}
    ]),

    ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json)),

    ok.

%%-----------client disconnect end-----------------------------------%%



%%-----------client subscribed start---------------------------------------%%

%% should retain TopicTable
on_client_subscribe(ClientId,Username, TopicTable, _Env) ->
    io:format("client ~s will subscribe ~p~n", [ClientId, TopicTable]),
    
    case TopicTable of
        [_|_] -> 
            %% If TopicTable list is not empty
            Key = proplists:get_keys(TopicTable),
            %% build json to send using ClientId
            Json = mochijson2:encode([
                {type, <<"subscribed">>},
                {client_id, ClientId},
                {topic, lists:last(Key)},
                {cluster_node, node()}
            ]),
            ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json));
        _ -> 
            %% If TopicTable is empty
            io:format("empty topic ~n")
    end,
    {ok, TopicTable}.
   
%on_client_subscribe_after(ClientId, TopicTable, _Env) ->
%    io:format("client ~s subscribed ~p~n", [ClientId, TopicTable]),
    
%    case TopicTable of
%        [_|_] -> 
%            %% If TopicTable list is not empty
%            Key = proplists:get_keys(TopicTable),
%            %% build json to send using ClientId
%            Json = mochijson2:encode([
%                {type, <<"subscribed">>},
%                {client_id, ClientId},
%                {topic, lists:last(Key)},
%                {cluster_node, node()}

%                ,{ts, emqttd_time:now_to_secs()}
%            ]),
%            ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json));
%        _ -> 
%            %% If TopicTable is empty
%            io:format("empty topic ~n")
%    end,

%    {ok, TopicTable}.

%%-----------client subscribed end----------------------------------------%%



%%-----------client unsubscribed start----------------------------------------%%

on_client_unsubscribe(ClientId,Username, Topics, _Env) ->
    io:format("client ~s unsubscribe ~p~n", [ClientId, Topics]),

    % build json to send using ClientId
    Json = mochijson2:encode([
        {type, <<"unsubscribed">>},
        {client_id, ClientId},
        {topic, lists:last(Topics)},
        {cluster_node, node()}
%        ,{ts, emqttd_time:now_to_secs()}
    ]),
    
    ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json)),
    
    {ok, Topics}.

%%-----------client unsubscribed end----------------------------------------%%



%%-----------message publish start--------------------------------------%%

%% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
    {ok, Message};

on_message_publish(Message, _Env) ->
    io:format("publish ~s~n", [emqttd_message:format(Message)]),   

%    From = Message#mqtt_message.from,
%    Sender =  Message#mqtt_message.sender,
    Topic = Message#mqtt_message.topic,
    Payload = Message#mqtt_message.payload, 
    QoS = Message#mqtt_message.qos,
    Timestamp = Message#mqtt_message.timestamp,

    Json = mochijson2:encode([
        {type, <<"published">>},
%        {client_id, From},
        {topic, Topic},
% 如果是二进制 {payload, binary_to_list(Payload)},
        {payload, Payload},
        {qos, QoS},
        {cluster_node, node()}
%        ,{ts, emqttd_time:now_to_secs(Timestamp)}
    ]),

    ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json)),

    {ok, Message}.

%%-----------message delivered start--------------------------------------%%
on_message_delivered(ClientId, Username, Message, _Env) ->
    io:format("delivered to client ~s: ~s~n", [ClientId, emqttd_message:format(Message)]),

%    From = Message#mqtt_message.from,
%    Sender =  Message#mqtt_message.sender,
    Topic = Message#mqtt_message.topic,
    Payload = Message#mqtt_message.payload, 
    QoS = Message#mqtt_message.qos,
    Timestamp = Message#mqtt_message.timestamp,

    Json = mochijson2:encode([
        {type, <<"delivered">>},
        {client_id, ClientId},
%        {from, From},
        {topic, Topic},
% 如果是二进制 {payload, binary_to_list(Payload)},
        {payload, Payload},
        {qos, QoS},
        {cluster_node, node()}
%        ,{ts, emqttd_time:now_to_secs(Timestamp)}
    ]),

    ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json)),

    {ok, Message}.
%%-----------message delivered end----------------------------------------%%

%%-----------acknowledgement publish start----------------------------%%
on_message_acked(ClientId, Username, Message, _Env) ->
    io:format("client ~s acked: ~s~n", [ClientId, emqttd_message:format(Message)]),   

%    From = Message#mqtt_message.from,
%    Sender =  Message#mqtt_message.sender,
    Topic = Message#mqtt_message.topic,
    Payload = Message#mqtt_message.payload, 
    QoS = Message#mqtt_message.qos,
    Timestamp = Message#mqtt_message.timestamp,

    Json = mochijson2:encode([
        {type, <<"acked">>},
        {client_id, ClientId},
%        {from, From},
        {topic, Topic},
% 如果是二进制 {payload, binary_to_list(Payload)},
        {payload, Payload},
        {qos, QoS},
        {cluster_node, node()}
%        ,{ts, emqttd_time:now_to_secs(Timestamp)}
    ]),

    ekaf:produce_sync(<<"broker_message">>, list_to_binary(Json)),
    {ok, Message}.

%% ===================================================================
%% ekaf_init
%% ===================================================================

ekaf_init(_Env) ->
    %% Get parameters
    {ok, Kafka} = application:get_env(emqttd_plugin_kafka_bridge, kafka),
    BootstrapBroker = proplists:get_value(bootstrap_broker, Kafka),
    PartitionStrategy= proplists:get_value(partition_strategy, Kafka),
    %% Set partition strategy, like application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
    application:set_env(ekaf, ekaf_partition_strategy, PartitionStrategy),
    %% Set broker url and port, like application:set_env(ekaf, ekaf_bootstrap_broker, {"127.0.0.1", 9092}),
    application:set_env(ekaf, ekaf_bootstrap_broker, BootstrapBroker),
    %% Set topic
    application:set_env(ekaf, ekaf_bootstrap_topics, <<"broker_message">>),

    {ok, _} = application:ensure_all_started(kafkamocker),
    {ok, _} = application:ensure_all_started(gproc),
    {ok, _} = application:ensure_all_started(ranch),
    {ok, _} = application:ensure_all_started(ekaf),

    io:format("Init ekaf with ~p~n", [BootstrapBroker]).

%% Called when the plugin application stop
unload() ->
    emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
    emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
    emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/3),
    emqttd:unhook('client.subscribe.after', fun ?MODULE:on_client_subscribe_after/3),
    emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/3),
    emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
    emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/3),
    emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/3).

上一篇下一篇

猜你喜欢

热点阅读