I have a program that I've written...basically the problem starts when I reach the condition to hit the following block:
status = myPubSubSubscribe.Receive(fsReceivedTopic, myBufferSubscribe, myHeader, 0);
if (status == PS_ENGINE_UNAVAIL)
{
printf("Unable to connect to the pubsub engine: %s\n", PSPubSub::Status2String(status));
myBufferPublish.FreeBuffer();
myBufferPublish.~PSBuffer();
myPubSubPublish.~PSPubSub();
myBufferSubscribe.~PSBuffer();
myPubSubSubscribe.~PSPubSub();
return;
}
In this part I'm checking to see if my connection to a publishing engine is still alive. If it is not, then I go back to the pubsubConnect() to wait until the engine comes back up to re-establish a connection and start over in the main loop. Well, I think I have a memory leak because when I watch taskmanager for this process on the workstation, the memory usage will go up by some 100-200kb every time this occurs. Any advice as to how I can fix this..as you can see I've tried to destroy certain objects in different parts of my code to no avail...please help! :( (The reason why I need this program to run continuously on the workstation is because I'm using this to monitor the health of other workstations and writing them to our database in another program.) ...here is the entire code:
// This program subscribes to specific topics, shortens the topic string and publishes the shorter topic name to the PubSub engine.
#include "pch.h"
#include "vfc/utility/primitives/TFlexString.h"
#include "libPubSub.h"
#include "libconfig.h"
#include "Interop.OASySDNA.Common.h"
#include <stdio.h>
#include <sstream>
#include "stdafx.h"
#include <iostream>
#include <cstring>
#include <string>
#include <Windows.H>
#using <mscorlib.dll>
using namespace System;
using namespace System::Collections;
using namespace std;
void hideConsoleWindow();
void pubsubConnect();
void pubsubSubscribe();
TFlexString getSystemName();
void pubsubPublish();
PSPubSub myPubSubSubscribe;
PSstatus status;
int main()
{
// hideConsoleWindow();
for(;;)
{
pubsubConnect();
pubsubSubscribe();
pubsubPublish();
}
return 0;
}
void hideConsoleWindow()
{
// Hide the console window *********************************************************************************************
HWND hWnd = GetConsoleWindow();
ShowWindow( hWnd, SW_HIDE );
}
void pubsubConnect()
{
// Connects to PubSub Engine ********************************************************************************************
do
{
status = myPubSubSubscribe.Connect(PSPubSub::ReceiveEnum::PSRT_BLOCKING, "Healthmonitor", 256);
if (myPubSubSubscribe.IsConnected() == PS_OK)
{
printf("Connection to PubSub engine established %\n");
}
else
{
printf("Failed to connect to PubSub engine...retrying in 10 seconds... %\n");
Sleep(10000);
}
} while (myPubSubSubscribe.IsConnected() != PS_OK);
}
void pubsubSubscribe()
{
}
void pubsubPublish()
{
PSPubSub myPubSubPublish;
PSstatus statusPublish = PS_ENGINE_UNAVAIL;
PSBuffer myBufferPublish;
TFlexString fsSystemName;
// TFlexString fsPublishTopicNYCENG1;
TFlexString fsPublishTopicNYCXOS1;
TFlexString fsPublishTopicNYCXOS2;
// Get the system name
if(cfg_getMySystemName(fsSystemName) == OAS_FALSE)
{
printf("Unable to get system name\n");
}
fsPublishTopicNYCXOS1 = getSystemName();
fsPublishTopicNYCXOS1 << ".healthstate.nycxos1";
fsPublishTopicNYCXOS2 = getSystemName();
fsPublishTopicNYCXOS2 << ".healthstate.nycxos2";
// Attach to PubSub system ***********************************************************************************************
statusPublish = myPubSubPublish.Connect(PSPubSub::ReceiveEnum::PSRT_BLOCKING, "Healthmonitor", 256);
if (statusPublish == PS_ENGINE_UNAVAIL)
{
printf("Unable to connect to the pubsub engine: %s\n", PSPubSub::Status2String(status));
myBufferPublish.FreeBuffer();
myBufferPublish.~PSBuffer();
myPubSubPublish.~PSPubSub();
myPubSubSubscribe.~PSPubSub();
return;
}
if(statusPublish != PS_OK)
{
printf("Unable to connect Publisher to the pubsub engine %s\n");
}
if (myPubSubPublish.IsConnected() == PS_OK)
{
printf("Publishing Connection to PubSub engine established %\n");
}
else
{
printf("Publishing Connection to PubSub engine FAILED %\n");
}
// Receive Data *******************************************************
for(;;)
{
PSBuffer myBufferSubscribe(1024);
PSheaderData myHeader;
TFlexString fsReceivedTopic;
TFlexString fsMyString;
TFlexString fsPublishMyStringNYCXOS1;
TFlexString fsPublishMyStringNYCXOS2;
myBufferSubscribe.ClearBuffer();
status = myPubSubSubscribe.Receive(fsReceivedTopic, myBufferSubscribe, myHeader, 0);
if (status == PS_ENGINE_UNAVAIL)
{
printf("Unable to connect to the pubsub engine: %s\n", PSPubSub::Status2String(status));
myBufferPublish.FreeBuffer();
myBufferPublish.~PSBuffer();
myPubSubPublish.~PSPubSub();
myBufferSubscribe.~PSBuffer();
myPubSubSubscribe.~PSPubSub();
return;
}
if (status != PS_OK)
{
printf(fsReceivedTopic);
printf("\n");
printf("Error in receiving data. %s\n", myPubSubSubscribe.Status2String(status));
string topicReceived = fsReceivedTopic; // "nyc.common.healthmon.nycxos15.monitors.processmonitor"
string stnName = "nycxos";
cout << topicReceived; // "nyc.common.healthmon.nycxos15.monitors.processmonitor"
printf("\n");
int topicReceivedPos = 0;
int topicReceivedSize = 0;
string subTopicReceived;
topicReceivedPos = topicReceived.find(stnName); // 21
cout << topicReceivedPos << endl; // 21
topicReceivedSize = topicReceived.size();
subTopicReceived = topicReceived.substr(topicReceivedPos, topicReceivedSize); // "nycxos15.monitors.processmonitor"
cout << subTopicReceived; // "nycxos15.monitors.processmonitor"
printf("\n");
string xosStationName;
int xosStationNameLength = 0;
xosStationNameLength = subTopicReceived.find_first_of('.');
cout << xosStationNameLength;
printf("\n");
xosStationName = subTopicReceived.substr(0, xosStationNameLength);
cout << xosStationName;
printf("\n");
TFlexString fsPublishTopicError;
TFlexString fsPublishMyStringError = "error";
fsPublishTopicError = getSystemName();
fsPublishTopicError << ".healthstate.";
fsPublishTopicError << xosStationName.c_str();
printf(fsPublishTopicError);
printf("\n");
myBufferPublish.ClearBuffer();
myBufferPublish.Write(&fsPublishMyStringError);
statusPublish = myPubSubPublish.Publish(fsPublishTopicError, myBufferPublish); // Check status of publisher
if(statusPublish != PS_OK)
{
printf("Publish failed: %s\n");
}
// exit(-1);
}
if (myHeader == PS_INTEG_DATA)
{
printf("This was an integrity update message\n");
}
if((myBufferSubscribe.Read(&fsMyString)) == OAS_FALSE)
{
printf("Value not read \n");
myBufferSubscribe.~PSBuffer();
// exit(-1);
}
if (fsReceivedTopic.DoesContainString("nycxos1.healthstate") == OAS_TRUE)
{
printf(fsReceivedTopic);
printf("\n");
cout << "NYCXOS1 Healthstate is: " << fsMyString << endl;
// PUBLISH NYCXOS1
fsPublishMyStringNYCXOS1 = fsMyString;
myBufferPublish.ClearBuffer(); // Flush buffer
myBufferPublish.Write(&fsPublishMyStringNYCXOS1); // Publish value
statusPublish = myPubSubPublish.Publish(fsPublishTopicNYCXOS1, myBufferPublish); // Check status of publisher
if(statusPublish != PS_OK)
{
printf("Publish failed: %s\n");
}
cout << "Published Value of NYCXOS1 is: " << fsPublishMyStringNYCXOS1 << endl;
// END PUBLISH NYCXOS1
}
if (fsReceivedTopic.DoesContainString("nycxos2.healthstate") == OAS_TRUE)
{
printf(fsReceivedTopic);
printf("\n");
cout << "NYCXOS2 Healthstate is: " << fsMyString << endl;
// PUBLISH NYCXOS2
fsPublishMyStringNYCXOS2 = fsMyString;
myBufferPublish.ClearBuffer(); // Flush buffer
myBufferPublish.Write(&fsPublishMyStringNYCXOS2); // Publish value
statusPublish = myPubSubPublish.Publish(fsPublishTopicNYCXOS2, myBufferPublish); // Check status of publisher
if(statusPublish != PS_OK)
{
printf("Publish failed: %s\n");
}
cout << "Published Value of NYCXOS2 is: " << fsPublishMyStringNYCXOS2 << endl;
// END PUBLISH NYCXOS2
}
}
myBufferPublish.FreeBuffer();
myBufferPublish.~PSBuffer();
myPubSubPublish.~PSPubSub();
myPubSubSubscribe.~PSPubSub();
}
TFlexString getSystemName()
{
}