使用Berkeley DB实现LinkedBlockingQue

2023-08-10  本文已影响0人  engineer_tang

Berkeley DB是一个开放源代码的内嵌式数据库管理系统,能够为应用程序提供高性能的数据管理服务。应用它程序员只需要调用一些简单的API就可以完成对数据的访问和管理。与常用的数据库管理系统(如MySQL和Oracle等)有所不同,在Berkeley DB中并没有数据库服务器的概念。应用程序不需要事先同数据库服务建立起网络连接,而是通过内嵌在程序中的Berkeley DB函数库来完成对数据的保存、查询、修改和删除等操作。

1. 引入maven依赖

        <dependency>
            <groupId>com.sleepycat</groupId>
            <artifactId>je</artifactId>
            <version>18.3.12</version>
        </dependency>

        <dependency>
            <groupId>de.ruedigermoeller</groupId>
            <artifactId>fst</artifactId>
            <version>2.48-jdk-6</version>
            <exclusions>
                <exclusion>
                    <groupId>commons-logging</groupId>
                    <artifactId>commons-logging</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.cedarsoftware</groupId>
                    <artifactId>json-io</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

2. 具体实现

BdbInitService文件

package com.joe.heightconcurrency.demo.service;

import com.joe.heightconcurrency.demo.config.BdbPropertyConfig;
import com.joe.heightconcurrency.demo.entity.User;
import com.joe.heightconcurrency.demo.util.FstSerialBinding;
import com.sleepycat.collections.StoredMap;
import com.sleepycat.je.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.File;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

@Service
@Slf4j
public class BdbInitService {

    private static Environment env;

    private static DatabaseConfig dbConfig;

    @Autowired
    private BdbPropertyConfig bdbPropertyConfig;

    private ConcurrentHashMap<String, Database> queueDbMap = new ConcurrentHashMap<>();

    private ConcurrentHashMap<String, StoredMap<String, User>> storedMaps = new ConcurrentHashMap<>();

    static {
        EnvironmentConfig envConfig = new EnvironmentConfig();
        envConfig.setAllowCreate(true);
        envConfig.setTransactional(true);
        envConfig.setCacheSize(10000000);
        try {
            env = new Environment(new File("E://develop//bdb"),envConfig);
        } catch (DatabaseException e) {
            e.printStackTrace();
        }
        dbConfig = new DatabaseConfig();
        dbConfig.setAllowCreate(true);
        dbConfig.setTransactional(true);
        dbConfig.setSortedDuplicates(false);
    }

    private Database open(String dbName) {
        Database database = queueDbMap.get(dbName);
        if (Objects.nonNull(database)) {
            return database;
        }
        try {
            database = env.openDatabase(null, dbName, dbConfig);
            Database olddb = queueDbMap.putIfAbsent(dbName, database);
            return Objects.isNull(olddb) ? database : olddb;
        } catch (DatabaseException e) {
            e.printStackTrace();
            return null;
        }
    }

    public StoredMap<String, User> getStoredMap(String queueName) {
        FstSerialBinding<String> messageKeyBinding = new FstSerialBinding<>();
        FstSerialBinding<User> messageValueBinding = new FstSerialBinding<>();
        Database database = open(queueName);
        StoredMap<String, User> tmpMap = new StoredMap<>(database, messageKeyBinding, messageValueBinding, true);
        StoredMap<String, User> oldMap = storedMaps.putIfAbsent(queueName, tmpMap);
        return Objects.isNull(oldMap) ? tmpMap : oldMap;
    }
}

UserQueueService文件

package com.joe.heightconcurrency.demo.service;

import com.joe.heightconcurrency.demo.entity.User;
import com.joe.heightconcurrency.demo.queue.BdbBlockingQueue;
import com.sleepycat.collections.StoredMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.Objects;


@Service
@Slf4j
public class UserQueueService {

    BdbBlockingQueue<User> bdbBlockingQueue;

    @Autowired
    private BdbInitService bdbInitService;

    @PostConstruct
    public void init() {
        String queueName = "user";
        StoredMap<String, User> storedMap =  bdbInitService.getStoredMap(queueName);
        bdbBlockingQueue = new BdbBlockingQueue<>(storedMap);
    }

    public void addUser(User user) {
        if (Objects.isNull(user)) {
            log.error("用户信息不存在");
            return;
        }
        boolean result = bdbBlockingQueue.offer(user);
        if (result) {
            log.info("用户信息放入队列成功");
            log.info("当前队列大小:{}", bdbBlockingQueue.size());
        } else {
            log.error("用户信息放入队列失败");
            log.info("当前队列大小:{}", bdbBlockingQueue.size());
        }
    }

    public Integer getSize() {
        return bdbBlockingQueue.size();
    }

    public User getUser() {
        return bdbBlockingQueue.poll();
    }
}

controller文件

    @GetMapping("/addUser")
    public String addUser(User user) {
        userQueueService.addUser(user);
        return "SUCCESS";
    }

    @GetMapping("/getUserSize")
    public Integer getUserSize() {
        return userQueueService.getSize();
    }

    @GetMapping("/getUser")
    public String getUser() {
        return userQueueService.getUser().toString();
    }

FstSerialBinding文件

package com.joe.heightconcurrency.demo.util;

import com.sleepycat.bind.ByteArrayBinding;
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.util.RuntimeExceptionWrapper;

import java.io.Serializable;

public class FstSerialBinding<E extends Serializable> implements EntryBinding<E> {
    private final static byte[] ZERO_LENGTH_BYTE_ARRAY = new byte[0];
    private ByteArrayBinding bb = new ByteArrayBinding();
    
        @Override
    public E entryToObject(DatabaseEntry entry) {
        byte[] data = bb.entryToObject(entry);
        if(data ==null || data.length ==0) return null;
        try{
            return (E) FstObjectSerializeUtil.read(data);
        }catch(Exception ex){
            throw RuntimeExceptionWrapper.wrapIfNeeded(ex);
        }
    }

    @Override
    public void objectToEntry(E object, DatabaseEntry entry) {
        if(object==null){
            bb.objectToEntry(ZERO_LENGTH_BYTE_ARRAY, entry);
        }else{
            try{
                bb.objectToEntry(FstObjectSerializeUtil.write(object), entry);
            }catch(Exception ex){
                throw RuntimeExceptionWrapper.wrapIfNeeded(ex);
            }
        }
    }
}

FstObjectSerializeUtil文件

package com.joe.heightconcurrency.demo.util;

import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectInput;
import org.nustaq.serialization.FSTObjectOutput;

import java.io.ByteArrayOutputStream;
import java.io.Serializable;

public abstract class FstObjectSerializeUtil {
    
    private final static ThreadLocal<FSTConfiguration> conf = new ThreadLocal<FSTConfiguration>() { 
        public FSTConfiguration initialValue() {
            return FSTConfiguration.createDefaultConfiguration();
        }
    };
    
    public static byte[] write(Serializable obj) throws Exception{
        ByteArrayOutputStream arroutput = new ByteArrayOutputStream();
        FSTObjectOutput objoutput = conf.get().getObjectOutput(arroutput);
        try{
            objoutput.writeObject(obj);
            objoutput.flush();
            return arroutput.toByteArray();
        }finally{
            arroutput.close();
        }
    }
    
    public static Serializable read(byte[] bytes)  throws Exception{
        FSTObjectInput objinput = conf.get().getObjectInput(bytes);
        try{
            Object t = objinput.readObject();
            if(t instanceof Serializable){
                return (Serializable)t;
            }else{
                return null;
            }
        }catch(Exception ex) {
            return null;
        }finally{
            
        }
    }
}

BdbBlockingQueue文件

package com.joe.heightconcurrency.demo.queue;

import com.joe.heightconcurrency.demo.entity.User;
import com.sleepycat.collections.StoredMap;

import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;

public class BdbBlockingQueue<T extends User> extends LinkedBlockingQueue<T> {

    private StoredMap<String, T> storedMap;

    public BdbBlockingQueue() {}

    public BdbBlockingQueue(StoredMap<String, T> storedMap) {
        super(storedMap.values());
        this.storedMap = storedMap;
    }

    @Override
    public boolean offer(T t) {
        if (Objects.isNull(t)) {
            return false;
        }
        storedMap.put(t.getId().toString(), t);
        return super.offer(t);
    }

    @Override
    public int size() {
        return super.size();
    }

    @Override
    public T poll() {
        T t = super.poll();
        if (Objects.nonNull(t)) {
            storedMap.remove(t.getId().toString());
        }
        return t;
    }
}

参考:https://blog.csdn.net/qq_38617531/article/details/83794893/

上一篇下一篇

猜你喜欢

热点阅读