The Task-based Asynchronous Pattern Stephen Toub, Microsoft


Building Task-based Data Structures



страница7/7
Дата26.02.2020
Размер284.15 Kb.
1   2   3   4   5   6   7

Building Task-based Data Structures


In addition to the ability to build custom task-based combinators, having a data structure in Task and Task that represents both the results of an asynchronous operation as well as the necessary synchronization to join with it makes it a very powerful type on which to build custom data structures to be used in asynchronous scenarios.

AsyncCache


One important aspect of Task is that it may be handed out to multiple consumers, all of whom may await it, register continuations with it, get its result (in the case of Task) or exceptions, and so on. This makes Task and Task perfectly suited to be used in an asynchronous caching infrastructure. Here’s a small but powerful asynchronous cache built on top of Task:

public class AsyncCache

{

private readonly Func> _valueFactory;



private readonly ConcurrentDictionary>> _map;
public AsyncCache(Func> valueFactory)

{

if (valueFactory == null) throw new ArgumentNullException("loader");



_valueFactory = valueFactory;

_map = new ConcurrentDictionary>>();

}
public Task this[TKey key]

{

get



{

if (key == null) throw new ArgumentNullException("key");

return _map.GetOrAdd(key, toAdd =>

new Lazy>(() => _valueFactory(toAdd))).Value;

}

}

}


The AsyncCache class accepts as a delegate to its constructor a function that takes a TKey and returns a Task. Any previously accessed values from the cache are stored in the internal dictionary, with the AsyncCache ensuring that only one task is generated per key, even if the cache is accessed concurrently.

As an example of using this, we could build a cache for downloaded web pages, e.g.

private AsyncCache m_webPages =

new AsyncCache(DownloadStringAsync);


Now, we can use this in asynchronous methods whenever we need the contents of a web page, and the AsyncCache will ensure we’re downloading as few pages as possible, caching the results.

private async void btnDownload_Click(object sender, RoutedEventArgs e)

{

btnDownload.IsEnabled = false;



try

{

txtContents.Text = await m_webPages["http://www.microsoft.com"];



}

finally { btnDownload.IsEnabled = true; }

}

AsyncProducerConsumerCollection


Tasks can also be used to build data structures for coordinating between asynchronous activities. Consider one of the classic parallel design patterns: producer/consumer. In producer/consumer, producers generate data which is consumed by consumers, and the producers and consumers may run in parallel (e.g. the consumer processing item 1 which was previously generated by a producer now producing item 2). For producer/consumer, we invariably need some data structure to store the work created by producers so that the consumers may be notified of new data and find it when available. Here’s a simple data structure built on top of tasks that enables asynchronous methods to be used as producers and consumers:

public class AsyncProducerConsumerCollection

{

private readonly Queue m_collection = new Queue();



private readonly Queue> m_waiting =

new Queue>();


public void Add(T item)

{

TaskCompletionSource tcs = null;



lock (m_collection)

{

if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();



else m_collection.Enqueue(item);

}

if (tcs != null) tcs.TrySetResult(item);



}
public Task Take()

{

lock (m_collection)



{

if (m_collection.Count > 0)

{

return Task.FromResult(m_collection.Dequeue());



}

else


{

var tcs = new TaskCompletionSource();

m_waiting.Enqueue(tcs);

return tcs.Task;

}

}

}



}
With that in place, we can now write code like the following:

private static AsyncProducerConsumerCollection m_data = …;

private static async Task ConsumerAsync()



{

while(true)

{

int nextItem = await m_data.Take();



ProcessNextItem(nextItem);

}

}



private static void Produce(int data)

{

m_data.Add(data);


}
Included in .NET 4.5 is the System.Threading.Tasks.Dataflow.dll assembly. This assembly includes the BufferBlock type, which may be used in a similar manner and without having to build a custom collection type:

private static BufferBlock m_data = …;

private static async Task ConsumerAsync()



{

while(true)

{

int nextItem = await m_data.ReceiveAsync();



ProcessNextItem(nextItem);

}

}



private static void Produce(int data)

{

m_data.Post(data);


}

Interop with Other .NET Asynchronous Patterns and Types


The .NET Framework 1.0 saw the introduction of the IAsyncResult pattern, otherwise known as the Asynchronous Programming Model (APM) pattern, or the Begin/End pattern. The .NET Framework 2.0 then brought with it the event-based asynchronous pattern (EAP). The new TAP deprecates both of its predecessors, while at the same time providing the ability to easily build migration routines from the APM and EAP to TAP.

Tasks and the Asynchronous Programming Model (APM)

From APM to Tasks


The APM pattern relies on two corresponding methods to represent an asynchronous operation: BeginMethodName and EndMethodName. At a high-level, the begin method accepts as parameters to the method the same parameters that would be supplied to the MethodName synchronous method counterpart, as well as also accepting an AsyncCallback delegate and an object state. The begin method then returns an IAsyncResult, which returns from its AsyncState property the object state passed to the begin method. When the asynchronous operation completes, the IAsyncResult’s IsCompleted will start returning true, and its AsyncWaitHandle will be set. Additionally, if the AsyncCallback parameter to the begin method was non-null, the callback will be invoked and passed the same IAsyncResult that was returned from the begin method. When the asynchronous operation does complete, the EndMethodName method is used to join with the operation, retrieving any results or forcing any exceptions that occurred to then propagate. There are further details around the IAsyncResult’s CompletedSynchronously property that are beyond the scope of this document; for more information, see MSDN.

Given the very structured nature of the APM pattern, it is quite easy to build a wrapper for an APM implementation to expose it as a TAP implementation. In fact, the .NET Framework 4 includes helper routines in the form of TaskFactory.FromAsync to provide this translation.

Consider the .NET Stream class and its BeginRead/EndRead methods, which represent the APM counterpart to the synchronous Read method:

public int Read(

byte [] buffer, int offset, int count);

public IAsyncResult BeginRead(



byte [] buffer, int offset, int count,

AsyncCallback callback, object state);

public int EndRead(IAsyncResult asyncResult);
Utilizing FromAsync, we can implement a TAP wrapper for this method as follows:

public static Task ReadAsync(

this Stream stream, byte [] buffer, int offset, int count)

{

if (stream == null) throw new ArgumentNullException(“stream”);



return Task.Factory.FromAsync(stream.BeginRead, stream.EndRead,

buffer, offset, count, null);

}
This implementation that utilizes FromAsync is effectively equivalent to the following:

public static Task ReadAsync(

this Stream stream, byte [] buffer, int offset, int count)

{

if (stream == null) throw new ArgumentNullException(“stream”);



var tcs = new TaskCompletionSource();

stream.BeginRead(buffer, offset, count, iar =>

{

try { tcs.TrySetResult(stream.EndRead(iar)); }



catch(OperationCanceledException) { tcs.TrySetCanceled(); }

catch(Exception exc) { tcs.TrySetException(exc); }

}, null);

return tcs.Task;

}

From Tasks to APM


For cases where existing infrastructure expects code to implement the APM pattern, it is also important to be able to be able to take a TAP implementation and use it where an APM implementation is expected. Thanks to the composability of tasks, and the fact that Task itself implements IAsyncResult, this is achievable with a straightforward helper function (shown here as an extension for Task, but an almost identical function may be used for the non-generic Task):

public static IAsyncResult AsApm(

this Task task, AsyncCallback callback, object state)

{

if (task == null) throw new ArgumentNullException(“task”);



var tcs = new TaskCompletionSource(state);

task.ContinueWith(t =>

{

if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions)



else if (t.IsCanceled) tcs.TrySetCanceled();

else tcs.TrySetResult(t.Result);


if (callback != null) callback(tcs.Task);

}, TaskScheduler.Default);

return tcs.Task;

}
Now, consider a case where we have a TAP implementation:

public static Task DownloadStringAsync(Uri url);
and we need to provide an APM implementation:

public IAsyncResult BeginDownloadString(

Uri url, AsyncCallback callback, object state);

public string EndDownloadString(IAsyncResult asyncResult);


This is achievable with the following code:

public IAsyncResult BeginDownloadString(

Uri url, AsyncCallback callback, object state)

{

return DownloadStringAsync(url).AsApm(callback, state);



}
public string EndDownloadString(IAsyncResult asyncResult)

{

return ((Task)asyncResult).Result;



}

Tasks and the Event-based Asynchronous Pattern (EAP)


The event-based asynchronous pattern relies on an instance MethodNameAsync method which returns void, accepts the same parameters as the synchronous MethodName method, and initiates the asynchronous operation. Prior to initiating the asynchronous operation, event handlers are registered with events on the same instance, and these events are then raised to provide progress and completion notifications. The event handlers are typically custom delegate types that utilize event argument types that are or that are derived from ProgressChangedEventArgs and AsyncCompletedEventArgs.

Wrapping an EAP implementation is more involved, as the pattern itself involves much more variation and less structure than the APM pattern. To demonstrate, we’ll wrap the DownloadStringAsync method. DownloadStringAsync accepts a Uri, raises the DownloadProgressChanged event while downloading in order to report multiple statistics on progress, and raises the DownloadStringCompleted event when done. The final result is a string containing the contents of the page at the specified Uri.

public static Task DownloadStringAsync(Uri url)

{

var tcs = new TaskCompletionSource();



var wc = new WebClient();

wc.DownloadStringCompleted += (s,e) =>

{

if (e.Error != null) tcs.TrySetException(e.Error);



else if (e.Cancelled) tcs.TrySetCanceled();

else tcs.TrySetResult(e.Result);

};

wc.DownloadStringAsync(url);



return tcs.Task;

}

Tasks and WaitHandles

From WaitHandles to Tasks


While not an asynchronous pattern per-se, advanced developers may find themselves utilizing WaitHandles and the ThreadPool’s RegisterWaitForSingleObject method to be notified asynchronously when a WaitHandle is set. We can wrap RegisterWaitForSingleObject to enable a task-based alternative to any synchronous wait on a WaitHandle:

public static Task WaitOneAsync(this WaitHandle waitHandle)

{

if (waitHandle == null) throw new ArgumentNullException("waitHandle");


var tcs = new TaskCompletionSource();

var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,

delegate { tcs.TrySetResult(true); }, null, -1, true);

var t = tcs.Task;

t.ContinueWith(_ => rwh.Unregister(null));

return t;

}
With a method like this in hand, we can utilize existing WaitHandle implementations in asynchronous methods. For example, consider the need to throttle the number of asynchronous operations executing at any particular time. For this, we can utilize a System.Threading.Semaphore. By initializing the semaphore’s count to N, waiting on the semaphore any time we want to perform an operation, and releasing the semaphore when we’re done with an operation, we can throttle to N the number of operations that run concurrently.

static Semaphore m_throttle = new Semaphore(N, N);


static async Task DoOperation()

{

await m_throttle.WaitOneAsync();



… // do work

m_throttle.ReleaseOne();

}
Using techniques as those demonstrated in this document’s previous section on building data structures on top of Task, it is similarly possible to build an asynchronous semaphore that does not rely on WaitHandles and instead works completely in terms of Task. In fact, the SemaphoreSlim type in .NET 4.5 exposes a WaitAsync method that enables this:

For example, the aforementioned BufferBlock type from System.Threading.Tasks.Dataflow.dll may be used towards a similar end:

static SemaphoreSlim m_throttle = new SemaphoreSlim(N, N);
static async Task DoOperation()

{

await m_throttle.WaitAsync();



… // do work

m_throttle.Release ();

}

From Tasks to WaitHandles


As previously mentioned, the Task class implements IAsyncResult, and its IAsyncResult implementation exposes an AsyncWaitHandle property which returns a WaitHandle that will be set when the Task completes. As such, getting a WaitHandle for a Task is accomplished as follows:

WaitHandle wh = ((IAsyncResult)task).AsyncWaitHandle;



Case Study: CopyToAsync


The ability to copy one stream to another is a useful and common operation. The Stream.CopyTo instance method was added in .NET 4 to accommodate scenarios that require this functionality, such as downloading the data at a specified URL:

public static byte[] DownloadData(string url)

{

using(var request = WebRequest.Create(url))



using(var response = request.GetResponse())

using(var responseStream = response.GetResponseStream())

using(var result = new MemoryStream())

{

responseStream.CopyTo(result);



return result.ToArray();

}

}


We would like to be able to implement a method like the above with the Task-based Asynchronous Pattern so as to improve responsiveness and scalability. We might attempt to do so as follows:

public static async Task DownloadDataAsync(string url)

{

using(var request = WebRequest.Create(url))



{

return await Task.Run(() =>

{

using(var response = request.GetResponse())



using(var responseStream = response.GetResponseStream())

using(var result = new MemoryStream())

{

responseStream.CopyTo(result);



return result.ToArray();

}

}



}

}
This implementation would improve responsiveness if utilized, for example, from a UI thread, as it offloads from the calling thread the work of downloading the data from the network stream and copying it to the memory stream which will ultimately be used to yield the downloaded data as an array. However, this implementation does not help with scalability, as it’s still performing synchronous I/O and blocking a ThreadPool thread in the process while waiting for data to be downloaded. Instead, we would like to be able to write the following function:

public static async Task DownloadDataAsync(string url)

{

using(var request = WebRequest.Create(url))



using(var response = await request.GetResponseAsync())

using(var responseStream = response.GetResponseStream())

using(var result = new MemoryStream())

{

await responseStream.CopyToAsync(result);



return result.ToArray();

}

}


Unfortunately, while Stream has a synchronous CopyTo method, in .NET 4 it lacks an asynchronous CopyToAsync method. We will now walk through providing such an implementation.

A synchronous CopyTo method could be implemented as follows:

public static void CopyTo(this Stream source, Stream destination)

{

var buffer = new byte[0x1000];



int bytesRead;

while((bytesRead = source.Read(buffer, 0, buffer.Length)) > 0)

{

destination.Write(buffer, 0, bytesRead);



}

}
To provide an asynchronous implementation of CopyTo, utilizing the compiler’s ability to implement the TAP, we can modify this implementation slightly:

public static async Task CopyToAsync(this Stream source, Stream destination)

{

var buffer = new byte[0x1000];



int bytesRead;

while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)

{

await destination.WriteAsync(buffer, 0, bytesRead);



}

}
Here, we changed the return type from void to Task, we utilized ReadAsync instead of Read and WriteAsync instead of Write, and we prefixed the calls to ReadAsync and WriteAsync with the await contextual keyword. Following the pattern, we also renamed our method by appending “Async” as a suffix. The ReadAsync and WriteAsync don’t exist in .NET 4, but they could be implemented with one statement based on Task.Factory.FromAsync as described in the “Tasks and the Asynchronous Programming Model” section of this docment:

public static Task ReadAsync(

this Stream source, byte [] buffer, int offset, int count)

{

return Task.Factory.FromAsync(source.BeginRead, source.EndRead,



buffer, offset, count, null);

}
public static Task WriteAsync(

this Stream destination, byte [] buffer, int offset, int count)

{

return Task.Factory.FromAsync(



destination.BeginWrite, destination.EndWrite,

buffer, offset, count, null);

}
With these methods in hand, we can now successfully implement the CopyToAsync method. We can also optionally support cancellation in the method by adding a CancellationToken that will, for example, be monitored during the copy after every read and write pair (if ReadAsync and/or WriteAsync supported cancellation, the CancellationToken could also be threaded into those calls):

public static async Task CopyToAsync(

this Stream source, Stream destination,

CancellationToken cancellationToken)

{

var buffer = new byte[0x1000];



int bytesRead;

while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)

{

await destination.WriteAsync(buffer, 0, bytesRead);



cancellationToken.ThrowIfCancellationRequested();

}

}


(Note that such cancellation could also be useful in a synchronous implementation of CopyTo, and the ability to pass in a CancellationToken enables this. Approaches that would rely on a cancelable object being returned from the method would receive that object too late, since by the time the synchronous call completed, there would be nothing left to cancel.)

We could also add support for progress notification, including how much data has thus far been copied:

public static async Task CopyToAsync(

this Stream source, Stream destination,

CancellationToken cancellationToken,

IProgress progress)

{

var buffer = new byte[0x1000];



int bytesRead;

long totalRead = 0;

while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)

{

await destination.WriteAsync(buffer, 0, bytesRead);



cancellationToken.ThrowIfCancellationRequested();

totalRead += bytesRead;

progress.Report(totalRead);

}

}


With this method in hand, we can now fully implement our DownloadDataAsync method, including now adding in cancellation and progress support:

public static async Task DownloadDataAsync(

string url,

CancellationToken cancellationToken,

IProgress progress)

{

using(var request = WebRequest.Create(url))



using(var response = await request.GetResponseAsync())

using(var responseStream = response.GetResponseStream())

using(var result = new MemoryStream())

{

await responseStream.CopyToAsync(



result, cancellationToken, progress);

return result.ToArray();

}

}
Further optimizations are also possible for our CopyToAsync method. For example, if we were to use two buffers instead of one, we could be writing the previously read data while reading in the next piece of data, thus overlapping latencies if both the read and the write are utilizing asynchronous I/O:



public static async Task CopyToAsync(this Stream source, Stream destination)

{

int i = 0;



var buffers = new [] { new byte[0x1000], new byte[0x1000] };

Task writeTask = null;

while(true)

{

var readTask = source.ReadAsync(buffers[i], 0, buffers[i].Length))>0;



if (writeTask != null) await Task.WhenAll(readTask, writeTask);

int bytesRead = await readTask;

if (bytesRead == 0) break;

writeTask = destination.WriteAsync(buffers[i], 0, bytesRead);

i ^= 1; // swap buffers

}

}


Another optimization is to eliminate unnecessary context switches. As mentioned earlier in this document, by default awaiting on a Task will transition back to the SynchronizationContext that was current when the await began. In the case of the CopyToAsync implementation, there’s no need to employ such transitions, since we’re not manipulating any UI state. We can take advantage of the Task.ConfigureAwait method to disable this automatic switch. For simplicity, changes are shown on the original asynchronous implementation from above:

public static Task CopyToAsync(this Stream source, Stream destination)

{

var buffer = new byte[0x1000];



int bytesRead;

while((bytesRead = await

source.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false)) > 0)

{

await destination.WriteAsync(buffer, 0, bytesRead)



.ConfigureAwait(false);

}

}






Поделитесь с Вашими друзьями:
1   2   3   4   5   6   7


База данных защищена авторским правом ©obuch.info 2019
отнасят до администрацията

    Начална страница