ParallelForEach with Result .NET 3.5

In an earlier post I described how I wrote a couple extension methods to give me a reasonably safe way to iterate objects in a generic enumerable and execute actions on the items in a multithreaded way. Since writing it I’ve used the methods to great success.

I did the first implementation using actions because they have no return type. I figured delegates of this type would lend themselves very simply to a multithreaded design. Today, however, I came across a need to apply length rules to objects and receive a collection of the results (essentially those that passed the test.) I could have used a predicate over the iterated items and returned those that evaluated true but instead I went with an implementation using Func for the flexibility to return pieces of information from within the iterated objects rather than the objects themselves.

I used my previous ParallelForEach methods as a starting point. Then I replaced the Action<T> parameter with Func<T, TResult> and gave the extension method a return type of IEnumerable<TResult>. This is because I will be collecting the results from each thread and returning them.

public static IEnumerable<TResult> ParallelForEach<T, TResult>(
                                      this IEnumerable<T> enumerable,
                                      Func<T, TResult> func)
{
   return ParallelForEach(enumerable, func, enumerable.Count());
}

The method body is essentially the same as well but with the difference that I collect a result and add it into a list.

public static IEnumerable<TResult> ParallelForEach<T. TResult>(
                                      IEnumerable enumerable,
                                      Func<T, TResult> func,
                                      int count)
{
   List<TResult> results = new List<TResult>(count);

   using (var cd = new Countdown(count))
   {
      foreach (T current in enumerable)
      {
         T captured = current;
         ThreadPool.QueueUserWorkItem(x =>
         {
            TResult result = func(captured);
            results.ThreadSafeAdd(result);
            cd.Signal();
         });
      }
   }

   return results;
}

To keep the list object from getting accessed by multiple threads I added an extension method to do a thread safe add using the list’s SyncRoot object. I only bothered implementing the add operation because that’s all I’ll be using from my threads. Once all the values are together I’ll be returning the object to be read and manipulated from a single thread. (See previous post for the Countdown class and explanation of it.)

public static class ListThreadSafeExtensions
{
   public static void ThreadSafeAdd<T>(this List<T> list, T value)
   {
       object spinLock = ((ICollection)list).SyncRoot;

       lock(spinLock)
       {
           list.Add(value);
       }
   }
}

Now I call the code in a very similar manner as before:

var events = objectWithEvents.GetType().GetEvents();

//Previous way (void)
events.ParallelForEach(evt => evt.AddEventHandler(...);

//New addition (IEnumerable)
IEnumerable isMulticast = events.ParallelForEach(evt => evt.IsMulticast);

You can of course add as many extension methods as you like to implement the various Func overloads (all the way up to 8 parameters with a return value.) I only needed the one.

Your friendly neighborhood coder,
-Erik

[Update 11/1/2010 – Linked SyncRoot to MSDN doc about it.]

Advertisements

One response to “ParallelForEach with Result .NET 3.5

  1. Pingback: Using SyncRoot for Multithreaded Applications | Erik Noren's Tech Stuff

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s