Throttled processing of multiple asynchronous tasks in coding
A random Twitterizer developer that emailed me was attempting to pull data for multiple users all in background threads, but noticed that his requests started to be rejected by Twitter. I proposed that it was possibly a spam countermeasure by the Twitter API, and that he needed to throttle his requests. He had no idea how to do that. Seeing an excellent challenge, I jumped into Visual Studio and created the following console application.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
namespace ThrottledThreads
{
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Threading;
class Program
{
// Configuration stuff
static int maxThreads = 10000;
static int maxActiveThreads = 4;
static int maxWaitTime = 10 * 1000;
static bool verbose = false;
// The rest doesn't need to be modified.
public static int currentlyActive = 0;
public static Queue<BackgroundWorker> threadQueue = new Queue<BackgroundWorker>();
public static AutoResetEvent resetEvent = new AutoResetEvent(false);
static object currentlyActiveLock = new object();
static Random rnd = new Random();
/// <summary>
/// The entry point for the application
/// </summary>
/// <param name="args">The args.</param>
/// <remarks></remarks>
static void Main(string[] args)
{
Console.WriteLine("[Main] Queueing threads");
// Add the background threads to the queue.
for (int i = 0; i < maxThreads; i++)
{
BackgroundWorker worker = new BackgroundWorker();
worker.DoWork += new DoWorkEventHandler(backgroundWorker_DoWork);
worker.RunWorkerCompleted += new RunWorkerCompletedEventHandler(backgroundWorker_RunWorkerCompleted);
threadQueue.Enqueue(worker);
}
int threadNumber = 1;
Console.WriteLine("[Main] Entering main loop.");
// Loop through the queue and process new workers as they complete
while (threadQueue.Count > 0)
{
lock (currentlyActiveLock)
{
for (int index = currentlyActive; index <= maxActiveThreads; index++)
{
Console.WriteLine("[Main] Running {0}", threadNumber);
threadQueue.Dequeue().RunWorkerAsync(threadNumber);
currentlyActive++;
threadNumber++;
}
}
if (verbose)
Console.WriteLine("[Main] Waiting ...");
resetEvent.WaitOne();
}
Console.WriteLine("[Main] All threads completed");
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
/// <summary>
/// Handles the DoWork event of the backgroundWorker control.
/// </summary>
/// <param name="sender">The source of the event.</param>
/// <param name="e">The <see cref="System.ComponentModel.DoWorkEventArgs"/> instance containing the event data.</param>
/// <remarks></remarks>
private static void backgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
BackgroundWorker worker = sender as BackgroundWorker;
if (verbose)
Console.WriteLine("[{0}] Starting", e.Argument);
int time = rnd.Next(500, maxWaitTime);
// This is where our actual work is done. For the example, we'll just sleep for a while.
Thread.Sleep(time);
Console.WriteLine("[{0}] Finished in {1:0.00}s", e.Argument, time / 1000d);
e.Result = e.Argument;
}
/// <summary>
/// Handles the RunWorkerCompleted event of the backgroundWorker control.
/// </summary>
/// <param name="sender">The source of the event.</param>
/// <param name="e">The <see cref="System.ComponentModel.RunWorkerCompletedEventArgs"/> instance containing the event data.</param>
/// <remarks></remarks>
private static void backgroundWorker_RunWorkerCompleted(
object sender, RunWorkerCompletedEventArgs e)
{
BackgroundWorker worker = sender as BackgroundWorker;
lock (currentlyActiveLock)
{
currentlyActive--;
}
Console.WriteLine("[{0}] Completed", e.Result);
// Signal that the thread has finished and a new thread can be processed.
resetEvent.Set();
}
}
}
The code uses the generic Queue<T> class to queue a large set of BackgroundWorker objects. It will execute a set number of threads, then wait for a worker to complete. When a worker completes, it sends a signal to trigger a new worker to be executed from the queue. It's pretty cool to watch.