Thread Safe Library Update
As always, my life revolves around threading. So I have a few classes that I use frequently for these purposes. Now, fortunately, .NET 4.0 will include much of this functionality in PFX, but for now, we have this.
ThreadSafeLibrary
AutoProcessingQueue: This is a queue that as you add things to it, will automatically begin dispatching on a seperate thread and begin calling a specified delegate to process the items.
Batch: This is a class that is similar to a queue except that it allows you to do work in batches. Set the max & minimum sizes, and start adding items to it. When it reaches a certain size it will attempt to close the batch and will fire an event that will allow you to wire in code for processing the data in the batch. It also has a timer that can be enabled if you'd prefer to allow the batch to be closed at a certain interval. This is great for database calls and bulk inserts. Say I want to send a series of messages, but i don't want to send them one at a time. I would like to send the messages when the batch reaches 100 items, or every 5 seconds. I use this class alot for messaging.
Dispatcher: This is the class that does the heavy lifting for the majority of things in this library. You give it a maximum number of threads to work with, and as you add workers to it's queue, it will begin spawning threads to process them(carefully re-using them whenever possible)
ResourcePool: The dispatcher uses this internally to store & manage it's threads. Though it can be used for any object type. One case where i use this frequently is to pool connections to webservices. To save the time required to open and close connections constantly, i set up a pool, give it a threshold(so as to not flood the service i'm calling) and ask it to get the resource for you via the delegate you provide.
ThreadSafeDictionary<K,V>: The classic threadsafe implementation of a dictionary, with several added methods to support atomic operations.
ThreadSafeQueue<T>: The classic Queue implementation of a dictionary, with several added methods to support atomic operations.
ThreadSafeHashSet<T>: The classic HashSet implementation of a dictionary, with several added methods to support atomic operations.
ThreadSafeStackList<T>: This is similar to a stack, without alot of the limitations. Multi-level peek, unwind, etc.
AsyncBlock:
This is a class that allows you to wrap async work into serial action for the purposes of simplicity. If you need have a function that needs to run numerous asynchronous calls, and then return only when they are all finished, this is your class.
You can use it like so:
public class AsyncSample
{
private Dispatcher Dispatcher;
public AsyncSample()
{
//init a dispatcher that uses 5 threads and doesn't carry execution context
Dispatcher = new Dispatcher(5, false);
}
public void ExecuteParallel<T>(IList<T> work)
{
var workers = ConstructParallelWorkers(work);
//the async block will pause this thread until all workers finish
using (AsyncBlock block = new AsyncBlock(this.Dispatcher, 10))
{
foreach (var worker in workers)
{
block.AddToQueue(worker);
}
}
}
private IList<AnyWorkerBee<T>> ConstructParallelWorkers<T>(IList<T> work)
{
var workers = new List<AnyWorkerBee<T>>();
foreach (var workUnit in work)
workers.Add(new AnyWorkerBee<T>() { WorkUnit = workUnit });
return workers;
}
}
|