// Standard C++ headers
#include
#include
#include
// PostgreSQL libpq headers
#include "libpq-fe.h"
#include "libpq/libpq-fs.h"
// Intel TBB header for threading
#include
#include
#include
#if defined(_WIN32)
# include
# define sleep Sleep
#else
# include
#endif
class TestException: public std::exception
{
std::string m_what;
public:
TestException(const std::string& what) : m_what(what) { }
~TestException() throw() { }
virtual const char* what() const throw()
{
return "My exception happened";
}
};
// Flag used to control thread exit
static bool terminate = false;
// This class does an insert into the test table
struct Inserter
{
void operator()()
{
// Establish a connection
PGconn* conn = PQconnectdb("user=postgres password=1234");
// Insert data into table indefinitely
int i=0;
while(!terminate)
{
// SQL Statement
std::stringstream insert;
insert << "INSERT INTO tmp (value) VALUES (" << i%250 << ");";
std::string insertStr = insert.str();
const char* c_str = insertStr.c_str();
// Execute query
PGresult* res=PQexec(conn,c_str);
if (PQresultStatus(res) == PGRES_FATAL_ERROR)
{
std::cerr << "Error in inserting data:\nError code: " << PQresStatus(PQresultStatus(res)) << "Error Message: " << PQerrorMessage(conn);
PQclear(res);
PQfinish(conn);
throw TestException( "Inserter::PQexec() failed." );
}
PQclear(res);
// Increment index
i++;
}
PQfinish(conn);
}
};
struct Queryer
{
void operator()()
{
// Establish a connection
PGconn* conn = PQconnectdb("user=postgres password=1234");
// Retrieve data from the test table indefinitely
int i=1;
while (!terminate)
{
// SQL statement - read the top 1000 values off `tmp'
std::stringstream query;
query << "SELECT * FROM tmp WHERE id > (SELECT last_value - 1000 FROM tmp_id_seq);";
std::string queryStr = query.str();
const char* c_str = queryStr.c_str();
// Execute query
PGresult* res=PQexec(conn, c_str);
if (PQresultStatus(res) == PGRES_FATAL_ERROR)
{
std::cerr << "Error in searching data:\nError code: " << PQresStatus(PQresultStatus(res)) << "Error Message: " << PQerrorMessage(conn);
PQclear(res);
PQfinish(conn);
throw TestException( "Queryer::PQexec() failed." );
}
PQclear(res);
// Increment index
i++;
}
PQfinish(conn);
}
};
int main(int argc, char * argv[])
{
int sleep_time = 0;
if (argc > 1) {
sleep_time = atoi(argv[1]);
}
std::cerr << std::string("Libpq is ") + (PQisthreadsafe() ? "" : "not ") + "threadsafe" << std::endl;
// Establish a connection
PGconn* conn = PQconnectdb("user=postgres password=1234");
// Create the test table
std::cout << "Creating table...\n";
PGresult* res=PQexec(conn,"DROP TABLE IF EXISTS tmp; CREATE TABLE tmp (id SERIAL8 PRIMARY KEY,value INT);");
if (PQresultStatus(res) == PGRES_FATAL_ERROR)
{
std::cerr << "Error in Creating table:\nError code: " << PQresStatus(PQresultStatus(res)) << "Error Message: " << PQerrorMessage(conn);
PQclear(res);
PQfinish(conn);
return 1;
}
// Clear and close the current connection
PQclear(res);
PQfinish(conn);
// Launch thread that does INSERT
std::cout << "Starting table filling thread...\n";
Inserter inserter;
tbb::tbb_thread inserter_thread(inserter);
// Launch thread that does SELECT ...This thread causes memory in postgres.exe to slowly go up indefinitely
std::cout << "Starting table searching thread...\n";
Queryer selector;
tbb::tbb_thread selector_thread(selector);
// run for timer
if (sleep_time) {
sleep( sleep_time );
terminate = true;
}
inserter_thread.join();
selector_thread.join();
// and terminate
return 0;
}