From ab8e7fb0f6b96b645a34f81979d7f3cade55c231 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Tue, 9 Dec 2014 22:57:53 +0100 Subject: [PATCH 3/3] gapless sequence v2 --- contrib/gapless_seq/Makefile | 65 +++++++ contrib/gapless_seq/expected/concurrency.out | 31 +++ contrib/gapless_seq/expected/gapless_seq.out | 81 ++++++++ contrib/gapless_seq/gapless_seq--1.0.sql | 57 ++++++ contrib/gapless_seq/gapless_seq.c | 274 +++++++++++++++++++++++++++ contrib/gapless_seq/gapless_seq.control | 6 + contrib/gapless_seq/gapless_seq.sgml | 22 +++ contrib/gapless_seq/specs/concurrency.spec | 29 +++ contrib/gapless_seq/sql/gapless_seq.sql | 38 ++++ 9 files changed, 603 insertions(+) create mode 100644 contrib/gapless_seq/Makefile create mode 100644 contrib/gapless_seq/expected/concurrency.out create mode 100644 contrib/gapless_seq/expected/gapless_seq.out create mode 100644 contrib/gapless_seq/gapless_seq--1.0.sql create mode 100644 contrib/gapless_seq/gapless_seq.c create mode 100644 contrib/gapless_seq/gapless_seq.control create mode 100644 contrib/gapless_seq/gapless_seq.sgml create mode 100644 contrib/gapless_seq/specs/concurrency.spec create mode 100644 contrib/gapless_seq/sql/gapless_seq.sql diff --git a/contrib/gapless_seq/Makefile b/contrib/gapless_seq/Makefile new file mode 100644 index 0000000..63b51c4 --- /dev/null +++ b/contrib/gapless_seq/Makefile @@ -0,0 +1,65 @@ +# contrib/gapless_seq/Makefile + +MODULE_big = gapless_seq +OBJS = gapless_seq.o +PG_CPPFLAGS = -I$(libpq_srcdir) + +EXTENSION = gapless_seq +DATA = gapless_seq--1.0.sql + +EXTRA_CLEAN = $(pg_regress_clean_files) ./regression_output ./isolation_output + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/gapless_seq +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif + +check: regresscheck isolationcheck + +submake-regress: + $(MAKE) -C $(top_builddir)/src/test/regress all + +submake-isolation: + $(MAKE) -C $(top_builddir)/src/test/isolation all + +submake-gapless_seq: + $(MAKE) -C $(top_builddir)/contrib/gapless_seq + +REGRESSCHECKS=gapless_seq + +regresscheck: all | submake-regress submake-gapless_seq + $(MKDIR_P) regression_output + $(pg_regress_check) \ + --temp-install=./tmp_check \ + --extra-install=contrib/gapless_seq \ + --outputdir=./regression_output \ + $(REGRESSCHECKS) + +regresscheck-install-force: | submake-regress submake-gapless_seq + $(pg_regress_installcheck) \ + --extra-install=contrib/gapless_seq \ + $(REGRESSCHECKS) + +ISOLATIONCHECKS=concurrency + +isolationcheck: all | submake-isolation submake-gapless_seq + $(MKDIR_P) isolation_output + $(pg_isolation_regress_check) \ + --extra-install=contrib/gapless_seq \ + --outputdir=./isolation_output \ + $(ISOLATIONCHECKS) + +isolationcheck-install-force: all | submake-isolation submake-gapless_seq + $(pg_isolation_regress_installcheck) \ + --extra-install=contrib/gapless_seq \ + $(ISOLATIONCHECKS) + +PHONY: submake-gapless_seq submake-regress check \ + regresscheck regresscheck-install-force \ + isolationcheck isolationcheck-install-force diff --git a/contrib/gapless_seq/expected/concurrency.out b/contrib/gapless_seq/expected/concurrency.out new file mode 100644 index 0000000..ec6a098 --- /dev/null +++ b/contrib/gapless_seq/expected/concurrency.out @@ -0,0 +1,31 @@ +Parsed test spec with 3 sessions + +starting permutation: s1_begin s1_nextval s2_begin s2_nextval s1_commit s2_commit +step s1_begin: BEGIN; +step s1_nextval: SELECT nextval('test_gapless'::regclass); +nextval + +1 +step s2_begin: BEGIN; +step s2_nextval: SELECT nextval('test_gapless'::regclass); +step s1_commit: COMMIT; +step s2_nextval: <... completed> +nextval + +2 +step s2_commit: COMMIT; + +starting permutation: s3_begin s3_nextval s2_begin s2_nextval s3_rollback s2_commit +step s3_begin: BEGIN; +step s3_nextval: SELECT nextval('test_gapless'::regclass); +nextval + +1 +step s2_begin: BEGIN; +step s2_nextval: SELECT nextval('test_gapless'::regclass); +step s3_rollback: ROLLBACK; +step s2_nextval: <... completed> +nextval + +1 +step s2_commit: COMMIT; diff --git a/contrib/gapless_seq/expected/gapless_seq.out b/contrib/gapless_seq/expected/gapless_seq.out new file mode 100644 index 0000000..903ed30 --- /dev/null +++ b/contrib/gapless_seq/expected/gapless_seq.out @@ -0,0 +1,81 @@ +CREATE EXTENSION gapless_seq; +CREATE SEQUENCE test_gapless USING gapless; +SELECT nextval('test_gapless'::regclass); + nextval +--------- + 1 +(1 row) + +BEGIN; + SELECT nextval('test_gapless'::regclass); + nextval +--------- + 2 +(1 row) + + SELECT nextval('test_gapless'::regclass); + nextval +--------- + 3 +(1 row) + + SELECT nextval('test_gapless'::regclass); + nextval +--------- + 4 +(1 row) + +ROLLBACK; +SELECT nextval('test_gapless'::regclass); + nextval +--------- + 2 +(1 row) + +CREATE SEQUENCE test_alter_seq USING local; +SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 1 +(1 row) + +ALTER SEQUENCE test_alter_seq USING gapless; +SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 1 +(1 row) + +BEGIN; + SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 2 +(1 row) + +ROLLBACK; +SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 2 +(1 row) + +ALTER SEQUENCE test_alter_seq START 100 USING local; +SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 100 +(1 row) + +-- should fail due to deps +DROP ACCESS METHOD FOR SEQUENCES gapless; +ERROR: cannot drop sequence access method gapless because extension gapless_seq requires it +HINT: You can drop extension gapless_seq instead. +-- likewise +DROP EXTENSION gapless_seq; +ERROR: cannot drop extension gapless_seq because other objects depend on it +DETAIL: sequence test_gapless depends on sequence access method gapless +HINT: Use DROP ... CASCADE to drop the dependent objects too. +-- success +DROP EXTENSION gapless_seq CASCADE; +NOTICE: drop cascades to sequence test_gapless diff --git a/contrib/gapless_seq/gapless_seq--1.0.sql b/contrib/gapless_seq/gapless_seq--1.0.sql new file mode 100644 index 0000000..8fd46e1 --- /dev/null +++ b/contrib/gapless_seq/gapless_seq--1.0.sql @@ -0,0 +1,57 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION gapless_seq" to load this file. \quit + +CREATE OR REPLACE FUNCTION seqam_gapless_extracols(INTERNAL) +RETURNS INTERNAL +LANGUAGE C +STABLE STRICT +AS 'MODULE_PATHNAME' +; + +CREATE OR REPLACE FUNCTION seqam_gapless_init(INTERNAL, INTERNAL, BOOLEAN, INTERNAL, INTERNAL) +RETURNS VOID +LANGUAGE C +STABLE STRICT +AS 'MODULE_PATHNAME' +; + +CREATE OR REPLACE FUNCTION seqam_gapless_alloc(INTERNAL, INTERNAL, BIGINT, INTERNAL) +RETURNS BIGINT +LANGUAGE C +STABLE STRICT +AS 'MODULE_PATHNAME' +; + +CREATE OR REPLACE FUNCTION seqam_gapless_setval(INTERNAL, INTERNAL, BIGINT) +RETURNS VOID +LANGUAGE C +STABLE STRICT +AS 'MODULE_PATHNAME' +; + +CREATE OR REPLACE FUNCTION seqam_gapless_dump(INTERNAL, INTERNAL) +RETURNS CSTRING +LANGUAGE C +STABLE STRICT +AS 'MODULE_PATHNAME' +; + +CREATE OR REPLACE FUNCTION seqam_gapless_restore(INTERNAL, INTERNAL, CSTRING) +RETURNS VOID +LANGUAGE C +STABLE STRICT +AS 'MODULE_PATHNAME' +; + +DELETE FROM pg_catalog.pg_seqam +WHERE seqamname = 'gapless'; + +CREATE ACCESS METHOD FOR SEQUENCES gapless ( + extracolumns = seqam_gapless_extracols, /* extra columns for gapless sequence */ + reloptions = seqam_local_reloptions, /* reloptions parser is same as local (no reloptions) */ + init = seqam_gapless_init, /* init new gapless sequence */ + alloc = seqam_gapless_alloc, /* logs and returns each value... slow */ + setval = seqam_gapless_setval, /* setval support */ + dump = seqam_gapless_dump, /* pgdump support */ + restore = seqam_gapless_restore /* pgdump support */ +); diff --git a/contrib/gapless_seq/gapless_seq.c b/contrib/gapless_seq/gapless_seq.c new file mode 100644 index 0000000..2e3a27a --- /dev/null +++ b/contrib/gapless_seq/gapless_seq.c @@ -0,0 +1,274 @@ +/*------------------------------------------------------------------------- + * + * gapless_seq.c + * + * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * contrib/gapless_seq/gapless_seq.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/seqam.h" +#include "access/transam.h" +#include "access/xact.h" +#include "catalog/pg_type.h" +#include "commands/sequence.h" +#include "funcapi.h" +#include "storage/procarray.h" +#include "storage/lmgr.h" +#include "utils/builtins.h" +#include "utils/int8.h" + + +PG_MODULE_MAGIC; + +/*------------------------------------------------------------ + * + * Sequence Access Manager = Gapless functions + * + *------------------------------------------------------------ + */ +extern Datum seqam_gapless_extracols(PG_FUNCTION_ARGS); +extern Datum seqam_gapless_init(PG_FUNCTION_ARGS); +extern Datum seqam_gapless_alloc(PG_FUNCTION_ARGS); +extern Datum seqam_gapless_setval(PG_FUNCTION_ARGS); +extern Datum seqam_gapless_dump(PG_FUNCTION_ARGS); +extern Datum seqam_gapless_restore(PG_FUNCTION_ARGS); + +PG_FUNCTION_INFO_V1(seqam_gapless_extracols); +PG_FUNCTION_INFO_V1(seqam_gapless_init); +PG_FUNCTION_INFO_V1(seqam_gapless_alloc); +PG_FUNCTION_INFO_V1(seqam_gapless_setval); +PG_FUNCTION_INFO_V1(seqam_gapless_dump); +PG_FUNCTION_INFO_V1(seqam_gapless_restore); + + +typedef struct FormGaplessSequence +{ + FormData_pg_sequence seq; + int64 tx_start_value; + int64 tx_last_value; + uint32 xid; +} FormGaplessSequence; + +SeqAMColumnData seqam_gapless_cols[] = { + {"tx_start_value", INT8OID, -1, true}, + {"tx_last_value", INT8OID, -1, true}, + {"xid", XIDOID, -1, true} +}; +#define SEQAM_GAPLESS_EXTRACOLS_CNT 3 +#define SEQAM_GAPLESS_COL_TXSTARTVAL SEQ_COL_LASTCOL + 1 +#define SEQAM_GAPLESS_COL_TXLASTVAL SEQ_COL_LASTCOL + 2 +#define SEQAM_GAPLESS_COL_XID SEQ_COL_LASTCOL + 3 + + +/* + * seqam_gapless_extracols() + * + * Get definitions for extra columns needed by a gapless sequence + */ +Datum +seqam_gapless_extracols(PG_FUNCTION_ARGS) +{ + List *ret = NIL; + int i; + + for (i = 0; i < SEQAM_GAPLESS_EXTRACOLS_CNT; i++) + ret = lappend(ret, &seqam_gapless_cols[i]); + + PG_RETURN_POINTER(ret); +} + +/* + * seqam_gapless_init() + * + * Initialize gapless sequence + * + * TODO: more checks + */ +Datum +seqam_gapless_init(PG_FUNCTION_ARGS) +{ + List *params = (List *)(PG_ARGISNULL(0) ? NULL : PG_GETARG_POINTER(0)); + bool is_init = PG_GETARG_BOOL(2); + Datum *values = (Datum *)PG_GETARG_POINTER(3); + bool *nulls = (bool *)PG_GETARG_POINTER(4); + + ListCell *param; + + foreach(param, params) + { + DefElem *defel = (DefElem *) lfirst(param); + if (strcmp(defel->defname, "restart") == 0) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("RESTART is not supported for gapless sequences"))); + break; + } + } + + if (is_init) + { + values[SEQAM_GAPLESS_COL_TXSTARTVAL - 1] = values[SEQ_COL_STARTVAL - 1]; + values[SEQAM_GAPLESS_COL_TXLASTVAL - 1] = values[SEQ_COL_STARTVAL - 1]; + values[SEQAM_GAPLESS_COL_XID - 1] = UInt32GetDatum(InvalidTransactionId); + + nulls[SEQAM_GAPLESS_COL_TXSTARTVAL - 1] = false; + nulls[SEQAM_GAPLESS_COL_TXLASTVAL - 1] = false; + nulls[SEQAM_GAPLESS_COL_XID - 1] = false; + } + + PG_RETURN_VOID(); +} + +/* + * seqam_gapless_alloc() + * + * Allocate new value for gapless sequence. + */ +Datum +seqam_gapless_alloc(PG_FUNCTION_ARGS) +{ + Relation seqrel = (Relation) PG_GETARG_POINTER(0); + SequenceHandle *seqh = (SequenceHandle*) PG_GETARG_POINTER(1); + /* we ignore nreguested as gapless sequence can't do caching */ + int64 *last = (int64 *) PG_GETARG_POINTER(3); + int64 result; + FormGaplessSequence *seq; + TransactionId local_xid = GetTopTransactionId(); + + /* + * Read and lock the sequence for our transaction, there can't be any + * concurrent transactions accessing the sequence at the same time. + */ + seq = (FormGaplessSequence *) GETSTRUCT(sequence_read_tuple(seqh)); + while (seq->xid != local_xid && TransactionIdIsInProgress(seq->xid)) + { + /* + * Release tuple to avoid dead locks and wait for the concurrent tx + * to finish. + */ + sequence_release_tuple(seqh); + XactLockTableWait(seq->xid, NULL, NULL, XLTW_None); + /* Reread the sequence. */ + seq = (FormGaplessSequence *) GETSTRUCT(sequence_read_tuple(seqh)); + } + + /* + * Only increment if this is same transaction which consumed the sequence + * previously (i.e. we are doing multiple nextval calls in same + * transaction) or if the transaction that previously consumed this + * sequence committed sucessfully. + */ + if (seq->xid == local_xid || + TransactionIdDidCommit(seq->xid)) + { + result = seq->tx_last_value; + (void) sequence_increment(seqrel, &result, 1, seq->seq.min_value, + seq->seq.max_value, seq->seq.increment_by, + seq->seq.is_cycled, true); + } + else + { + result = seq->tx_start_value; + } + + if (seq->xid == local_xid) + { + seq->tx_last_value = result; + } + else + { + /* + * Remember current xid and starting value of the sequence in this + * transaction so that in case the transaction rolls back the + * subsequent nextval call will know what is the real first unallocated + * value. + */ + seq->tx_last_value = seq->tx_start_value = result; + seq->xid = local_xid; + } + + *last = result; + + sequence_save_tuple(seqh, NULL, true); + + PG_RETURN_INT64(result); +} + +/* + * seqam_gapless_setval() + * + * Setval support (we don't allow setval on gapless) + */ +Datum +seqam_gapless_setval(PG_FUNCTION_ARGS) +{ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("setval() is not supported for gapless sequences"))); + + PG_RETURN_VOID(); +} + +/* + * seqam_gapless_dump() + * + * Dump state of a gapless sequence (for pg_dump) + * + * Format is: '' + * + * TODO: locking + */ +Datum +seqam_gapless_dump(PG_FUNCTION_ARGS) +{ + SequenceHandle *seqh = (SequenceHandle*) PG_GETARG_POINTER(1); + FormGaplessSequence *seq; + char buf[19 + 1]; /* int64 + null byte */ + char *result; + + + seq = (FormGaplessSequence *) GETSTRUCT(sequence_read_tuple(seqh)); + + pg_lltoa(seq->tx_last_value, buf); + result = pstrdup(buf); + + PG_RETURN_CSTRING(result); +} + +/* + * seqam_gapless_restore() + * + * Restore previously dumpred state of gapless sequence + * + * Format is: '' + * + * TODO: locking +*/ +Datum +seqam_gapless_restore(PG_FUNCTION_ARGS) +{ + SequenceHandle *seqh = (SequenceHandle*) PG_GETARG_POINTER(1); + char *state = PG_GETARG_CSTRING(2); + FormGaplessSequence *seq; + int64 last_value; + + scanint8(state, false, &last_value); + + seq = (FormGaplessSequence *) GETSTRUCT(sequence_read_tuple(seqh)); + + seq->tx_last_value = last_value; + seq->xid = InvalidTransactionId; + + sequence_save_tuple(seqh, NULL, true); + + PG_RETURN_VOID(); +} diff --git a/contrib/gapless_seq/gapless_seq.control b/contrib/gapless_seq/gapless_seq.control new file mode 100644 index 0000000..326601b --- /dev/null +++ b/contrib/gapless_seq/gapless_seq.control @@ -0,0 +1,6 @@ +# Gapless sequence extension +comment = 'Gapless Sequence AM' +default_version = '1.0' +module_pathname = '$libdir/gapless_seq' +relocatable = false +schema = pg_catalog diff --git a/contrib/gapless_seq/gapless_seq.sgml b/contrib/gapless_seq/gapless_seq.sgml new file mode 100644 index 0000000..3eb536f --- /dev/null +++ b/contrib/gapless_seq/gapless_seq.sgml @@ -0,0 +1,22 @@ + + + gapless sequence + + + gapless_seq + + + + gapless_seq provides a sequence implementation that never + has gaps in the sequence of values it produces, even after rollback or + a crash. + + + + The consequence of this capability is that every nextval() request + writes a WAL record recording the latest state of the sequence. + This could be very costly and is not recommended for general + usage except in specific applications that require this feature. + + + diff --git a/contrib/gapless_seq/specs/concurrency.spec b/contrib/gapless_seq/specs/concurrency.spec new file mode 100644 index 0000000..3a0cc57 --- /dev/null +++ b/contrib/gapless_seq/specs/concurrency.spec @@ -0,0 +1,29 @@ +setup +{ + CREATE EXTENSION IF NOT EXISTS gapless_seq; + DROP SEQUENCE IF EXISTS test_gapless; + CREATE SEQUENCE test_gapless USING gapless; +} + +teardown +{ + DROP SEQUENCE test_gapless; +} + +session "s1" +step "s1_begin" { BEGIN; } +step "s1_nextval" { SELECT nextval('test_gapless'::regclass); } +step "s1_commit" { COMMIT; } + +session "s2" +step "s2_begin" { BEGIN; } +step "s2_nextval" { SELECT nextval('test_gapless'::regclass); } +step "s2_commit" { COMMIT; } + +session "s3" +step "s3_begin" { BEGIN; } +step "s3_nextval" { SELECT nextval('test_gapless'::regclass); } +step "s3_rollback" { ROLLBACK; } + +permutation "s1_begin" "s1_nextval" "s2_begin" "s2_nextval" "s1_commit" "s2_commit" +permutation "s3_begin" "s3_nextval" "s2_begin" "s2_nextval" "s3_rollback" "s2_commit" diff --git a/contrib/gapless_seq/sql/gapless_seq.sql b/contrib/gapless_seq/sql/gapless_seq.sql new file mode 100644 index 0000000..2ea191e --- /dev/null +++ b/contrib/gapless_seq/sql/gapless_seq.sql @@ -0,0 +1,38 @@ +CREATE EXTENSION gapless_seq; + +CREATE SEQUENCE test_gapless USING gapless; + +SELECT nextval('test_gapless'::regclass); + +BEGIN; + SELECT nextval('test_gapless'::regclass); + SELECT nextval('test_gapless'::regclass); + SELECT nextval('test_gapless'::regclass); +ROLLBACK; + +SELECT nextval('test_gapless'::regclass); + +CREATE SEQUENCE test_alter_seq USING local; + +SELECT nextval('test_alter_seq'::regclass); + +ALTER SEQUENCE test_alter_seq USING gapless; + +SELECT nextval('test_alter_seq'::regclass); + +BEGIN; + SELECT nextval('test_alter_seq'::regclass); +ROLLBACK; + +SELECT nextval('test_alter_seq'::regclass); + +ALTER SEQUENCE test_alter_seq START 100 USING local; + +SELECT nextval('test_alter_seq'::regclass); + +-- should fail due to deps +DROP ACCESS METHOD FOR SEQUENCES gapless; +-- likewise +DROP EXTENSION gapless_seq; +-- success +DROP EXTENSION gapless_seq CASCADE; -- 1.9.1