Async SelectMany implementation












0












$begingroup$


Async SelectMany takes an enumeration (length unknown) of tasks, each of which returns another enumeration (length unknown and likely different from one another), and returns results from the second dimension as soon as they are ready. I used the name FoldAsync to make it descriptive, but I'm open to suggestions.



Imagine a task, identified with a number, runs for that many seconds then returns. The 2d array (first # being the time for the first dimension task) of tasks could be



1a  : 6     11  16  
17 : 33 1b
22 : 3 4 5


Different ways of nesting Task.WhenAll could result in the main thread iterating through them like so:



    Depth first             Breadth first
time : task id time : task id
1 : 1a 1 : 1a
7 : 6 17 : 17
12 : 11 22 : 22
17 : 16 23 : 1b
17 : 17 25 : 3
18 : 1b 26 : 4
50 : 33 27 : 5
50 : 22 28 : 6
50 : 3 33 : 11
50 : 4 38 : 16
50 : 5 50 : 33


Async SelectMany accomplishes:



time    :   task id
1 : 1a
7 : 6
12 : 11
17 : (16, 17)
17 : (16, 17)
18 : 1b
22 : 22
25 : 3
26 : 4
27 : 5
50 : 33


Task.WhenAny can obtain the same results, but you'd have to maintain a list of both Task<IEnumerable<Task<T>>>, and Task<T> which you're now casting, and spend O(n^2) iterating through it. It's workable, but I like cool code instead.



The following is my implementation (with 800% too many comments). The /// documentation comments are messy because I couldn't figure out how to display nested generics inside. I tried to manage cancellation and Exceptions in a sane manner. I'm looking for comments, critiques, and hopefully improvements.



using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace WhereIWork.Utilities.Async
{
/// <summary>
/// A helper class containing the <see cref="FoldAsync{TResult}(IEnumerable{Task{IEnumerable{Task{TResult}}}})"/> extension method that is an async equivalent of <see cref="System.Linq.Enumerable"/>.SelectMany.
/// Also provides an overload taking a <see cref="CancellationToken"/>
/// </summary>
/// <remarks>
/// If the size of your dimensions are known, use <see cref="InterleavingExtensions.Interleaved{TResult}"/> instead.
/// <see cref="FoldAsync{TResult}(IEnumerable{Task{IEnumerable{Task{TResult}}}})"/> blocks, because it cannot know the Count of its results until the all tasks in the first dimension are complete.
/// </remarks>
public static class AsyncEnumerable
{
/// <summary>
/// An asynchronous version of <see cref="System.Linq.Enumerable"/>.SelectMany.
/// Will return results in the order they are completed and not the order from the two-dimensional <see cref="IEnumerable"/>.
/// </summary>
/// <typeparam name="TResult">The type of the expected results.</typeparam>
/// <param name="tasks">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
/// <returns>An <see cref="IEnumerable{Task}"/> which will block as its results are iterated.</returns>
public static IEnumerable<Task<TResult>> FoldAsync<TResult>(this IEnumerable<Task<IEnumerable<Task<TResult>>>> tasks)
{
return new TaskFolder<TResult>(tasks);
}

/// <summary>
/// An asynchronous version of <see cref="System.Linq.Enumerable"/>.SelectMany.
/// Will return results in the order they are completed and not the order from the two-dimensional <see cref="IEnumerable"/>
/// </summary>
/// <typeparam name="TResult">The type of the expected results.</typeparam>
/// <param name="tasks">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
/// <param name="token">The cancellation token to be passed into every <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; and <see cref="Task{TResult}"/>.</param>
/// <returns>An <see cref="IEnumerable{Task}"/> which will block as its results are iterated.</returns>
public static IEnumerable<Task<TResult>> FoldAsync<TResult>(this IEnumerable<Task<IEnumerable<Task<TResult>>>> tasks, CancellationToken token)
{
return new TaskFolder<TResult>(tasks, token);
}
}

/// <summary>
/// Implements <see cref="System.Linq.Enumerable"/>.SelectMany(<see cref="IEnumerable"/>&lt;<see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt;&gt;,
/// Func&lt;<see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt;,
/// <see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt;)
/// <br/>The advantage of this class is that we process results in the order they are ready, without waiting for tasks that are listed earlier in the initial enumeration.
/// </summary>
/// <typeparam name="TResult">The common type of the result of all <see cref="Task{TResult}"/> being folded.</typeparam>
// ReSharper disable once InheritdocConsiderUsage
internal sealed class TaskFolder<TResult> : IEnumerable<Task<TResult>>
{
/// <summary>
/// The collection to which we will post results as they are ready.
/// The <see cref="IEnumerator{TResult}"/> returned by this class comes from this collection.
/// </summary>
private readonly BlockingCollection<Task<TResult>> _collection = new BlockingCollection<Task<TResult>>();

private readonly CancellationToken _token;

/// <summary>
/// The number of active tasks.
/// </summary>
private int _taskCount;

/// <summary>
/// The current state of this <see cref="TaskFolder{TResult}"/>
/// </summary>
private int _folderState = (int)FolderState.Initial;

/// <summary>
/// The possible states for a <see cref="TaskFolder{TResult}"/>.
/// </summary>
[Flags]
private enum FolderState
{
/// <summary>
/// The initial state
/// </summary>
Initial = 0,
/// <summary>
/// All tasks have been completed
/// </summary>
Task = 1,
/// <summary>
/// The <see cref="IEnumerator"/>&lt;<see cref="Task{TResult}"/>&gt; has been requested.
/// </summary>
Fetching = 2,
/// <inheritdoc cref="Task"/>
/// <inheritdoc cref="Fetching"/>
TaskFetching = 3,
/// <summary>
/// The <see cref="IEnumerator"/>&lt;<see cref="Task{TResult}"/>&gt; from <see cref="_collection"/> has been obtained.
/// </summary>
Fetched = 4,
/// <inheritdoc cref="Task"/>
/// <inheritdoc cref="Fetched"/>
TaskFetched = 5,
/// <inheritdoc cref="Task"/>
/// <inheritdoc cref="Fetched"/>
/// <summary>
/// The underlying <see cref="BlockingCollection{T}"/> has been disposed.
/// </summary>
TaskFetchedDisposed = 13
}

/// <summary>
/// Creates a new TaskFolder and initiates the process of listening for completions.
/// </summary>
/// <param name="twoDimensionalWork">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
// ReSharper disable once InheritdocConsiderUsage
internal TaskFolder(IEnumerable<Task<IEnumerable<Task<TResult>>>> twoDimensionalWork) : this(twoDimensionalWork, CancellationToken.None)
{
}

/// <summary>
/// Creates a new TaskFolder with a cancellation token, and initiates the process of listening for completions.
/// </summary>
/// <param name="twoDimensionalWork">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
/// <param name="cancellationToken">The cancellation token to be passed into every <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; and <see cref="Task{TResult}"/>.</param>
internal TaskFolder(IEnumerable<Task<IEnumerable<Task<TResult>>>> twoDimensionalWork, CancellationToken cancellationToken)
{
// This initial increment represents the need to add the continuation to all first dimension tasks before we complete the BlockingCollection.
Interlocked.Increment(ref _taskCount);

_token = cancellationToken;
foreach (var outerTask in twoDimensionalWork)
{
if (_token.IsCancellationRequested)
{
AdvanceTaskCompletion();
Dispose();
throw new OperationCanceledException(_token);
}

// Increment first, then any decrementing in OuterContinuation will necessarily not be premature.
Interlocked.Increment(ref _taskCount);

// As this is an example of Dynamic Task Parallelism, we do not await the result of ContinueWith
// Because we manage completion using _taskCount, we do not need to track the continuation to guarantee its completion.
// Do not use an overload without the TaskScheduler parameter. http://blog.stephencleary.com/2015/01/a-tour-of-task-part-7-continuations.html
outerTask.ContinueWith(OuterContinuation, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}

DecrementTaskCounter();
}

/// <summary>
/// Called when a first dimension <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; is complete.
/// If there was an exception, it will be returned as a <see cref="Task{TResult}"/> in the enumeration.
/// </summary>
/// <remarks>
/// Not async since we have another method of tracking the completion of newly created tasks.
/// </remarks>
/// <param name="task">The first dimension <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; that has completed.</param>
private void OuterContinuation(Task<IEnumerable<Task<TResult>>> task)
{
if (IsCanceled())
{
return;
}

// In both the faulted and canceled states, there are no second dimension Task<TResult>, and so we ignore _taskCount.
// If the task has faulted, add an exception to the results.
if (task.IsFaulted)
{
Debug.Assert(task.Exception != null, "If a task if faulted, it should have an exception.");
_collection.Add(Task.FromException<TResult>(task.Exception), _token);
DecrementTaskCounter();
return;
}

// If the task was cancelled, add a cancellation to the results. And it won't have children tasks to deal with.
if (task.IsCanceled || _token.IsCancellationRequested)
{
_collection.Add(Task.FromCanceled<TResult>(_token), _token);
DecrementTaskCounter();
return;
}

// By exclusion, the task has completed and we can safely use the Result without worrying about blocking.
Debug.Assert(task.Status == TaskStatus.RanToCompletion, "The continuation should not be called when the Task is not complete");
foreach (var innerTask in task.Result)
{
if (IsCanceled())
{
return;
}

// Increment first, then any decrementing in InnerContinuation won't cause _taskCount to go to 0
Interlocked.Increment(ref _taskCount);

// As this is an example of Dynamic Task Parallelism, we do not await the result of ContinueWith
// Because we manage completion using _taskCount, we do not need to track the continuation to guarantee its completion.
// Do not use an overload without the TaskScheduler parameter. http://blog.stephencleary.com/2015/01/a-tour-of-task-part-7-continuations.html
innerTask.ContinueWith(InnerContinuation, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}

// Decrement at the end, ensuring that all second dimension Task<TResult> have contributed to the counter.
DecrementTaskCounter();
}

/// <summary>
/// Called when a result is ready to be added to <see cref="_collection"/> and returned as part of this <see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;
/// </summary>
/// <remarks>
/// Not async since we do not use the result of the <see cref="Task{TResult}"/> and simply return it still wrapped.
/// </remarks>
/// <param name="task">The next result to return to the consumer.</param>
private void InnerContinuation(Task<TResult> task)
{
if (IsCanceled())
{
return;
}

// No special handling for various TaskStatus, these are returned to the user as part of the task's result.
_collection.Add(task, _token);
DecrementTaskCounter();
}

/// <summary>
/// Checks if <see cref="_token"/> has <see cref="CancellationToken.IsCancellationRequested"/> set to true.
/// If so, will call <see cref="AdvanceTaskCompletion"/> to stop processing further results.
/// </summary>
/// <returns>
/// <see cref="_token"/>.IsCancellationRequested
/// </returns>
private bool IsCanceled()
{
if (!_token.IsCancellationRequested)
{
return false;
}

// Pretend all the tasks are done, we won't be adding more results.
_taskCount = 0;
AdvanceTaskCompletion();

return true;
}

/// <summary>
/// Calls <see cref="BlockingCollection{T}.CompleteAdding"/> to tell it we have finished processing <see cref="Task{TResult}"/>.
/// Sets the <see cref="FolderState.Task"/> flag on <see cref="_folderState"/> to indicate all <see cref="Task{TResult}"/> have been completed.
/// </summary>
private void AdvanceTaskCompletion()
{
//If this has already been run, ignore it.
if ((_folderState & (int)FolderState.Task) != 0)
{
return;
}

// Completed or Canceled both mean we won't be adding anything new to the collection
// If we're the first to advance to Task complete
// ReSharper disable once InvertIf
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.Task, (int)FolderState.Initial) == (int)FolderState.Initial ||
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetching, (int)FolderState.Fetching) == (int)FolderState.Fetching ||
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetched, (int)FolderState.Fetched) == (int)FolderState.TaskFetched)
{
_collection.CompleteAdding();

// If we have Fetched and completed the Tasks we can dispose the collection early.
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetchedDisposed, (int)FolderState.TaskFetched) == (int)FolderState.TaskFetched)
{
_collection.Dispose();
}
}
}

/// <summary>
/// Decrements <see cref="_taskCount"/>, and if all the tasks have completed, will advance the <see cref="FolderState"/>.
/// </summary>
private void DecrementTaskCounter()
{
if (Interlocked.Decrement(ref _taskCount) > 0)
{
return;
}

AdvanceTaskCompletion();
}

/// <summary>
/// Returns an <see cref="IEnumerator"/> which will provide <see cref="Task{TResult}"/> in the order they have completed, not the order they were supplied.
/// </summary>
/// <returns>A blocking <see cref="IEnumerator"/>&lt;<see cref="Task{TResult}"/>&gt;.</returns>
// ReSharper disable once InheritdocConsiderUsage
public IEnumerator<Task<TResult>> GetEnumerator()
{
// Allow only one thread to advance into Fetching
// ReSharper disable once InvertIf
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.Fetching, (int)FolderState.Initial) == (int)FolderState.Initial ||
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetching, (int)FolderState.Task) == (int)FolderState.Task)
{
var result = _collection.GetConsumingEnumerable(_token).GetEnumerator();

// Advance to Fetched - No need to check initial state since we're the only thread that can be in here.
// Still interlocked so we don't interfere with advancing Task
Interlocked.CompareExchange(ref _folderState, (int)FolderState.Fetched, (int)FolderState.Fetching);
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetched, (int)FolderState.TaskFetching);

// If we have Fetched and completed the Tasks we can dispose
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetchedDisposed, (int)FolderState.TaskFetched) == (int)FolderState.TaskFetched)
{
_collection.Dispose();
}

return result;
}

Dispose();
throw new InvalidOperationException("The enumerator has already been fetched. Cannot Enumerate this object twice.");
}

/// <inheritdoc />
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

/// <summary>
/// Dispose of the underlying <see cref="BlockingCollection{T}"/> and set the <see cref="FolderState"/> to <see cref="FolderState.TaskFetchedDisposed"/>
/// </summary>
public void Dispose()
{
if (Interlocked.Exchange(ref _folderState, (int)FolderState.TaskFetchedDisposed) != (int)FolderState.TaskFetchedDisposed)
{
_collection.Dispose();
}
}
}
}









share|improve this question











$endgroup$












  • $begingroup$
    The words Cancellation, cancelled, and cancelling have lost all meaning and are probably spelled a variety of ways with one or two l's. <see cref"en.wikipedia.org/wiki/Semantic_satiation" />
    $endgroup$
    – Jean-Bernard Pellerin
    2 hours ago


















0












$begingroup$


Async SelectMany takes an enumeration (length unknown) of tasks, each of which returns another enumeration (length unknown and likely different from one another), and returns results from the second dimension as soon as they are ready. I used the name FoldAsync to make it descriptive, but I'm open to suggestions.



Imagine a task, identified with a number, runs for that many seconds then returns. The 2d array (first # being the time for the first dimension task) of tasks could be



1a  : 6     11  16  
17 : 33 1b
22 : 3 4 5


Different ways of nesting Task.WhenAll could result in the main thread iterating through them like so:



    Depth first             Breadth first
time : task id time : task id
1 : 1a 1 : 1a
7 : 6 17 : 17
12 : 11 22 : 22
17 : 16 23 : 1b
17 : 17 25 : 3
18 : 1b 26 : 4
50 : 33 27 : 5
50 : 22 28 : 6
50 : 3 33 : 11
50 : 4 38 : 16
50 : 5 50 : 33


Async SelectMany accomplishes:



time    :   task id
1 : 1a
7 : 6
12 : 11
17 : (16, 17)
17 : (16, 17)
18 : 1b
22 : 22
25 : 3
26 : 4
27 : 5
50 : 33


Task.WhenAny can obtain the same results, but you'd have to maintain a list of both Task<IEnumerable<Task<T>>>, and Task<T> which you're now casting, and spend O(n^2) iterating through it. It's workable, but I like cool code instead.



The following is my implementation (with 800% too many comments). The /// documentation comments are messy because I couldn't figure out how to display nested generics inside. I tried to manage cancellation and Exceptions in a sane manner. I'm looking for comments, critiques, and hopefully improvements.



using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace WhereIWork.Utilities.Async
{
/// <summary>
/// A helper class containing the <see cref="FoldAsync{TResult}(IEnumerable{Task{IEnumerable{Task{TResult}}}})"/> extension method that is an async equivalent of <see cref="System.Linq.Enumerable"/>.SelectMany.
/// Also provides an overload taking a <see cref="CancellationToken"/>
/// </summary>
/// <remarks>
/// If the size of your dimensions are known, use <see cref="InterleavingExtensions.Interleaved{TResult}"/> instead.
/// <see cref="FoldAsync{TResult}(IEnumerable{Task{IEnumerable{Task{TResult}}}})"/> blocks, because it cannot know the Count of its results until the all tasks in the first dimension are complete.
/// </remarks>
public static class AsyncEnumerable
{
/// <summary>
/// An asynchronous version of <see cref="System.Linq.Enumerable"/>.SelectMany.
/// Will return results in the order they are completed and not the order from the two-dimensional <see cref="IEnumerable"/>.
/// </summary>
/// <typeparam name="TResult">The type of the expected results.</typeparam>
/// <param name="tasks">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
/// <returns>An <see cref="IEnumerable{Task}"/> which will block as its results are iterated.</returns>
public static IEnumerable<Task<TResult>> FoldAsync<TResult>(this IEnumerable<Task<IEnumerable<Task<TResult>>>> tasks)
{
return new TaskFolder<TResult>(tasks);
}

/// <summary>
/// An asynchronous version of <see cref="System.Linq.Enumerable"/>.SelectMany.
/// Will return results in the order they are completed and not the order from the two-dimensional <see cref="IEnumerable"/>
/// </summary>
/// <typeparam name="TResult">The type of the expected results.</typeparam>
/// <param name="tasks">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
/// <param name="token">The cancellation token to be passed into every <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; and <see cref="Task{TResult}"/>.</param>
/// <returns>An <see cref="IEnumerable{Task}"/> which will block as its results are iterated.</returns>
public static IEnumerable<Task<TResult>> FoldAsync<TResult>(this IEnumerable<Task<IEnumerable<Task<TResult>>>> tasks, CancellationToken token)
{
return new TaskFolder<TResult>(tasks, token);
}
}

/// <summary>
/// Implements <see cref="System.Linq.Enumerable"/>.SelectMany(<see cref="IEnumerable"/>&lt;<see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt;&gt;,
/// Func&lt;<see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt;,
/// <see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt;)
/// <br/>The advantage of this class is that we process results in the order they are ready, without waiting for tasks that are listed earlier in the initial enumeration.
/// </summary>
/// <typeparam name="TResult">The common type of the result of all <see cref="Task{TResult}"/> being folded.</typeparam>
// ReSharper disable once InheritdocConsiderUsage
internal sealed class TaskFolder<TResult> : IEnumerable<Task<TResult>>
{
/// <summary>
/// The collection to which we will post results as they are ready.
/// The <see cref="IEnumerator{TResult}"/> returned by this class comes from this collection.
/// </summary>
private readonly BlockingCollection<Task<TResult>> _collection = new BlockingCollection<Task<TResult>>();

private readonly CancellationToken _token;

/// <summary>
/// The number of active tasks.
/// </summary>
private int _taskCount;

/// <summary>
/// The current state of this <see cref="TaskFolder{TResult}"/>
/// </summary>
private int _folderState = (int)FolderState.Initial;

/// <summary>
/// The possible states for a <see cref="TaskFolder{TResult}"/>.
/// </summary>
[Flags]
private enum FolderState
{
/// <summary>
/// The initial state
/// </summary>
Initial = 0,
/// <summary>
/// All tasks have been completed
/// </summary>
Task = 1,
/// <summary>
/// The <see cref="IEnumerator"/>&lt;<see cref="Task{TResult}"/>&gt; has been requested.
/// </summary>
Fetching = 2,
/// <inheritdoc cref="Task"/>
/// <inheritdoc cref="Fetching"/>
TaskFetching = 3,
/// <summary>
/// The <see cref="IEnumerator"/>&lt;<see cref="Task{TResult}"/>&gt; from <see cref="_collection"/> has been obtained.
/// </summary>
Fetched = 4,
/// <inheritdoc cref="Task"/>
/// <inheritdoc cref="Fetched"/>
TaskFetched = 5,
/// <inheritdoc cref="Task"/>
/// <inheritdoc cref="Fetched"/>
/// <summary>
/// The underlying <see cref="BlockingCollection{T}"/> has been disposed.
/// </summary>
TaskFetchedDisposed = 13
}

/// <summary>
/// Creates a new TaskFolder and initiates the process of listening for completions.
/// </summary>
/// <param name="twoDimensionalWork">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
// ReSharper disable once InheritdocConsiderUsage
internal TaskFolder(IEnumerable<Task<IEnumerable<Task<TResult>>>> twoDimensionalWork) : this(twoDimensionalWork, CancellationToken.None)
{
}

/// <summary>
/// Creates a new TaskFolder with a cancellation token, and initiates the process of listening for completions.
/// </summary>
/// <param name="twoDimensionalWork">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
/// <param name="cancellationToken">The cancellation token to be passed into every <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; and <see cref="Task{TResult}"/>.</param>
internal TaskFolder(IEnumerable<Task<IEnumerable<Task<TResult>>>> twoDimensionalWork, CancellationToken cancellationToken)
{
// This initial increment represents the need to add the continuation to all first dimension tasks before we complete the BlockingCollection.
Interlocked.Increment(ref _taskCount);

_token = cancellationToken;
foreach (var outerTask in twoDimensionalWork)
{
if (_token.IsCancellationRequested)
{
AdvanceTaskCompletion();
Dispose();
throw new OperationCanceledException(_token);
}

// Increment first, then any decrementing in OuterContinuation will necessarily not be premature.
Interlocked.Increment(ref _taskCount);

// As this is an example of Dynamic Task Parallelism, we do not await the result of ContinueWith
// Because we manage completion using _taskCount, we do not need to track the continuation to guarantee its completion.
// Do not use an overload without the TaskScheduler parameter. http://blog.stephencleary.com/2015/01/a-tour-of-task-part-7-continuations.html
outerTask.ContinueWith(OuterContinuation, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}

DecrementTaskCounter();
}

/// <summary>
/// Called when a first dimension <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; is complete.
/// If there was an exception, it will be returned as a <see cref="Task{TResult}"/> in the enumeration.
/// </summary>
/// <remarks>
/// Not async since we have another method of tracking the completion of newly created tasks.
/// </remarks>
/// <param name="task">The first dimension <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; that has completed.</param>
private void OuterContinuation(Task<IEnumerable<Task<TResult>>> task)
{
if (IsCanceled())
{
return;
}

// In both the faulted and canceled states, there are no second dimension Task<TResult>, and so we ignore _taskCount.
// If the task has faulted, add an exception to the results.
if (task.IsFaulted)
{
Debug.Assert(task.Exception != null, "If a task if faulted, it should have an exception.");
_collection.Add(Task.FromException<TResult>(task.Exception), _token);
DecrementTaskCounter();
return;
}

// If the task was cancelled, add a cancellation to the results. And it won't have children tasks to deal with.
if (task.IsCanceled || _token.IsCancellationRequested)
{
_collection.Add(Task.FromCanceled<TResult>(_token), _token);
DecrementTaskCounter();
return;
}

// By exclusion, the task has completed and we can safely use the Result without worrying about blocking.
Debug.Assert(task.Status == TaskStatus.RanToCompletion, "The continuation should not be called when the Task is not complete");
foreach (var innerTask in task.Result)
{
if (IsCanceled())
{
return;
}

// Increment first, then any decrementing in InnerContinuation won't cause _taskCount to go to 0
Interlocked.Increment(ref _taskCount);

// As this is an example of Dynamic Task Parallelism, we do not await the result of ContinueWith
// Because we manage completion using _taskCount, we do not need to track the continuation to guarantee its completion.
// Do not use an overload without the TaskScheduler parameter. http://blog.stephencleary.com/2015/01/a-tour-of-task-part-7-continuations.html
innerTask.ContinueWith(InnerContinuation, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}

// Decrement at the end, ensuring that all second dimension Task<TResult> have contributed to the counter.
DecrementTaskCounter();
}

/// <summary>
/// Called when a result is ready to be added to <see cref="_collection"/> and returned as part of this <see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;
/// </summary>
/// <remarks>
/// Not async since we do not use the result of the <see cref="Task{TResult}"/> and simply return it still wrapped.
/// </remarks>
/// <param name="task">The next result to return to the consumer.</param>
private void InnerContinuation(Task<TResult> task)
{
if (IsCanceled())
{
return;
}

// No special handling for various TaskStatus, these are returned to the user as part of the task's result.
_collection.Add(task, _token);
DecrementTaskCounter();
}

/// <summary>
/// Checks if <see cref="_token"/> has <see cref="CancellationToken.IsCancellationRequested"/> set to true.
/// If so, will call <see cref="AdvanceTaskCompletion"/> to stop processing further results.
/// </summary>
/// <returns>
/// <see cref="_token"/>.IsCancellationRequested
/// </returns>
private bool IsCanceled()
{
if (!_token.IsCancellationRequested)
{
return false;
}

// Pretend all the tasks are done, we won't be adding more results.
_taskCount = 0;
AdvanceTaskCompletion();

return true;
}

/// <summary>
/// Calls <see cref="BlockingCollection{T}.CompleteAdding"/> to tell it we have finished processing <see cref="Task{TResult}"/>.
/// Sets the <see cref="FolderState.Task"/> flag on <see cref="_folderState"/> to indicate all <see cref="Task{TResult}"/> have been completed.
/// </summary>
private void AdvanceTaskCompletion()
{
//If this has already been run, ignore it.
if ((_folderState & (int)FolderState.Task) != 0)
{
return;
}

// Completed or Canceled both mean we won't be adding anything new to the collection
// If we're the first to advance to Task complete
// ReSharper disable once InvertIf
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.Task, (int)FolderState.Initial) == (int)FolderState.Initial ||
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetching, (int)FolderState.Fetching) == (int)FolderState.Fetching ||
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetched, (int)FolderState.Fetched) == (int)FolderState.TaskFetched)
{
_collection.CompleteAdding();

// If we have Fetched and completed the Tasks we can dispose the collection early.
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetchedDisposed, (int)FolderState.TaskFetched) == (int)FolderState.TaskFetched)
{
_collection.Dispose();
}
}
}

/// <summary>
/// Decrements <see cref="_taskCount"/>, and if all the tasks have completed, will advance the <see cref="FolderState"/>.
/// </summary>
private void DecrementTaskCounter()
{
if (Interlocked.Decrement(ref _taskCount) > 0)
{
return;
}

AdvanceTaskCompletion();
}

/// <summary>
/// Returns an <see cref="IEnumerator"/> which will provide <see cref="Task{TResult}"/> in the order they have completed, not the order they were supplied.
/// </summary>
/// <returns>A blocking <see cref="IEnumerator"/>&lt;<see cref="Task{TResult}"/>&gt;.</returns>
// ReSharper disable once InheritdocConsiderUsage
public IEnumerator<Task<TResult>> GetEnumerator()
{
// Allow only one thread to advance into Fetching
// ReSharper disable once InvertIf
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.Fetching, (int)FolderState.Initial) == (int)FolderState.Initial ||
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetching, (int)FolderState.Task) == (int)FolderState.Task)
{
var result = _collection.GetConsumingEnumerable(_token).GetEnumerator();

// Advance to Fetched - No need to check initial state since we're the only thread that can be in here.
// Still interlocked so we don't interfere with advancing Task
Interlocked.CompareExchange(ref _folderState, (int)FolderState.Fetched, (int)FolderState.Fetching);
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetched, (int)FolderState.TaskFetching);

// If we have Fetched and completed the Tasks we can dispose
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetchedDisposed, (int)FolderState.TaskFetched) == (int)FolderState.TaskFetched)
{
_collection.Dispose();
}

return result;
}

Dispose();
throw new InvalidOperationException("The enumerator has already been fetched. Cannot Enumerate this object twice.");
}

/// <inheritdoc />
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

/// <summary>
/// Dispose of the underlying <see cref="BlockingCollection{T}"/> and set the <see cref="FolderState"/> to <see cref="FolderState.TaskFetchedDisposed"/>
/// </summary>
public void Dispose()
{
if (Interlocked.Exchange(ref _folderState, (int)FolderState.TaskFetchedDisposed) != (int)FolderState.TaskFetchedDisposed)
{
_collection.Dispose();
}
}
}
}









share|improve this question











$endgroup$












  • $begingroup$
    The words Cancellation, cancelled, and cancelling have lost all meaning and are probably spelled a variety of ways with one or two l's. <see cref"en.wikipedia.org/wiki/Semantic_satiation" />
    $endgroup$
    – Jean-Bernard Pellerin
    2 hours ago
















0












0








0





$begingroup$


Async SelectMany takes an enumeration (length unknown) of tasks, each of which returns another enumeration (length unknown and likely different from one another), and returns results from the second dimension as soon as they are ready. I used the name FoldAsync to make it descriptive, but I'm open to suggestions.



Imagine a task, identified with a number, runs for that many seconds then returns. The 2d array (first # being the time for the first dimension task) of tasks could be



1a  : 6     11  16  
17 : 33 1b
22 : 3 4 5


Different ways of nesting Task.WhenAll could result in the main thread iterating through them like so:



    Depth first             Breadth first
time : task id time : task id
1 : 1a 1 : 1a
7 : 6 17 : 17
12 : 11 22 : 22
17 : 16 23 : 1b
17 : 17 25 : 3
18 : 1b 26 : 4
50 : 33 27 : 5
50 : 22 28 : 6
50 : 3 33 : 11
50 : 4 38 : 16
50 : 5 50 : 33


Async SelectMany accomplishes:



time    :   task id
1 : 1a
7 : 6
12 : 11
17 : (16, 17)
17 : (16, 17)
18 : 1b
22 : 22
25 : 3
26 : 4
27 : 5
50 : 33


Task.WhenAny can obtain the same results, but you'd have to maintain a list of both Task<IEnumerable<Task<T>>>, and Task<T> which you're now casting, and spend O(n^2) iterating through it. It's workable, but I like cool code instead.



The following is my implementation (with 800% too many comments). The /// documentation comments are messy because I couldn't figure out how to display nested generics inside. I tried to manage cancellation and Exceptions in a sane manner. I'm looking for comments, critiques, and hopefully improvements.



using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace WhereIWork.Utilities.Async
{
/// <summary>
/// A helper class containing the <see cref="FoldAsync{TResult}(IEnumerable{Task{IEnumerable{Task{TResult}}}})"/> extension method that is an async equivalent of <see cref="System.Linq.Enumerable"/>.SelectMany.
/// Also provides an overload taking a <see cref="CancellationToken"/>
/// </summary>
/// <remarks>
/// If the size of your dimensions are known, use <see cref="InterleavingExtensions.Interleaved{TResult}"/> instead.
/// <see cref="FoldAsync{TResult}(IEnumerable{Task{IEnumerable{Task{TResult}}}})"/> blocks, because it cannot know the Count of its results until the all tasks in the first dimension are complete.
/// </remarks>
public static class AsyncEnumerable
{
/// <summary>
/// An asynchronous version of <see cref="System.Linq.Enumerable"/>.SelectMany.
/// Will return results in the order they are completed and not the order from the two-dimensional <see cref="IEnumerable"/>.
/// </summary>
/// <typeparam name="TResult">The type of the expected results.</typeparam>
/// <param name="tasks">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
/// <returns>An <see cref="IEnumerable{Task}"/> which will block as its results are iterated.</returns>
public static IEnumerable<Task<TResult>> FoldAsync<TResult>(this IEnumerable<Task<IEnumerable<Task<TResult>>>> tasks)
{
return new TaskFolder<TResult>(tasks);
}

/// <summary>
/// An asynchronous version of <see cref="System.Linq.Enumerable"/>.SelectMany.
/// Will return results in the order they are completed and not the order from the two-dimensional <see cref="IEnumerable"/>
/// </summary>
/// <typeparam name="TResult">The type of the expected results.</typeparam>
/// <param name="tasks">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
/// <param name="token">The cancellation token to be passed into every <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; and <see cref="Task{TResult}"/>.</param>
/// <returns>An <see cref="IEnumerable{Task}"/> which will block as its results are iterated.</returns>
public static IEnumerable<Task<TResult>> FoldAsync<TResult>(this IEnumerable<Task<IEnumerable<Task<TResult>>>> tasks, CancellationToken token)
{
return new TaskFolder<TResult>(tasks, token);
}
}

/// <summary>
/// Implements <see cref="System.Linq.Enumerable"/>.SelectMany(<see cref="IEnumerable"/>&lt;<see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt;&gt;,
/// Func&lt;<see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt;,
/// <see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt;)
/// <br/>The advantage of this class is that we process results in the order they are ready, without waiting for tasks that are listed earlier in the initial enumeration.
/// </summary>
/// <typeparam name="TResult">The common type of the result of all <see cref="Task{TResult}"/> being folded.</typeparam>
// ReSharper disable once InheritdocConsiderUsage
internal sealed class TaskFolder<TResult> : IEnumerable<Task<TResult>>
{
/// <summary>
/// The collection to which we will post results as they are ready.
/// The <see cref="IEnumerator{TResult}"/> returned by this class comes from this collection.
/// </summary>
private readonly BlockingCollection<Task<TResult>> _collection = new BlockingCollection<Task<TResult>>();

private readonly CancellationToken _token;

/// <summary>
/// The number of active tasks.
/// </summary>
private int _taskCount;

/// <summary>
/// The current state of this <see cref="TaskFolder{TResult}"/>
/// </summary>
private int _folderState = (int)FolderState.Initial;

/// <summary>
/// The possible states for a <see cref="TaskFolder{TResult}"/>.
/// </summary>
[Flags]
private enum FolderState
{
/// <summary>
/// The initial state
/// </summary>
Initial = 0,
/// <summary>
/// All tasks have been completed
/// </summary>
Task = 1,
/// <summary>
/// The <see cref="IEnumerator"/>&lt;<see cref="Task{TResult}"/>&gt; has been requested.
/// </summary>
Fetching = 2,
/// <inheritdoc cref="Task"/>
/// <inheritdoc cref="Fetching"/>
TaskFetching = 3,
/// <summary>
/// The <see cref="IEnumerator"/>&lt;<see cref="Task{TResult}"/>&gt; from <see cref="_collection"/> has been obtained.
/// </summary>
Fetched = 4,
/// <inheritdoc cref="Task"/>
/// <inheritdoc cref="Fetched"/>
TaskFetched = 5,
/// <inheritdoc cref="Task"/>
/// <inheritdoc cref="Fetched"/>
/// <summary>
/// The underlying <see cref="BlockingCollection{T}"/> has been disposed.
/// </summary>
TaskFetchedDisposed = 13
}

/// <summary>
/// Creates a new TaskFolder and initiates the process of listening for completions.
/// </summary>
/// <param name="twoDimensionalWork">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
// ReSharper disable once InheritdocConsiderUsage
internal TaskFolder(IEnumerable<Task<IEnumerable<Task<TResult>>>> twoDimensionalWork) : this(twoDimensionalWork, CancellationToken.None)
{
}

/// <summary>
/// Creates a new TaskFolder with a cancellation token, and initiates the process of listening for completions.
/// </summary>
/// <param name="twoDimensionalWork">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
/// <param name="cancellationToken">The cancellation token to be passed into every <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; and <see cref="Task{TResult}"/>.</param>
internal TaskFolder(IEnumerable<Task<IEnumerable<Task<TResult>>>> twoDimensionalWork, CancellationToken cancellationToken)
{
// This initial increment represents the need to add the continuation to all first dimension tasks before we complete the BlockingCollection.
Interlocked.Increment(ref _taskCount);

_token = cancellationToken;
foreach (var outerTask in twoDimensionalWork)
{
if (_token.IsCancellationRequested)
{
AdvanceTaskCompletion();
Dispose();
throw new OperationCanceledException(_token);
}

// Increment first, then any decrementing in OuterContinuation will necessarily not be premature.
Interlocked.Increment(ref _taskCount);

// As this is an example of Dynamic Task Parallelism, we do not await the result of ContinueWith
// Because we manage completion using _taskCount, we do not need to track the continuation to guarantee its completion.
// Do not use an overload without the TaskScheduler parameter. http://blog.stephencleary.com/2015/01/a-tour-of-task-part-7-continuations.html
outerTask.ContinueWith(OuterContinuation, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}

DecrementTaskCounter();
}

/// <summary>
/// Called when a first dimension <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; is complete.
/// If there was an exception, it will be returned as a <see cref="Task{TResult}"/> in the enumeration.
/// </summary>
/// <remarks>
/// Not async since we have another method of tracking the completion of newly created tasks.
/// </remarks>
/// <param name="task">The first dimension <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; that has completed.</param>
private void OuterContinuation(Task<IEnumerable<Task<TResult>>> task)
{
if (IsCanceled())
{
return;
}

// In both the faulted and canceled states, there are no second dimension Task<TResult>, and so we ignore _taskCount.
// If the task has faulted, add an exception to the results.
if (task.IsFaulted)
{
Debug.Assert(task.Exception != null, "If a task if faulted, it should have an exception.");
_collection.Add(Task.FromException<TResult>(task.Exception), _token);
DecrementTaskCounter();
return;
}

// If the task was cancelled, add a cancellation to the results. And it won't have children tasks to deal with.
if (task.IsCanceled || _token.IsCancellationRequested)
{
_collection.Add(Task.FromCanceled<TResult>(_token), _token);
DecrementTaskCounter();
return;
}

// By exclusion, the task has completed and we can safely use the Result without worrying about blocking.
Debug.Assert(task.Status == TaskStatus.RanToCompletion, "The continuation should not be called when the Task is not complete");
foreach (var innerTask in task.Result)
{
if (IsCanceled())
{
return;
}

// Increment first, then any decrementing in InnerContinuation won't cause _taskCount to go to 0
Interlocked.Increment(ref _taskCount);

// As this is an example of Dynamic Task Parallelism, we do not await the result of ContinueWith
// Because we manage completion using _taskCount, we do not need to track the continuation to guarantee its completion.
// Do not use an overload without the TaskScheduler parameter. http://blog.stephencleary.com/2015/01/a-tour-of-task-part-7-continuations.html
innerTask.ContinueWith(InnerContinuation, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}

// Decrement at the end, ensuring that all second dimension Task<TResult> have contributed to the counter.
DecrementTaskCounter();
}

/// <summary>
/// Called when a result is ready to be added to <see cref="_collection"/> and returned as part of this <see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;
/// </summary>
/// <remarks>
/// Not async since we do not use the result of the <see cref="Task{TResult}"/> and simply return it still wrapped.
/// </remarks>
/// <param name="task">The next result to return to the consumer.</param>
private void InnerContinuation(Task<TResult> task)
{
if (IsCanceled())
{
return;
}

// No special handling for various TaskStatus, these are returned to the user as part of the task's result.
_collection.Add(task, _token);
DecrementTaskCounter();
}

/// <summary>
/// Checks if <see cref="_token"/> has <see cref="CancellationToken.IsCancellationRequested"/> set to true.
/// If so, will call <see cref="AdvanceTaskCompletion"/> to stop processing further results.
/// </summary>
/// <returns>
/// <see cref="_token"/>.IsCancellationRequested
/// </returns>
private bool IsCanceled()
{
if (!_token.IsCancellationRequested)
{
return false;
}

// Pretend all the tasks are done, we won't be adding more results.
_taskCount = 0;
AdvanceTaskCompletion();

return true;
}

/// <summary>
/// Calls <see cref="BlockingCollection{T}.CompleteAdding"/> to tell it we have finished processing <see cref="Task{TResult}"/>.
/// Sets the <see cref="FolderState.Task"/> flag on <see cref="_folderState"/> to indicate all <see cref="Task{TResult}"/> have been completed.
/// </summary>
private void AdvanceTaskCompletion()
{
//If this has already been run, ignore it.
if ((_folderState & (int)FolderState.Task) != 0)
{
return;
}

// Completed or Canceled both mean we won't be adding anything new to the collection
// If we're the first to advance to Task complete
// ReSharper disable once InvertIf
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.Task, (int)FolderState.Initial) == (int)FolderState.Initial ||
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetching, (int)FolderState.Fetching) == (int)FolderState.Fetching ||
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetched, (int)FolderState.Fetched) == (int)FolderState.TaskFetched)
{
_collection.CompleteAdding();

// If we have Fetched and completed the Tasks we can dispose the collection early.
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetchedDisposed, (int)FolderState.TaskFetched) == (int)FolderState.TaskFetched)
{
_collection.Dispose();
}
}
}

/// <summary>
/// Decrements <see cref="_taskCount"/>, and if all the tasks have completed, will advance the <see cref="FolderState"/>.
/// </summary>
private void DecrementTaskCounter()
{
if (Interlocked.Decrement(ref _taskCount) > 0)
{
return;
}

AdvanceTaskCompletion();
}

/// <summary>
/// Returns an <see cref="IEnumerator"/> which will provide <see cref="Task{TResult}"/> in the order they have completed, not the order they were supplied.
/// </summary>
/// <returns>A blocking <see cref="IEnumerator"/>&lt;<see cref="Task{TResult}"/>&gt;.</returns>
// ReSharper disable once InheritdocConsiderUsage
public IEnumerator<Task<TResult>> GetEnumerator()
{
// Allow only one thread to advance into Fetching
// ReSharper disable once InvertIf
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.Fetching, (int)FolderState.Initial) == (int)FolderState.Initial ||
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetching, (int)FolderState.Task) == (int)FolderState.Task)
{
var result = _collection.GetConsumingEnumerable(_token).GetEnumerator();

// Advance to Fetched - No need to check initial state since we're the only thread that can be in here.
// Still interlocked so we don't interfere with advancing Task
Interlocked.CompareExchange(ref _folderState, (int)FolderState.Fetched, (int)FolderState.Fetching);
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetched, (int)FolderState.TaskFetching);

// If we have Fetched and completed the Tasks we can dispose
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetchedDisposed, (int)FolderState.TaskFetched) == (int)FolderState.TaskFetched)
{
_collection.Dispose();
}

return result;
}

Dispose();
throw new InvalidOperationException("The enumerator has already been fetched. Cannot Enumerate this object twice.");
}

/// <inheritdoc />
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

/// <summary>
/// Dispose of the underlying <see cref="BlockingCollection{T}"/> and set the <see cref="FolderState"/> to <see cref="FolderState.TaskFetchedDisposed"/>
/// </summary>
public void Dispose()
{
if (Interlocked.Exchange(ref _folderState, (int)FolderState.TaskFetchedDisposed) != (int)FolderState.TaskFetchedDisposed)
{
_collection.Dispose();
}
}
}
}









share|improve this question











$endgroup$




Async SelectMany takes an enumeration (length unknown) of tasks, each of which returns another enumeration (length unknown and likely different from one another), and returns results from the second dimension as soon as they are ready. I used the name FoldAsync to make it descriptive, but I'm open to suggestions.



Imagine a task, identified with a number, runs for that many seconds then returns. The 2d array (first # being the time for the first dimension task) of tasks could be



1a  : 6     11  16  
17 : 33 1b
22 : 3 4 5


Different ways of nesting Task.WhenAll could result in the main thread iterating through them like so:



    Depth first             Breadth first
time : task id time : task id
1 : 1a 1 : 1a
7 : 6 17 : 17
12 : 11 22 : 22
17 : 16 23 : 1b
17 : 17 25 : 3
18 : 1b 26 : 4
50 : 33 27 : 5
50 : 22 28 : 6
50 : 3 33 : 11
50 : 4 38 : 16
50 : 5 50 : 33


Async SelectMany accomplishes:



time    :   task id
1 : 1a
7 : 6
12 : 11
17 : (16, 17)
17 : (16, 17)
18 : 1b
22 : 22
25 : 3
26 : 4
27 : 5
50 : 33


Task.WhenAny can obtain the same results, but you'd have to maintain a list of both Task<IEnumerable<Task<T>>>, and Task<T> which you're now casting, and spend O(n^2) iterating through it. It's workable, but I like cool code instead.



The following is my implementation (with 800% too many comments). The /// documentation comments are messy because I couldn't figure out how to display nested generics inside. I tried to manage cancellation and Exceptions in a sane manner. I'm looking for comments, critiques, and hopefully improvements.



using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace WhereIWork.Utilities.Async
{
/// <summary>
/// A helper class containing the <see cref="FoldAsync{TResult}(IEnumerable{Task{IEnumerable{Task{TResult}}}})"/> extension method that is an async equivalent of <see cref="System.Linq.Enumerable"/>.SelectMany.
/// Also provides an overload taking a <see cref="CancellationToken"/>
/// </summary>
/// <remarks>
/// If the size of your dimensions are known, use <see cref="InterleavingExtensions.Interleaved{TResult}"/> instead.
/// <see cref="FoldAsync{TResult}(IEnumerable{Task{IEnumerable{Task{TResult}}}})"/> blocks, because it cannot know the Count of its results until the all tasks in the first dimension are complete.
/// </remarks>
public static class AsyncEnumerable
{
/// <summary>
/// An asynchronous version of <see cref="System.Linq.Enumerable"/>.SelectMany.
/// Will return results in the order they are completed and not the order from the two-dimensional <see cref="IEnumerable"/>.
/// </summary>
/// <typeparam name="TResult">The type of the expected results.</typeparam>
/// <param name="tasks">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
/// <returns>An <see cref="IEnumerable{Task}"/> which will block as its results are iterated.</returns>
public static IEnumerable<Task<TResult>> FoldAsync<TResult>(this IEnumerable<Task<IEnumerable<Task<TResult>>>> tasks)
{
return new TaskFolder<TResult>(tasks);
}

/// <summary>
/// An asynchronous version of <see cref="System.Linq.Enumerable"/>.SelectMany.
/// Will return results in the order they are completed and not the order from the two-dimensional <see cref="IEnumerable"/>
/// </summary>
/// <typeparam name="TResult">The type of the expected results.</typeparam>
/// <param name="tasks">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
/// <param name="token">The cancellation token to be passed into every <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; and <see cref="Task{TResult}"/>.</param>
/// <returns>An <see cref="IEnumerable{Task}"/> which will block as its results are iterated.</returns>
public static IEnumerable<Task<TResult>> FoldAsync<TResult>(this IEnumerable<Task<IEnumerable<Task<TResult>>>> tasks, CancellationToken token)
{
return new TaskFolder<TResult>(tasks, token);
}
}

/// <summary>
/// Implements <see cref="System.Linq.Enumerable"/>.SelectMany(<see cref="IEnumerable"/>&lt;<see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt;&gt;,
/// Func&lt;<see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt;,
/// <see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt;)
/// <br/>The advantage of this class is that we process results in the order they are ready, without waiting for tasks that are listed earlier in the initial enumeration.
/// </summary>
/// <typeparam name="TResult">The common type of the result of all <see cref="Task{TResult}"/> being folded.</typeparam>
// ReSharper disable once InheritdocConsiderUsage
internal sealed class TaskFolder<TResult> : IEnumerable<Task<TResult>>
{
/// <summary>
/// The collection to which we will post results as they are ready.
/// The <see cref="IEnumerator{TResult}"/> returned by this class comes from this collection.
/// </summary>
private readonly BlockingCollection<Task<TResult>> _collection = new BlockingCollection<Task<TResult>>();

private readonly CancellationToken _token;

/// <summary>
/// The number of active tasks.
/// </summary>
private int _taskCount;

/// <summary>
/// The current state of this <see cref="TaskFolder{TResult}"/>
/// </summary>
private int _folderState = (int)FolderState.Initial;

/// <summary>
/// The possible states for a <see cref="TaskFolder{TResult}"/>.
/// </summary>
[Flags]
private enum FolderState
{
/// <summary>
/// The initial state
/// </summary>
Initial = 0,
/// <summary>
/// All tasks have been completed
/// </summary>
Task = 1,
/// <summary>
/// The <see cref="IEnumerator"/>&lt;<see cref="Task{TResult}"/>&gt; has been requested.
/// </summary>
Fetching = 2,
/// <inheritdoc cref="Task"/>
/// <inheritdoc cref="Fetching"/>
TaskFetching = 3,
/// <summary>
/// The <see cref="IEnumerator"/>&lt;<see cref="Task{TResult}"/>&gt; from <see cref="_collection"/> has been obtained.
/// </summary>
Fetched = 4,
/// <inheritdoc cref="Task"/>
/// <inheritdoc cref="Fetched"/>
TaskFetched = 5,
/// <inheritdoc cref="Task"/>
/// <inheritdoc cref="Fetched"/>
/// <summary>
/// The underlying <see cref="BlockingCollection{T}"/> has been disposed.
/// </summary>
TaskFetchedDisposed = 13
}

/// <summary>
/// Creates a new TaskFolder and initiates the process of listening for completions.
/// </summary>
/// <param name="twoDimensionalWork">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
// ReSharper disable once InheritdocConsiderUsage
internal TaskFolder(IEnumerable<Task<IEnumerable<Task<TResult>>>> twoDimensionalWork) : this(twoDimensionalWork, CancellationToken.None)
{
}

/// <summary>
/// Creates a new TaskFolder with a cancellation token, and initiates the process of listening for completions.
/// </summary>
/// <param name="twoDimensionalWork">The two dimensional <see cref="IEnumerable"/> of <see cref="Task{TResult}"/> that will be folded into one dimension.</param>
/// <param name="cancellationToken">The cancellation token to be passed into every <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; and <see cref="Task{TResult}"/>.</param>
internal TaskFolder(IEnumerable<Task<IEnumerable<Task<TResult>>>> twoDimensionalWork, CancellationToken cancellationToken)
{
// This initial increment represents the need to add the continuation to all first dimension tasks before we complete the BlockingCollection.
Interlocked.Increment(ref _taskCount);

_token = cancellationToken;
foreach (var outerTask in twoDimensionalWork)
{
if (_token.IsCancellationRequested)
{
AdvanceTaskCompletion();
Dispose();
throw new OperationCanceledException(_token);
}

// Increment first, then any decrementing in OuterContinuation will necessarily not be premature.
Interlocked.Increment(ref _taskCount);

// As this is an example of Dynamic Task Parallelism, we do not await the result of ContinueWith
// Because we manage completion using _taskCount, we do not need to track the continuation to guarantee its completion.
// Do not use an overload without the TaskScheduler parameter. http://blog.stephencleary.com/2015/01/a-tour-of-task-part-7-continuations.html
outerTask.ContinueWith(OuterContinuation, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}

DecrementTaskCounter();
}

/// <summary>
/// Called when a first dimension <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; is complete.
/// If there was an exception, it will be returned as a <see cref="Task{TResult}"/> in the enumeration.
/// </summary>
/// <remarks>
/// Not async since we have another method of tracking the completion of newly created tasks.
/// </remarks>
/// <param name="task">The first dimension <see cref="Task"/>&lt;<see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;&gt; that has completed.</param>
private void OuterContinuation(Task<IEnumerable<Task<TResult>>> task)
{
if (IsCanceled())
{
return;
}

// In both the faulted and canceled states, there are no second dimension Task<TResult>, and so we ignore _taskCount.
// If the task has faulted, add an exception to the results.
if (task.IsFaulted)
{
Debug.Assert(task.Exception != null, "If a task if faulted, it should have an exception.");
_collection.Add(Task.FromException<TResult>(task.Exception), _token);
DecrementTaskCounter();
return;
}

// If the task was cancelled, add a cancellation to the results. And it won't have children tasks to deal with.
if (task.IsCanceled || _token.IsCancellationRequested)
{
_collection.Add(Task.FromCanceled<TResult>(_token), _token);
DecrementTaskCounter();
return;
}

// By exclusion, the task has completed and we can safely use the Result without worrying about blocking.
Debug.Assert(task.Status == TaskStatus.RanToCompletion, "The continuation should not be called when the Task is not complete");
foreach (var innerTask in task.Result)
{
if (IsCanceled())
{
return;
}

// Increment first, then any decrementing in InnerContinuation won't cause _taskCount to go to 0
Interlocked.Increment(ref _taskCount);

// As this is an example of Dynamic Task Parallelism, we do not await the result of ContinueWith
// Because we manage completion using _taskCount, we do not need to track the continuation to guarantee its completion.
// Do not use an overload without the TaskScheduler parameter. http://blog.stephencleary.com/2015/01/a-tour-of-task-part-7-continuations.html
innerTask.ContinueWith(InnerContinuation, CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default);
}

// Decrement at the end, ensuring that all second dimension Task<TResult> have contributed to the counter.
DecrementTaskCounter();
}

/// <summary>
/// Called when a result is ready to be added to <see cref="_collection"/> and returned as part of this <see cref="IEnumerable"/>&lt;<see cref="Task{TResult}"/>&gt;
/// </summary>
/// <remarks>
/// Not async since we do not use the result of the <see cref="Task{TResult}"/> and simply return it still wrapped.
/// </remarks>
/// <param name="task">The next result to return to the consumer.</param>
private void InnerContinuation(Task<TResult> task)
{
if (IsCanceled())
{
return;
}

// No special handling for various TaskStatus, these are returned to the user as part of the task's result.
_collection.Add(task, _token);
DecrementTaskCounter();
}

/// <summary>
/// Checks if <see cref="_token"/> has <see cref="CancellationToken.IsCancellationRequested"/> set to true.
/// If so, will call <see cref="AdvanceTaskCompletion"/> to stop processing further results.
/// </summary>
/// <returns>
/// <see cref="_token"/>.IsCancellationRequested
/// </returns>
private bool IsCanceled()
{
if (!_token.IsCancellationRequested)
{
return false;
}

// Pretend all the tasks are done, we won't be adding more results.
_taskCount = 0;
AdvanceTaskCompletion();

return true;
}

/// <summary>
/// Calls <see cref="BlockingCollection{T}.CompleteAdding"/> to tell it we have finished processing <see cref="Task{TResult}"/>.
/// Sets the <see cref="FolderState.Task"/> flag on <see cref="_folderState"/> to indicate all <see cref="Task{TResult}"/> have been completed.
/// </summary>
private void AdvanceTaskCompletion()
{
//If this has already been run, ignore it.
if ((_folderState & (int)FolderState.Task) != 0)
{
return;
}

// Completed or Canceled both mean we won't be adding anything new to the collection
// If we're the first to advance to Task complete
// ReSharper disable once InvertIf
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.Task, (int)FolderState.Initial) == (int)FolderState.Initial ||
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetching, (int)FolderState.Fetching) == (int)FolderState.Fetching ||
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetched, (int)FolderState.Fetched) == (int)FolderState.TaskFetched)
{
_collection.CompleteAdding();

// If we have Fetched and completed the Tasks we can dispose the collection early.
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetchedDisposed, (int)FolderState.TaskFetched) == (int)FolderState.TaskFetched)
{
_collection.Dispose();
}
}
}

/// <summary>
/// Decrements <see cref="_taskCount"/>, and if all the tasks have completed, will advance the <see cref="FolderState"/>.
/// </summary>
private void DecrementTaskCounter()
{
if (Interlocked.Decrement(ref _taskCount) > 0)
{
return;
}

AdvanceTaskCompletion();
}

/// <summary>
/// Returns an <see cref="IEnumerator"/> which will provide <see cref="Task{TResult}"/> in the order they have completed, not the order they were supplied.
/// </summary>
/// <returns>A blocking <see cref="IEnumerator"/>&lt;<see cref="Task{TResult}"/>&gt;.</returns>
// ReSharper disable once InheritdocConsiderUsage
public IEnumerator<Task<TResult>> GetEnumerator()
{
// Allow only one thread to advance into Fetching
// ReSharper disable once InvertIf
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.Fetching, (int)FolderState.Initial) == (int)FolderState.Initial ||
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetching, (int)FolderState.Task) == (int)FolderState.Task)
{
var result = _collection.GetConsumingEnumerable(_token).GetEnumerator();

// Advance to Fetched - No need to check initial state since we're the only thread that can be in here.
// Still interlocked so we don't interfere with advancing Task
Interlocked.CompareExchange(ref _folderState, (int)FolderState.Fetched, (int)FolderState.Fetching);
Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetched, (int)FolderState.TaskFetching);

// If we have Fetched and completed the Tasks we can dispose
if (Interlocked.CompareExchange(ref _folderState, (int)FolderState.TaskFetchedDisposed, (int)FolderState.TaskFetched) == (int)FolderState.TaskFetched)
{
_collection.Dispose();
}

return result;
}

Dispose();
throw new InvalidOperationException("The enumerator has already been fetched. Cannot Enumerate this object twice.");
}

/// <inheritdoc />
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

/// <summary>
/// Dispose of the underlying <see cref="BlockingCollection{T}"/> and set the <see cref="FolderState"/> to <see cref="FolderState.TaskFetchedDisposed"/>
/// </summary>
public void Dispose()
{
if (Interlocked.Exchange(ref _folderState, (int)FolderState.TaskFetchedDisposed) != (int)FolderState.TaskFetchedDisposed)
{
_collection.Dispose();
}
}
}
}






c# asynchronous async-await






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited 3 mins ago









t3chb0t

34.5k748118




34.5k748118










asked 2 hours ago









Jean-Bernard PellerinJean-Bernard Pellerin

447421




447421












  • $begingroup$
    The words Cancellation, cancelled, and cancelling have lost all meaning and are probably spelled a variety of ways with one or two l's. <see cref"en.wikipedia.org/wiki/Semantic_satiation" />
    $endgroup$
    – Jean-Bernard Pellerin
    2 hours ago




















  • $begingroup$
    The words Cancellation, cancelled, and cancelling have lost all meaning and are probably spelled a variety of ways with one or two l's. <see cref"en.wikipedia.org/wiki/Semantic_satiation" />
    $endgroup$
    – Jean-Bernard Pellerin
    2 hours ago


















$begingroup$
The words Cancellation, cancelled, and cancelling have lost all meaning and are probably spelled a variety of ways with one or two l's. <see cref"en.wikipedia.org/wiki/Semantic_satiation" />
$endgroup$
– Jean-Bernard Pellerin
2 hours ago






$begingroup$
The words Cancellation, cancelled, and cancelling have lost all meaning and are probably spelled a variety of ways with one or two l's. <see cref"en.wikipedia.org/wiki/Semantic_satiation" />
$endgroup$
– Jean-Bernard Pellerin
2 hours ago












0






active

oldest

votes











Your Answer





StackExchange.ifUsing("editor", function () {
return StackExchange.using("mathjaxEditing", function () {
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
});
});
}, "mathjax-editing");

StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "196"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: false,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f213489%2fasync-selectmany-implementation%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Code Review Stack Exchange!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


Use MathJax to format equations. MathJax reference.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f213489%2fasync-selectmany-implementation%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

404 Error Contact Form 7 ajax form submitting

How to know if a Active Directory user can login interactively

Refactoring coordinates for Minecraft Pi buildings written in Python