Limiting Concurrent Threads with Semaphores

It’s been some time since I posted my early attempts at making parallel execution of threads easy when working with collections in .NET Framework prior to version 4. It might seem out of date but believe it or not there are a lot of developers who are still unable to take advantage of the new runtime yet. For me this is due to our strong dependence on SharePoint which unfortunately still requires the 2.0 runtime.

With this limitation strongly in place, I continue to refine my ability to work with threads.

If you’ve seen my past attempts you’ve seen a focus on executing Action delegates on an IEnumerable. My thinking was this would be the easiest way to get a bit of speed in execution if some unit of work needed to be repeated for every item in a collection and there was no accumulation or interaction of values. This has continued to be a focus for me moving forward.

My first attempt used a bit of code I scavenged from the web. This code used a countdown class to keep track of the number of items that needed processing and blocked until all items were processed. The actual delegates were just tossed onto the ThreadPool and left to execute.

My second attempt came about while trying to create a minimalist implementation of LINQ style operators for the .NET Micro Framework. This was particularly tricky since the Micro Framework is missing a lot of features available in the full framework. Not the least of which was the ThreadPool!

Through a few iterations and a lot of back and forth with a community member from the Netduino forums, I reached a workable solution using Thread objects. In this iteration the Countdown class was tossed aside to remove the potential requirement to iterate an IEnumerable twice – once to get the count for the Countdown object and again to perform the actual queueing of threads. In its place are a few integers to count each Thread queued. After each thread finishes there’s a bit of code to increment a second counter and check to see if we’ve finished all the work. A ManualResetEvent was used to make the ParallelForEach call a blocking call that would only return after each thread had completed.

In my latest iteration I needed a bit more granularity for how many threads I had running at any one time. Someone might ask: “Isn’t that what the Thread Pool is designed to optimize?” Indeed it is. The Thread Pool is tuned to optimize the number of running threads on a computer. A computer. But my requirements involved more than just the local computer.

In fact my first attempt at parallelizing the code did use the ThreadPool to manage the optimal number of running threads. While monitoring my application during testing I noticed there were upwards of 30 threads running concurrently. The ThreadPool determined that the threads were taking a long time waiting on network resources and fired off more threads to take advantage of the extra CPU cycles. Though this caused the code to finish more quickly than the very first attempt using single threaded code, it was still sub-optimal for my needs. Why? The network resource wasn’t the real limiting factor. My code was doing some filesystem attribute gathering on a remote server share and generating representative objects and calculating some totals. Having even a dozen threads running concurrently meant the remote I/O was thrashing and lowering the throughput.

This doesn’t mean running in parallel was a bad choice – the object creation and calculations with the file info I retrieve takes time. With disk access being so slow, I need to make sure I’m not wasting any time and the disks need to be working the whole time! I decided I just needed a small tweak to my extension method to limit how many threads I dump on the ThreadPool at once. As a first shot I decided to try 3 threads. This is a completely off-the wall guess and I hope to tune this over time as I get real world numbers.

But how can we do this?
Semaphores!

What’s a semaphore? I’ve seen a lot of ways to conceptualize a semaphore – a bouncer, a fire marshall, a queue – but no matter how you visualize what it is, you use it to rate limit. When you construct a new Semaphore object, you specify how many requests can happen at once. You call a WaitOne before starting your thread and Release when you’re done. The call to WaitOne might complete immediately if the semaphore isn’t full or it will block until a spot opens up. The call to Release lets the semaphore know you’ve finished your work and ready to move on.

So what does this look like?

For me it was quick to implement because I already had an extension object I used to queue up work on the ThreadPool. The problem is I can’t just call WaitOne and Release on either side of the ThreadPool queue call. This is a non-blocking call! I’d end up throwing all the threads into the ThreadPool at once. Instead I put the calls into the body of the delegates I queue themselves. Et voila!

But what about cross thread access? Semaphores are meant to be run from different threads! How else do you expect threads to synchronize? There is a downside though – semaphore objects don’t enforce thread identity. Each thread is responsible for calling a matched pair of WaitOne and Release. If you call WaitOne an uneven number of times, you can fill your semaphore and potentially end up blocked forever without running all your threads. If you call Release too many times, you’ll get an exception! So be careful!

Here’s a quick (non-tested) reimplementation of what I’m using on a work project. I think I got all the correct parts in all the right places. It at least compiles with a target of .NET 3.5!

public static void ParallelForEachThreadPool<T>(
 this IEnumerable<T> items,
 Action<T> delegateAction,
 int millisecondTimeout = -1)
{
   ManualResetEvent mre = new ManualResetEvent(false);
   Semaphore sem = new Semaphore(3, 3);

   int count = 0;
   int total = 0;
   int target = 0;

   foreach (T item in items)
   {
      ++count;
      T captured = item;

      ThreadPool.QueueUserWorkItem((state) =>
      {
         try
         {
            sem.WaitOne();

            delegateAction(captured);
         }
         finally
         {
            sem.Release();

            if (Interlocked.Increment(ref total) == target)
               mre.Set();
         }
      });
   }

   target = count;

   if (total != target)
      mre.WaitOne(millisecondTimeout);
}

Addendum (5/29/2012 10:30am ET):

It’s important to note this method will still queue threads on the ThreadPool and they will be started but they’ll start blocking almost right away. This is significant if your IEnumerable is going to cause a lot of threads to start – you could potentially fill up the ThreadPool and starve out others who are more sane in their request for threads. Additionally you could end up causing the ThreadPool to create many threads to handle all the requests even though you’re limiting to n threads running concurrently and waste system resources.

If this is the case you should use the Thread object directly and only start a thread when the semaphore has an open slot. I might tackle this in a future post to show what it’d look like. The upside to using Threads directly is greater control over priority, whether it’s a background thread, etc.

Throttled,
-Erik

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s