import os
os.environ["PSYCOPG_IMPL"] = "python"
import sys
import logging
from select import select
from psycopg import pq
from psycopg.pq._debug import PGconnDebug
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger("psycopg.debug")
logger.setLevel(logging.INFO)
assert pq.__impl__ == "python", "please export 'PSYCOPG_IMPL=python'"
def wait_read(conn: pq.abc.PGconn) -> None:
for i in range(5):
if select([conn.socket], [], [], 1.0)[0]:
return
raise Exception("likely blocked")
print(f"libpq version: {pq.version()}", file=sys.stderr)
conn: pq.abc.PGconn = PGconnDebug.connect(b"")
conn.nonblocking = 1
conn.trace(sys.stderr.fileno())
conn.set_trace_flags(pq.Trace.REGRESS_MODE | pq.Trace.SUPPRESS_TIMESTAMPS)
conn.enter_pipeline_mode()
# This would work instead
# conn.send_query_params(b"BEGIN", None)
conn.send_query(b"BEGIN")
conn.send_query_params(b"SELECT 1", None)
conn.pipeline_sync()
conn.flush()
for expected_status in [
pq.ExecStatus.COMMAND_OK,
pq.ExecStatus.TUPLES_OK,
pq.ExecStatus.PIPELINE_SYNC,
]:
while True:
if conn.is_busy():
wait_read(conn)
while True:
conn.consume_input()
if not conn.is_busy():
break
wait_read(conn)
res = conn.get_result()
if not res:
break
else:
if res.status != expected_status:
print(f"expected {expected_status.name}, got {res}", file=sys.stderr)