// Standard C++ headers #include #include #include // PostgreSQL libpq headers #include "libpq-fe.h" #include "libpq/libpq-fs.h" // Intel TBB header for threading #include #include #include #if defined(_WIN32) # include # define sleep Sleep #else # include #endif class TestException: public std::exception { std::string m_what; public: TestException(const std::string& what) : m_what(what) { } ~TestException() throw() { } virtual const char* what() const throw() { return "My exception happened"; } }; // Flag used to control thread exit static bool terminate = false; // This class does an insert into the test table struct Inserter { void operator()() { // Establish a connection PGconn* conn = PQconnectdb("user=postgres password=1234"); // Insert data into table indefinitely int i=0; while(!terminate) { // SQL Statement std::stringstream insert; insert << "INSERT INTO tmp (value) VALUES (" << i%250 << ");"; std::string insertStr = insert.str(); const char* c_str = insertStr.c_str(); // Execute query PGresult* res=PQexec(conn,c_str); if (PQresultStatus(res) == PGRES_FATAL_ERROR) { std::cerr << "Error in inserting data:\nError code: " << PQresStatus(PQresultStatus(res)) << "Error Message: " << PQerrorMessage(conn); PQclear(res); PQfinish(conn); throw TestException( "Inserter::PQexec() failed." ); } PQclear(res); // Increment index i++; } PQfinish(conn); } }; struct Queryer { void operator()() { // Establish a connection PGconn* conn = PQconnectdb("user=postgres password=1234"); // Retrieve data from the test table indefinitely int i=1; while (!terminate) { // SQL statement - read the top 1000 values off `tmp' std::stringstream query; query << "SELECT * FROM tmp WHERE id > (SELECT last_value - 1000 FROM tmp_id_seq);"; std::string queryStr = query.str(); const char* c_str = queryStr.c_str(); // Execute query PGresult* res=PQexec(conn, c_str); if (PQresultStatus(res) == PGRES_FATAL_ERROR) { std::cerr << "Error in searching data:\nError code: " << PQresStatus(PQresultStatus(res)) << "Error Message: " << PQerrorMessage(conn); PQclear(res); PQfinish(conn); throw TestException( "Queryer::PQexec() failed." ); } PQclear(res); // Increment index i++; } PQfinish(conn); } }; int main(int argc, char * argv[]) { int sleep_time = 0; if (argc > 1) { sleep_time = atoi(argv[1]); } std::cerr << std::string("Libpq is ") + (PQisthreadsafe() ? "" : "not ") + "threadsafe" << std::endl; // Establish a connection PGconn* conn = PQconnectdb("user=postgres password=1234"); // Create the test table std::cout << "Creating table...\n"; PGresult* res=PQexec(conn,"DROP TABLE IF EXISTS tmp; CREATE TABLE tmp (id SERIAL8 PRIMARY KEY,value INT);"); if (PQresultStatus(res) == PGRES_FATAL_ERROR) { std::cerr << "Error in Creating table:\nError code: " << PQresStatus(PQresultStatus(res)) << "Error Message: " << PQerrorMessage(conn); PQclear(res); PQfinish(conn); return 1; } // Clear and close the current connection PQclear(res); PQfinish(conn); // Launch thread that does INSERT std::cout << "Starting table filling thread...\n"; Inserter inserter; tbb::tbb_thread inserter_thread(inserter); // Launch thread that does SELECT ...This thread causes memory in postgres.exe to slowly go up indefinitely std::cout << "Starting table searching thread...\n"; Queryer selector; tbb::tbb_thread selector_thread(selector); // run for timer if (sleep_time) { sleep( sleep_time ); terminate = true; } inserter_thread.join(); selector_thread.join(); // and terminate return 0; }