Hello everyone, I've encounterred a problem that I cannot seem to overcome. For the program to compile there are a few steps necessary to take: from the website Click Here on the download page, one would need to download and install:
- MPI.NET SDK
- Microsoft Compute Cluster Pack SDK(this one is the one that I used,this needs to be installed in the x86 version of Program Files,this was the only solution that I found to be able to install it);
- Also there is a tutorial for installing the previous programs and a small introduction to the MPI program model (in C#)
Now follows the code that I've written:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using MPI;
using System.Threading;
namespace _PRO__MAPC_MPI_Chat
{
class Program
{
static void Main(string[] args)
{
using (new MPI.Environment(ref args))
{
int producersNo = 1;// (Communicator.world.Size - 1) / 2;
// Console.WriteLine("number of producers :{0}",producersNo);
Intracommunicator comm = Communicator.world;
Random rand = new Random();
if (comm.Rank == 0)
{
// program for rank 0; in this case is the server/queue process
Queue<string> que = new Queue<string>();
const int MAXSIZE = 10;
string msg = null;
string response = null;
new Thread(() =>
{
while (true)
{
//get the message from entity
msg = comm.Receive<string>(Communicator.anySource, 0);
if ((msg == null) && (1 < msg.Length))
{
Console.WriteLine("Process Rank 0 Ended!");
break;
}
// Console.WriteLine("Queue process received a message !");
lock (que)
{
if (que.Count() != MAXSIZE)
{
que.Enqueue(msg);
Monitor.PulseAll(que);
}
else
{
Console.WriteLine("Queue is full discarding received message: {0}", msg);
}
}
}
}).Start();
new Thread(() =>
{
while (true)
{
//get the message from Queue to the destination
lock (que)
{
while (que.Count == 0)
{
Monitor.Wait(que);
}
response = que.Dequeue();
}
if (response != null)
comm.Send<string>(response, rand.Next(producersNo, comm.Size), 0);
else
Console.WriteLine("response is null!");
}
}).Start();
}
else // not rank 0(not the central node)
{
int timeToSleepProducer = 900;
int timeToSleepConsumer = 1000;
Console.WriteLine(" Process Rank:{0} started", comm.Rank);
{
new Thread(() =>
{
//consume the received message
string received = null;
while (true)
{
received = comm.Receive<string>(0, 0);
if ((received == null) && (1 < received.Length))
{
Console.WriteLine("Program Ended!");
break;
}
Thread.Sleep(timeToSleepProducer);
Console.WriteLine("msg: \"{0}\" got consumed by {1}", received, Communicator.world.Rank);
}
}
).Start();
}
// else
{
//execute producer code(create the messages)
// new Thread(() =>{
string msg = null;
while (true)
{
msg = DateTime.Now.ToString();
//msg = Console.ReadLine();
comm.Send<string>(msg, 0, 0);
if ((msg == null) && (1 < msg.Length))
{
Console.WriteLine("Program Ended!");
break;
}
Thread.Sleep(timeToSleepConsumer);
Console.WriteLine("msg: \"{0}\" produced by {1}", msg, Communicator.world.Rank);
}
//}).Start();
}
}
}
}
}
}
The most frequent error(and the stack trace) that I'm getting is:
Unhandled Exception: System.AccessViolationException: Attempted to read or write
protected memory. This is often an indication that other memory is corrupt.
at MPI.Unsafe.MPI_Send(IntPtr buf, Int32 count, Int32 datatype, Int32 dest, I
nt32 tag, Int32 comm)
at MPI.Communicator.Send[T](T value, Int32 dest, Int32 tag)
at MPI.Communicator.Send[T](T value, Int32 dest, Int32 tag)
at _PRO__MAPC_MPI_Chat.Program.<>c__DisplayClass5.<Main>b__1() in E:\Programm
ing\Projects\MAPC_Proj\MAPC_Proj\Program.cs:line 68
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, C
ontextCallback callback, Object state)
at System.Threading.ThreadHelper.ThreadStart()
If it matters: I'm using Visual Studio 2008.