C#

BlockingCollection in C# – Introduction and Examples

BlockingCollection is a collection class which ensures thread-safety. Multiple threads can add and remove objects in BlockingCollection concurrently.

It implements the producer-consumer pattern. In this pattern, there are two threads one is called producer and other is called consumer. Both threads share a common collection class to exchange data between them. BlockingCollection can be used as the collection class. Producer thread generates the data and consumer thread is consuming the data. We set the maximum limit of collection class. Producer cannot add new objects more than maximum limit and consumer cannot remove data from an empty collection class.

BlockingCollection has two features which differentiate it from other concurrent classes.

  1. Bounding
  2. Blocking

Both features help us to implement producer-consumer pattern.

Bounding means we can set the maximum number of objects that we can store in the collection. When a producer thread reaches BlockingCollection maximum limit, it is blocked to add new objects. In the blocked stage, thread goes in the sleep mode. It will unblock when consumer thread remove item from the collection.

When collection class is empty, the consumer thread is blocked until the producer thread adds new item.

At the end, producer thread calls the CompleteAdding method. CompleteAdding set the IsCompleted property to true. A consumer thread monitors the IsCompleted property to know that there are no more items to add.

Create BlockingCollection Instance

Below is the syntax of creating BlockingCollection

BlockingCollection<int> bCollection = new BlockingCollection<int>(10);

We can also create BlockingCollection with no max limit then it will take any number of items.

Default Collection Type

By default, BlockingCollection use ConcurrentQueue as its collection class. We can also provide another collection class. But we can pass only those collection classes which implements the IProducerConsumerCollection<T> interface. ConcurrentStack and ConcurrentBag implements the IProducerConsumerCollection<T> interface. We can also define our own collection class which implements the IProducerConsumerCollection<T> interface and pass that class to BlockingCollection constructor. Below is the example of passing ConcurrentBag to BlockingCollection which has maximum limit of 10 objects.

BlockingCollection<int> bCollection = new BlockingCollection<int>(new ConcurrentBag<int>(), 10);

Add New Item

There are two methods to add new items.

  1. Add
  2. TryAdd

Add Method

Add method takes a single parameter. This method is blocked when maximum limit is reached. Below is the example of Add method.

BlockingCollection<int> bCollection = new BlockingCollection<int>(boundedCapacity: 2);
bCollection.Add(1);
bCollection.Add(2);

In the above example, we have created BlockingCollection with maximum capacity of 2 items. When we try to add third item, it will block until any item is not removed from the collection.

TryAdd Method

For solving the above problem, we have a different add method with takes a timeout value. If the add operation is not completed within the timespan value then TryAdd method returns with false value.

Below is the example of TryAdd method.

BlockingCollection<int> bCollection = new BlockingCollection<int>(boundedCapacity: 2);
bCollection.Add(1);
bCollection.Add(2);

if (bCollection.TryAdd(3, TimeSpan.FromSeconds(1)))
{
    Console.WriteLine("Item added");
}
else
{
    Console.WriteLine("Item does not added");
}

We have defined bounded capacity of 2 in the constructor. When we try to add third object, it will wait for 1 second and returns with false value.

Remove Items

BlockingCollection provides two methods for removing an item.

  1. Take
  2. TryTake

Take Method

Take method removes an item from the collection. Take method is blocked when the collection is empty. It’ll unblock when any item is added by other thread.

BlockingCollection<int> bCollection = new BlockingCollection<int>(boundedCapacity: 3);
bCollection.Add(1);
bCollection.Add(2);

int item = bCollection.Take();
Console.WriteLine(item); //returns 1

TryTake Method

TryTake method also removes an item and returns item in the out parameter. This method also takes a timeout parameter. If the collection is empty then this method will wait for time specify in timeout parameter. If new item is not added within the timeout value, then it returns false.

BlockingCollection<int> bCollection = new BlockingCollection<int>(boundedCapacity: 2);
bCollection.Add(1);
bCollection.Add(2);

int item = bCollection.Take();
item = bCollection.Take();

if (bCollection.TryTake(out item, TimeSpan.FromSeconds(1)))
{
    Console.WriteLine(item);
}
else
{
    Console.WriteLine("No item removed");
}

CompleteAdding method and IsCompleted Property

Producer thread call the CompleteAdding method. It marks the BlockingCollection instance that it will not add any more items. CompleteAdding method mark the IsAddingCompleted property to true.

IsCompleted property is used by consumer threads. It returns true when IsAddingCompleted is true and the BlockingCollection is empty. That means when IsCompleted is true there are no items in the collection and other producer threads will not add any new item.

Below is the example of CompleteAdding method and IsCompleted property.

static void Main(string[] args)
{
    BlockingCollection<int> bCollection = new BlockingCollection<int>(boundedCapacity: 10);

    Task producerThread = Task.Factory.StartNew(() =>
        {
            for (int i = 0; i < 10; ++i)
            {
                bCollection.Add(i);
            }

            bCollection.CompleteAdding();
        });

    Task consumerThread = Task.Factory.StartNew(() =>
        {
            while (!bCollection.IsCompleted)
            {
                int item = bCollection.Take();
                Console.WriteLine(item);
            }
        });

    Task.WaitAll(producerThread, consumerThread);
}

In the above example, we have created two threads one is producer and other is consumer thread. Producer thread adds items into the BlockingCollection. After adding it will call the CompleteAdding method that will mark that collection class that it will not add any more items.

Consumer thread put a condition in while loop. In the loop, it checks the IsCompleted property. Loop runs until the IsCompleted property returns true. We remove one item at a time using the Take method and print into the console.

BlockingCollection in the Foreach loop

BlockingCollection provides a GetConsumingEnumerable() method. This method returns IEnumerable<T> so that we can use that method in the foreach loop. This method returns items as soon as item is available in the collection.

This method has a blocking feature. It will block the foreach loop when the collection is empty. A foreach loop ends when the producer thread calls the CompleteAdding method. Below is the example.

static void Main(string[] args)
{
    BlockingCollection<int> bCollection = new BlockingCollection<int>(boundedCapacity: 10);
    Task producerThread = Task.Factory.StartNew(() =>
        {
            for (int i = 0; i < 10; ++i)
            {
                Thread.Sleep(TimeSpan.FromSeconds(1));
                bCollection.Add(i);
            }

            bCollection.CompleteAdding();
        });

    foreach (int item in bCollection.GetConsumingEnumerable())
    {
        Console.WriteLine(item);
    }

    //Output:
    // 0
    // 1
    // 2
    // 3
    // 4
    // 5
    // 6
    // 7
    // 8 
    // 9
}

In the above example, a producer thread is adding items. It will take 1 second before adding items into the collection. GetConsumingEnumerable method wait until the CompleteAdded method is called.

Work with Multiple Producers and Consumers

Sometime, we have multiple producers and consumers threads. BlockingCollection gives some static methods to work with multiple collections. Below are the methods.

  1. AddToAny
  2. TryAddToAny
  3. TakeFromAny
  4. TryTakeFromAny

All three methods are static. They take an array of BlockingCollection as parameter. AddToAny and TryAddToAny add item into any of the BlockingCollection array item. TryTakeFromAny also takes an array of BlockingCollection and try removes the item from any of the array item.

Below is the example of TryTakeFromAny.

static void Main(string[] args)
{
    BlockingCollection<int>[] producers = new BlockingCollection[3];
    producers[0] = new BlockingCollection<int>(boundedCapacity: 10);
    producers[1] = new BlockingCollection<int>(boundedCapacity: 10);
    producers[2] = new BlockingCollection<int>(boundedCapacity: 10);

    Task t1 = Task.Factory.StartNew(() =>
        {
            for (int i = 1; i <= 10; ++i)
            {
                producers[0].Add(i);
                Thread.Sleep(100);
            }
            producers[0].CompleteAdding();
        });

    Task t2 = Task.Factory.StartNew(() =>
    {
        for (int i = 11; i <= 20; ++i)
        {
            producers[1].Add(i);
            Thread.Sleep(150);
        }
        producers[1].CompleteAdding();
    });

    Task t3 = Task.Factory.StartNew(() =>
    {
        for (int i = 21; i <= 30; ++i)
        {
            producers[2].Add(i);
            Thread.Sleep(250);
        }
        producers[2].CompleteAdding();
    });

    while (!producers[0].IsCompleted ||
        !producers[1].IsCompleted ||
        !producers[2].IsCompleted)
    {
        int item;
        BlockingCollection<int>.TryTakeFromAny(producers, out item, TimeSpan.FromSeconds(1));
        if (item != default(int))
        {
            Console.WriteLine(item);
        }
    }
}

In the above example, we have used three producer threads in the array. We started three threads all are adding new items into the BlockingCollection array. In the last while loop, we are using TryTakeFromAny to remove a single item from any of the BlockingCollection array and print it to the console.

Here is a MSDN example of creating pipeline using BlockingCollection 

Final Words

BlockingCollection is a thread-safe collection class. It is an implementation of producer-consumer pattern. It provides features bounding and blocking to support producer-consumer pattern. It is only concurrent collection class which support bounding and blocking features.