From f445b293e0dbf31ae531167f90493b9acae65543 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Tue, 9 Dec 2014 22:57:53 +0100 Subject: [PATCH 3/4] gapless sequence v4 --- contrib/gapless_seq/Makefile | 65 ++++ contrib/gapless_seq/expected/concurrency.out | 31 ++ contrib/gapless_seq/expected/gapless_seq.out | 145 ++++++++ contrib/gapless_seq/gapless_seq--1.0.sql | 71 ++++ contrib/gapless_seq/gapless_seq.c | 490 +++++++++++++++++++++++++++ contrib/gapless_seq/gapless_seq.control | 6 + contrib/gapless_seq/gapless_seq.sgml | 22 ++ contrib/gapless_seq/specs/concurrency.spec | 29 ++ contrib/gapless_seq/sql/gapless_seq.sql | 61 ++++ 9 files changed, 920 insertions(+) create mode 100644 contrib/gapless_seq/Makefile create mode 100644 contrib/gapless_seq/expected/concurrency.out create mode 100644 contrib/gapless_seq/expected/gapless_seq.out create mode 100644 contrib/gapless_seq/gapless_seq--1.0.sql create mode 100644 contrib/gapless_seq/gapless_seq.c create mode 100644 contrib/gapless_seq/gapless_seq.control create mode 100644 contrib/gapless_seq/gapless_seq.sgml create mode 100644 contrib/gapless_seq/specs/concurrency.spec create mode 100644 contrib/gapless_seq/sql/gapless_seq.sql diff --git a/contrib/gapless_seq/Makefile b/contrib/gapless_seq/Makefile new file mode 100644 index 0000000..63b51c4 --- /dev/null +++ b/contrib/gapless_seq/Makefile @@ -0,0 +1,65 @@ +# contrib/gapless_seq/Makefile + +MODULE_big = gapless_seq +OBJS = gapless_seq.o +PG_CPPFLAGS = -I$(libpq_srcdir) + +EXTENSION = gapless_seq +DATA = gapless_seq--1.0.sql + +EXTRA_CLEAN = $(pg_regress_clean_files) ./regression_output ./isolation_output + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/gapless_seq +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif + +check: regresscheck isolationcheck + +submake-regress: + $(MAKE) -C $(top_builddir)/src/test/regress all + +submake-isolation: + $(MAKE) -C $(top_builddir)/src/test/isolation all + +submake-gapless_seq: + $(MAKE) -C $(top_builddir)/contrib/gapless_seq + +REGRESSCHECKS=gapless_seq + +regresscheck: all | submake-regress submake-gapless_seq + $(MKDIR_P) regression_output + $(pg_regress_check) \ + --temp-install=./tmp_check \ + --extra-install=contrib/gapless_seq \ + --outputdir=./regression_output \ + $(REGRESSCHECKS) + +regresscheck-install-force: | submake-regress submake-gapless_seq + $(pg_regress_installcheck) \ + --extra-install=contrib/gapless_seq \ + $(REGRESSCHECKS) + +ISOLATIONCHECKS=concurrency + +isolationcheck: all | submake-isolation submake-gapless_seq + $(MKDIR_P) isolation_output + $(pg_isolation_regress_check) \ + --extra-install=contrib/gapless_seq \ + --outputdir=./isolation_output \ + $(ISOLATIONCHECKS) + +isolationcheck-install-force: all | submake-isolation submake-gapless_seq + $(pg_isolation_regress_installcheck) \ + --extra-install=contrib/gapless_seq \ + $(ISOLATIONCHECKS) + +PHONY: submake-gapless_seq submake-regress check \ + regresscheck regresscheck-install-force \ + isolationcheck isolationcheck-install-force diff --git a/contrib/gapless_seq/expected/concurrency.out b/contrib/gapless_seq/expected/concurrency.out new file mode 100644 index 0000000..ec6a098 --- /dev/null +++ b/contrib/gapless_seq/expected/concurrency.out @@ -0,0 +1,31 @@ +Parsed test spec with 3 sessions + +starting permutation: s1_begin s1_nextval s2_begin s2_nextval s1_commit s2_commit +step s1_begin: BEGIN; +step s1_nextval: SELECT nextval('test_gapless'::regclass); +nextval + +1 +step s2_begin: BEGIN; +step s2_nextval: SELECT nextval('test_gapless'::regclass); +step s1_commit: COMMIT; +step s2_nextval: <... completed> +nextval + +2 +step s2_commit: COMMIT; + +starting permutation: s3_begin s3_nextval s2_begin s2_nextval s3_rollback s2_commit +step s3_begin: BEGIN; +step s3_nextval: SELECT nextval('test_gapless'::regclass); +nextval + +1 +step s2_begin: BEGIN; +step s2_nextval: SELECT nextval('test_gapless'::regclass); +step s3_rollback: ROLLBACK; +step s2_nextval: <... completed> +nextval + +1 +step s2_commit: COMMIT; diff --git a/contrib/gapless_seq/expected/gapless_seq.out b/contrib/gapless_seq/expected/gapless_seq.out new file mode 100644 index 0000000..c2dd1be --- /dev/null +++ b/contrib/gapless_seq/expected/gapless_seq.out @@ -0,0 +1,145 @@ +CREATE EXTENSION gapless_seq; +CREATE SEQUENCE test_gapless USING gapless; +SELECT nextval('test_gapless'::regclass); + nextval +--------- + 1 +(1 row) + +BEGIN; + SELECT nextval('test_gapless'::regclass); + nextval +--------- + 2 +(1 row) + + SELECT nextval('test_gapless'::regclass); + nextval +--------- + 3 +(1 row) + + SELECT nextval('test_gapless'::regclass); + nextval +--------- + 4 +(1 row) + +ROLLBACK; +SELECT nextval('test_gapless'::regclass); + nextval +--------- + 2 +(1 row) + +CREATE SEQUENCE test_alter_seq USING local; +SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 1 +(1 row) + +ALTER SEQUENCE test_alter_seq USING gapless; +SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 2 +(1 row) + +BEGIN; + SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 3 +(1 row) + +ROLLBACK; +SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 3 +(1 row) + +BEGIN; + SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 4 +(1 row) + + SAVEPOINT mysp; + SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 5 +(1 row) + + ROLLBACK TO SAVEPOINT mysp; + SAVEPOINT mysp2; + SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 5 +(1 row) + + RELEASE SAVEPOINT mysp2; + SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 6 +(1 row) + +COMMIT; +ALTER SEQUENCE test_alter_seq RESTART 100 USING local; +SELECT nextval('test_alter_seq'::regclass); + nextval +--------- + 100 +(1 row) + +-- check dump/restore +SELECT pg_sequence_get_state('test_gapless'); + pg_sequence_get_state +---------------------------- + {last_value,2,is_called,t} +(1 row) + +SELECT pg_sequence_set_state('test_gapless', pg_sequence_get_state('test_gapless')); + pg_sequence_set_state +----------------------- + +(1 row) + +SELECT pg_sequence_get_state('test_gapless'); + pg_sequence_get_state +---------------------------- + {last_value,2,is_called,t} +(1 row) + +-- check that event trigger works correctly +SELECT last_value FROM gapless_seq.seqam_gapless_values ORDER BY seqid; + last_value +------------ + 2 + 6 +(2 rows) + +DROP SEQUENCE test_gapless; +SELECT last_value FROM gapless_seq.seqam_gapless_values ORDER BY seqid; + last_value +------------ +(0 rows) + +CREATE SEQUENCE test_gapless USING gapless; +-- should fail due to deps +DROP ACCESS METHOD FOR SEQUENCES gapless; +ERROR: cannot drop sequence access method gapless because extension gapless_seq requires it +HINT: You can drop extension gapless_seq instead. +-- likewise +DROP EXTENSION gapless_seq; +ERROR: cannot drop extension gapless_seq because other objects depend on it +DETAIL: sequence test_gapless depends on sequence access method gapless +HINT: Use DROP ... CASCADE to drop the dependent objects too. +-- success +DROP EXTENSION gapless_seq CASCADE; +NOTICE: drop cascades to sequence test_gapless diff --git a/contrib/gapless_seq/gapless_seq--1.0.sql b/contrib/gapless_seq/gapless_seq--1.0.sql new file mode 100644 index 0000000..8553e81 --- /dev/null +++ b/contrib/gapless_seq/gapless_seq--1.0.sql @@ -0,0 +1,71 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION gapless_seq" to load this file. \quit + +CREATE OR REPLACE FUNCTION seqam_gapless_init(OID, INTERNAL, BYTEA, INTERNAL, INTERNAL) +RETURNS VOID +LANGUAGE C +STABLE STRICT +AS 'MODULE_PATHNAME' +; + +CREATE OR REPLACE FUNCTION seqam_gapless_alloc(INTERNAL, INTERNAL, BIGINT, INTERNAL) +RETURNS BIGINT +LANGUAGE C +STABLE STRICT +AS 'MODULE_PATHNAME' +; + +CREATE OR REPLACE FUNCTION seqam_gapless_setval(INTERNAL, INTERNAL, BIGINT) +RETURNS VOID +LANGUAGE C +STABLE STRICT +AS 'MODULE_PATHNAME' +; + +CREATE OR REPLACE FUNCTION seqam_gapless_get_state(INTERNAL, INTERNAL, INTERNAL, INTERNAL) +RETURNS INT +LANGUAGE C +STABLE STRICT +AS 'MODULE_PATHNAME' +; + +CREATE OR REPLACE FUNCTION seqam_gapless_set_state(INTERNAL, INTERNAL, INTERNAL, INTERNAL, INT) +RETURNS VOID +LANGUAGE C +STABLE STRICT +AS 'MODULE_PATHNAME' +; + +CREATE TABLE seqam_gapless_values ( + seqid oid PRIMARY KEY, + is_called bool NOT NULL, + last_value bigint NOT NULL +); + +CREATE OR REPLACE FUNCTION gapless_seq_clean_sequence_value() +RETURNS event_trigger +LANGUAGE plpgsql +AS $$ +BEGIN + -- just delete all the value data that don't have corresponding + -- gapless sequence (either DELETEd or ALTERed to different AM) + DELETE FROM gapless_seq.seqam_gapless_values WHERE seqid NOT IN ( + SELECT oid FROM pg_class WHERE relam = ( + SELECT oid FROM pg_seqam WHERE seqamname = 'gapless' + ) + ); +END; +$$; + +CREATE EVENT TRIGGER gapless_seq_clean_sequence_value ON sql_drop + WHEN TAG IN ('DROP SEQUENCE') + EXECUTE PROCEDURE gapless_seq_clean_sequence_value(); + +CREATE ACCESS METHOD FOR SEQUENCES gapless AS ( + reloptions = seqam_local_reloptions, /* reloptions parser is same as local (no reloptions) */ + init = seqam_gapless_init, /* init new gapless sequence */ + alloc = seqam_gapless_alloc, /* logs and returns each value... slow */ + setval = seqam_gapless_setval, /* setval support */ + getstate = seqam_gapless_get_state, /* pgdump support */ + setstate = seqam_gapless_set_state /* pgdump support */ +); diff --git a/contrib/gapless_seq/gapless_seq.c b/contrib/gapless_seq/gapless_seq.c new file mode 100644 index 0000000..bc3e439 --- /dev/null +++ b/contrib/gapless_seq/gapless_seq.c @@ -0,0 +1,490 @@ +/*------------------------------------------------------------------------- + * + * gapless_seq.c + * + * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * contrib/gapless_seq/gapless_seq.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/genam.h" +#include "access/htup_details.h" +#include "access/seqam.h" +#include "access/transam.h" +#include "access/xact.h" +#include "catalog/indexing.h" +#include "catalog/namespace.h" +#include "catalog/pg_type.h" +#include "commands/sequence.h" +#include "funcapi.h" +#include "nodes/makefuncs.h" +#include "storage/procarray.h" +#include "storage/lmgr.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/int8.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" + + +PG_MODULE_MAGIC; + +/*------------------------------------------------------------ + * + * Sequence Access Manager = Gapless functions + * + *------------------------------------------------------------ + */ +extern Datum seqam_gapless_init(PG_FUNCTION_ARGS); +extern Datum seqam_gapless_alloc(PG_FUNCTION_ARGS); +extern Datum seqam_gapless_setval(PG_FUNCTION_ARGS); +extern Datum seqam_gapless_get_state(PG_FUNCTION_ARGS); +extern Datum seqam_gapless_set_state(PG_FUNCTION_ARGS); + +PG_FUNCTION_INFO_V1(seqam_gapless_init); +PG_FUNCTION_INFO_V1(seqam_gapless_alloc); +PG_FUNCTION_INFO_V1(seqam_gapless_setval); +PG_FUNCTION_INFO_V1(seqam_gapless_get_state); +PG_FUNCTION_INFO_V1(seqam_gapless_set_state); + +typedef struct FormGaplessSequence +{ + uint32 xid; +} FormGaplessSequence; + +typedef struct FormGaplessValue +{ + Oid seqid; + bool is_called; + int64 last_value; +} FormGaplessValue; + +#define GAPLESS_SEQ_NAMESPACE "gapless_seq" +#define VALUES_TABLE_NAME "seqam_gapless_values" +#define VALUES_TABLE_COLUMNS 3 + +static FormData_pg_sequence *wait_for_sequence(SequenceHandle *seqh, + TransactionId local_xid); +static Relation open_values_rel(void); +static HeapTuple get_last_value_tup(Relation rel, Oid seqid); +static void set_last_value_tup(Relation rel, Oid seqid, int64 last_value, + bool is_called, HeapTuple oldtuple); + + +/* + * seqam_gapless_init() + * + * Initialize gapless sequence + * + * TODO: more checks + */ +Datum +seqam_gapless_init(PG_FUNCTION_ARGS) +{ + Oid seqrelid = PG_GETARG_OID(0); + List *seqoptions = (List *) PG_GETARG_POINTER(1); + Datum *values = (Datum *) PG_GETARG_POINTER(3); + bool *nulls = (bool *) PG_GETARG_POINTER(4); + bool found_restart; + int64 start_value, + last_value, + min_value, + max_value; + Relation valrel; + HeapTuple tuple = NULL; + TransactionId local_xid = GetTopTransactionId(); + FormGaplessSequence *gapless_seq; + + /* Get the new value to use as starting point. */ + start_value = DatumGetInt64(values[SEQ_COL_STARTVAL - 1]); + start_value = sequence_get_restart_value(seqoptions, start_value, + &found_restart); + + /* Load current value if this is existing sequence. */ + if (seqrelid != InvalidOid) + { + valrel = open_values_rel(); + tuple = get_last_value_tup(valrel, seqrelid); + } + + /* + * If this is new sequence or restart was provided or if there is + * no previous stored value for the sequence we use the starting value + * otherwise we use the stored value. + */ + if (seqrelid == InvalidOid || found_restart || !HeapTupleIsValid(tuple)) + last_value = start_value; + else + { + FormGaplessValue *v = (FormGaplessValue *) GETSTRUCT(tuple); + last_value = v->last_value; + } + + /* Validate the min/max against the starting point. */ + min_value = DatumGetInt64(values[SEQ_COL_MINVALUE - 1]); + max_value = DatumGetInt64(values[SEQ_COL_MAXVALUE - 1]); + sequence_check_range(last_value, min_value, max_value); + + /* + * If this is existing sequence with new RESTART value we should update + * our state to that value. + */ + if (seqrelid != InvalidOid && found_restart) + set_last_value_tup(valrel, seqrelid, last_value, false, tuple); + + /* Now we are done with values relation, but keep the lock. */ + if (seqrelid != InvalidOid) + heap_close(valrel, NoLock); + + /* Update the xid info */ + if (nulls[SEQ_COL_AMDATA - 1]) + { + struct varlena *vl = palloc0(VARHDRSZ + sizeof(FormGaplessSequence)); + SET_VARSIZE(vl, VARHDRSZ + sizeof(FormGaplessSequence)); + nulls[SEQ_COL_AMDATA - 1] = false; + values[SEQ_COL_AMDATA - 1] = PointerGetDatum(vl); + } + + gapless_seq = (FormGaplessSequence *) + VARDATA_ANY(DatumGetByteaP(values[SEQ_COL_AMDATA - 1])); + + gapless_seq->xid = UInt32GetDatum(local_xid); + + PG_RETURN_VOID(); +} + +/* + * seqam_gapless_alloc() + * + * Allocate new value for gapless sequence. + */ +Datum +seqam_gapless_alloc(PG_FUNCTION_ARGS) +{ + Relation seqrel = (Relation) PG_GETARG_POINTER(0); + SequenceHandle *seqh = (SequenceHandle*) PG_GETARG_POINTER(1); + /* we ignore nreguested as gapless sequence can't do caching */ + int64 *last = (int64 *) PG_GETARG_POINTER(3); + int64 result; + Oid seqrelid = RelationGetRelid(seqrel); + Relation valrel; + HeapTuple tuple; + FormData_pg_sequence *seq; + FormGaplessSequence *gapless_seq; + TransactionId local_xid = GetTopTransactionId(); + + /* Wait until the sequence is locked by us. */ + seq = wait_for_sequence(seqh, local_xid); + gapless_seq = (FormGaplessSequence *) VARDATA_ANY(&seq->amdata); + + /* Read the last value from our transactional table (if any). */ + valrel = open_values_rel(); + tuple = get_last_value_tup(valrel, seqrelid); + + /* Last value found get next value. */ + if (HeapTupleIsValid(tuple)) + { + FormGaplessValue *v = (FormGaplessValue *) GETSTRUCT(tuple); + result = v->last_value; + + if (v->is_called) + (void) sequence_increment(seqrel, &result, 1, seq->min_value, + seq->max_value, + seq->increment_by, + seq->is_cycled, true); + } + else /* No last value, start from beginning. */ + result = seq->start_value; + + /* + * Insert or update the last value tuple. + */ + set_last_value_tup(valrel, seqrelid, result, true, tuple); + + /* Now we are done with values relation, but keep the lock. */ + heap_close(valrel, NoLock); + + /* + * If current tx is different fron the last one, + * update the sequence tuple as well. + * + * We don't need to WAL log the update as the only thing we save to + * sequence tuple is the active transaction id and we know that in case of + * crash the transaction id will not be active so it's ok to lose the + * update. + */ + if (gapless_seq->xid != local_xid) + { + gapless_seq->xid = local_xid; + sequence_save_tuple(seqh, NULL, false); + } + + *last = result; + PG_RETURN_INT64(result); +} + +/* + * seqam_gapless_setval() + * + * Setval support (we don't allow setval on gapless) + */ +Datum +seqam_gapless_setval(PG_FUNCTION_ARGS) +{ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("setval() is not supported for gapless sequences"))); + + PG_RETURN_VOID(); +} + +/* + * seqam_gapless_get_state() + * + * Dump state of a gapless sequence (for pg_dump) + */ +Datum +seqam_gapless_get_state(PG_FUNCTION_ARGS) +{ + Relation seqrel = (Relation) PG_GETARG_POINTER(0); + SequenceHandle *seqh = (SequenceHandle *) PG_GETARG_POINTER(1); + char ***out_keys = (char ***) PG_GETARG_POINTER(2); + char ***out_values = (char ***) PG_GETARG_POINTER(3); + char **keys; + char **values; + Oid seqrelid = RelationGetRelid(seqrel); + Relation valrel; + HeapTuple tuple; + int64 last_value; + bool is_called; + + /* + * Get the last value from the values table, if not found use start_value + * from the sequence definition. + */ + valrel = open_values_rel(); + tuple = get_last_value_tup(valrel, seqrelid); + heap_close(valrel, RowExclusiveLock); + + if (HeapTupleIsValid(tuple)) + { + FormGaplessValue *v = (FormGaplessValue *) GETSTRUCT(tuple); + last_value = v->last_value; + is_called = v->is_called; + } + else + { + FormData_pg_sequence *seq = (FormData_pg_sequence *) + GETSTRUCT(sequence_read_tuple(seqh)); + last_value = seq->start_value; + is_called = false; + } + + keys = palloc(2 * sizeof(char *)); + values = palloc(2 * sizeof(char *)); + + keys[0] = "last_value"; + values[0] = DatumGetCString(DirectFunctionCall1(int8out, + Int64GetDatum(last_value))); + + keys[1] = "is_called"; + values[1] = DatumGetCString(DirectFunctionCall1(boolout, + BoolGetDatum(is_called))); + + *out_keys = keys; + *out_values = values; + + PG_RETURN_INT32(2); +} + +/* + * seqam_gapless_set_state() + * + * Restore previously dumpred state of gapless sequence + */ +Datum +seqam_gapless_set_state(PG_FUNCTION_ARGS) +{ + Relation seqrel = (Relation) PG_GETARG_POINTER(0); + SequenceHandle *seqh = (SequenceHandle*) PG_GETARG_POINTER(1); + char **keys = (char **) PG_GETARG_POINTER(2); + char **values = (char **) PG_GETARG_POINTER(3); + int count = PG_GETARG_INT32(4); + Oid seqrelid = RelationGetRelid(seqrel); + Relation valrel; + HeapTuple tuple; + int64 last_value = 0; + bool is_called = false; + int i; + FormData_pg_sequence *seq; + FormGaplessSequence *gapless_seq; + TransactionId local_xid = GetTopTransactionId(); + + if (count != 2) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("gapless sequence has to be array of two keys and two values"))); + + for (i = 0; i < count; i++) + { + if (pg_strcasecmp(keys[i], "last_value") == 0) + last_value = DatumGetInt64(DirectFunctionCall1(int8in, + CStringGetDatum(values[i]))); + else if (pg_strcasecmp(keys[i], "is_called") == 0) + is_called = DatumGetBool(DirectFunctionCall1(boolin, + CStringGetDatum(values[i]))); + else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid state key \"%s\" for gapless sequence", + keys[i]))); + } + + /* Wait until the sequence is locked by us. */ + seq = wait_for_sequence(seqh, local_xid); + gapless_seq = (FormGaplessSequence *) VARDATA_ANY(&seq->amdata); + + /* Read the last value from our transactional table (if any). */ + valrel = open_values_rel(); + tuple = get_last_value_tup(valrel, seqrelid); + + /* + * Insert or update the last value tuple. + */ + set_last_value_tup(valrel, seqrelid, last_value, is_called, tuple); + + /* Now we are done with values relation, but keep the lock. */ + heap_close(valrel, NoLock); + + /* Save to updated sequence. */ + gapless_seq->xid = local_xid; + sequence_save_tuple(seqh, NULL, true); + + PG_RETURN_VOID(); +} + +/* + * Lock the sequence for current transaction. + */ +static FormData_pg_sequence * +wait_for_sequence(SequenceHandle *seqh, TransactionId local_xid) +{ + FormData_pg_sequence *seq = (FormData_pg_sequence *) GETSTRUCT(sequence_read_tuple(seqh)); + FormGaplessSequence *gapless_seq = (FormGaplessSequence *) VARDATA_ANY(&seq->amdata); + + /* + * Read and lock the sequence for our transaction, there can't be any + * concurrent transactions accessing the sequence at the same time. + */ + while (gapless_seq->xid != local_xid && + TransactionIdIsInProgress(gapless_seq->xid)) + { + /* + * Release tuple to avoid dead locks and wait for the concurrent tx + * to finish. + */ + sequence_release_tuple(seqh); + XactLockTableWait(gapless_seq->xid, NULL, NULL, XLTW_None); + /* Reread the sequence. */ + seq = (FormData_pg_sequence *) GETSTRUCT(sequence_read_tuple(seqh)); + } + + return seq; +} + +/* + * Open the relation used for storing last value in RowExclusive lock mode. + */ +static Relation +open_values_rel(void) +{ + RangeVar *rv; + Oid valrelid; + Relation valrel; + + rv = makeRangeVar(GAPLESS_SEQ_NAMESPACE, VALUES_TABLE_NAME, -1); + valrelid = RangeVarGetRelid(rv, RowExclusiveLock, false); + valrel = heap_open(valrelid, RowExclusiveLock); + + return valrel; +} + +/* + * Read the last value tuple from the values table. + * + * Can return NULL if tuple is not found. + */ +static HeapTuple +get_last_value_tup(Relation rel, Oid seqid) +{ + ScanKey key; + SysScanDesc scan; + HeapTuple tuple; + + key = (ScanKey) palloc(sizeof(ScanKeyData) * 1); + + ScanKeyInit(&key[0], + 1, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(seqid) + ); + + /* FIXME: should use index */ + scan = systable_beginscan(rel, 0, true, NULL, 1, key); + tuple = systable_getnext(scan); + if (HeapTupleIsValid(tuple)) + tuple = heap_copytuple(tuple); + systable_endscan(scan); + + return tuple; +} + +/* + * Insert or update the last value tuple. + * + * The write access to the table must be serialized by the wait_for_sequence + * so that we don't have to have any retry scheme here. +*/ +static void +set_last_value_tup(Relation rel, Oid seqid, int64 last_value, bool is_called, HeapTuple oldtuple) +{ + bool nulls[VALUES_TABLE_COLUMNS]; + Datum values[VALUES_TABLE_COLUMNS]; + TupleDesc tupDesc = RelationGetDescr(rel); + HeapTuple tuple; + + if (!HeapTupleIsValid(oldtuple)) + { + memset(nulls, false, VALUES_TABLE_COLUMNS * sizeof(bool)); + values[0] = ObjectIdGetDatum(seqid); + values[1] = BoolGetDatum(is_called); + values[2] = Int64GetDatum(last_value); + + tuple = heap_form_tuple(tupDesc, values, nulls); + simple_heap_insert(rel, tuple); + } + else + { + bool replaces[VALUES_TABLE_COLUMNS]; + + replaces[0] = false; + replaces[1] = true; + replaces[2] = true; + + nulls[1] = false; + nulls[2] = false; + values[1] = BoolGetDatum(is_called); + values[2] = Int64GetDatum(last_value); + + tuple = heap_modify_tuple(oldtuple, tupDesc, values, nulls, replaces); + simple_heap_update(rel, &tuple->t_self, tuple); + } + + CatalogUpdateIndexes(rel, tuple); +} diff --git a/contrib/gapless_seq/gapless_seq.control b/contrib/gapless_seq/gapless_seq.control new file mode 100644 index 0000000..85da739 --- /dev/null +++ b/contrib/gapless_seq/gapless_seq.control @@ -0,0 +1,6 @@ +# Gapless sequence extension +comment = 'Gapless Sequence AM' +default_version = '1.0' +module_pathname = '$libdir/gapless_seq' +relocatable = false +schema = gapless_seq diff --git a/contrib/gapless_seq/gapless_seq.sgml b/contrib/gapless_seq/gapless_seq.sgml new file mode 100644 index 0000000..3eb536f --- /dev/null +++ b/contrib/gapless_seq/gapless_seq.sgml @@ -0,0 +1,22 @@ + + + gapless sequence + + + gapless_seq + + + + gapless_seq provides a sequence implementation that never + has gaps in the sequence of values it produces, even after rollback or + a crash. + + + + The consequence of this capability is that every nextval() request + writes a WAL record recording the latest state of the sequence. + This could be very costly and is not recommended for general + usage except in specific applications that require this feature. + + + diff --git a/contrib/gapless_seq/specs/concurrency.spec b/contrib/gapless_seq/specs/concurrency.spec new file mode 100644 index 0000000..3a0cc57 --- /dev/null +++ b/contrib/gapless_seq/specs/concurrency.spec @@ -0,0 +1,29 @@ +setup +{ + CREATE EXTENSION IF NOT EXISTS gapless_seq; + DROP SEQUENCE IF EXISTS test_gapless; + CREATE SEQUENCE test_gapless USING gapless; +} + +teardown +{ + DROP SEQUENCE test_gapless; +} + +session "s1" +step "s1_begin" { BEGIN; } +step "s1_nextval" { SELECT nextval('test_gapless'::regclass); } +step "s1_commit" { COMMIT; } + +session "s2" +step "s2_begin" { BEGIN; } +step "s2_nextval" { SELECT nextval('test_gapless'::regclass); } +step "s2_commit" { COMMIT; } + +session "s3" +step "s3_begin" { BEGIN; } +step "s3_nextval" { SELECT nextval('test_gapless'::regclass); } +step "s3_rollback" { ROLLBACK; } + +permutation "s1_begin" "s1_nextval" "s2_begin" "s2_nextval" "s1_commit" "s2_commit" +permutation "s3_begin" "s3_nextval" "s2_begin" "s2_nextval" "s3_rollback" "s2_commit" diff --git a/contrib/gapless_seq/sql/gapless_seq.sql b/contrib/gapless_seq/sql/gapless_seq.sql new file mode 100644 index 0000000..1913947 --- /dev/null +++ b/contrib/gapless_seq/sql/gapless_seq.sql @@ -0,0 +1,61 @@ +CREATE EXTENSION gapless_seq; + +CREATE SEQUENCE test_gapless USING gapless; + +SELECT nextval('test_gapless'::regclass); + +BEGIN; + SELECT nextval('test_gapless'::regclass); + SELECT nextval('test_gapless'::regclass); + SELECT nextval('test_gapless'::regclass); +ROLLBACK; + +SELECT nextval('test_gapless'::regclass); + +CREATE SEQUENCE test_alter_seq USING local; + +SELECT nextval('test_alter_seq'::regclass); + +ALTER SEQUENCE test_alter_seq USING gapless; + +SELECT nextval('test_alter_seq'::regclass); + +BEGIN; + SELECT nextval('test_alter_seq'::regclass); +ROLLBACK; + +SELECT nextval('test_alter_seq'::regclass); + +BEGIN; + SELECT nextval('test_alter_seq'::regclass); + SAVEPOINT mysp; + SELECT nextval('test_alter_seq'::regclass); + ROLLBACK TO SAVEPOINT mysp; + SAVEPOINT mysp2; + SELECT nextval('test_alter_seq'::regclass); + RELEASE SAVEPOINT mysp2; + SELECT nextval('test_alter_seq'::regclass); +COMMIT; + +ALTER SEQUENCE test_alter_seq RESTART 100 USING local; + +SELECT nextval('test_alter_seq'::regclass); + +-- check dump/restore +SELECT pg_sequence_get_state('test_gapless'); +SELECT pg_sequence_set_state('test_gapless', pg_sequence_get_state('test_gapless')); +SELECT pg_sequence_get_state('test_gapless'); + +-- check that event trigger works correctly +SELECT last_value FROM gapless_seq.seqam_gapless_values ORDER BY seqid; +DROP SEQUENCE test_gapless; +SELECT last_value FROM gapless_seq.seqam_gapless_values ORDER BY seqid; + +CREATE SEQUENCE test_gapless USING gapless; + +-- should fail due to deps +DROP ACCESS METHOD FOR SEQUENCES gapless; +-- likewise +DROP EXTENSION gapless_seq; +-- success +DROP EXTENSION gapless_seq CASCADE; -- 1.9.1