7,使用Java Reactive反应式方式操作Document
2021-07-20 本文已影响0人
lcjyzm
1,创建maven项目,并引入以下依赖:
<!--测试包-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>compile</scope>
</dependency>
<!--mongodb reactive驱动包-->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.9</version>
</dependency>
2,启动MongoDB服务实例
mongod.exe --dbpath D:\UserData\mongodb --auth
3,获取MongoClient对象
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import java.util.Arrays;
/**
* @Package: com.lcj.mongodb.sync
* @ClassName: MongoUtil
* @Author: Administrator
* @CreateTime: 2021/7/15 15:10
* @Description:
*/
public class MongoUtil {
/**
* 通过指定host和port获得连接
* @return MongoClient
*/
public static MongoClient getInstance1(){
// 添加认证
MongoCredential credential = MongoCredential.createScramSha256Credential("admin", "admin", "123456".toCharArray());
return MongoClients.create(
MongoClientSettings.builder()
.applyToClusterSettings(builder ->
builder.hosts(Arrays.asList(new ServerAddress("127.0.0.1", 27017))))
.credential(credential)
.build());
}
/**
* 通过连接字符串
* @return MongoClient
*/
public static MongoClient getInstance2(){
return MongoClients.create("mongodb://admin:123456@127.0.0.1:27017/?authSource=admin&authMechanism=SCRAM-SHA-256");
}
}
4,SubscriberHelpers工具类
import com.mongodb.MongoInterruptedException;
import com.mongodb.MongoTimeoutException;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static java.lang.String.format;
/**
* Subscriber helper implementations for the Quick Tour.
*/
public final class SubscriberHelpers {
/**
* A Subscriber that stores the publishers results and provides a latch so can block on completion.
*
* @param <T> The publishers result type
*/
public abstract static class ObservableSubscriber<T> implements Subscriber<T> {
private final List<T> received;
private final List<RuntimeException> errors;
private final CountDownLatch latch;
private volatile Subscription subscription;
private volatile boolean completed;
/**
* Construct an instance
*/
public ObservableSubscriber() {
this.received = new ArrayList<>();
this.errors = new ArrayList<>();
this.latch = new CountDownLatch(1);
}
@Override
public void onSubscribe(final Subscription s) {
subscription = s;
}
@Override
public void onNext(final T t) {
received.add(t);
}
@Override
public void onError(final Throwable t) {
if (t instanceof RuntimeException) {
errors.add((RuntimeException) t);
} else {
errors.add(new RuntimeException("Unexpected exception", t));
}
onComplete();
}
@Override
public void onComplete() {
completed = true;
latch.countDown();
}
/**
* Gets the subscription
*
* @return the subscription
*/
public Subscription getSubscription() {
return subscription;
}
/**
* Get received elements
*
* @return the list of received elements
*/
public List<T> getReceived() {
return received;
}
/**
* Get error from subscription
*
* @return the error, which may be null
*/
public RuntimeException getError() {
if (errors.size() > 0) {
return errors.get(0);
}
return null;
}
/**
* Get received elements.
*
* @return the list of receive elements
*/
public List<T> get() {
return await().getReceived();
}
/**
* Get received elements.
*
* @param timeout how long to wait
* @param unit the time unit
* @return the list of receive elements
*/
public List<T> get(final long timeout, final TimeUnit unit) {
return await(timeout, unit).getReceived();
}
/**
* Await completion or error
*
* @return this
*/
public ObservableSubscriber<T> await() {
return await(60, TimeUnit.SECONDS);
}
/**
* Await completion or error
*
* @param timeout how long to wait
* @param unit the time unit
* @return this
*/
public ObservableSubscriber<T> await(final long timeout, final TimeUnit unit) {
subscription.request(Integer.MAX_VALUE);
try {
if (!latch.await(timeout, unit)) {
throw new MongoTimeoutException("Publisher onComplete timed out");
}
} catch (InterruptedException e) {
throw new MongoInterruptedException("Interrupted waiting for observeration", e);
}
if (!errors.isEmpty()) {
throw errors.get(0);
}
return this;
}
}
/**
* A Subscriber that immediately requests Integer.MAX_VALUE onSubscribe
* 添加,更新,删除订阅
* @param <T> The publishers result type
*/
public static class OperationSubscriber<T> extends ObservableSubscriber<T> {
@Override
public void onSubscribe(final Subscription s) {
super.onSubscribe(s);
s.request(Integer.MAX_VALUE);
}
}
/**
* A Subscriber that prints a message including the received items on completion
* 打印总记录数
* @param <T> The publishers result type
*/
public static class PrintSubscriber<T> extends OperationSubscriber<T> {
private final String message;
/**
* A Subscriber that outputs a message onComplete.
*
* @param message the message to output onComplete
*/
public PrintSubscriber(final String message) {
this.message = message;
}
@Override
public void onComplete() {
System.out.println(format(message, getReceived()));
super.onComplete();
}
}
/**
* A Subscriber that prints the json version of each document
* 查询集合订阅
*/
public static class PrintDocumentSubscriber extends ConsumerSubscriber<Document> {
/**
* Construct a new instance
*/
public PrintDocumentSubscriber() {
super(t -> System.out.println(t.toJson()));
}
}
/**
* A Subscriber that prints the toString version of each element
* @param <T> the type of the element
*/
public static class PrintToStringSubscriber<T> extends ConsumerSubscriber<T> {
/**
* Construct a new instance
*/
public PrintToStringSubscriber() {
super(System.out::println);
}
}
/**
* A Subscriber that processes a consumer for each element
* @param <T> the type of the element
*/
public static class ConsumerSubscriber<T> extends OperationSubscriber<T> {
private final Consumer<T> consumer;
/**
* Construct a new instance
* @param consumer the consumer
*/
public ConsumerSubscriber(final Consumer<T> consumer) {
this.consumer = consumer;
}
@Override
public void onNext(final T document) {
super.onNext(document);
consumer.accept(document);
}
}
private SubscriberHelpers() {
}
}
5,使用Document对象进行CRUD操作
import cn.hutool.core.lang.Console;
import com.lcj.mongodb.reactive.MongoUtil;
import com.lcj.mongodb.reactive.SubscriberHelpers;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.InsertManyResult;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import org.bson.Document;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @Package: com.lcj.mongodb.sync
* @ClassName: UseDocumentCRUDTest
* @Author: Administrator
* @CreateTime: 2021/7/16 8:26
* @Description:
*/
public class UseDocumentCRUDTest {
// 声明连接对象
private static MongoClient client;
// 声明集合变量
private static MongoCollection<Document> book;
// 所有方法执行前,创建集合对象
@BeforeClass
public static void setUp() {
client = MongoUtil.getInstance1();
// 访问指定的数据库,如果数据库不存在,自动创建
MongoDatabase mongoDatabase = client.getDatabase("test");
// 访问指定的集合,如果集合不存在,在第一次存数据是创建
book = mongoDatabase.getCollection("book");
}
// 所有方法执行完后,释放资源
@AfterClass
public static void close() {
//book.drop(); // 删除集合
client.close();
}
/**
* 插入单个文档
*/
@Test
public void insertOne() throws InterruptedException {
// 创建如下文档
Document document = new Document("name", "MongoDB")
.append("type", "type")
.append("count", 1)
.append("versions", Arrays.asList("v3.2", "v3.0", "v2.6"))
.append("info", new Document("x", 203).append("y", 102));
// 插入单个文档,如果未指定_id字段,会自动添加
SubscriberHelpers.ObservableSubscriber<InsertOneResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
book.insertOne(document).subscribe(subscriber);
subscriber.await(); // 等待
}
/**
* 插入多个文档
*/
@Test
public void insertMany() {
List<Document> documents = new ArrayList<>(100);
for (int i = 1; i <= 100; i++) {
documents.add(new Document("i", i));
}
// 插入文档,如果未指定_id字段,会自动添加
SubscriberHelpers.ObservableSubscriber<InsertManyResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
book.insertMany(documents).subscribe(subscriber);
subscriber.await();
}
/**
* @desc 统计集合中文档数量
*/
@Test
public void countNumbers(){
SubscriberHelpers.PrintSubscriber<Long> subscriber = new SubscriberHelpers.PrintSubscriber<>("book集合文档总数: %s");
book.countDocuments().subscribe(subscriber);
subscriber.await();
Console.log("文档总数:{}",subscriber.get().get(0));
}
/**
* 查询集合的第一个文档
*/
@Test
public void findFirst() {
SubscriberHelpers.PrintDocumentSubscriber subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
book.find().first().subscribe(subscriber);
subscriber.await();
Document document = subscriber.get().stream().findFirst().get();
Console.log("查询第一条文档:{}",document.toJson());
}
/**
* 查询集合的所有文档
*/
@Test
public void findAll() {
SubscriberHelpers.PrintDocumentSubscriber subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
book.find().subscribe(subscriber);
subscriber.await();
subscriber.get().forEach(Document::toJson);
}
/**
* 查询i=71的第一条文档
*/
@Test
public void findFirstByFilter() {
SubscriberHelpers.PrintDocumentSubscriber subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
book.find(Filters.eq("i",71)).first().subscribe(subscriber);
subscriber.await();
subscriber.get().forEach(Document::toJson);
}
/**
* 查询i>71的所有的文档
*/
@Test
public void findAllByFilterGt() {
SubscriberHelpers.PrintDocumentSubscriber subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
book.find(Filters.gt("i",71)).subscribe(subscriber);
subscriber.await();
subscriber.get().forEach(Document::toJson);
}
/**
* 查询满足i值在(71,91]之间的文档
*/
@Test
public void findAllByFilterAnd() {
SubscriberHelpers.PrintDocumentSubscriber subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
book.find(Filters.and(Filters.gt("i",71),Filters.lte("i",91))).subscribe(subscriber);
subscriber.await();
subscriber.get().forEach(Document::toJson);
}
/**
* 查询存在i字段的文档,并按i的值降序排序
*/
@Test
public void sort() {
SubscriberHelpers.PrintDocumentSubscriber subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
book.find(Filters.exists("i")).sort(Sorts.descending("i")).subscribe(subscriber);
subscriber.await();
subscriber.get().forEach(Document::toJson);
}
/**
* 投影--排除文档的id,很好用强大
*/
@Test
public void projection() {
SubscriberHelpers.PrintDocumentSubscriber subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
book.find().projection(Projections.excludeId()).subscribe(subscriber);
subscriber.await();
subscriber.get().forEach(Document::toJson);
}
// 更新i=50的文档,将i修改为500
@Test
public void updateOne() {
SubscriberHelpers.OperationSubscriber<UpdateResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
book.updateOne(Filters.eq("i", 50), Updates.set("i", 500)).subscribe(subscriber);
subscriber.await();
UpdateResult updateResult = subscriber.get().get(0);
Console.log("更新数量:{}",updateResult.getModifiedCount());
}
/**
* 更新字段i小于50的,将匹配的值加上20
*/
@Test
public void updateMany() {
SubscriberHelpers.OperationSubscriber<UpdateResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
book.updateMany(Filters.lt("i", 50), Updates.inc("i", 20)).subscribe(subscriber);
subscriber.await();
UpdateResult updateResult = subscriber.get().get(0);
Console.log("更新数量:{}",updateResult.getModifiedCount());
}
/**
* 删除字段i=500的文档
*/
@Test
public void deleteOne() {
SubscriberHelpers.OperationSubscriber<DeleteResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
book.deleteOne(Filters.eq("i", 500)).subscribe(subscriber);
subscriber.await();
Console.log("删除数量: {}", subscriber.get().get(0).getDeletedCount());
}
/**
* 删除字段i<50的所有的文档
*/
@Test
public void deleteMany() {
SubscriberHelpers.OperationSubscriber<DeleteResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
book.deleteMany(Filters.lt("i", 50)).subscribe(subscriber);
subscriber.await();
Console.log("删除数量: {}", subscriber.get().get(0).getDeletedCount());
}
}