I have the following socket connection codes. Where for every connection is one new thread. After receiving the data which is done via the producer then I put into a queue for next database processor which is the consumer. This works fine but the moment the queue gets build up it can hang or even take even hours to clear cause is like so many producers and just one single consumer. I am thinking is it possible for me to have one consumer handles one producer meaning each message goes into its own queue gets process and that it. What changes can I do to handle it or must I build a thread pooling ?
public class cServer {
private LinkedBlockingQueue<String> databaseQueue = new LinkedBlockingQueue<String>();
class ConnectionHandler implements Runnable {
private Socket receivedSocketConn1;
ConnectionHandler(Socket receivedSocketConn1) {
this.receivedSocketConn1=receivedSocketConn1;
}
// gets data from an inbound connection and queues it for databse update
public void run() { // etc
// you already have most of this in your existing code.
// it uses the shared databaseQueue variable to queue message Strings
BufferedWriter w = null;
BufferedReader r = null;
String message="";
try {
PrintStream out = System.out;
BufferedWriter fout = null;
w = new BufferedWriter(new OutputStreamWriter(receivedSocketConn1.getOutputStream()));
r = new BufferedReader(new InputStreamReader(receivedSocketConn1.getInputStream()));
int m = 0, count=0;
int nextChar=0;
while ((nextChar=r.read()) != -1)
{
message += (char) nextChar;
if (nextChar == '*')
{
databaseQueue.add(message);
message="";
}
}
}
catch (IOException ex)
{
System.out.println("MyError:IOException has been caught in in the main first try");
ex.printStackTrace(System.out);
}
finally
{
try
{
if ( w != null )
{
w.close();
}
else
{
System.out.println("MyError:w is null in finally close");
}
}
catch(IOException ex){
System.out.println("MyError:IOException has been caught in w in finally close");
ex.printStackTrace(System.out);
}
}
}
}
class DatabaseProcessor implements Runnable {
// updates databaase with data queued by ConnectionHandler
Connection dbconn = null;
Statement stmt = null;
Statement stmt1 = null;
Statement stmt2 = null;
Date connCreated = null;
public void run()
{ // this is just like the QueueProcessor example I gave you
// open database connection
//createConnection();
while (true)
{
try
{
int count=0;
String message = "";
message = databaseQueue.take();
if (message.equals(null)) {
System.out.println("QueueProcessor is shutting down");
break; // exit while loop, ends run() method
}
System.out.println("Message taken from queue is :"+message);
}
catch (Exception e)
{
e.printStackTrace();
}
}//while true
//closeConnection();
}//run
}
public static void main(String[] args) {
new cServer();
}
cServer() { // default constructor
new Thread(new DatabaseProcessor()).start();
try
{
final ServerSocket serverSocketConn = new ServerSocket(5000);
while (true)
{
try
{
Socket socketConn1 = serverSocketConn.accept();
new Thread(new ConnectionHandler(socketConn1)).start();
}
catch(Exception e)
{
System.out.println("MyError:Socket Accepting has been caught in main loop."+e.toString());
e.printStackTrace(System.out);
}
}
}
catch (Exception e)
{
System.out.println("MyError:Socket Conn has been caught in main loop."+e.toString());
e.printStackTrace(System.out);
//System.exit(0);
}
databaseQueue.add(null);
}
}
I am trying to change it to this change to this
if (nextChar == '*')
{
Thread t = new Thread(new DatabaseProcessor (message));
t.start();
//databaseQueue.add(message);
message="";
}
Next I change the consumer to this. I need this due to the fact that the queue is getting very congested and stop my whole operation as time where I need to stop the operation and restart with I will loose all my data in the queue. So my plan is the socket gets data create a new thread and later close that socket? Is this vialbe?
class DatabaseProcessor extends Thread {
private String data;
public DatabaseProcessor(String data) {
this.data = data;
}
// updates databaase with data queued by ConnectionHandler
Connection dbconn = null;
Statement stmt = null;
Statement stmt1 = null;
Statement stmt2 = null;
Date connCreated = null;
public void run()
{ // this is just like the QueueProcessor example I gave you
// open database connection
//createConnection();
while (true)
{
try
{
int count=0;
String message = "";
//message = databaseQueue.take();
//if (message.equals(null)) {
// System.out.println("QueueProcessor is shutting down");
//break; // exit while loop, ends run() method
//}
System.out.println("Message taken from queue is :"+data);
}
catch (Exception e)
{
e.printStackTrace();
}
}//while true
//closeConnection();
}//run
}