Automatic Temporal Tables Schema Management With PostgreSQL

12 min read Original article ↗

Vedran Bilopavlović

Press enter or click to view image in full size

Rijeka Neretva, Mostar, photo: VB

In my previous article, I explored a simple way how to automatically create and maintain temporal tables with historical data using nothing but PostgreSQL table triggers.

You can read the article, but the full solution is really simple; it just requires a bit of dynamic SQL and a trigger, and that is it. The entire script creates everything needed is provided in the article.

For every history-enabled or temporal table, the system will create a new table with the same name in a special history schema that will:

  • Have identical column structure,
  • Without any keys or constraints.
  • With one additional column at the end called _data_valid_to (leading underscore to minimize potential name collisions) - it just holds information about the maximum timestamp when that record was last valid.

And that’s it. It works well.

However, there is a big problem with this approach: It doesn’t support schema changes well (or rather, at all).

That means if we alter the schema of our temporal table, we won’t be able to do any more updates or deletes on that table because the history table is out of sync, and the history insert will fail.

So, when we do an alter of the temporal table, we also must do an identical alter of the history table.

PostgreSQL has an interesting feature called event triggers that could be used to do this kind of work for us automatically. Event triggers, unlike standard table triggers, are global event handlers that will run on any schema changes (any DDL command) and are perfect for keeping history schema in sync.

However, I would advise against it.

Any ALTER TABLE command does table lock until the command is finished. Depending on the type of ALTER TABLE, this could be a write lock (add column, drop column) or even a read-and-write lock (change column type).

It is in our best interest to minimize the number of possible ALTER TABLE executions in a database. Sometimes, they are inevitable, but at least we don’t have to do any additional locking on history tables if we don’t have to.

Furthermore, the changes in a data type are problematic: not only do they require a type conversion on the entire table, but the nature itself of historic tables is such that older records may have one type and newer ones may have a different type as the schema is changed through time.

The solution is obvious: JSON.

However, we can still use event triggers to record schema changes to another table that will record schema change history and then establish a relation between data history and schema history.

Sounds good?

The entire solution is below in one big installable script.

You can run it in your database, and it will create a new schema called temporal_support with a couple of support functions for the entire system, which we will demonstrate afterward.

create schema temporal_support;

create function temporal_support._consts(_key text)
returns text
language sql
immutable
parallel safe
as
$$
select case _key
when 'trigger_function_name' then 'temporal_support._temporal_table_after_trigger'
when 'trigger_name' then 'temporal_table_after_delete_or_update_trigger'
when 'history_schema_suffix' then '__history'
when 'schema_table_suffix' then '__schema_history'
else null
end;
$$;

create function temporal_support._exec(
_command text,
variadic _args text[]
)
returns void
language plpgsql as
$$
declare
_names text[] = array[]::text[];
_values text[] = array[]::text[];
i int;
begin
for i in 1..array_length(_args, 1) by 2 loop
_names = array_append(_names, _args[i]);
_values = array_append(_values, _args[i+1]);
_command = replace(_command, '%' || _args[i], '%' || array_length(_names, 1));
end loop;
_command = format(_command, variadic _values);
raise info '%', replace(_command, ' ', '');
execute _command;
end;
$$;

create function temporal_support._table_exists(
_schema text,
_name text
)
returns boolean
language sql as
$$
select exists(
select 1
from information_schema.tables
where
table_schema = _schema
and table_name = _name
and table_type = 'BASE TABLE'
);
$$;

create function temporal_support._temporal_table_exists(
_schema text,
_name text
)
returns boolean
language sql as
$$
select exists(
select 1
from information_schema.tables tbl
join information_schema.triggers tr
on tbl.table_schema = tr.event_object_schema and tbl.table_name = tr.event_object_table and tr.trigger_name = temporal_support._consts('trigger_name')
where
table_schema = _schema
and table_name = _name
and table_type = 'BASE TABLE'
limit 1
);
$$;

create function temporal_support._temporal_exists(
_schema text,
_name text
)
returns boolean
language sql as
$$
select exists(
select 1
from information_schema.triggers
where
event_object_schema = _schema
and event_object_table = _name
and trigger_name = temporal_support._consts('trigger_name')
limit 1
);
$$;

create function temporal_support._schema_exists(_name text) returns boolean language sql as 'select exists(select 1 from pg_namespace where nspname = _name);';

create or replace function temporal_support._update_schema_history(
_schema text,
_table text
)
returns void
language plpgsql as
$$
declare
_record record;
_history_schema text;
_schema_table text;
_schema_data text[];

_history_schema_suffix constant text = temporal_support._consts('history_schema_suffix');
_schema_table_suffix constant text = temporal_support._consts('schema_table_suffix');
begin
_history_schema = _schema || _history_schema_suffix;
_schema_table = _table || _schema_table_suffix;

execute format(
$sql$select * from %1$I.%2$I where schema_id = (select max(schema_id) from %1$I.%2$I);$sql$,
_history_schema, _schema_table
) into _record;

_schema_data = (
select
array_agg(
trim(concat(
quote_ident(c.column_name),
' ',
(c.udt_schema || '.' || c.udt_name)::regtype::text,
' ',
case when c.is_nullable = 'NO' then 'NOT NULL ' else '' end,
case
when c.is_generated <> 'NEVER' then 'GENERATED ' || c.is_generated || ' AS ' || c.generation_expression
when c.column_default is not null then 'DEFAULT ' || c.column_default
end
)) order by c.ordinal_position
)
from
information_schema.columns c
where
c.table_schema = _schema
and c.table_name = _table
);

if _record.data = _schema_data then
return;
end if;

execute format(
$sql$insert into %1$I.%2$I (data) values ($1);$sql$,
_history_schema, _schema_table
) using _schema_data;
end;
$$;

create function temporal_support._temporal_table_after_trigger()
returns trigger
security definer
language plpgsql as
$$
declare
_history_schema text;
_table text;
_schema_table text;

_history_schema_suffix constant text = temporal_support._consts('history_schema_suffix');
_schema_table_suffix constant text = temporal_support._consts('schema_table_suffix');
begin
_history_schema = TG_TABLE_SCHEMA::text || _history_schema_suffix;
_table = TG_TABLE_NAME::text;
_schema_table = _table || _schema_table_suffix;

execute format(
$sql$insert into %1$I.%2$I (data, schema_id) select $1, (select max(schema_id) from %1$I.%3$I);$sql$,
_history_schema,
_table,
_schema_table
) using row_to_json(old.*);

return null;
end;
$$;

create function temporal_support._temporal_history_before_trigger()
returns trigger
security definer
language plpgsql as
$$
begin
raise exception '%', format('Table %1$I.%2$I is a history table and cannot be modified (updated or deleted)', TG_TABLE_SCHEMA, TG_TABLE_NAME);
end;
$$;

create function temporal_support._sql_drop_event_trigger()
returns event_trigger
security definer
language plpgsql as
$$
declare
_record record;
_schema text;
_history_schema text;
_schema_table_name text;
_trigger_name constant text = 'temporal_support.table_after_trigger';
_history_schema_suffix constant text = temporal_support._consts('history_schema_suffix');
_schema_table_suffix constant text = temporal_support._consts('schema_table_suffix');
begin
for _record in (
select object_type, schema_name, object_name from pg_event_trigger_dropped_objects()
)
loop
if _record.object_type = 'table' then

if _record.schema_name like '%' || _history_schema_suffix then -- table is in history schema
_schema = split_part(_record.schema_name, _history_schema_suffix, 1);
if temporal_support._temporal_table_exists(_schema, _record.object_name) then
-- prevent dropping table with active history
raise exception 'Cannot drop active history for table %.%', _schema, _record.object_name;
end if;
else -- normal table
-- delete any history table still remaining

_history_schema = _record.schema_name || _history_schema_suffix;
if temporal_support._table_exists(_history_schema, _record.object_name) then
perform temporal_support._exec(
$sql$drop table if exists %history_schema$I.%table_name$I;$sql$,
'history_schema', _history_schema,
'table_name', _record.object_name
);
end if;
_schema_table_name = _record.object_name || _schema_table_suffix;
if temporal_support._table_exists(_history_schema, _schema_table_name) then
perform temporal_support._exec(
$sql$drop table if exists %history_schema$I.%schema_table_name$I;$sql$,
'history_schema', _history_schema,
'schema_table_name', _schema_table_name
);
end if;

end if; -- end table is in history schema
end if; -- end table
end loop;
end;
$$;
create event trigger sql_drop_event_trigger on sql_drop execute function temporal_support._sql_drop_event_trigger();

create function temporal_support._ddl_command_end_event_trigger()
returns event_trigger
security definer
language plpgsql as
$$
declare
_record record;
_table_name text;
begin
for _record in (
select object_type, command_tag, schema_name, object_identity from pg_event_trigger_ddl_commands ()
)
loop
if _record.object_type = 'table' and _record.command_tag = 'ALTER TABLE' then
_table_name = trim('"' from split_part(_record.object_identity, '.', 2));
if temporal_support._temporal_exists(_record.schema_name, _table_name) then
perform temporal_support._update_schema_history(_record.schema_name, _table_name);
end if;
end if;
end loop;
end;
$$;
create event trigger ddl_command_end_event_trigger
on ddl_command_end
when tag in ('ALTER TABLE')
execute function temporal_support._ddl_command_end_event_trigger();

--
-- function that initializes temporal support for argument tables
--
create function temporal_support.init_temporal_tables(
variadic _tables text[]
)
returns void
security definer
language plpgsql as
$$
declare
_record record;
_history_schema text;
_schema_table text;

_trigger_func constant text = temporal_support._consts('trigger_function_name');
_trigger_name constant text = temporal_support._consts('trigger_name');
_history_schema_suffix constant text = temporal_support._consts('history_schema_suffix');
_schema_table_suffix constant text = temporal_support._consts('schema_table_suffix');
begin
for _record in (
select table_schema, table_name
from
information_schema.tables tbl
left join information_schema.triggers tr
on tbl.table_schema = tr.event_object_schema and tbl.table_name = tr.event_object_table and tr.trigger_name = _trigger_name
where
-- filter out tables that already have trigger
tr.event_object_table is null
-- schema.table in array or just table in array for public
and (
(table_schema <> 'public' and table_schema || '.' || table_name = any(_tables))
or
(table_schema = 'public' and table_name = any(_tables))
)
and table_type = 'BASE TABLE'
)
loop
_history_schema = _record.table_schema || _history_schema_suffix;
_schema_table = _record.table_name || _schema_table_suffix;

if not temporal_support._schema_exists(_history_schema) then
perform temporal_support._exec('create schema %history_schema$I',
'history_schema', _history_schema
);
end if;

if not temporal_support._table_exists(_history_schema, _schema_table) then
perform temporal_support._exec(
$sql$
create table %history_schema$I.%schema_table_name$I (
schema_id int not null generated always as identity primary key,
data text[] not null,
timestamp timestamptz not null default now()
);
create trigger temporal_history_before_trigger before delete or update on %history_schema$I.%schema_table_name$I
for each row execute function temporal_support._temporal_history_before_trigger();
$sql$,
'history_schema', _history_schema,
'schema_table_name', _schema_table
);
end if;

if not temporal_support._table_exists(_history_schema, _record.table_name) then
perform temporal_support._exec(
$sql$
create table %history_schema$I.%table_name$I (
clock_ts timestamptz not null default clock_timestamp() primary key,
tran_ts timestamptz not null default now(),
data jsonb not null,
schema_id int not null references %history_schema$I.%schema_table_name$I(schema_id)
);
create trigger temporal_history_before_trigger before delete or update on %history_schema$I.%table_name$I
for each row execute function temporal_support._temporal_history_before_trigger();
$sql$,
'history_schema', _history_schema,
'table_name', _record.table_name,
'schema_table_name', _schema_table
);
end if;

perform temporal_support._exec(
$sql$
create trigger %trigger_name$I after delete or update on %table_schema$I.%table_name$I for each row execute function %trigger_function_name$s();
$sql$,
'trigger_name', _trigger_name,
'trigger_function_name', _trigger_func,
'table_schema', _record.table_schema,
'table_name', _record.table_name
);

perform temporal_support._update_schema_history(_record.table_schema, _record.table_name);
end loop;
end;
$$;

--
-- function that deinitializes (removes) temporal support for argument tables
--
create function temporal_support.remove_temporal_tables(
_drop_history boolean,
variadic _tables text[]
)
returns void
security definer
language plpgsql as
$$
declare
_record record;
_history_schema text;
_schema_table text;

_trigger_name constant text = temporal_support._consts('trigger_name');
_history_schema_suffix constant text = temporal_support._consts('history_schema_suffix');
_schema_table_suffix constant text = temporal_support._consts('schema_table_suffix');
begin
for _record in (
select distinct table_schema, table_name
from
information_schema.tables tbl
join information_schema.triggers tr
on tbl.table_schema = tr.event_object_schema and tbl.table_name = tr.event_object_table and tr.trigger_name = _trigger_name
where
-- schema.table in array or just table in array for public
(
(table_schema <> 'public' and table_schema || '.' || table_name = any(_tables))
or
(table_schema = 'public' and table_name = any(_tables))
)
and table_type = 'BASE TABLE'
)
loop
_history_schema = _record.table_schema || _history_schema_suffix;

-- remove trigger
perform temporal_support._exec(
$sql$
drop trigger if exists %trigger_name$I on %table_schema$I.%table_name$I;
$sql$,
'trigger_name', _trigger_name,
'table_schema', _record.table_schema,
'table_name', _record.table_name
);

if _drop_history is true then
-- remove history table
_schema_table = _record.table_name || _schema_table_suffix;
perform temporal_support._exec(
$sql$
drop table if exists %history_schema$I.%table_name$I;
drop table if exists %history_schema$I.%schema_table_name$I;
$sql$,
'history_schema', _history_schema,
'table_name', _record.table_name,
'schema_table_name', _schema_table
);
end if;
end loop;
end;
$$;

1) Create temporal support with the script above:

Note that you can always drop this schema with temporal support when you don’t need this subsystem in your database.

2) Create a test table:

3) Initialize the temporal support for this table:

Press enter or click to view image in full size

Note that you can see all commands executed in a messages pane.

As we can see, the temporal initialization will create:

  • The new schema for historical data from the tables from the public schema is called public__history (suffix configurable).
  • The new table for data history: public__history.test
  • The new table for schema history: public__history.test__schema_history

From this point on, everything is automatic and handled by triggers.

Get Vedran Bilopavlović’s stories in your inbox

Join Medium for free to get updates from this writer.

The schema history table will contain an initial schema for the table test:

The schema definition is in the text array field called data, which will contain the exact same column definitions in the order of creation. Note that, in this version, constraints, indexes, sequences, and triggers are not recorded. Constraints may be added in later versions.

The data history table is still empty.

4) Make some history

Note that inserts are in atomic transactions, and updates and deletes are in normal transactions. That is for a reason. Let’s see the data history now:

Press enter or click to view image in full size

  • clock_ts is the exact timestamp of the data change. It is also a primary key since it is always unique.
  • tran_ts is the transaction timestamp of the transaction that made this change. You can see that all three records have the same value of the transaction timestamp since those changes were made under transaction (see data change above).
  • data is the JSON representation of the record.
  • schema_id is a foreign key to the schema version table.

5) Schema changes:

Let’s do this:

We’ve set it as not null on column t and added column i. Let’s see what schema history says:

Press enter or click to view image in full size

Excellent. As we can see, the timestamps for the last two changes are the same because, obviously, they ran together in the same transaction.

6) New changes:

Let’s see what we have now:

Press enter or click to view image in full size

As we can see, those two changes are referring to the latest schema version id = 3. That is correct.

This system now handles schema changes correctly and without extra table locking caused by the ALTER TABLE commands while recording all data and schema changes.

We can de-initialize temporal support with this call:

Press enter or click to view image in full size

Note that the first Boolean parameter tells us whether we should drop history objects or not.

And that’s it; if you need it, you can copy the code yourself and tweak and customize it further for your needs.

It will run very fast with minimal overhead.

It also has extra triggers that prevent you from manipulating or dropping history tables while it’s an active history table for some other temporal-enabled table. You must de-initialize temporal support first. If you drop the temporal table, the active history is dropped as well. So, the system is always consistent and doesn’t leave garbage.

The only limitation that I’m aware of is that the routine that records schema changes is not parallel-safe, so if you run many parallel ALTER TABLE commands, it may record the same schema change twice, but it won’t fail.

But it would help if you didn’t do the multiple ALTER TABLE on the same table in parallel anyhow (I don’t think it’s possible due to the ALTER locking anyway). So it’s not even a limitation anyway.

Anyway, that’s all I have for now. My temporal system for PostgreSQL is now perfect, and you can use it if you want.