Netty同步调用的实现方式

2021-03-03  本文已影响0人  小玉1991

方式主要是设置同步锁。注意点: serverChannelHandlerAdapter.setUuid(dataSourceKey) 与通讯实体的UUID是同一个UUID,才能回调到latch.await的if语句里边。


        // 查询该数据源该schema下所有的表
        FtpPathFileSource dataSource = new FtpPathFileSource();
        dataSource.setUserName(dataSourceDto.getUserName());
        dataSource.setPassword(dataSourceDto.getPassword());
        dataSource.setUrl(dataSourceDto.getDataSourceIp());
        dataSource.setPath(dataConfigSearchForm.getSchema());
        dataSource.setBusinessType(BusinessTypeEnum.FTP);


        List<Client> usableClientList = datasourceClientRepository.findUsableClientList();
        Random random = new Random();
        //从可用的客户端列表中随机获取一个客户端
        Client client = usableClientList.get(random.nextInt(usableClientList.size()));
        //获取channel
        Channel channel = DataBaseCache.CHANNEL_MAP.get(client.getChannelId());

        //设置同步锁
        CountDownLatch latch = new CountDownLatch(Constant.Field.IS_SYNCHRO);
        ServerChannelHandlerAdapter serverChannelHandlerAdapter = (ServerChannelHandlerAdapter) channel.pipeline().get("ChannelHandlerAdapter");
        serverChannelHandlerAdapter.resetSync(latch, true);
        String dataSourceKey = UUID.randomUUID().toString();
        serverChannelHandlerAdapter.setUuid(dataSourceKey);

        //构造通讯实体
        TransferMessage transferMessage = new TransferMessage();
        transferMessage.setId(dataSourceKey);
        transferMessage.setBusinessType(BusinessTypeEnum.FTP);
        transferMessage.setCommandType(CommandTypeEnum.CONTENT);
        transferMessage.setVersion(Constant.Field.CLIENT_VERSION);
        transferMessage.setData(JsonUtil.objectToJson(dataSource));

        channel.writeAndFlush(JsonUtil.objectToJson(transferMessage));

        FtpFilePathDto dataResult = new FtpFilePathDto();
        try {
            //同步返回结果
            if (latch.await(Constant.Field.SECONDS_TIMEOUT, TimeUnit.SECONDS)) {
                transferMessage = serverChannelHandlerAdapter.getResult();
                dataResult = JsonUtil.jsonToObject(transferMessage.getData(), FtpFilePathDto.class);
            } else {
                log.error("测试连接超时");
            }
        } catch (Exception e) {
            log.error("show table data timeout", e);
        }

上一篇 下一篇

猜你喜欢

热点阅读