C#

ConcurrentBag in C# – Introduction and Examples

ConcurrentBag is one of the thread-safe collection classes introduced in .NET 4.0. ConcurrentBag allows you to store objects in unordered way. Contrary to ConcurrentDictionary class, it allows you to store duplicate objects.

ConcurrentBag allows multiple threads to store the objects. It is optimized for scenarios where same thread act as producer and consumer. That means same thread is adding and retrieving the data.

For example, there are two threads Thread1 and Thread2. Thread1 added four objects 1,2,3,4. Thread2 added three objects 5,6,7. After both threads added the data, Thread1 starts retrieving the data. As Thread1 added 1,2,3,4 objects these items get preferences over 5,6,7. After Thread1 retrieves all four items then Thread1 goes to retrieve Thread2 inserted data 5,6,7. We’ll look into this with example in later section. First we learn how we can create, add and retrieve items from ConcurrentBag.

Create ConcurrentBag Instance

Below is the syntax for creating ConcurrentBag instance.

ConcurrentBag<int> bag = new ConcurrentBag<int>();

We can also initialize ConcurrentBag with a existing collection. We can pass any collection in the parameter and it will copy all the collection objects into its own list. Below is the example.

List<int> ints = new List<int>();
ints.Add(1);
ints.Add(2);
ints.Add(3);
ints.Add(4);

ConcurrentBag<int> bag = new ConcurrentBag<int>(ints);
int count = bag.Count; //returns 4

Add New Items

To add new items, there is a Add method. Below is the example.

ConcurrentBag<int> bag = new ConcurrentBag<int>();
bag.Add(1);
bag.Add(2);

In the above example, we have added two items 1 and 2 into the bag.

Add Multiple Items

ConcurrentBag does not provide any AddRange method, we have to manually call Add method for each item.

int[] arr = { 1, 2, 3 };

ConcurrentBag<int> bag = new ConcurrentBag<int>();
foreach(var item in arr)
{
    bag.Add(item);
}

Count All Items

It provides a Count property which returns the total number of items in the Bag.

ConcurrentBag<int> bag = new ConcurrentBag<int>();
bag.Add(1);
bag.Add(2);

int count = bag.Count; //returns 2

Retrieve Items

To retrieve an item, ConcurrentBag provides two methods.

  1. TryPeek
  2. TryTake

TryPeek method returns one item from the bag but it does not remove item from the bag. It returns true if it successfully retrieve item otherwise returns false. It returns the item in the out parameter as shown in below example.

ConcurrentBag<int> bag = new ConcurrentBag<int>();
bag.Add(1);
bag.Add(2);

int item;
bool isSucess = bag.TryPeek(out item); //isSuccess=True, item = 2
isSuccess = bag.TryPeek(out item); //isSuccess=True, item = 2
isSuccess = bag.TryPeek(out item); //isSuccess=True, item = 2

As TryPeek method does not remove item from the bag, it returns the same last item again and again.

TryTake method returns one item from the bag and removes that item from the bag. It returns true if it successfully retrieve item otherwise false.

ConcurrentBag<int> bag = new ConcurrentBag<int>();
bag.Add(1);
bag.Add(2);

int item;
bool isSuccess = bag.TryTake(out item); //isSuccess=True, item = 2
isSuccess = bag.TryTake(out item); //isSuccess=True, item = 1

Retrieve all items

As ConcurrentBag implements the IEnumerable interface, we can enumerate over it in foreach loop. Enumeration is thread-safe in ConcurrentBag. Below is the example.

static void Main(string[] args)
{
    ConcurrentBag<int> bag = new ConcurrentBag<int>();

    Task t1 = Task.Factory.StartNew(() =>
        {
            for (int i = 1; i < 10; ++i)
            {
                bag.Add(i);
                Thread.Sleep(200);
            }
        });

    Task t2 = Task.Factory.StartNew(() =>
        {
            int i = 0;
            while (i != 4)
            {
                foreach (var item in bag)
                {
                    Console.WriteLine(i + "-" + item);
                    Thread.Sleep(200);
                }
                i++;
                Thread.Sleep(200);
            }

        });

    Task.WaitAll(t1, t2);
}

In the above example, we have created two threads. One thread is adding new items into the bag. Second thread is calling foreach loop. Both threads are working at the same time, and second thread calling foreach loop again and again. Second thread does not throw an exception if any new item is added during the enumeration.

Producer/Consumer approach

As I have explained earlier, ConcurrentBag is preferable in scenarios where same thread is both producer and the consumer. ConcurrentBag maintains a local queue for each thread that access it, and when the same thread is retrieving items, it gives priority to those items that are in same thread queue.

ConcurrentBag is an implementation of Work stealing algorithm.

You can read more about work stealing algorithm here. Below is the example.

static void Main(string[] args)
{
    ConcurrentBag<int> bag = new ConcurrentBag<int>();
    AutoResetEvent autoEvent1 = new AutoResetEvent(false);

    Task t1 = Task.Factory.StartNew(() =>
    {
        for (int i = 1; i <= 4; ++i)
        {
            bag.Add(i);
        }
        //wait for second thread to add its items
        autoEvent1.WaitOne();

        while (bag.IsEmpty == false)
        {
            int item;
            if (bag.TryTake(out item))
            {
                Console.WriteLine(item);
            }
        }
    });


    Task t2 = Task.Factory.StartNew(() =>
        {
            for (int i = 5; i <= 7; ++i)
            {
                bag.Add(i);
            }
            autoEvent1.Set();
        });

    t1.Wait();

    //Output
    // 4
    // 3
    // 2
    // 1
    // 5
    // 6
    // 7
}

In this example, when both threads t1 and t2 completes adding items, then Thread1 starts retrieving the items. In the bag 5,6,7 are added after the 1,2,3,4, but as Thread1 is accessing the item 1,2,3,4 get preferences. When t1 local queue is empty then it goes to another thread local queue to steal the items.

Final Words

ConcurrentBag is a thread-safe collection class. It allows you to stores items list in unordered way and it also allows you to store duplicate items. ConcurrentBag supports both Producer/Consumer problem and Work stealing algorithm.