Timer + Producer-Consumer
up vote
2
down vote
favorite
I am new to threading and I am trying to learn different concepts.
Right now I am doing a Producer/Consumer pattern with a Timer Thread. The Problem is I have no idea on how to check if all producer and consumer threads finished their processes before letting the Timer thread tick for a certain time and dispose all created producer and consumer thread for the next tick.
Would like to ask for your help and guidance on how to create a work-around for this approach.
Here is my sample code:
public class WorkerThread
{
public BlockingQueue<Item> collection = new BlockingQueue<Item>(100);
private Timer TimerThread { get; set; }
public void ThreadTimer()
{
this.TimerThread = new Timer(new TimerCallback(StartMonitor), null, 500, Timeout.Infinite);
}
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => RunProducers(this.collection)));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => RunConsumers(this.collection)));
}
//TODO: Start all consumer threads...
//TODO: Let Thread wait until all worker threads are done
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
public void RunProducers(BlockingQueue<Item> collection)
{
List<Item> lsItems = CreateListOfItems();
foreach(var item in lsItems)
{
collection.Add(item);
}
}
public void RunConsumers(BlockingQueue<Item> collection)
{
while(true)
{
Item item = collection.Take();
Console.WriteLine("Processed[{0}] : {1}", item.ID, item.Name);
//Thread.Sleep(100);
}
}
public List<Item> CreateListOfItems()
{
List<Item> lsItems = new List<Item>();
for (int i = 0; i <= 9999; i++)
{
lsItems.Add(new Item() { ID = i, Name = "Item[" + i + "]" });
}
return lsItems;
}
}
BlockCollection Implementation (Since our environment is in .Net 3.5 we can't use libraries on the higher verions).
public class BlockingQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly int MaxSize;
public bool closing;
public BlockingQueue(int maxSize) {
this.MaxSize = maxSize;
}
public void Add(T item)
{
lock(queue)
{
while(queue.Count >= this.MaxSize)
{
Monitor.Wait(queue);
}
queue.Enqueue(item);
if(queue.Count == 1)
{
Monitor.PulseAll(queue);
}
}
}
public T Take()
{
lock(queue)
{
while(queue.Count == 0)
{
Monitor.Wait(queue);
}
T item = queue.Dequeue();
if(queue.Count == MaxSize - 1)
{
Monitor.PulseAll(queue);
}
return item;
}
}
public void Close()
{
lock (queue)
{
closing = true;
Monitor.PulseAll(queue);
}
}
public bool TryDequeue(out T value)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing)
{
value = default(T);
return false;
}
Monitor.Wait(queue);
}
value = queue.Dequeue();
if (queue.Count == MaxSize - 1)
{
Monitor.PulseAll(queue);
}
return true;
}
}
}
c# multithreading
add a comment |
up vote
2
down vote
favorite
I am new to threading and I am trying to learn different concepts.
Right now I am doing a Producer/Consumer pattern with a Timer Thread. The Problem is I have no idea on how to check if all producer and consumer threads finished their processes before letting the Timer thread tick for a certain time and dispose all created producer and consumer thread for the next tick.
Would like to ask for your help and guidance on how to create a work-around for this approach.
Here is my sample code:
public class WorkerThread
{
public BlockingQueue<Item> collection = new BlockingQueue<Item>(100);
private Timer TimerThread { get; set; }
public void ThreadTimer()
{
this.TimerThread = new Timer(new TimerCallback(StartMonitor), null, 500, Timeout.Infinite);
}
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => RunProducers(this.collection)));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => RunConsumers(this.collection)));
}
//TODO: Start all consumer threads...
//TODO: Let Thread wait until all worker threads are done
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
public void RunProducers(BlockingQueue<Item> collection)
{
List<Item> lsItems = CreateListOfItems();
foreach(var item in lsItems)
{
collection.Add(item);
}
}
public void RunConsumers(BlockingQueue<Item> collection)
{
while(true)
{
Item item = collection.Take();
Console.WriteLine("Processed[{0}] : {1}", item.ID, item.Name);
//Thread.Sleep(100);
}
}
public List<Item> CreateListOfItems()
{
List<Item> lsItems = new List<Item>();
for (int i = 0; i <= 9999; i++)
{
lsItems.Add(new Item() { ID = i, Name = "Item[" + i + "]" });
}
return lsItems;
}
}
BlockCollection Implementation (Since our environment is in .Net 3.5 we can't use libraries on the higher verions).
public class BlockingQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly int MaxSize;
public bool closing;
public BlockingQueue(int maxSize) {
this.MaxSize = maxSize;
}
public void Add(T item)
{
lock(queue)
{
while(queue.Count >= this.MaxSize)
{
Monitor.Wait(queue);
}
queue.Enqueue(item);
if(queue.Count == 1)
{
Monitor.PulseAll(queue);
}
}
}
public T Take()
{
lock(queue)
{
while(queue.Count == 0)
{
Monitor.Wait(queue);
}
T item = queue.Dequeue();
if(queue.Count == MaxSize - 1)
{
Monitor.PulseAll(queue);
}
return item;
}
}
public void Close()
{
lock (queue)
{
closing = true;
Monitor.PulseAll(queue);
}
}
public bool TryDequeue(out T value)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing)
{
value = default(T);
return false;
}
Monitor.Wait(queue);
}
value = queue.Dequeue();
if (queue.Count == MaxSize - 1)
{
Monitor.PulseAll(queue);
}
return true;
}
}
}
c# multithreading
2
You'll have to get rid of the timer, that is not going to end well. As-is it keeps creating more consumer threads that never end, you have to wait a bit too long to watch it blow up your program. In the snippet it is only necessary as band-aid for the wonky producer code. A sensible thing to do is to push an item into the queue whenever it becomes available in whatever real code that produces items. It that needs to be simulated with a timer then so be it.
– Hans Passant
Nov 8 at 8:40
I just updated my code in my local environment a while ago while bench-marking and yes it does create a lot of consumer threads. Right now I tried to only having the Timer Thread create producer thread to queue items every tick
– Jonathan Daniel
Nov 8 at 8:58
add a comment |
up vote
2
down vote
favorite
up vote
2
down vote
favorite
I am new to threading and I am trying to learn different concepts.
Right now I am doing a Producer/Consumer pattern with a Timer Thread. The Problem is I have no idea on how to check if all producer and consumer threads finished their processes before letting the Timer thread tick for a certain time and dispose all created producer and consumer thread for the next tick.
Would like to ask for your help and guidance on how to create a work-around for this approach.
Here is my sample code:
public class WorkerThread
{
public BlockingQueue<Item> collection = new BlockingQueue<Item>(100);
private Timer TimerThread { get; set; }
public void ThreadTimer()
{
this.TimerThread = new Timer(new TimerCallback(StartMonitor), null, 500, Timeout.Infinite);
}
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => RunProducers(this.collection)));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => RunConsumers(this.collection)));
}
//TODO: Start all consumer threads...
//TODO: Let Thread wait until all worker threads are done
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
public void RunProducers(BlockingQueue<Item> collection)
{
List<Item> lsItems = CreateListOfItems();
foreach(var item in lsItems)
{
collection.Add(item);
}
}
public void RunConsumers(BlockingQueue<Item> collection)
{
while(true)
{
Item item = collection.Take();
Console.WriteLine("Processed[{0}] : {1}", item.ID, item.Name);
//Thread.Sleep(100);
}
}
public List<Item> CreateListOfItems()
{
List<Item> lsItems = new List<Item>();
for (int i = 0; i <= 9999; i++)
{
lsItems.Add(new Item() { ID = i, Name = "Item[" + i + "]" });
}
return lsItems;
}
}
BlockCollection Implementation (Since our environment is in .Net 3.5 we can't use libraries on the higher verions).
public class BlockingQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly int MaxSize;
public bool closing;
public BlockingQueue(int maxSize) {
this.MaxSize = maxSize;
}
public void Add(T item)
{
lock(queue)
{
while(queue.Count >= this.MaxSize)
{
Monitor.Wait(queue);
}
queue.Enqueue(item);
if(queue.Count == 1)
{
Monitor.PulseAll(queue);
}
}
}
public T Take()
{
lock(queue)
{
while(queue.Count == 0)
{
Monitor.Wait(queue);
}
T item = queue.Dequeue();
if(queue.Count == MaxSize - 1)
{
Monitor.PulseAll(queue);
}
return item;
}
}
public void Close()
{
lock (queue)
{
closing = true;
Monitor.PulseAll(queue);
}
}
public bool TryDequeue(out T value)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing)
{
value = default(T);
return false;
}
Monitor.Wait(queue);
}
value = queue.Dequeue();
if (queue.Count == MaxSize - 1)
{
Monitor.PulseAll(queue);
}
return true;
}
}
}
c# multithreading
I am new to threading and I am trying to learn different concepts.
Right now I am doing a Producer/Consumer pattern with a Timer Thread. The Problem is I have no idea on how to check if all producer and consumer threads finished their processes before letting the Timer thread tick for a certain time and dispose all created producer and consumer thread for the next tick.
Would like to ask for your help and guidance on how to create a work-around for this approach.
Here is my sample code:
public class WorkerThread
{
public BlockingQueue<Item> collection = new BlockingQueue<Item>(100);
private Timer TimerThread { get; set; }
public void ThreadTimer()
{
this.TimerThread = new Timer(new TimerCallback(StartMonitor), null, 500, Timeout.Infinite);
}
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => RunProducers(this.collection)));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => RunConsumers(this.collection)));
}
//TODO: Start all consumer threads...
//TODO: Let Thread wait until all worker threads are done
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
public void RunProducers(BlockingQueue<Item> collection)
{
List<Item> lsItems = CreateListOfItems();
foreach(var item in lsItems)
{
collection.Add(item);
}
}
public void RunConsumers(BlockingQueue<Item> collection)
{
while(true)
{
Item item = collection.Take();
Console.WriteLine("Processed[{0}] : {1}", item.ID, item.Name);
//Thread.Sleep(100);
}
}
public List<Item> CreateListOfItems()
{
List<Item> lsItems = new List<Item>();
for (int i = 0; i <= 9999; i++)
{
lsItems.Add(new Item() { ID = i, Name = "Item[" + i + "]" });
}
return lsItems;
}
}
BlockCollection Implementation (Since our environment is in .Net 3.5 we can't use libraries on the higher verions).
public class BlockingQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly int MaxSize;
public bool closing;
public BlockingQueue(int maxSize) {
this.MaxSize = maxSize;
}
public void Add(T item)
{
lock(queue)
{
while(queue.Count >= this.MaxSize)
{
Monitor.Wait(queue);
}
queue.Enqueue(item);
if(queue.Count == 1)
{
Monitor.PulseAll(queue);
}
}
}
public T Take()
{
lock(queue)
{
while(queue.Count == 0)
{
Monitor.Wait(queue);
}
T item = queue.Dequeue();
if(queue.Count == MaxSize - 1)
{
Monitor.PulseAll(queue);
}
return item;
}
}
public void Close()
{
lock (queue)
{
closing = true;
Monitor.PulseAll(queue);
}
}
public bool TryDequeue(out T value)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing)
{
value = default(T);
return false;
}
Monitor.Wait(queue);
}
value = queue.Dequeue();
if (queue.Count == MaxSize - 1)
{
Monitor.PulseAll(queue);
}
return true;
}
}
}
c# multithreading
c# multithreading
edited Nov 8 at 6:07
Uwe Keim
27.3k30128210
27.3k30128210
asked Nov 8 at 5:56
Jonathan Daniel
285
285
2
You'll have to get rid of the timer, that is not going to end well. As-is it keeps creating more consumer threads that never end, you have to wait a bit too long to watch it blow up your program. In the snippet it is only necessary as band-aid for the wonky producer code. A sensible thing to do is to push an item into the queue whenever it becomes available in whatever real code that produces items. It that needs to be simulated with a timer then so be it.
– Hans Passant
Nov 8 at 8:40
I just updated my code in my local environment a while ago while bench-marking and yes it does create a lot of consumer threads. Right now I tried to only having the Timer Thread create producer thread to queue items every tick
– Jonathan Daniel
Nov 8 at 8:58
add a comment |
2
You'll have to get rid of the timer, that is not going to end well. As-is it keeps creating more consumer threads that never end, you have to wait a bit too long to watch it blow up your program. In the snippet it is only necessary as band-aid for the wonky producer code. A sensible thing to do is to push an item into the queue whenever it becomes available in whatever real code that produces items. It that needs to be simulated with a timer then so be it.
– Hans Passant
Nov 8 at 8:40
I just updated my code in my local environment a while ago while bench-marking and yes it does create a lot of consumer threads. Right now I tried to only having the Timer Thread create producer thread to queue items every tick
– Jonathan Daniel
Nov 8 at 8:58
2
2
You'll have to get rid of the timer, that is not going to end well. As-is it keeps creating more consumer threads that never end, you have to wait a bit too long to watch it blow up your program. In the snippet it is only necessary as band-aid for the wonky producer code. A sensible thing to do is to push an item into the queue whenever it becomes available in whatever real code that produces items. It that needs to be simulated with a timer then so be it.
– Hans Passant
Nov 8 at 8:40
You'll have to get rid of the timer, that is not going to end well. As-is it keeps creating more consumer threads that never end, you have to wait a bit too long to watch it blow up your program. In the snippet it is only necessary as band-aid for the wonky producer code. A sensible thing to do is to push an item into the queue whenever it becomes available in whatever real code that produces items. It that needs to be simulated with a timer then so be it.
– Hans Passant
Nov 8 at 8:40
I just updated my code in my local environment a while ago while bench-marking and yes it does create a lot of consumer threads. Right now I tried to only having the Timer Thread create producer thread to queue items every tick
– Jonathan Daniel
Nov 8 at 8:58
I just updated my code in my local environment a while ago while bench-marking and yes it does create a lot of consumer threads. Right now I tried to only having the Timer Thread create producer thread to queue items every tick
– Jonathan Daniel
Nov 8 at 8:58
add a comment |
1 Answer
1
active
oldest
votes
up vote
0
down vote
You can just check property IsAlive of all of worker threads. It seems as not very clear code, but it works:
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => RunProducers(this.collection)));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => RunConsumers(this.collection)));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
while(true)
{
Thread.Sleep(50);
List<Thread> is_alive = new List<Thread>();
foreach(Thread t in to_check)
if(t.IsAlive)
is_alive.Add(t);
if(is_alive.Count == 0)
break;
to_check = is_alive;
}
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
Or, maybe somewhat better way:
private int _counter = new int[1];
private int Counter
{
get
{
lock (_counter) { return _counter[0]; }
}
set
{
lock (_counter) { _counter[0] = value; }
}
}
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
Counter = 0;
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => { Counter++; RunProducers(this.collection); Counter--; }));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => { Counter++; RunConsumers(this.collection); Counter--; }));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
while (Counter > 0)
Thread.Sleep(50);
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
And to avoid using of Sleep() you can use a Barrier class:
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
int producer_cnt = 1,
consumer_cnt = 2;
Barrier b = new Barrier(producer_cnt + consumer_cnt + 1);
try
{
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => { try { RunProducers(this.collection); } finally { b.SignalAndWait(); } }));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => { try { RunConsumers(this.collection); } finally { b.SignalAndWait(); } }));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
}
finally
{
b.SignalAndWait();
}
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
0
down vote
You can just check property IsAlive of all of worker threads. It seems as not very clear code, but it works:
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => RunProducers(this.collection)));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => RunConsumers(this.collection)));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
while(true)
{
Thread.Sleep(50);
List<Thread> is_alive = new List<Thread>();
foreach(Thread t in to_check)
if(t.IsAlive)
is_alive.Add(t);
if(is_alive.Count == 0)
break;
to_check = is_alive;
}
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
Or, maybe somewhat better way:
private int _counter = new int[1];
private int Counter
{
get
{
lock (_counter) { return _counter[0]; }
}
set
{
lock (_counter) { _counter[0] = value; }
}
}
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
Counter = 0;
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => { Counter++; RunProducers(this.collection); Counter--; }));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => { Counter++; RunConsumers(this.collection); Counter--; }));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
while (Counter > 0)
Thread.Sleep(50);
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
And to avoid using of Sleep() you can use a Barrier class:
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
int producer_cnt = 1,
consumer_cnt = 2;
Barrier b = new Barrier(producer_cnt + consumer_cnt + 1);
try
{
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => { try { RunProducers(this.collection); } finally { b.SignalAndWait(); } }));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => { try { RunConsumers(this.collection); } finally { b.SignalAndWait(); } }));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
}
finally
{
b.SignalAndWait();
}
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
add a comment |
up vote
0
down vote
You can just check property IsAlive of all of worker threads. It seems as not very clear code, but it works:
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => RunProducers(this.collection)));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => RunConsumers(this.collection)));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
while(true)
{
Thread.Sleep(50);
List<Thread> is_alive = new List<Thread>();
foreach(Thread t in to_check)
if(t.IsAlive)
is_alive.Add(t);
if(is_alive.Count == 0)
break;
to_check = is_alive;
}
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
Or, maybe somewhat better way:
private int _counter = new int[1];
private int Counter
{
get
{
lock (_counter) { return _counter[0]; }
}
set
{
lock (_counter) { _counter[0] = value; }
}
}
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
Counter = 0;
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => { Counter++; RunProducers(this.collection); Counter--; }));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => { Counter++; RunConsumers(this.collection); Counter--; }));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
while (Counter > 0)
Thread.Sleep(50);
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
And to avoid using of Sleep() you can use a Barrier class:
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
int producer_cnt = 1,
consumer_cnt = 2;
Barrier b = new Barrier(producer_cnt + consumer_cnt + 1);
try
{
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => { try { RunProducers(this.collection); } finally { b.SignalAndWait(); } }));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => { try { RunConsumers(this.collection); } finally { b.SignalAndWait(); } }));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
}
finally
{
b.SignalAndWait();
}
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
add a comment |
up vote
0
down vote
up vote
0
down vote
You can just check property IsAlive of all of worker threads. It seems as not very clear code, but it works:
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => RunProducers(this.collection)));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => RunConsumers(this.collection)));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
while(true)
{
Thread.Sleep(50);
List<Thread> is_alive = new List<Thread>();
foreach(Thread t in to_check)
if(t.IsAlive)
is_alive.Add(t);
if(is_alive.Count == 0)
break;
to_check = is_alive;
}
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
Or, maybe somewhat better way:
private int _counter = new int[1];
private int Counter
{
get
{
lock (_counter) { return _counter[0]; }
}
set
{
lock (_counter) { _counter[0] = value; }
}
}
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
Counter = 0;
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => { Counter++; RunProducers(this.collection); Counter--; }));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => { Counter++; RunConsumers(this.collection); Counter--; }));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
while (Counter > 0)
Thread.Sleep(50);
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
And to avoid using of Sleep() you can use a Barrier class:
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
int producer_cnt = 1,
consumer_cnt = 2;
Barrier b = new Barrier(producer_cnt + consumer_cnt + 1);
try
{
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => { try { RunProducers(this.collection); } finally { b.SignalAndWait(); } }));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => { try { RunConsumers(this.collection); } finally { b.SignalAndWait(); } }));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
}
finally
{
b.SignalAndWait();
}
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
You can just check property IsAlive of all of worker threads. It seems as not very clear code, but it works:
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => RunProducers(this.collection)));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => RunConsumers(this.collection)));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
while(true)
{
Thread.Sleep(50);
List<Thread> is_alive = new List<Thread>();
foreach(Thread t in to_check)
if(t.IsAlive)
is_alive.Add(t);
if(is_alive.Count == 0)
break;
to_check = is_alive;
}
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
Or, maybe somewhat better way:
private int _counter = new int[1];
private int Counter
{
get
{
lock (_counter) { return _counter[0]; }
}
set
{
lock (_counter) { _counter[0] = value; }
}
}
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
Counter = 0;
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => { Counter++; RunProducers(this.collection); Counter--; }));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => { Counter++; RunConsumers(this.collection); Counter--; }));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
while (Counter > 0)
Thread.Sleep(50);
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
And to avoid using of Sleep() you can use a Barrier class:
public void StartMonitor(object state)
{
List<Thread> producers = new List<Thread>();
List<Thread> consumers = new List<Thread>();
int producer_cnt = 1,
consumer_cnt = 2;
Barrier b = new Barrier(producer_cnt + consumer_cnt + 1);
try
{
for (int i = 0; i < 1; i++)
{
producers.Add(new Thread(() => { try { RunProducers(this.collection); } finally { b.SignalAndWait(); } }));
}
//TODO: Start all producer threads...
for (int i = 0; i < 2; i++)
{
consumers.Add(new Thread(() => { try { RunConsumers(this.collection); } finally { b.SignalAndWait(); } }));
}
//TODO: Let Thread wait until all worker threads are done
List<Thread> to_check = new List<Thread>(producers);
to_check.AddRange(consumers);
}
finally
{
b.SignalAndWait();
}
//TODO: Dispose Threads
TimerThread.Change(5000, Timeout.Infinite);
}
edited Nov 8 at 8:45
answered Nov 8 at 8:06
Alexander Kiselev
386113
386113
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53202228%2ftimer-producer-consumer%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
2
You'll have to get rid of the timer, that is not going to end well. As-is it keeps creating more consumer threads that never end, you have to wait a bit too long to watch it blow up your program. In the snippet it is only necessary as band-aid for the wonky producer code. A sensible thing to do is to push an item into the queue whenever it becomes available in whatever real code that produces items. It that needs to be simulated with a timer then so be it.
– Hans Passant
Nov 8 at 8:40
I just updated my code in my local environment a while ago while bench-marking and yes it does create a lot of consumer threads. Right now I tried to only having the Timer Thread create producer thread to queue items every tick
– Jonathan Daniel
Nov 8 at 8:58