使用Berkeley DB实现LinkedBlockingQue
2023-08-10 本文已影响0人
engineer_tang
Berkeley DB是一个开放源代码的内嵌式数据库管理系统,能够为应用程序提供高性能的数据管理服务。应用它程序员只需要调用一些简单的API就可以完成对数据的访问和管理。与常用的数据库管理系统(如MySQL和Oracle等)有所不同,在Berkeley DB中并没有数据库服务器的概念。应用程序不需要事先同数据库服务建立起网络连接,而是通过内嵌在程序中的Berkeley DB函数库来完成对数据的保存、查询、修改和删除等操作。
1. 引入maven依赖
<dependency>
<groupId>com.sleepycat</groupId>
<artifactId>je</artifactId>
<version>18.3.12</version>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>2.48-jdk-6</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>com.cedarsoftware</groupId>
<artifactId>json-io</artifactId>
</exclusion>
</exclusions>
</dependency>
2. 具体实现
BdbInitService文件
package com.joe.heightconcurrency.demo.service;
import com.joe.heightconcurrency.demo.config.BdbPropertyConfig;
import com.joe.heightconcurrency.demo.entity.User;
import com.joe.heightconcurrency.demo.util.FstSerialBinding;
import com.sleepycat.collections.StoredMap;
import com.sleepycat.je.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.File;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
public class BdbInitService {
private static Environment env;
private static DatabaseConfig dbConfig;
@Autowired
private BdbPropertyConfig bdbPropertyConfig;
private ConcurrentHashMap<String, Database> queueDbMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, StoredMap<String, User>> storedMaps = new ConcurrentHashMap<>();
static {
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
envConfig.setCacheSize(10000000);
try {
env = new Environment(new File("E://develop//bdb"),envConfig);
} catch (DatabaseException e) {
e.printStackTrace();
}
dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(true);
dbConfig.setSortedDuplicates(false);
}
private Database open(String dbName) {
Database database = queueDbMap.get(dbName);
if (Objects.nonNull(database)) {
return database;
}
try {
database = env.openDatabase(null, dbName, dbConfig);
Database olddb = queueDbMap.putIfAbsent(dbName, database);
return Objects.isNull(olddb) ? database : olddb;
} catch (DatabaseException e) {
e.printStackTrace();
return null;
}
}
public StoredMap<String, User> getStoredMap(String queueName) {
FstSerialBinding<String> messageKeyBinding = new FstSerialBinding<>();
FstSerialBinding<User> messageValueBinding = new FstSerialBinding<>();
Database database = open(queueName);
StoredMap<String, User> tmpMap = new StoredMap<>(database, messageKeyBinding, messageValueBinding, true);
StoredMap<String, User> oldMap = storedMaps.putIfAbsent(queueName, tmpMap);
return Objects.isNull(oldMap) ? tmpMap : oldMap;
}
}
UserQueueService文件
package com.joe.heightconcurrency.demo.service;
import com.joe.heightconcurrency.demo.entity.User;
import com.joe.heightconcurrency.demo.queue.BdbBlockingQueue;
import com.sleepycat.collections.StoredMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Objects;
@Service
@Slf4j
public class UserQueueService {
BdbBlockingQueue<User> bdbBlockingQueue;
@Autowired
private BdbInitService bdbInitService;
@PostConstruct
public void init() {
String queueName = "user";
StoredMap<String, User> storedMap = bdbInitService.getStoredMap(queueName);
bdbBlockingQueue = new BdbBlockingQueue<>(storedMap);
}
public void addUser(User user) {
if (Objects.isNull(user)) {
log.error("用户信息不存在");
return;
}
boolean result = bdbBlockingQueue.offer(user);
if (result) {
log.info("用户信息放入队列成功");
log.info("当前队列大小:{}", bdbBlockingQueue.size());
} else {
log.error("用户信息放入队列失败");
log.info("当前队列大小:{}", bdbBlockingQueue.size());
}
}
public Integer getSize() {
return bdbBlockingQueue.size();
}
public User getUser() {
return bdbBlockingQueue.poll();
}
}
controller文件
@GetMapping("/addUser")
public String addUser(User user) {
userQueueService.addUser(user);
return "SUCCESS";
}
@GetMapping("/getUserSize")
public Integer getUserSize() {
return userQueueService.getSize();
}
@GetMapping("/getUser")
public String getUser() {
return userQueueService.getUser().toString();
}
FstSerialBinding文件
package com.joe.heightconcurrency.demo.util;
import com.sleepycat.bind.ByteArrayBinding;
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.util.RuntimeExceptionWrapper;
import java.io.Serializable;
public class FstSerialBinding<E extends Serializable> implements EntryBinding<E> {
private final static byte[] ZERO_LENGTH_BYTE_ARRAY = new byte[0];
private ByteArrayBinding bb = new ByteArrayBinding();
@Override
public E entryToObject(DatabaseEntry entry) {
byte[] data = bb.entryToObject(entry);
if(data ==null || data.length ==0) return null;
try{
return (E) FstObjectSerializeUtil.read(data);
}catch(Exception ex){
throw RuntimeExceptionWrapper.wrapIfNeeded(ex);
}
}
@Override
public void objectToEntry(E object, DatabaseEntry entry) {
if(object==null){
bb.objectToEntry(ZERO_LENGTH_BYTE_ARRAY, entry);
}else{
try{
bb.objectToEntry(FstObjectSerializeUtil.write(object), entry);
}catch(Exception ex){
throw RuntimeExceptionWrapper.wrapIfNeeded(ex);
}
}
}
}
FstObjectSerializeUtil文件
package com.joe.heightconcurrency.demo.util;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectInput;
import org.nustaq.serialization.FSTObjectOutput;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
public abstract class FstObjectSerializeUtil {
private final static ThreadLocal<FSTConfiguration> conf = new ThreadLocal<FSTConfiguration>() {
public FSTConfiguration initialValue() {
return FSTConfiguration.createDefaultConfiguration();
}
};
public static byte[] write(Serializable obj) throws Exception{
ByteArrayOutputStream arroutput = new ByteArrayOutputStream();
FSTObjectOutput objoutput = conf.get().getObjectOutput(arroutput);
try{
objoutput.writeObject(obj);
objoutput.flush();
return arroutput.toByteArray();
}finally{
arroutput.close();
}
}
public static Serializable read(byte[] bytes) throws Exception{
FSTObjectInput objinput = conf.get().getObjectInput(bytes);
try{
Object t = objinput.readObject();
if(t instanceof Serializable){
return (Serializable)t;
}else{
return null;
}
}catch(Exception ex) {
return null;
}finally{
}
}
}
BdbBlockingQueue文件
package com.joe.heightconcurrency.demo.queue;
import com.joe.heightconcurrency.demo.entity.User;
import com.sleepycat.collections.StoredMap;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
public class BdbBlockingQueue<T extends User> extends LinkedBlockingQueue<T> {
private StoredMap<String, T> storedMap;
public BdbBlockingQueue() {}
public BdbBlockingQueue(StoredMap<String, T> storedMap) {
super(storedMap.values());
this.storedMap = storedMap;
}
@Override
public boolean offer(T t) {
if (Objects.isNull(t)) {
return false;
}
storedMap.put(t.getId().toString(), t);
return super.offer(t);
}
@Override
public int size() {
return super.size();
}
@Override
public T poll() {
T t = super.poll();
if (Objects.nonNull(t)) {
storedMap.remove(t.getId().toString());
}
return t;
}
}
参考:https://blog.csdn.net/qq_38617531/article/details/83794893/