7,使用Java Reactive反应式方式操作Document

2021-07-20  本文已影响0人  lcjyzm

1,创建maven项目,并引入以下依赖:

<!--测试包-->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>compile</scope>
</dependency>

<!--mongodb reactive驱动包-->
<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
    <version>4.3.0</version>
</dependency>

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.3.9</version>
</dependency>

2,启动MongoDB服务实例

mongod.exe --dbpath D:\UserData\mongodb --auth

3,获取MongoClient对象

import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;

import java.util.Arrays;

/**
 * @Package: com.lcj.mongodb.sync
 * @ClassName: MongoUtil
 * @Author: Administrator
 * @CreateTime: 2021/7/15 15:10
 * @Description:
 */
public class MongoUtil {

    /**
     * 通过指定host和port获得连接
     * @return MongoClient
     */
    public static MongoClient getInstance1(){
        // 添加认证
        MongoCredential credential = MongoCredential.createScramSha256Credential("admin", "admin", "123456".toCharArray());
        return MongoClients.create(
                MongoClientSettings.builder()
                        .applyToClusterSettings(builder ->
                                builder.hosts(Arrays.asList(new ServerAddress("127.0.0.1", 27017))))
                        .credential(credential)
                        .build());
    }

    /**
     * 通过连接字符串
     * @return MongoClient
     */
    public static MongoClient getInstance2(){
        return MongoClients.create("mongodb://admin:123456@127.0.0.1:27017/?authSource=admin&authMechanism=SCRAM-SHA-256");
    }


}


4,SubscriberHelpers工具类

import com.mongodb.MongoInterruptedException;
import com.mongodb.MongoTimeoutException;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static java.lang.String.format;

/**
 *  Subscriber helper implementations for the Quick Tour.
 */
public final class SubscriberHelpers {

    /**
     * A Subscriber that stores the publishers results and provides a latch so can block on completion.
     *
     * @param <T> The publishers result type
     */
    public abstract static class ObservableSubscriber<T> implements Subscriber<T> {
        private final List<T> received;
        private final List<RuntimeException> errors;
        private final CountDownLatch latch;
        private volatile Subscription subscription;
        private volatile boolean completed;

        /**
         * Construct an instance
         */
        public ObservableSubscriber() {
            this.received = new ArrayList<>();
            this.errors = new ArrayList<>();
            this.latch = new CountDownLatch(1);
        }

        @Override
        public void onSubscribe(final Subscription s) {
            subscription = s;
        }

        @Override
        public void onNext(final T t) {
            received.add(t);
        }

        @Override
        public void onError(final Throwable t) {
            if (t instanceof RuntimeException) {
                errors.add((RuntimeException) t);
            } else {
                errors.add(new RuntimeException("Unexpected exception", t));
            }
            onComplete();
        }

        @Override
        public void onComplete() {
            completed = true;
            latch.countDown();
        }

        /**
         * Gets the subscription
         *
         * @return the subscription
         */
        public Subscription getSubscription() {
            return subscription;
        }

        /**
         * Get received elements
         *
         * @return the list of received elements
         */
        public List<T> getReceived() {
            return received;
        }

        /**
         * Get error from subscription
         *
         * @return the error, which may be null
         */
        public RuntimeException getError() {
            if (errors.size() > 0) {
                return errors.get(0);
            }
            return null;
        }

        /**
         * Get received elements.
         *
         * @return the list of receive elements
         */
        public List<T> get() {
            return await().getReceived();
        }

        /**
         * Get received elements.
         *
         * @param timeout how long to wait
         * @param unit the time unit
         * @return the list of receive elements
         */
        public List<T> get(final long timeout, final TimeUnit unit) {
            return await(timeout, unit).getReceived();
        }

        /**
         * Await completion or error
         *
         * @return this
         */
        public ObservableSubscriber<T> await() {
            return await(60, TimeUnit.SECONDS);
        }

        /**
         * Await completion or error
         *
         * @param timeout how long to wait
         * @param unit the time unit
         * @return this
         */
        public ObservableSubscriber<T> await(final long timeout, final TimeUnit unit) {
            subscription.request(Integer.MAX_VALUE);
            try {
                if (!latch.await(timeout, unit)) {
                    throw new MongoTimeoutException("Publisher onComplete timed out");
                }
            } catch (InterruptedException e) {
                throw new MongoInterruptedException("Interrupted waiting for observeration", e);
            }
            if (!errors.isEmpty()) {
                throw errors.get(0);
            }
            return this;
        }
    }

    /**
     * A Subscriber that immediately requests Integer.MAX_VALUE onSubscribe
     * 添加,更新,删除订阅
     * @param <T> The publishers result type
     */
    public static class OperationSubscriber<T> extends ObservableSubscriber<T> {

        @Override
        public void onSubscribe(final Subscription s) {
            super.onSubscribe(s);
            s.request(Integer.MAX_VALUE);
        }
    }

    /**
     * A Subscriber that prints a message including the received items on completion
     * 打印总记录数
     * @param <T> The publishers result type
     */
    public static class PrintSubscriber<T> extends OperationSubscriber<T> {
        private final String message;

        /**
         * A Subscriber that outputs a message onComplete.
         *
         * @param message the message to output onComplete
         */
        public PrintSubscriber(final String message) {
            this.message = message;
        }

        @Override
        public void onComplete() {
            System.out.println(format(message, getReceived()));
            super.onComplete();
        }
    }

    /**
     * A Subscriber that prints the json version of each document
     * 查询集合订阅
     */
    public static class PrintDocumentSubscriber extends ConsumerSubscriber<Document> {
        /**
         * Construct a new instance
         */
        public PrintDocumentSubscriber() {
            super(t -> System.out.println(t.toJson()));
        }
    }

    /**
     * A Subscriber that prints the toString version of each element
     * @param <T> the type of the element
     */
    public static class PrintToStringSubscriber<T> extends ConsumerSubscriber<T> {
        /**
         * Construct a new instance
         */
        public PrintToStringSubscriber() {
            super(System.out::println);
        }
    }

    /**
     * A Subscriber that processes a consumer for each element
     * @param <T> the type of the element
     */
    public static class ConsumerSubscriber<T> extends OperationSubscriber<T> {
        private final Consumer<T> consumer;

        /**
         * Construct a new instance
         * @param consumer the consumer
         */
        public ConsumerSubscriber(final Consumer<T> consumer) {
            this.consumer = consumer;
        }


        @Override
        public void onNext(final T document) {
            super.onNext(document);
            consumer.accept(document);
        }
    }

    private SubscriberHelpers() {
    }
}

5,使用Document对象进行CRUD操作

import cn.hutool.core.lang.Console;
import com.lcj.mongodb.reactive.MongoUtil;
import com.lcj.mongodb.reactive.SubscriberHelpers;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.InsertManyResult;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import org.bson.Document;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @Package: com.lcj.mongodb.sync
 * @ClassName: UseDocumentCRUDTest
 * @Author: Administrator
 * @CreateTime: 2021/7/16 8:26
 * @Description:
 */
public class UseDocumentCRUDTest {
    // 声明连接对象
    private static MongoClient client;
    // 声明集合变量
    private static MongoCollection<Document> book;

    // 所有方法执行前,创建集合对象
    @BeforeClass
    public static void setUp() {
        client = MongoUtil.getInstance1();
        // 访问指定的数据库,如果数据库不存在,自动创建
        MongoDatabase mongoDatabase = client.getDatabase("test");
        // 访问指定的集合,如果集合不存在,在第一次存数据是创建
        book = mongoDatabase.getCollection("book");
    }

    // 所有方法执行完后,释放资源
    @AfterClass
    public static void close() {
        //book.drop();  // 删除集合
        client.close();
    }

    /**
     * 插入单个文档
     */
    @Test
    public void insertOne() throws InterruptedException {
        // 创建如下文档
        Document document = new Document("name", "MongoDB")
                .append("type", "type")
                .append("count", 1)
                .append("versions", Arrays.asList("v3.2", "v3.0", "v2.6"))
                .append("info", new Document("x", 203).append("y", 102));
        // 插入单个文档,如果未指定_id字段,会自动添加
        SubscriberHelpers.ObservableSubscriber<InsertOneResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
        book.insertOne(document).subscribe(subscriber);
        subscriber.await(); // 等待
    }


    /**
     * 插入多个文档
     */
    @Test
    public void insertMany() {
        List<Document> documents = new ArrayList<>(100);
        for (int i = 1; i <= 100; i++) {
            documents.add(new Document("i", i));
        }
        // 插入文档,如果未指定_id字段,会自动添加
        SubscriberHelpers.ObservableSubscriber<InsertManyResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
        book.insertMany(documents).subscribe(subscriber);
        subscriber.await();
    }

    /**
     * @desc 统计集合中文档数量
     */
    @Test
    public void countNumbers(){
        SubscriberHelpers.PrintSubscriber<Long> subscriber = new SubscriberHelpers.PrintSubscriber<>("book集合文档总数: %s");
        book.countDocuments().subscribe(subscriber);
        subscriber.await();
        Console.log("文档总数:{}",subscriber.get().get(0));
    }


    /**
     * 查询集合的第一个文档
     */
    @Test
    public void findFirst() {
        SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
        book.find().first().subscribe(subscriber);
        subscriber.await();
        Document document = subscriber.get().stream().findFirst().get();
        Console.log("查询第一条文档:{}",document.toJson());
    }

    /**
     * 查询集合的所有文档
     */
    @Test
    public void findAll() {
        SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
        book.find().subscribe(subscriber);
        subscriber.await();
        subscriber.get().forEach(Document::toJson);
    }


    /**
     * 查询i=71的第一条文档
     */
    @Test
    public void findFirstByFilter() {
        SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
        book.find(Filters.eq("i",71)).first().subscribe(subscriber);
        subscriber.await();
        subscriber.get().forEach(Document::toJson);
    }


    /**
     * 查询i>71的所有的文档
     */
    @Test
    public void findAllByFilterGt() {
        SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
        book.find(Filters.gt("i",71)).subscribe(subscriber);
        subscriber.await();
        subscriber.get().forEach(Document::toJson);
    }


    /**
     * 查询满足i值在(71,91]之间的文档
     */
    @Test
    public void findAllByFilterAnd() {
        SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
        book.find(Filters.and(Filters.gt("i",71),Filters.lte("i",91))).subscribe(subscriber);
        subscriber.await();
        subscriber.get().forEach(Document::toJson);
    }

    /**
     * 查询存在i字段的文档,并按i的值降序排序
     */
    @Test
    public void sort() {
        SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
        book.find(Filters.exists("i")).sort(Sorts.descending("i")).subscribe(subscriber);
        subscriber.await();
        subscriber.get().forEach(Document::toJson);
    }

    /**
     * 投影--排除文档的id,很好用强大
     */
    @Test
    public void projection() {
        SubscriberHelpers.PrintDocumentSubscriber  subscriber = new SubscriberHelpers.PrintDocumentSubscriber();
        book.find().projection(Projections.excludeId()).subscribe(subscriber);
        subscriber.await();
        subscriber.get().forEach(Document::toJson);
    }

    // 更新i=50的文档,将i修改为500
    @Test
    public void updateOne() {
        SubscriberHelpers.OperationSubscriber<UpdateResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
        book.updateOne(Filters.eq("i", 50), Updates.set("i", 500)).subscribe(subscriber);
        subscriber.await();
        UpdateResult updateResult = subscriber.get().get(0);
        Console.log("更新数量:{}",updateResult.getModifiedCount());
    }

    /**
     * 更新字段i小于50的,将匹配的值加上20
     */
    @Test
    public void updateMany() {
        SubscriberHelpers.OperationSubscriber<UpdateResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
        book.updateMany(Filters.lt("i", 50), Updates.inc("i", 20)).subscribe(subscriber);
        subscriber.await();
        UpdateResult updateResult = subscriber.get().get(0);
        Console.log("更新数量:{}",updateResult.getModifiedCount());
    }


    /**
     * 删除字段i=500的文档
     */
    @Test
    public void deleteOne() {
        SubscriberHelpers.OperationSubscriber<DeleteResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
        book.deleteOne(Filters.eq("i", 500)).subscribe(subscriber);
        subscriber.await();
        Console.log("删除数量: {}", subscriber.get().get(0).getDeletedCount());
    }


    /**
     * 删除字段i<50的所有的文档
     */
    @Test
    public void deleteMany() {
        SubscriberHelpers.OperationSubscriber<DeleteResult> subscriber = new SubscriberHelpers.OperationSubscriber<>();
        book.deleteMany(Filters.lt("i", 50)).subscribe(subscriber);
        subscriber.await();
        Console.log("删除数量: {}", subscriber.get().get(0).getDeletedCount());
    }

}

5,单测结果

image-20210716170142357.png
上一篇下一篇

猜你喜欢

热点阅读