i've just started learning the C# language and this is my first program to use threads.
I don't understand why the consumer thread doesn't get called( only after the producer has stopped)and when it gets called it doesn't consume anything, also I've noticed that the majority of produced elements are of the same values.
Here is the full code:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace Sem_example_luci
{
class Semaphore
{
private Object oLock;
private int iResourceAccesses;
private Boolean isBinary;
// private LinkedList<Thread> vWaitingTh;
private Semaphore()
{
this.oLock = new Object();
//this.vWaitingTh = new LinkedList<Thread>();
}
public Semaphore(int initialSemaphoreCount)
: this()
{
if (initialSemaphoreCount == 1)
this.isBinary = true;
this.iResourceAccesses = initialSemaphoreCount;
}
public void increase(Int32 iIncrement)
{
lock (this.oLock)
{
this.iResourceAccesses += iIncrement;
// Console.WriteLine("Semaphore incremented");
if (this.isBinary)
{
Monitor.Pulse(this.oLock);
}
}
}
public void decrease(Int32 iDecrement)
{
lock (this.oLock)
{
while(this.iResourceAccesses == 0 && this.isBinary)
{
//add thread to queue
Monitor.Wait(this.oLock);
}
//Console.WriteLine("semaphore decremented!");
this.iResourceAccesses -= iDecrement;
// Monitor.Pulse(oLock);
}
}
class ElementaryItem
//this class represents what gets produced and consumed
//the element will be consumed only when the "priotity"
//of the element equals that of the current priority
{
public static Int16[] priorities = { 0, 1, 2, 3, 4 };
private Int32 ID, priority;
public static Int32 getRandomPriority()
{
Random rand = new Random();
Int32 temp = (priorities[priorities.Length - 1] - priorities[0] + 1);
return rand.Next() % temp + priorities[0];
}
public ElementaryItem()
{
Random rand = new Random();
this.ID = rand.Next(0, Int16.MaxValue);
this.priority = rand.Next(priorities[0]-1,priorities[priorities.Length-1]+1);
}
public ElementaryItem(ElementaryItem o)
{
this.ID = o.ID;
this.priority = o.priority;
}
public int getID()
{
return this.ID;
}
public int getPriority()
{
return this.priority;
}
public String toString()
{
return "Element ID: " + this.ID + "\t with priority:" + this.priority;
}
}
class MyQueue
//this class holds the information, it's common for the producer and consumer
{
private Int32 MaxNumberOfElements;
private List<ElementaryItem> items;//the key will be the priority
private static int currentPriority;
public void putItemIntoQueue(ElementaryItem item)
{
this.items.Add(item);
Console.WriteLine("Added in queue the item:"+item.toString());
this.MaxNumberOfElements--;
Console.WriteLine("new number of elements = " + this.MaxNumberOfElements);
}
public static void updatePriority(){
while (true)
{
//generate a random priority
MyQueue.currentPriority = (MyQueue.currentPriority+1) % ElementaryItem.priorities.Length;//we need the next priority
Console.WriteLine("Now serving priority level:{0}", MyQueue.currentPriority);
Thread.Sleep(10000);
}
}
public ElementaryItem getItemFromQueue()
{
ElementaryItem retValue = null;
for (int i = 0; i < this.items.Count; i++)
{
ElementaryItem temp = this.items[i];
if (temp.getPriority() == MyQueue.currentPriority || i==this.items.Count)
{
retValue = new ElementaryItem(temp);//copy the element
this.items.RemoveAt(i);//remove the original
break;
}//else to prevent queue overflow we disregard the priority condition
//and we just take the last visited element
}
// Console.WriteLine("Remainig elements" + this.items.Count());
return retValue;
}
public MyQueue(Int32 maxSize)
{
this.MaxNumberOfElements = maxSize;//the maximum numeber of elements we want to produce
this.items = new System.Collections.Generic.List<ElementaryItem>();
MyQueue.currentPriority = ElementaryItem.getRandomPriority();
Thread priorityUpdate = new Thread(MyQueue.updatePriority);
priorityUpdate.Start();
}
//this function
public bool endConditionAchieved()
{
return this.MaxNumberOfElements == 0;
}
}
class Entity
{
public enum EntityType { Producer, Consumer };
private EntityType eType;
private Semaphore emptyCount;//we keep track, using the semaphore, of the number of empty spaces
private Semaphore fullCount;//and the number of elements currently on the queue
private Semaphore useQueue;//binary semaphore
private MyQueue elementQueue;
public Entity(EntityType e, Semaphore emptyCount, Semaphore fullCount,Semaphore useQueue,MyQueue elementQueue)
{
this.eType = e;
this.emptyCount = emptyCount;
this.fullCount = fullCount;
this.useQueue = useQueue;
this.elementQueue = elementQueue;
}
public static void Worker(Object ent)
{
((Entity)ent).action();
}
public void action()
{
if (this.eType == EntityType.Producer)
{
Console.WriteLine("Thread producerr fired!");
this.produce();
}
else
{
Console.WriteLine("Thread Consumer fired!");
this.consume();
}
}
private void produce()
{
while (!this.elementQueue.endConditionAchieved())
{
// Thread.Sleep(10000);
this.emptyCount.decrease(1);
//Console.WriteLine("emptyCount decreased");
this.useQueue.decrease(1);
// Console.WriteLine("useQueue decreased");
this.elementQueue.putItemIntoQueue(new ElementaryItem());
// Console.WriteLine("Element added to queue");
this.useQueue.increase(1);
// Console.WriteLine("useQueue increased");
this.fullCount.increase(1);
// Console.WriteLine("fullCount increased");
}
}
private void consume()
{
Console.WriteLine("Consumer before while");
//while (!this.elementQueue.endConditionAchieved())
while(true)
{
// Console.WriteLine("in the while loop");
this.fullCount.decrease(1);
// Console.WriteLine("fullCount decremented ");
this.useQueue.decrease(1);
// Console.WriteLine("useQueue decremented");
ElementaryItem item = this.elementQueue.getItemFromQueue();
if (item != null)
{
Console.WriteLine("Consumed item:"+ item.toString());
}
// Console.WriteLine("Element removed from queue!");
this.useQueue.increase(1);
// Console.WriteLine(" useQueue incremented");
this.emptyCount.increase(1);
// Console.WriteLine(" emptyCount incremented");
}
}
}
static void Main(string[] args)
{
int noElements = 10;
int noProducers = 1;
int noConsumers = 2;
Semaphore emptyCount, fullCount, useQueue;
emptyCount = new Semaphore(noElements);
fullCount = new Semaphore(0);
useQueue = new Semaphore(1);
MyQueue elemQueue = new MyQueue(noElements);
Entity[] producers = new Entity[noProducers];
Entity[] consumers = new Entity[noConsumers];
for(int i=0;i<producers.Length;i++){
producers[i] = new Entity(Entity.EntityType.Producer,emptyCount,fullCount,useQueue,elemQueue);
Thread th = new Thread(Entity.Worker);
th.Start(producers[i]);
}
for(int i=0;i<consumers.Length;i++){
consumers[i] = new Entity(Entity.EntityType.Consumer,emptyCount,fullCount,useQueue,elemQueue);
Thread th = new Thread(Entity.Worker);
th.Start(consumers[i]);
}
Console.ReadKey();//to view the results
}
}
}