反应式编程 .Net平台的有Reactive Extension
文档:https://mcxiaoke.gitbooks.io/rxdocs/content/
支持的平台
Rx 4.1支持以下平台
.NET Framework 4.6+
.NET Standard 2.0+(包括.NET Core,Xamarin等)
.UWP
在从v2.xx迁移到v3.0.0时,NuGet包已更改其包命名
-
就是现在 [Rx-Main
System.Reactive
] -
就是现在 [Rx-Core
System.Reactive.Core
] -
就是现在 [Rx-Interfaces
System.Reactive.Interfaces
] -
就是现在 [Rx-Linq
System.Reactive.Linq
] -
就是现在 [Rx-PlatformServices
System.Reactive.PlatformServices
] -
就是现在 [Rx-Testing
Microsoft.Reactive.Testing
]
响应式编程包含:一个观察者(observer)订阅可观察到的对象(Observable)。可观察到的对象(Observable)通过调用观察者的方法来发射项目或通知给它的所有观察者(observer)。观察者有些时候也被称作是订阅者,观看者,响应者。
关于Observers的创建:
下面是Observable的生成方法列表
方法名 功能介绍
Create 生成任意的Observable
Defer 推迟执行直到被订阅为止
Empty 只执行 OnCompleted 方法
FromAsyncPattern 生成 Begin-End Pattern
FromEvent *4 从 Action 代理的事件生成
FromEventPattern 从 EventHandler 代理的事件生成
Generate 模拟for推送值
Interval 一段时间推送一个值
Never 什么也不做
Range 推送一个区间内的值
Repeat 指定次数推送同一个值
Return 只返回一个结果
Start 指定一个Schedula立即执行,完成后返回一个值
Throw 只执行 OnError
Timer 在某个时间推送
ToAsync 返回一个 Func<Observable<T>>
Using 完成后,生成将指定的资源Dispose掉的代理
Observable对象创建完了之后,我们需要注册。注册可能不在UI线程中,如果这样,当我们试图往UI线程中写入东西时,就会抛出异常。因此确保在UI线程中注册Observable对象显得尤为重要。我们可以通过ObserveOn(Deployment.Current.Dispatcher),这个方法实现。该方法保证我们在UI线程中注册Observable对象。
实例:
var browser = Observable.FromEvent<NavigationEventArgs>(webBrowser1, "Navigated");
browser.ObserveOn(Deployment.Current.Dispatcher).Subscribe(evt =>
{
lblProgress.Visibility = Visibility.Collapsed;
}
);
订阅方法就是展示了observer如何连接到Observable。observer实现了下列方法的一些子集:
onNext
每当Observable广播数据时将会调用该方法。这个方法将会被作为Observable的一个广播项目参数被发送
onError
Observable调用此方法表示它内部已经发生异常数据或者发生一些其他错误。这样停止观察,并且也不会做将来的调用onNext或者onCompleted。该onError方法作为它的参数来指示了错误的原因。
onCompleted
Observable在已经调用了onNext方法作为最后的时间,如果没有遇到任何错误,那么该方法将会被调用
通过Observable的定义,它可能调用onNext零次或者很多次,并且接下来的调用可能是onCompleted或者onError方法,但是不是同时调用,这都是最终才会被调用。在调用过程中,onNext通常称作任务的执行,而onCompleted或者onError被称作任务的结果通知
实例代码:
// 正常结束的时候
// 运行结果:1, 2, 3, 4, 5, Completed
Observable.Range(1, 5)
.Subscribe(
x => Console.WriteLine(x),
ex => Console.WriteLine("Error"),
() => Console.WriteLine("Completed"));
// 中间发生异常的时候
// 运行结果:1, 2, Error
Observable.Range(1, 5)
.Do(x => { if (x == 3) throw new Exception(); })
.Subscribe(
x => Console.WriteLine(x),
ex => Console.WriteLine("Error"),
() => Console.WriteLine("Completed"));
Dispose的必要性
Subscribe方法的返回值是一个实现了 IDisposable 接口的对象,在上面的示例中中间变量被忽略了。关于Rx的IDisposable对象与一般意义上的“有可能是必须释放的资源”是有所不同的。如果 Rx 处理的是事件的时候,那么 Dispose 表示“分离”,Timer的时候则表示“中止”,异步的时候是“取消”的意思。
用Rx来处理事件的优势
一、事件合成
下面的代码示例如何将 按下/移动/放开 合成一个新的事件并进行处理:
// WindowsForm的Drag事件:鼠标左键按下/移动/直到放开过程中
// 取得鼠标的坐标
var drag = from down in this.MouseDownAsObservable()
from move in this.MouseMoveAsObservable().TakeUntil(
this.MouseUpAsObservable())
select move.Location;
MouseDownAsObservable、MouseMoveAsObservable、MouseUpAsObservable方法是用Observable类的FromEvent静态方法包装的扩张方法。
二、Timer/通知事件
在一定的时间间隔监视某个值的场景。Rx将Timer变为一个序列,另外通过Select方法可以灵活变换输出。这些组合在一起,监视对象的值在一定时间间隔内会自动推送过来,非常容易操作。而且过滤值的处理也很简单,可以只在值发生变化时再接收这样就更简单了。
①
// 每隔1秒监视一下watchTarget.Value的值
var polling =
Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
.Select(_ => watchTarget.Value)
.DistinctUntilChanged(); // 只有在值发生变化时才引发事件(polling)
②// FileSystemWatcher的Changed事件
// 发生一次变化,会触发多个事件
var watcher =
new FileSystemWatcher("C:\", "test.txt")
{ EnableRaisingEvents = true };
// "对于1秒内连续发生的事件,进行过滤,只处理最后一个"
// 变成一个相对更容易处理的对象
var changed =
Observable.FromEventPattern<FileSystemEventArgs>(
watcher, "Changed")
.Throttle(TimeSpan.FromSeconds(1)); // Throttle方法是只允许通过指定时间和指定值的内容
三、FromEvent方法和FromEventPattern方法
用Rx处理事件,需要用 FromEvent 方法或者 FromEventPattern 方法将事件变为 IObservable<T> 对象。FromEvent 方法可以转换 Action<T> 代理,序列元素则为 T 。FromEventPattern 方法可以转换 EventHandler 代理,序列元素则为 EventPattern<TEventArgs>,它包装了 Object 类型的 sender 和 TEventArgs 类型的 e。
①通过反射方式
// 按钮控件的Click事件 Rx 化
Observable.FromEventPattern<RoutedEventArgs>(button1, "Click");
②非反射方式,性能更加优化
// 第一个参数是事件响应处理的代理(这里 h => h.Invoke 是固定)
// 第二个参数绑定事件,第三个参数解除事件
Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
h => h.Invoke,
h => button1.Click += h, h => button1.Click -= h);
在进一步优化代码结构
指定事件的处理,还有指定的字符串,这样的编码会比较多,实际运用中这样会影响代码的维护性。因此推荐将这些代码作为扩展方法分离出去。
public static class ButtonBaseExtensions
{
// 分离的扩展方法
public static IObservable<RoutedEventArgs> ClickAsObservable(this ButtonBase button)
{
return Observable.FromEventPattern<RoutedEventHandler, RoutedEventArgs>(
h => h.Invoke,
h => button.Click += h, h => button.Click -= h)
.Select(x => x.EventArgs);
/ 使用FromEvent生成EventPattern对象
// 用Select省略sender
return Observable.FromEvent<RoutedEventHandler, RoutedEventArgs>(
h => (sender, e) => h(e),
h => button.Click += h, h => button.Click -= h);
}
}
// 实际运用时调用
button1.ClickAsObservable().Subscribe(_ => MessageBox.Show("Clicked!"));
如果sender参数是必要的时候,也可以通过 Select 重新包装来做。比如:
button1.ClickAsObservable().Select(ev => new { Sender = button1, EventArgs = ev })
Rx代表性的方法:
SelectMany 方法
SelectMany 方法是 Rx 中最常用的方法之一。从第一个异步结果中启动第2个异步处理,这对于使用Rx进行异步编程是非常重要的。
SelectMany的处理图
根据 A 序列的值,后续用 B 序列的值进行插入替换。
// 替换别的Observable的内容
// 结果:10, 10, 11, 10, 11, 12
Observable.Range(1, 3)
.SelectMany(x => Observable.Range(10, x))
.Subscribe(Console.WriteLine);
// 实际的替换过程
// { x = 1, y = 10 }
// { x = 2, y = 10 }
// { x = 2, y = 11 }
// { x = 3, y = 10 }
// { x = 3, y = 11 }
// { x = 3, y = 12 }
var query = from x in Observable.Range(1, 3)
from y in Observable.Range(10, x)
select new { x, y };
query.Subscribe(Console.WriteLine);
○ Concat方法
Concat 是将2个序列进行连接的方法。这个时候,直到第一个序列终止前,第二个序列的值就会被忽略掉。我们可以理解是在第一个序列的结尾追加上另一个序列。
// 运行结果:1, 2, 3, -1, -1, -1
Observable.Range(1, 3)
.Concat(Observable.Repeat(-1, 3))
.Subscribe(Console.WriteLine);
○ Merge 方法
Merge会将所有的值都会合并进来。如果要对应多个控件的共通处理的话,使用Merge是很方便的。
// WindowsForm中的4个TextBox控件全部设定为:
// “DragDropEffects.All”
new[] { textBox1, textBox2, textBox3, textBox4 }
.Select(x => Observable.FromEventPattern<DragEventArgs>(x, "DragEnter"))
.Merge()
.Subscribe(x => x.EventArgs.Effect = DragDropEffects.All);
// 上面的Merge方法是下面的代码的变形,
// 修改为:IEnumerable<IObservable<T>>进行Merge
// 代码变得更简洁
Observable.Merge(
Observable.FromEventPattern<DragEventArgs>(textBox1, "DragEnter"),
Observable.FromEventPattern<DragEventArgs>(textBox2, "DragEnter"),
Observable.FromEventPattern<DragEventArgs>(textBox3, "DragEnter"),
Observable.FromEventPattern<DragEventArgs>(textBox4, "DragEnter")
);
○ Zip 方法
Zip方法是A和B中各取1个值为一组(2个值)进行配对处理。一边的值如果发生偏移,那么Zip会直到取到2个值为止才输出。
如下代码所示,使用Zip方法将 Interval 方法(指定时间间隔发行值)和Timestamp(实际时刻)进行组合的结果。
// 結果:
// { x = 0@2011/12/20 7:37:15 +09:00, y = 0@2011/12/20 7:37:17 +09:00, now = 2011/12/20 7:37:17 +09:00 }
Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Zip(Observable.Interval(TimeSpan.FromSeconds(3)).Timestamp(), (x, y) => new { x, y, now = DateTimeOffset.Now })
.Subscribe(Console.WriteLine);
○ CombineLatest 方法
类似 Zip 方法,两边引发“值”时,取得最新的值输出。如下图所示,两边都引发事件,且需要对两边的事件都需要处理的场景:
A和B的序列,任何一边在引发变化时都会取出两边最新的值输出。
public static class ToggleButtonExtensions
{
// WPF/Silverlight/WP7のToggleButton控件(Checkbox)
// 如果Check状态变化 IsChecked属性值也跟着变化
public static IObservable<bool> IsCheckedAsObservable(this ToggleButton button)
{
var checkedAsObservable = Observable.FromEvent<RoutedEventHandler, RoutedEventArgs>(
h => (sender, e) => h(e),
h => button.Checked += h, h => button.Checked -= h);
var uncheckedAsObservable = Observable.FromEvent<RoutedEventHandler, RoutedEventArgs>(
h => (sender, e) => h(e),
h => button.Unchecked += h, h => button.Unchecked -= h);
return Observable.Merge(checkedAsObservable, uncheckedAsObservable).Select(_ => button.IsChecked.Value);
}
}
// checkBox1和checkBox2两个CheckBox
// 同时选中时,MessageBox才表示。
checkBox1.IsCheckedAsObservable()
.CombineLatest(checkBox2.IsCheckedAsObservable(),
(isChecked1, isChecked2) => new { isChecked1, isChecked2 })
.Where(x => x.isChecked1 && x.isChecked2)
.Subscribe(_ => MessageBox.Show("同时选择!"));
○ Scan方法
最后,这个Scan方法不是连接方法而是一个集计的方法。Scan方法是1个前面的“结果”和现在的“值”进行合成输出的。因为可以获得1个前面的结果值,所以进行差分(累计)计算时使用比较方便。如下图所示:
A序列中,1个前面的“结果”(中间褐色的横线)和当前的“值”(上面蓝色的横线)进行合成。
Scan 方法就像 Linq2Object 中的 Aggregate 方法在计算时,列举的全部中间结果
// 1, 3(=1+2), 6(=3+3), 10(=6+4), 15(=10+5)
Observable.Range(1, 5)
.Scan((x, y) => x + y)
.Subscribe(Console.WriteLine);
ReactiveUI库
ReactiveUI类库是实现了MVVM模式的框架,他移除了一些Rx和用户界面进行交互的代码。ReactiveUI的核心思想是使开发者能够将属性变更以及事件转换为IObservable对象,然后在需要的时候使用IObservable对象将这些对象转换到属性中来。他的另一个核心目标是可以在ViewModel中相关属性发生变化时可以可执行相应的命令。虽然其他的框架也允许这么做,但是ReactiveUI会在依赖属性变更时自动的去更新结果,而不需要通过拉或者调用类似UpdateTheUI之类的方法
核心类
ReactiveObject:它是ViewModel对象,该对象实现了INotifyPropertyChanged接口。除此之外,该对象也提供了一个称之为Changed的IObservable接口,允许其他对象来注册,从而使得该对象属性变更时能够得到通知。使用Rx中强大的操作符,我们还可以追踪到一些状态是如何改变的。
ReactiveValidateObject:该对象继承自ReactiveObject对象,它通过实现IDataErrorInfo接口,利用DataAnnotations来验证对象。因此属性的值可以使用一些限制标记,UI界面能够自动的在属性的值违反这些限制时显示出这些错误。
ObservableAsPropertyHelper<T>:该类可以很容易的将IObservable对想转换为一个属性,该属性存储该对象的最新值,并且在属性值发生改变时能够触发NofityPropertyChanged事件。使用该类,我们能够从IObservable中派生出一些新的属性。
ReactiveCommand:该类实现了ICommand和IObservable接口,并且当Execute执行时OnNext方法就会被执行。该对象的CanExecute可以通过IObservable<bool>来定义。
ReactiveAsyncCommand:该对象继承自ReactiveCommand,并且封装了一种通用的模式。即“触发一步命令,然后将结果封送到dispather线程中”该对象也允许设置最大并行值。当达到最大值时,CanExecute方法返回false。
使用ReactiveObject实现ViewModels
在ReactiveObject中,属性的命名也需要注意,用作属性的私有字段必须为属性名称前面加上下划线。下面的例子展示了如何使用ReactiveObject声明一个可读写的属性。
public class AppViewModel : ReactiveObject
{
int _SomeProp;
public int SomeProp
{
get { return _SomeProp; }
set { this.RaiseAndSetIfChanged(x => x.SomeProp, value); }
}
}
ReactiveUI能够很容易的通过名为Changed的IObservable接口注册事件变化。在任何一个属性发生变化时,都会触发通知,客户端通常只需要关注感兴趣的一两个变化了的属性。使用ReactiveUI,可以通过WhenAny扩展方法很容易的获取这些属性值:
var newLoginVm = new NewUserLoginViewModel();
newLoginVm.WhenAny(x => x.User, x => x.Value)
.Where(x => x.Name == "Bob")
.Subscribe(x => MessageBox.Show("Bob is already a user!"));
IObservable<bool> passwordIsValid = newLoginVm.WhenAny(
x => x.Password, x => x.PasswordConfirm,
(pass, passConf) => (pass.Value == passConf.Value));
ReactiveCommand
ReactiveCommand实现了ICommand接口,他可以模拟简单的ICommand实现。我们可以将它看做是一种ICommand,可以使用Create静态方法创建。
var cmd = ReactiveCommand.Create(x => true, x => Console.WriteLine(x));
cmd.CanExecute(null); //方法输出true
cmd.CanExecute("Hello"); //方法输出"Hello"
下面构造了一个Command,该Command只在鼠标松开时触发。
var mouseIsUp = Observable.Merge(
Observable.FromEvent<MouseButtonEventArgs>(window, "MouseDown").Select(_ => false),
Observable.FromEvent<MouseButtonEventArgs>(window, "MouseUp").Select(_ => true))
.StartWith(true);
var cmd = new ReactiveCommand(mouseIsUp);
cmd.Subscribe(x => Console.WriteLine(x));
使用ReactiveAsyncCommand处理异步方法调用
由于ReactiveAsyncCommand直接继承自ReactiveCommand,所以它能做基类的所有功能。使用Execute,使得Command开始在后台执行时并可以通知用户。ReactiveAsyncCommand和ReactiveCommand不同之处在于,它内建了能够自动跟踪后台线程中运行的任务的数量。ReactiveAsyncCommand对象中是使用RegisterAsyncAction来注册异步执行操作的。它能够注册异步方法和同步方法,这些方法将会在后台线程中执行,并返回IObservable数据表示执行结果会在未来的某一时刻到来。IObservale通常对应Command调用。每一次执行Execute方法将会将结果存入到IObservable对象中。
下面是一个简单的使用Command的例子,它在后台线程的Task中运行,并且只运行一次。
var cmd = new ReactiveAsyncCommand();
cmd.RegisterAsyncAction(i => {
Thread.Sleep((int)i * 1000);
});
cmd.Execute(5);
cmd.CanExecute(5);//False
构造一个ViewModel例子
①View
<Window x:Class="RxUI.MainWindow"
xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
Title="MainWindow" Height="350" Width="525" x:Name="Window">
<Grid DataContext="{Binding ViewModel, ElementName=Window}">
<StackPanel HorizontalAlignment="Center" VerticalAlignment="Center">
<TextBlock Text="{Binding DataFromTheInternet}" FontSize="18"/>
<Button Content="Click me!" Command="{Binding GetDataFromTheInternet}"
CommandParameter="5" MinWidth="75" Margin="0,6,0,0"/>
</StackPanel>
</Grid>
</Window>
public partial class MainWindow : Window
{
public AppViewModel ViewModel { get; protected set; }
public MainWindow()
{
ViewModel = new AppViewModel();
InitializeComponent();
}
}
②ViewModel
class AppViewModel:ReactiveObject
{
ObservableAsPropertyHelper<String> dataFromTheInternet;
public string DataFromTheInternet
{
get { return dataFromTheInternet.Value; }
}
public ReactiveAsyncCommand GetDataFromTheInternet { get; protected set; }
}
构造方法
public AppViewModel()
{
GetDataFromTheInternet = new ReactiveAsyncCommand();
var futureData = GetDataFromTheInternet.RegisterAsyncAction(I => {
Thread.Sleep(5 * 1000);
return String.Format("The Future will be {0}x as awesome!", i);
});
dataFromTheInternet = futureData.ToProperty(this, x => x.DataFromTheInternet);
}
每一次用户点击按钮的时候,Command的Execute方法就会被执行一次,每5分钟就会向futureData这个Observable对象中传入一个数据。
ReactiveUI中的缓存
在ReactiveUI中,引入了一个称之为MemorizingMRUCache的对象,如名字所示,是一种以最近最常使用过的数据来作为缓存方案,它会移除一些在一定时间内没有请求的数据,从而保证缓存集在一定的大小范围内。
var cache = new MemoizingMRUCache<Int32, Int32>((x, ctx) => {
Thread.Sleep(5 * 1000);
return x * 100;
},20);
cache.Get(10);//第一次获取,需要5秒
cache.Get(10);//第二次取值,立即返回
cache.Get(15);//也需要5秒
MemorizingMRUCache也可以将缓存数据从内存中存储到磁盘上供以后使用,缓存的键可以是一个URL,值可以是该URL对应的临时文件。当缓存文件不再需要时,调用OnRelease方法可以删除这些临时文件。
TryGet:视图从缓存中获取某一个键对应的值
Invalidate:将某一个键对应的值的缓存进行清除,内部调用Release函数。
InvalidateAll:清空所有缓存。
图形拖拽实例:
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Documents;
using System.Windows.Input;
namespace RxDragDownSample
{
/// <summary>
/// MainWindow.xaml 的交互逻辑
/// </summary>
public partial class MainWindow : Window
{
public MainWindow()
{
InitializeComponent();
}
private void btnCircular_Click_1(object sender, RoutedEventArgs e)
{
AddShape<Shapes.Circular>();
}
private void btnSquare_Click_1(object sender, RoutedEventArgs e)
{
AddShape<Shapes.Square>();
}
private void btnTriangle_Click_1(object sender, RoutedEventArgs e)
{
AddShape<Shapes.Triangle>();
}
private void AddShape<T>() where T : new()
{
var shape = (new T()) as UserControl;
myCanvas.Children.Add(shape);
Canvas.SetLeft(shape, 10);
Canvas.SetTop(shape, 10);
var minX = 0;
var maxX = myCanvas.ActualWidth - 30;
var minY = 0;
var maxY = myCanvas.ActualHeight - 30;
// 鼠标在Shape上按下,开始DragDrop
var mouseDown = from evt in Observable.FromEventPattern<MouseButtonEventArgs>(shape, "MouseLeftButtonDown")
select evt.EventArgs.GetPosition(this);
// 鼠标移动,取得坐标
var mouseMove = from evt in Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove")
select evt.EventArgs.GetPosition(this);
// 鼠标放开,终止DragDrop
var mouseUp = from evt in Observable.FromEventPattern<MouseButtonEventArgs>(this, "MouseLeftButtonUp")
select evt.EventArgs.GetPosition(this);
// 当鼠标移出Window,终止DragDrop
var mouseLeave = from evt in Observable.FromEventPattern<MouseEventArgs>(this, "MouseLeave")
select evt;
//mouseMoves则是对连续的 mouseMove 进行了Zip 获得鼠标位移(offset)
var mouseMoves = mouseMove.Skip(1).Zip(mouseMove, (prev, cur) =>
new { X = prev.X - cur.X, Y = prev.Y - cur.Y });
var dragDrop = mouseDown.SelectMany(mouseMoves.TakeUntil(mouseUp).TakeUntil(mouseLeave));
dragDrop.ObserveOn(SynchronizationContext.Current).Subscribe(p =>
{
var x = Math.Min(Math.Max(Canvas.GetLeft(shape) + p.X, minX), maxX);
var y = Math.Min(Math.Max(Canvas.GetTop(shape) + p.Y, minY), maxY);
Canvas.SetLeft(shape, x);
Canvas.SetTop(shape, y);
this.lblPosition.Content = "{x:" + x.ToString() + ",y:" + y.ToString() + "}";
});
}
}
}