使用golang编写自己的eclipse-mosquitto 验

2023-12-31  本文已影响0人  申_9a33

源码

1. 创建一个mosquitto_auth.proto

syntax = "proto3";

option go_package = "./mosquitto_auth";

package mosquitto_auth;

service Greeter {
  rpc BasicAuth (BasicAuthRequest) returns (BasicAuthReply) {}
  rpc AclCheck (AclCheckRequest) returns (AclCheckReply) {}
}

message BasicAuthRequest {
  string username = 1;
  string password = 2;
  string clientId = 3;
  string clientAddress = 4;
}

message BasicAuthReply {
  int32 code = 1;
}

message AclCheckRequest {
  string username = 1;
  string clientId = 2;
  string topic = 3;
  int32 access = 4;
  int32 qos = 5;
  int32 retain = 6;
}

message AclCheckReply {
  int32 code = 1;
}

2.使用golang创建GRPC客户端,然后将其编译成C语言动态库

package main

/**/
import "C"
import (
    "context"
    "log"
    "mosquitto_auth_plugin/mosq_err"
    pb "mosquitto_auth_plugin/mosquitto_auth"
    "time"

    "google.golang.org/grpc/connectivity"

    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

var clientConn *grpc.ClientConn = nil

//export PluginInit
func PluginInit(addr *C.char) C.int {
    if clientConn.GetState() != connectivity.Shutdown {
        clientConn.Close()
    }

    clientConn, err := grpc.Dial(C.GoString(addr), grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil || clientConn == nil {
        log.Fatalf("[PluginInit] did not connect: %v", err)
        return mosq_err.MOSQ_ERR_UNKNOWN
    }

    return mosq_err.MOSQ_ERR_SUCCESS
}

//export PluginBasicAuth
func PluginBasicAuth(username *C.char, password *C.char, clientId *C.char, clientAddress *C.char) C.int {
    Username := C.GoString(username)
    Password := C.GoString(password)
    ClientId := C.GoString(clientId)
    ClientAddress := C.GoString(clientAddress)

    c := pb.NewGreeterClient(clientConn)

    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()

    r, err := c.BasicAuth(ctx, &pb.BasicAuthRequest{Username: Username, Password: Password, ClientId: ClientId, ClientAddress: ClientAddress})
    if err != nil {
        log.Fatalf("[PluginBasicAuth]could not greet: %v", err)
        return mosq_err.MOSQ_ERR_UNKNOWN
    }

    log.Printf("[PluginBasicAuth] Greeting: %d", r.GetCode())

    return C.int(r.GetCode())
}

//export PluginAclCheck
func PluginAclCheck(username *C.char, clientId *C.char, topic *C.char, access C.int, qos C.int, retain C.int) C.int {
    Username := C.GoString(username)
    ClientId := C.GoString(clientId)
    Topic := C.GoString(topic)
    Access := int32(access)
    Retain := int32(retain)

    c := pb.NewGreeterClient(clientConn)

    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()

    r, err := c.AclCheck(ctx, &pb.AclCheckRequest{Username: Username, ClientId: ClientId, Topic: Topic, Access: Access, Retain: Retain})
    if err != nil {
        log.Fatalf("[PluginAclCheck] could not greet: %v", err)
        return mosq_err.MOSQ_ERR_UNKNOWN
    }

    log.Printf("[PluginAclCheck] Greeting: %d", r.GetCode())

    return C.int(r.GetCode())
}

func main() {}

3.使用c语言动态库方法调用grpc_auth.so

3.1 插件初始化时打开动态库

int mosquitto_plugin_init(mosquitto_plugin_id_t *identifier, void **user_data, struct mosquitto_opt *opts, int opt_count)
{
    // 打开grpc插件
    grpc_handle = dlopen(grpc_path, RTLD_NOW); // 打开指定的动态库
    if (!grpc_handle)
    {
        mosquitto_log_printf(MOSQ_LOG_ERR, "[mosquitto_plugin_init] grpc_path = %s, dlerror = %s", grpc_path, dlerror());
        return MOSQ_ERR_PLUGIN_DEFER;
    }

    // 初始化动态库
    err_code = callback_init();
    if (err_code != MOSQ_ERR_SUCCESS)
    {
        mosquitto_log_printf(MOSQ_LOG_ERR, "[mosquitto_plugin_init] callback_init err = %d", err_code);
        return err_code;
    }

    return err_code;
}

3.2 回调函数调用动态库里面的方法

调用动态库PluginBasicAuth方法

static int callback_basic_auth(int event, void *event_data, void *userdata)
{
    UNUSED(event);
    UNUSED(userdata);

    struct mosquitto_evt_basic_auth *ed = event_data;
    char *client_id = mosquitto_client_id(ed->client);
    char *client_address = mosquitto_client_address(ed->client);

    mosquitto_log_printf(MOSQ_LOG_INFO, "[callback_basic_auth] username=%s, password=%s, client_id=%s, client_address=%s", ed->username, ed->password, client_id, client_address);

    PluginBasicAuth pluginBasicAuth = (PluginBasicAuth)dlsym(grpc_handle, "PluginBasicAuth");
    if (!pluginBasicAuth)
    {
        mosquitto_log_printf(MOSQ_LOG_ERR, "[mosquitto_plugin_init] PluginBasicAuth undefined");
        return MOSQ_ERR_PLUGIN_DEFER;
    }

    return pluginBasicAuth(ed->username, ed->password, client_id, client_address);
}

调用动态库PluginAclCheck方法

static int callback_acl_check(int event, void *event_data, void *userdata)
{
    UNUSED(event);
    UNUSED(userdata);

    struct mosquitto_evt_acl_check *ed = event_data;
    char *username = mosquitto_client_username(ed->client);
    char *client_id = mosquitto_client_id(ed->client);
    char *topic = ed->topic;
    int access = ed->access;
    int qos = ed->qos;
    int retain = ed->retain ? 1 : 0;

    mosquitto_log_printf(MOSQ_LOG_INFO, "[callback_acl_check] username=%s, client_id=%s, topic=%s, access=%d, qos=%d, retain=%d", username, client_id, topic, access, qos, retain);

    PluginAclCheck pluginAclCheck = (PluginAclCheck)dlsym(grpc_handle, "PluginAclCheck");
    if (!pluginAclCheck)
    {
        mosquitto_log_printf(MOSQ_LOG_ERR, "[mosquitto_plugin_init] PluginAclCheck undefined");
        return MOSQ_ERR_PLUGIN_DEFER;
    }

    return pluginAclCheck(username, client_id, topic, access, qos, retain);
}

3.3 插件结束时关闭动态库

int mosquitto_plugin_cleanup(void *user_data, struct mosquitto_opt *opts, int opt_count)
{
    UNUSED(user_data);
    UNUSED(opts);
    UNUSED(opt_count);
    int err_code = MOSQ_ERR_SUCCESS;

    dlclose(grpc_handle);

    return MOSQ_ERR_SUCCESS;
}

4.配置加载动态库参数

// mosquitto.conf

plugin /mosquitto/dll/mosquitto_payload_modification.so
plugin_opt_grpc_path /mosquitto/dll/grpc_auth.so
plugin_opt_grpc_addr 127.0.0.1:10086
上一篇 下一篇

猜你喜欢

热点阅读