November 2008 - Posts

More Threading Goodness!

Threading - I write about it a lot, even though i would consider myself somewhat of a rookie when it comes to threading complexities.

One of the challenges we often find when trying to incorporate threading into applications is managing work units, thread pools, and queues.  What I've ended up doing is writing a few helpers to accommodate just these things.

To start, we need to manage a pool of available thread resources.  This pool needs to have a limit, and needs to be able to generate threads on the fly if necessary.  However, the need for resource pools is not limited to threading.  Consider the scenario where you have a web service of some sort, being WSE, FTP HTTP whatever.  There is a lot of overhead in opening and handshaking for these connections.  If I had say a list of 100k files I needed to upload, and wanted to do that asynchronously, I wouldn't want to throw away the connection after every upload.  So I need to do simple connection pooling.  But I don't want to open 10 connections immediately if I am only going to need 2 either, so this pool needs to be dynamic. 

To even start writing a pool, we need some sort of queuing class for resource storage, and in the threaded world, this needs to be thread safe.

So a simple wrapper may look something like this:

public class ThreadSafeQueue<T>
    {
        //This is the internal queue that we are wrapping
        Queue<T> queue = new Queue<T>();
 
        [NonSerialized]
        ReaderWriterLockSlim objLock = Locks.GetLockInstance(LockRecursionPolicy.NoRecursion); //setup the lock;
 
        public int Count
        {
            get
            {
                using (new ReadOnlyLock(this.objLock))
                {
                    return this.queue.Count;
                }
            }
        }
 
        public void Clear()
        {
            using (new WriteLock(this.objLock))
            {
                this.queue.Clear(); ;
            }
        }
 
        public bool Contains(T item)
        {
            using (new ReadOnlyLock(this.objLock))
            {
                return this.queue.Contains(item);
            }
        }
 
        public bool TryDequeue(out T obj)
        {
            using (new ReadLock(this.objLock))
            {
                if (this.queue.Count != 0)
                {
                    obj = this.Dequeue();
                    return true;
                }
            }
 
            obj = default(T);
            return false;
        }
 
        public T Dequeue()
        {
            using (new WriteLock(this.objLock))
            {
                return this.queue.Dequeue();
            }
        }
 
        public void Enqueue(T item)
        {
            using (new WriteLock(this.objLock))
            {
                this.queue.Enqueue(item);
            }
        }
 
        public T Peek()
        {
            using (new ReadOnlyLock(this.objLock))
            {
                return this.queue.Peek();
            }
        }
    }


 
Then we can move on to the Resource Pool, which we want to be generic so that it can serve up any kind of resource, be it a thread, a web service connection, a widget of any sort.

public class ResourcePool<T> : IDisposable
        where T : class
    {
 
        public event EventHandler CreatedResource;
        public delegate T CreateResourceDelegate();
        public delegate bool CanReuseResourceDelegate(T obj);
 
        public ResourcePool(CreateResourceDelegate creatorDelegate, CanReuseResourceDelegate checkerDelegate)
        {
            this.MaxResourceCount = 10;
            this.IterationTimeout = 5;
            this.MaxRetry = 100;
            this.Creator = creatorDelegate;
            this.CheckResource = checkerDelegate;
        }
 
        private CreateResourceDelegate Creator { get; set; }
 
        private CanReuseResourceDelegate CheckResource { get; set; }
 
        /// <summary>
        /// Retreives a resource from the pool.
        /// </summary>
        /// <returns></returns>
        public T Get()
        {
            T obj = null;
            try
            {
                int tries = 0;
                while (obj == null && (this.MaxRetry == 0 || tries++ <= this.MaxRetry))
                {
                    if (Queue.Count == 0)
                    {
                        if (this.MaxResourceCount == 0 || (this.MaxResourceCount > this.ActiveResourceCount))
                        {
                            if (this.Creator == null)
                                throw new NullReferenceException("Unable to create new Resource Instance");
 
                            obj = this.Creator();
                            FireEvent(ref this.CreatedResource);
                        }
                        else
                        {
                            Thread.Sleep(this.IterationTimeout); // wait for one to become available if we can't create one.
                        }
                    }
                    else
                    {
                        obj = Queue.Dequeue();
                    }
                }
            }
            finally
            {
                Interlocked.Increment(ref this._ActiveResourceCount);
            }
 
            if (obj == null)
                throw new NullReferenceException("Unable to retrieve Resource Instance");
 
            return obj;
        }
 
        /// <summary>
        /// Returns a resource to the pool.
        /// </summary>
        /// <param name="obj"></param>
        public void Release(T obj)
        {
            try
            {
                if (this.MaxResourceCount == 0 || Queue.Count < this.MaxResourceCount)
                {
                    if (this.CheckResource == null || this.CheckResource(obj))
                    {
                        Queue.Enqueue(obj);
                    }
                }
            }
            finally
            {
               Interlocked.Decrement(ref this._ActiveResourceCount); //decrement counter so we know we can reuse the resource.
            }
        }
 
        private int _ActiveResourceCount;
         
        /// <summary>
        /// This is the maximum number of resources the pool will hold and subsequently attempt to create.  Default is 10.
        /// Set to 0 enables no limit.
        /// </summary>
        public int MaxResourceCount { get; set; }
 
        /// <summary>
        /// This is the maximum number of times the pool will look for or attempt to create a resource.  Default is 100.
        /// Set to 0 enables no limit.
        /// </summary>
        public int MaxRetry { get; set; }
 
        /// <summary>
        /// This is the length of time(in milliseconds) that the pool will wait between tries to get a resource. Default is 5ms.
        /// </summary>
        public int IterationTimeout { get; set; }
 
        private ThreadSafeQueue<T> Queue
        {
            get { return this._Queue; }
        }readonly ThreadSafeQueue<T> _Queue = new ThreadSafeQueue<T>();
 
        private void FireEvent(ref EventHandler targetEvent)
        {
            Eventing.FireEvent(ref targetEvent, this);
        }
 
        public void Dispose(bool disposing)
        {
            if (disposing)
            {
                this.Queue.Clear();
            }
        }
 
        public void Dispose()
        {
            this.Dispose(true);
        }
    }


 
 
I know I am tearing through this quickly, but it's a lot to explain.
 
Now we need to have some sort of managed work unit that the thread can work on.  We want it's work unit to be generic and should be easily derived from:
 

public abstract class WorkerBee
    {
        public WorkerBee() { }
 
        public abstract void Start();
    }
 
    public abstract class WorkerBee<T> : WorkerBee
    {
        public T WorkUnit { get; set; }
    }
 
    /// <summary>
    /// This worker can be used as it.  Subscribe to the WorkBegan and WorkFinished events to inject logic.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class AnyWorkerBee<T> : WorkerBee<T>
    {
        public delegate void WorkerEventHandler(WorkerBee<T> sender);
        public event WorkerEventHandler WorkBegan;
        public event WorkerEventHandler WorkFinished;
 
        protected virtual void OnWorkBegan()
        {
            if (this.WorkBegan != null)
                this.WorkBegan(this);
        }
 
        protected virtual void OnWorkFinished()
        {
            if (this.WorkFinished != null)
                this.WorkFinished(this);
        }
 
        protected virtual void WorkStart()
        {
            this.OnWorkBegan();
        }
 
        protected virtual void WorkFinish()
        {
            this.OnWorkBegan();
        }
 
        public virtual void DoWork() { }
 
        public override void Start()
        {
            this.WorkStart();
            this.DoWork();
            this.WorkFinish();
        }
    }


 
Now we can look at the actual dispatcher, we need some way to add work to a queue, have it spawn a thread if it has any left, and then execute the work.  One of the tricky things was to make sure the dispatcher thread was not ALWAYS running.  It would start itself up anytime work was added, continue running and waiting until it's work queue was empty, then shut down.  Every subsequent addition to the queue would ensure that the dispatcher was still running.
 

 
public class Dispatcher : IDisposable
    {
        public Dispatcher(int maxThreadCount)
        {
            this.Threads = new ResourcePool<Thread>(
                new ResourcePool<Thread>.CreateResourceDelegate(this.CreateThread),
                new ResourcePool<Thread>.CanReuseResourceDelegate(this.ValidateThread));
 
            this.Threads.MaxRetry = 0;
 
            this.WorkQueue = new ThreadSafeQueue<WorkerBee>();
 
            this.Threads.MaxResourceCount = maxThreadCount;
            this.EnsureDispatch(false); // setup the dispatcher
        }
 
        /// <summary>
        /// Call this to stop the dispatcher
        /// </summary>
        public void Abort()
        {
            this.AbortDispatch = true;
        }
 
        /// <summary>
        /// Call this to pause the dispatcher
        /// </summary>
        public void Pause()
        {
            this.AbortDispatch = true;
        }
 
        /// <summary>
        /// Call this to resume the dispatcher
        /// </summary>
        public void Resume()
        {
            if (this.AbortDispatch)
            {
                this.AbortDispatch = false;
                this.EnsureDispatch();
            }
        }
 
        /// <summary>
        /// Set this to prevent the dispatcher from spawning threads.
        /// </summary>
        private bool AbortDispatch
        {
            get;
            set;
        }
 
        /// <summary>
        /// This is the thread that will dispatch work to other threads.
        /// </summary>
        private Thread DispatchThread
        {
            get;
            set;
        }object _DispatcherLock = new Object();
 
        /// <summary>
        /// This will make sure that the Dispatcher is active and running.
        /// </summary>
        private void EnsureDispatch()
        {
            this.EnsureDispatch(true);
        }
 
        /// <summary>
        /// This will make sure that the Dispatcher is active and running.
        /// </summary>
        private void EnsureDispatch(bool start)
        {
            lock (this._DispatcherLock)
            {
                if (this.DispatchThread == null || this.DispatchThread.ThreadState == ThreadState.Stopped)
                {
                    this.DispatchThread = new Thread(new ThreadStart(this.FireThread));
                    this.DispatchThread.IsBackground = true;
                }
 
                if (start && ((this.DispatchThread.ThreadState | ThreadState.Unstarted) == this.DispatchThread.ThreadState))
                    this.DispatchThread.Start();
            }
        }
 
        /// <summary>
        /// This is a required callback for the Resource pool.
        /// </summary>
        /// <param name="obj"></param>
        /// <returns></returns>
        private bool ValidateThread(Thread obj)
        {
            return false; //never reuse threads
        }
 
        /// <summary>
        /// This is the method that the resource pool will call to get a new thread to work with.
        /// </summary>
        /// <returns></returns>
        private Thread CreateThread()
        {
            return new Thread(new ParameterizedThreadStart(this.DispatchWork));
        }
 
        /// <summary>
        /// The function starts the work on a new thread.
        /// </summary>
        private void DispatchWork(object state)
        {
            try
            {
                WorkerBee work = state as WorkerBee;
 
                if (work != null)
                    work.Start();
            }
            finally
            {
                //Give the thread back.
                this.Threads.Release(Thread.CurrentThread);
            }
        }
 
        /// <summary>
        /// This puts work on the queue and starts the dispatcher
        /// </summary>
        /// <param name="obj"></param>
        public void AddToQueue(WorkerBee obj)
        {
            this.WorkQueue.Enqueue(obj);
            this.EnsureDispatch();
        }
 
        private int GetQueueLength()
        {
            return this.WorkQueue.Count;
        }
 
        /// <summary>
        /// This is the callback that the dispatcher thread calls.
        /// It will initialize a new thread from the pool and start work
        /// </summary>
        private void FireThread()
        {
            WorkerBee work = null;
            while (!AbortDispatch && this.WorkQueue.TryDequeue(out work))
            {
                if (work != null)
                {
                    Thread thread = this.Threads.Get();
                    
                    if (!AbortDispatch) // check again
                        thread.Start(work);
                    else
                        this.Threads.Release(Thread.CurrentThread);
                }
            }
        }
 
        /// <summary>
        /// This is the queue of stuff to be done.
        /// </summary>
        private ThreadSafeQueue<WorkerBee> WorkQueue
        {
            get;
            set;
        }
 
        /// <summary>
        /// Simple resource pool filled with threads
        /// </summary>
        private ResourcePool<Thread> Threads
        {
            get;
            set;
        }
 
        public void Dispose(bool disposing)
        {
            if (disposing)
            {
                this.WorkQueue.Clear();
                this.Threads.Dispose();
            }
        }
 
        public void Dispose()
        {
            this.Dispose(true);
        }
    }

ThreadingDemo.zip


Download the code and test project and try it for yourself.  I will say that this is fairly experimental, although I have been using it for quite a few things and have experienced no issues.  One of the major things I have used it for is the cache scavenger in my Factory caching model and it seems to work great.

 

Posted by Brian Rudolph | with no comments
Filed under: ,