Optimized Parallel ForEach .NET 3.5

While talking to a coworker about optimizing some heavy-hit, relatively resource intensive code the topic of multi-threaded programming came up. We discussed the performance implications and whether there would be much benefit since the threads would all be writing to the same variable and resource contention would likely slow things down more than single-threading. While it might not fit in the situation we were discussing at the time, I was still interested since I’ve done some multi-threaded programming and I don’t have any qualms with it.

But since we had more tasks than time, I decided to look at what was out there already and see if any of it could be leveraged. Our code had to target .NET 3.5 since it was running from within SharePoint 2007 context so the new .NET 4 features were not available to us. A quick search turned up this happy class (retrieved from http://blogs.lessthandot.com/index.php/WebDev/ServerProgramming/a-simple-parallel-foreach-implementation-5):

    class Countdown : IDisposable
    {
        private readonly ManualResetEvent done;
        private readonly int total;
        private volatile int current;

        public Countdown(int total)
        {
            this.total = total;
            current = total;
            done = new ManualResetEvent(false);
        }

        public void Signal()
        {
            lock (done)
            {
                if (current > 0 && --current == 0)
                    done.Set();
            }
        }

        public void Wait()
        {
            done.WaitOne();
        }

        public void Dispose()
        {
            ((IDisposable)done).Dispose();
        }
   }

A quick review of the code revealed it did exactly what I needed: a class that will track how many tasks I want to fire off, decrement with each Signal() and provide the ability to block my main thread until all the tasks finish. Using this, I wrote a couple extension methods (based on code from the site mentioned above):

public static void ParallelForEach<T>(this IEnumerable<T> enumerable, Action<T> action)
{
    IList<T> lst = enumerable as IList<T>;
    if (null != lst)
        OptimizedParallelizedForEach(lst, action);
    else
        ParallelizeForEach(enumerable, action, enumerable.Count());
}

public static void ParallelForEach<T>(this ICollection enumerable, Action<T> action)
{
    IList lst = enumerable as IList;
    if (null != lst)
        OptimizedParallelizedForEach(lst, action);
    else
        ParallelizeForEach(enumerable, action, enumerable.Count);
}

What these extensions do is provide the ability for generic and non-generic collections to execute actions on each of their items. You’ll note the code tries to figure out if the data can be cast to IList / /IList since these have a native Count member and don’t rely on iterating a full collection to get the count of elements like the extension for IEnumerator.Count() does. (If you reflect that .NET code you’ll see the Count() extension method does something similar to see if the type can just tell you how big it is without iterating and counting.) This is a nice performance boost or something.

Unoptimized iteration:

private static void ParallelizeForEach<T>(IEnumerable enumerable,
                                          Action<T> action, int count)
{
    using (var cd = new Countdown(count))
    {
        foreach (T current in enumerable)
        {
            T captured = current;
            ThreadPool.QueueUserWorkItem(x =>
            {
                action.Invoke(captured);
                cd.Signal();
            });
        }
        cd.Wait();
    }
}

Optimized iteration:

private static void OptimizedParallelizedForEach<T>(IList enumerable, Action<T> action)
{
    using (var cd = new Countdown(enumerable.Count))
    {
        int count = enumerable.Count;
        for (int i = 0; i < count; i++)
        {
            T captured = (T)enumerable[i];
            ThreadPool.QueueUserWorkItem(x =>
            {
                action.Invoke(captured);
                cd.Signal();
            });
        }
        cd.Wait();
    }
}

private static void OptimizedParallelizedForEach<T>(IList<T> enumerable,
                                                    Action<T> action)
{
    using (var cd = new Countdown(enumerable.Count))
    {
        int count = enumerable.Count;
        for (int i = 0; i < count; i++)
        {
            T captured = (T)enumerable[i];
            ThreadPool.QueueUserWorkItem(x =>
            {
                action.Invoke(captured);
                cd.Signal();
            });
        }
        cd.Wait();
    }
}

You’ll see I have two Optimized methods. That’s because IList and IList have their own indexing operations ([]) and even though the code is exactly the same, I couldn’t find a way to combine the code without getting complaints about ambiguous method calls. So I split them out.

The place I’ve put this to good use is in my previous post where I iterate all the Events inside a type and, if the event is of the right type, attach a listener to it. Since this requires no interaction between the loop iterations this was a great candidate for threading in my opinion. This is code that’s executed at the startup of a Silverlight application and it’s running over 44 events of which 42 need listeners attached. When I switched from a simple foreach to for and then ParallelForEach I saw a total reduction of 1 second from load to render with no other changes. Attaching the debugger I was able to verify all the events wired up and the listener method fired off 100% of the time it should and 0% when it shouldn’t.

So, if you were curious how I was handling my threading in the last post, this is it!

Ha det bra!
-Erik

6 responses to “Optimized Parallel ForEach .NET 3.5

  1. Pingback: ParallelForEach with Result .NET 3.5 | Erik Noren's Tech Stuff

  2. Pingback: Using SyncRoot for Multithreaded Applications | Erik Noren's Tech Stuff

  3. Nice article Erik,

    I don’t see the usage of “total” member in Countdown class.. am I missing something here ? Also, why lock(done) instead of Interlocked.Decrement ?

    Zohrab.

  4. Honestly that Countdown class was pulled from another web source as noted in the article. I hadn’t really taken a look at making changes. You’re right in your observations though.

  5. Pingback: Limiting Concurrent Threads with Semaphores | Erik Noren's Tech Stuff

  6. deadlocks in WaitOne() on zero sized inputs btw

Leave a comment