http://msdn.microsoft.com/en-us/library/dd460684(VS.100).aspx
This documentation is for preview only, and is subject to change in later releases. Blank topics are included as placeholders.]
The following examples show how to use the thread-safe System.Collections.Concurrent..::.BlockingCollection<(Of <(T>)>) collection class to implement the producer-consumer pattern. In this pattern, one or more producer threads add items to a collection as other consumer threads remove items from it. The examples also show how to cancel the producer thread.
namespace ProducerConsumer { using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Linq; using System.Threading; class Program { // Limit the collection size to 2000 items // at any given time. const int upperLimit = 2000; static BlockingCollection<long> collection = new BlockingCollection<long>(upperLimit); // Variables for diagnostic output only. static Stopwatch sw = new Stopwatch(); static int additions = 0; static int subtractions = 0; static void Main(string[] args) { // Start the stopwatch. sw.Start(); // Queue the ProduceData thread. ThreadPool.QueueUserWorkItem(new WaitCallback(RunProducer)); // Queue the ConsumeData thread. ThreadPool.QueueUserWorkItem(new WaitCallback(RunConsumer)); // ThreadPool.QueueUserWorkItem(new WaitCallback(RunConsumer2)); // Keep the console window open while the // consumer thread completes its output. Console.ReadKey(); } static void RunProducer(Object stateInfo) { for (int i = 0; i < 100; i++) { long ticks = sw.ElapsedTicks; // Uncomment this line to see interleaved additions and subtractions. Console.WriteLine("adding tick value {0}. item# {1}", ticks, additions); collection.Add(ticks); // Counter for demonstration purposes only. additions++; // For demonstration purposes, uncomment this line to // slow down the producer thread without sleeping. // Thread.SpinWait(100000); } // Important!!! Tell consumers that no more items will be added. collection.CompleteAdding(); Console.WriteLine("Done adding: {0} items", additions); } static void RunConsumer(Object stateInfo) { // GetConsumingEnumerable returns the enumerator for the // underlying collection. foreach (var item in collection.GetConsumingEnumerable()) { Console.WriteLine("Consuming tick value {0} : item# {1} ", item.ToString("D18"), subtractions++); } Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ", additions, subtractions, collection.Count()); sw.Stop(); Console.WriteLine("Press any key to exit"); } static void RunConsumer2(Object stateInfo) { // Count may be zero while still waiting for more items. // IsCompleted may be true while Count is still > 0. // Therefore, iterate as long as either condition is true. while (collection.IsCompleted == false || collection.Count > 0) { long ticks = 0; bool b = collection.TryTake(out ticks, 30); if (b == true) { Console.WriteLine("Consuming {0} : {1} ", ticks.ToString("D18"), subtractions++); } else { // Do something else useful before trying again. Console.WriteLine("Doing something useful here"); } } Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ", additions, subtractions, collection.Count); } } }
System.Collections.Concurrent..::.BlockingCollection<(Of <(T>)>) limits the maximum number of items that are in the collection at any time. It also blocks consumer threads if no items are available and it blocks producer threads if the collection is full. This example uses a foreach loop with the BlockingCollection<(Of <(T>)>)..::.GetConsumingEnumerable method in the consuming thread. This type of loop blocks if the collection is empty. In this example blocking is not a concern because the producer thread adds items faster than they can be consumed. The second example in this topic shows how a consumer thread can perform other work if the collection is empty.
The producer thread reads the ElapsedTicks property of a Stopwatch instance and adds that value to the collection. The consumer thread removes items from the collection and simply prints the value to the console. (The call to WriteLine slows down the consumer thread significantly. Remember, this is just an example.) If no upper bound were specified in the System.Collections.Concurrent..::.BlockingCollection<(Of <(T>)>) constructor, the collection would grow and consume all system memory. To see this for yourself, remove the argument from the constructor, press F5, and view the process in Task Manager.
The following example shows how to consume the collection in such a way that the consuming thread can perform other work if the collection is empty. This is useful in scenarios where the producer is generally slower than the consumer thread.
static void RunConsumer2(Object stateInfo) { // Count may be zero while still waiting for more items. // IsCompleted may be true while Count is still > 0. // Therefore, iterate as long as either condition is true. while (collection.IsCompleted == false || collection.Count > 0) { long ticks = 0; bool b = collection.TryTake(out ticks, 30); if (b == true) { Console.WriteLine("Consuming {0} : {1} ", ticks.ToString("D18"), subtractions++); } else { // Do something else useful before trying again. Console.WriteLine("Doing something useful here"); } } Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ", additions, subtractions, collection.Count); }
To compile and run this code, cut and paste it into the previous example, and then comment out the first QueueUserWorkItem method call, and uncomment the second call. Next, uncomment the SpinWait lines in the RunProducer method to simulate a slow producer thread.