c#-异步编程模型-代码示例
2020-10-27 本文已影响0人
wwmin_
- 异步编程,当有Task完成就执行
class Program
{
static readonly HttpClient _client = new HttpClient
{
MaxResponseContentBufferSize = 1_000_000
};
static readonly IEnumerable<string> s_urlList = new string[] {
"https://docs.microsoft.com",
"https://docs.microsoft.com/aspnet/core",
"https://docs.microsoft.com/azure",
"https://docs.microsoft.com/azure/devops",
"https://docs.microsoft.com/dotnet",
"https://docs.microsoft.com/dynamics365",
"https://docs.microsoft.com/education",
"https://docs.microsoft.com/enterprise-mobility-security",
"https://docs.microsoft.com/gaming",
"https://docs.microsoft.com/graph",
"https://docs.microsoft.com/microsoft-365",
"https://docs.microsoft.com/office",
"https://docs.microsoft.com/powershell",
"https://docs.microsoft.com/sql",
"https://docs.microsoft.com/surface",
"https://docs.microsoft.com/system-center",
"https://docs.microsoft.com/visualstudio",
"https://docs.microsoft.com/windows",
"https://docs.microsoft.com/xamarin"
};
static Task Main() => SumPageSizesAsync();
private static async Task SumPageSizesAsync()
{
var stopwatch = Stopwatch.StartNew();
IEnumerable<Task<int>> downloadTasksQuery = from url in s_urlList select ProcessUrlAsync(url, _client);
List<Task<int>> downloadTasks = downloadTasksQuery.ToList();
int total = 0;
while (downloadTasks.Any())
{
Task<int> finishedTask = await Task.WhenAny(downloadTasks);
downloadTasks.Remove(finishedTask);
total += await finishedTask;
}
stopwatch.Stop();
Console.WriteLine($"\nTotal bytes returned: {total:#,#}");
Console.WriteLine($"Elapsed time: {stopwatch.Elapsed}\n");
Console.ReadKey();
}
private static async Task<int> ProcessUrlAsync(string url, HttpClient client)
{
byte[] content = await _client.GetByteArrayAsync(url);
Console.WriteLine($"{url,-60} {content.Length,10:#,#}");
return content.Length;
}
}
- 手动执行取消任务
class Program
{
static readonly CancellationTokenSource s_cts = new CancellationTokenSource();
static readonly HttpClient s_client = new HttpClient
{
MaxResponseContentBufferSize = 1_000_000
};
static readonly IEnumerable<string> s_urlList = new string[] {
"https://docs.microsoft.com",
"https://docs.microsoft.com/aspnet/core",
"https://docs.microsoft.com/azure",
"https://docs.microsoft.com/azure/devops",
"https://docs.microsoft.com/dotnet",
"https://docs.microsoft.com/dynamics365",
"https://docs.microsoft.com/education",
"https://docs.microsoft.com/enterprise-mobility-security",
"https://docs.microsoft.com/gaming",
"https://docs.microsoft.com/graph",
"https://docs.microsoft.com/microsoft-365",
"https://docs.microsoft.com/office",
"https://docs.microsoft.com/powershell",
"https://docs.microsoft.com/sql",
"https://docs.microsoft.com/surface",
"https://docs.microsoft.com/system-center",
"https://docs.microsoft.com/visualstudio",
"https://docs.microsoft.com/windows",
"https://docs.microsoft.com/xamarin"
};
static async Task Main(string[] args)
{
Console.WriteLine("Application started.");
Console.WriteLine("Press the ENTER key to cancel...\n");
Task cancelTask = Task.Run(() =>
{
while (Console.ReadKey().Key != ConsoleKey.Enter)
{
Console.WriteLine("Press the ENTER key to cancel...");
}
Console.WriteLine("\nENTER key pressed: cancelling downloads.\n");
s_cts.Cancel();
});
Task sumPageSizesTask = SumPageSizesAsync();
await Task.WhenAny(new[] { cancelTask, sumPageSizesTask });
Console.WriteLine("Application ending.");
}
private static async Task SumPageSizesAsync()
{
var stopwatch = Stopwatch.StartNew();
int total = 0;
foreach (string url in s_urlList)
{
int contentLength = await ProcessUrlAsync(url, s_client, s_cts.Token);
total += contentLength;
}
stopwatch.Stop();
Console.WriteLine($"\nTotal bytes returned: {total:#,#}");
Console.WriteLine($"Elapsed time: {stopwatch.Elapsed}\n");
}
private static async Task<int> ProcessUrlAsync(string url, HttpClient client, CancellationToken token)
{
HttpResponseMessage response = await client.GetAsync(url, token);
byte[] content = await response.Content.ReadAsByteArrayAsync();
Console.WriteLine($"{url,-60} {content.Length,10:#,#}");
return content.Length;
}
}
- 超时取消任务
class Program
{
static readonly CancellationTokenSource s_cts = new CancellationTokenSource();
static readonly HttpClient s_client = new HttpClient
{
MaxResponseContentBufferSize = 1_000_000
};
static readonly IEnumerable<string> s_urlList = new string[]
{
"https://docs.microsoft.com",
"https://docs.microsoft.com/aspnet/core",
"https://docs.microsoft.com/azure",
"https://docs.microsoft.com/azure/devops",
"https://docs.microsoft.com/dotnet",
"https://docs.microsoft.com/dynamics365",
"https://docs.microsoft.com/education",
"https://docs.microsoft.com/enterprise-mobility-security",
"https://docs.microsoft.com/gaming",
"https://docs.microsoft.com/graph",
"https://docs.microsoft.com/microsoft-365",
"https://docs.microsoft.com/office",
"https://docs.microsoft.com/powershell",
"https://docs.microsoft.com/sql",
"https://docs.microsoft.com/surface",
"https://docs.microsoft.com/system-center",
"https://docs.microsoft.com/visualstudio",
"https://docs.microsoft.com/windows",
"https://docs.microsoft.com/xamarin"
};
static async Task Main(string[] args)
{
Console.WriteLine("Application started.");
try
{
s_cts.CancelAfter(3500);
await SumPageSizesAsync();
}
catch (TaskCanceledException)
{
Console.WriteLine("\nTasks cancelled: timed out.\n");
}
Console.ReadKey();
}
static async Task SumPageSizesAsync()
{
var stopwatch = Stopwatch.StartNew();
int total = 0;
foreach (string url in s_urlList)
{
int contentLength = await ProcessUrlAsync(url, s_client, s_cts.Token);
total += contentLength;
}
stopwatch.Stop();
Console.WriteLine($"\nTotal bytes returned: {total:#,#}");
Console.WriteLine($"Elapsed time: {stopwatch.Elapsed}\n");
}
static async Task<int> ProcessUrlAsync(string url, HttpClient client, CancellationToken token)
{
HttpResponseMessage response = await client.GetAsync(url, token);
byte[] content = await response.Content.ReadAsByteArrayAsync();
Console.WriteLine($"{url,-60} {content.Length,10:#,#}");
return content.Length;
}
}
- 异步文件读写
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Hello World!");
await SimpleWriteAsync();
await ProcessWriteAsync();
await SimpleReadAsync();
await ProcessReadAsync();
await SimpleParalleWriteAsync();
await ProcessMultpleWritesAsync();
Console.ReadKey();
}
#region 写入文本-简单示例
/// <summary>
/// 简单示例
/// </summary>
/// <returns></returns>
public static async Task SimpleWriteAsync()
{
string filePath = "simple.txt";
string text = $"Hello World";
await File.WriteAllTextAsync(filePath, text);
}
#endregion
#region 写入文本-有限控制示例
public static async Task ProcessWriteAsync()
{
string filePath = "temp.txt";
string text = $"Hello World{Environment.NewLine}";
await WriteTextAsync(filePath, text);
}
private static async Task WriteTextAsync(string filePath, string text)
{
byte[] encodedText = Encoding.Unicode.GetBytes(text);
using var sourceStream = new FileStream(filePath, FileMode.Create, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: false);
await sourceStream.WriteAsync(encodedText, 0, encodedText.Length);
}
#endregion
#region 读取文本-简单示例
public static async Task SimpleReadAsync()
{
string filePath = "simple.txt";
string text = await File.ReadAllTextAsync(filePath);
Console.WriteLine(text);
}
#endregion
#region 读取文本-有限控制示例
public static async Task ProcessReadAsync()
{
try
{
string filePath = "temp.txt";
if (File.Exists(filePath) != false)
{
string text = await ReadTextAsync(filePath);
Console.WriteLine(text);
}
else
{
Console.WriteLine($"file not found: {filePath}");
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
private static async Task<string> ReadTextAsync(string filePath)
{
using var sourceStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: 4096, useAsync: true);
var sb = new StringBuilder();
byte[] buffer = new byte[0x1000];
int numRead;
while ((numRead = await sourceStream.ReadAsync(buffer, 0, buffer.Length)) != 0)
{
string text = Encoding.Unicode.GetString(buffer, 0, numRead);
sb.Append(text);
}
return sb.ToString();
}
#endregion
#region 并行异步 I/O-简单示例
public static async Task SimpleParalleWriteAsync()
{
string folder = Directory.CreateDirectory("tempfolder").Name;
IList<Task> writeTaskList = new List<Task>();
for (int index = 0; index < 20; index++)
{
string fileName = $"file-{index:00}.txt";
string filePath = $"{folder}/{fileName}";
string text = $"In file {index}{Environment.NewLine}";
writeTaskList.Add(File.WriteAllTextAsync(filePath, text));
}
int total = 1;
while (writeTaskList.Any())
{
Task finishedTask = await Task.WhenAny(writeTaskList);
writeTaskList.Remove(finishedTask);
await finishedTask;
Console.WriteLine($"已完成:{total++}");
}
Console.WriteLine("全部完成");
//await Task.WhenAll(writeTaskList);
}
#endregion
#region 并行异步 I/O-有限控制示例
public static async Task ProcessMultpleWritesAsync()
{
IList<FileStream> sourceStreams = new List<FileStream>();
try
{
string folder = Directory.CreateDirectory("tempfolder").Name;
IList<Task> writeTaskList = new List<Task>();
for (int index = 0; index < 100; index++)
{
string fileName = $"file-{index:00}.txt";
string filePath = $"{folder}/{fileName}";
string text = $"In file {index}{Environment.NewLine}";
byte[] encodedText = Encoding.Unicode.GetBytes(text);
var sourceStream = new FileStream(filePath, FileMode.Create, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true);
Task writeTask = sourceStream.WriteAsync(encodedText, 0, encodedText.Length);
sourceStreams.Add(sourceStream);
writeTaskList.Add(writeTask);
}
int total = 1;
while (writeTaskList.Any())
{
Task finishedTask = await Task.WhenAny(writeTaskList);
await finishedTask;
writeTaskList.Remove(finishedTask);
Console.WriteLine($"已完成:{total++:00}");
}
Console.WriteLine("全部完成");
//await Task.WhenAll(writeTaskList);
}
catch (Exception ex)
{
throw ex;
}
finally
{
foreach (FileStream sourceStream in sourceStreams)
{
sourceStream.Close();
}
}
}
#endregion
}