我是程序员;您好程先生;叫我序员就好了程序员技术栈程序员

基于Ignite的ContinuousQuery

2018-03-27  本文已影响31人  SofiyaJ

知识准备

持续查询语言(CQL, continuous query language)类似于:
内存数据库+视图+触发器 的解决方案。
简单来说,一有符合条件的对象进入查询结果集,就执行一次回调函数。

本文的实现是基于C/S模式的,即Client端先按照一定规则从Server端查询数据,返回结果集后,Server端继续添加符合条件的数据,Client端仍然可以实时查询返回结果。
持续查询可以监听缓存中数据的变更。持续查询一旦启动,如果有,就会收到符合查询条件的数据变化的通知。

主要maven依赖:


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-core</artifactId>
            <version>${ignite.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-spring</artifactId>
            <version>${ignite.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-indexing</artifactId>
            <version>${ignite.version}</version>
        </dependency>
    <properties>
        <ignite.version>2.4.0</ignite.version>
    </properties>

TIPS:本工程使用的ignite的版本是2.4.0,ignite更新迭代较快,版本见得差异还是很大的。

主要代码实现

Server端实现:

github源代码

package xx.xx.searchengine;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author: wangjie
 * @Description:
 * @Date: Created in 10:13 2018/3/27
 */

@SpringBootApplication
@RestController
public class ServerApplication {

    //cache name
    private static final String CACHE_NAME = "serverCache";

    private static Ignite ignite = Ignition.start("example-cache.xml");

    public static void main(String[] args) throws InterruptedException {

        SpringApplication.run(ServerApplication.class,args);

    }

    @RequestMapping(value = "/testIgnite",method = RequestMethod.GET)
    public String testIgnite(Integer key,String value) throws InterruptedException{
        ignite.active(true);
        System.out.println("*******insert data begins*********");

        try(IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)){
            cache.put(key,value);
            Thread.sleep(2000);
        }
        return "*******insert data succeed*********";
    }
}

example-cache.xml主要配置(在初始化文件之后添加的):

 <property name="clientMode" value="false"/>
 <property name="peerClassLoadingEnabled" value="true"/>

Client端实现:

github源代码

package xx.xx.searchengine;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;

/**
 * @Author: wangjie
 * @Description:
 * @Date: Created in 10:26 2018/3/27
 */
@SpringBootApplication
public class ClientApplication {

    //cache name
    private static final String CACHE_NAME = "serverCache";

    public static void main(String[] args) throws InterruptedException {

        SpringApplication.run(ClientApplication.class, args);


        try (Ignite ignite = Ignition.start("example-cache.xml")) {
            ignite.active(true);
            System.out.println("**********Cache continuous query example started**********");

            try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) {

                // Create new continuous query.
                ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
                //init query
                qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() {
                    @Override
                    public boolean apply(Integer key, String val) {
                        return key > 0;
                    }
                }));
                 //set local listener
                qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
                    @Override
                    public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
                        for (CacheEntryEvent<? extends Integer, ? extends String> e : evts) {
                            System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
                        }
                    }
                });


                try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
                    // Iterate through existing data.
                    for (Cache.Entry<Integer, String> e : cur) {
                        System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
                        Thread.sleep(2000000000);
                    }
                } finally {
                    ignite.destroyCache(CACHE_NAME);
                }
            }
        }
    }
}

example-cache.xml主要配置(在初始化文件之后添加的):

  <property name="clientMode" value="true"/>
  <property name="peerClassLoadingEnabled" value="true"/>

启动程序,测试连续查询

启动Server端:

Server.png

启动Client端:

Client.png

在postman中发送get请求:

http://localhost:8080/testIgnite?key=26&value="hahahah"

postman.png

查看Client端控制台的输出信息:

Client-updata.png

关于ignite的其它文章:
Ignite CS 模式 java初探
Ignite 之计算运用的 Hello world

程序媛小白一枚,如有错误,烦请批评指正!(#.#)

上一篇下一篇

猜你喜欢

热点阅读