import os os.environ["PSYCOPG_IMPL"] = "python" import sys import logging from select import select from psycopg import pq from psycopg.pq._debug import PGconnDebug logging.basicConfig(level=logging.INFO, format="%(message)s") logger = logging.getLogger("psycopg.debug") logger.setLevel(logging.INFO) assert pq.__impl__ == "python", "please export 'PSYCOPG_IMPL=python'" def wait_read(conn: pq.abc.PGconn) -> None: for i in range(5): if select([conn.socket], [], [], 1.0)[0]: return raise Exception("likely blocked") print(f"libpq version: {pq.version()}", file=sys.stderr) conn: pq.abc.PGconn = PGconnDebug.connect(b"") conn.nonblocking = 1 conn.trace(sys.stderr.fileno()) conn.set_trace_flags(pq.Trace.REGRESS_MODE | pq.Trace.SUPPRESS_TIMESTAMPS) conn.enter_pipeline_mode() # This would work instead # conn.send_query_params(b"BEGIN", None) conn.send_query(b"BEGIN") conn.send_query_params(b"SELECT 1", None) conn.pipeline_sync() conn.flush() for expected_status in [ pq.ExecStatus.COMMAND_OK, pq.ExecStatus.TUPLES_OK, pq.ExecStatus.PIPELINE_SYNC, ]: while True: if conn.is_busy(): wait_read(conn) while True: conn.consume_input() if not conn.is_busy(): break wait_read(conn) res = conn.get_result() if not res: break else: if res.status != expected_status: print(f"expected {expected_status.name}, got {res}", file=sys.stderr)