大数据大数据,机器学习,人工智能玩转大数据

关于流数据上的事务操作

2018-09-06  本文已影响264人  铛铛铛clark

概述

最近Flink母公司Data Artisans发布了一篇博客关于一个新的组件Streaming Ledger,给出了流数据的事务解决方案(就是常说的数据库的事务,满足ACID,隔离级别为Serializable)。

使用姿势

        // start building the transactional streams
        StreamingLedger tradeLedger = StreamingLedger.create("simple trade example");
        // define transactors on states
        tradeLedger.usingStream(deposits, "deposits")
                .apply(new DepositHandler())
                .on(accounts, DepositEvent::getAccountId, "account", READ_WRITE)
                .on(books, DepositEvent::getBookEntryId, "asset", READ_WRITE);
        // produce transactions stream
        DataStream<TransactionEvent> transfers = env.addSource(new TransactionsGenerator(1));

        OutputTag<TransactionResult> transactionResults = tradeLedger.usingStream(transfers, "transactions")
                .apply(new TxnHandler())
                .on(accounts, TransactionEvent::getSourceAccountId, "source-account", READ_WRITE)
                .on(accounts, TransactionEvent::getTargetAccountId, "target-account", READ_WRITE)
                .on(books, TransactionEvent::getSourceBookEntryId, "source-asset", READ_WRITE)
                .on(books, TransactionEvent::getTargetBookEntryId, "target-asset", READ_WRITE)
                .output();
        //  compute the resulting streams.
        ResultStreams resultsStreams = tradeLedger.resultStreams();

        // output to the console
        resultsStreams.getResultStream(transactionResults).print();
    private static final class TxnHandler extends TransactionProcessFunction<TransactionEvent, TransactionResult> {

        private static final long serialVersionUID = 1;

        @ProcessTransaction
        public void process(
                final TransactionEvent txn,
                final Context<TransactionResult> ctx,
                final @State("source-account") StateAccess<Long> sourceAccount,
                final @State("target-account") StateAccess<Long> targetAccount,
                final @State("source-asset") StateAccess<Long> sourceAsset,
                final @State("target-asset") StateAccess<Long> targetAsset) {

            final long sourceAccountBalance = sourceAccount.readOr(ZERO);
            final long sourceAssetValue = sourceAsset.readOr(ZERO);
            final long targetAccountBalance = targetAccount.readOr(ZERO);
            final long targetAssetValue = targetAsset.readOr(ZERO);

            // check the preconditions
            if (sourceAccountBalance > txn.getMinAccountBalance()
                    && sourceAccountBalance > txn.getAccountTransfer()
                    && sourceAssetValue > txn.getBookEntryTransfer()) {

                // compute the new balances
                final long newSourceBalance = sourceAccountBalance - txn.getAccountTransfer();
                final long newTargetBalance = targetAccountBalance + txn.getAccountTransfer();
                final long newSourceAssets = sourceAssetValue - txn.getBookEntryTransfer();
                final long newTargetAssets = targetAssetValue + txn.getBookEntryTransfer();

                // write back the updated values
                sourceAccount.write(newSourceBalance);
                targetAccount.write(newTargetBalance);
                sourceAsset.write(newSourceAssets);
                targetAsset.write(newTargetAssets);

                // emit result event with updated balances and flag to mark transaction as processed
                ctx.emit(new TransactionResult(txn, true, newSourceBalance, newTargetBalance));
            }
            else {
                // emit result with unchanged balances and a flag to mark transaction as rejected
                ctx.emit(new TransactionResult(txn, false, sourceAccountBalance, targetAccountBalance));
            }
        }
    }

原理

        SingleOutputStreamOperator<Void> resultStream = input
                .process(new SerialTransactor(specs(streamLedgerSpecs), sideOutputTags))
                .name(serialTransactorName)
                .uid(serialTransactorName + "___SERIAL_TX")
                .forceNonParallel()
                .returns(Void.class);

感想

其实看到这个功能的第一感觉是很牛逼,但是仔细看过了它的实现觉得真正应用上可能会有不少问题。因为对于最重要的处理事务的那个算子来说,本质上它并不是Scalable的,没有办法横向扩展。不过从功能上来说,确实引出了一个新的发展方向,希望以后还能看到有更优的解决方案,比如针对另外两种隔离级别Read Committed和Repeatable read。

上一篇下一篇

猜你喜欢

热点阅读