数据审计是一个跟踪表内容随时间变化的系统。PostgreSQL 具有一组强大的功能,我们可以利用这些功能在 150 行 SQL 中创建通用审计解决方案。
审计对于历史分析特别有用。
为了证明这一点,设想你有一个用户表,跟踪用户何时在线。你可以添加一个状态列,它可以有两个值之一:在线和离线。
你将如何跟踪一个用户在整个月中在线的时间?
一个审计系统会用时间戳来跟踪每一个变化,因此你可以测量每个时间戳之间的差异,并将它们汇总到整个月。
我们的审计解决方案的目标是。
为了证明我们正在努力实现的目标,下面的例子显示了我们在博文结束时的情况。
-- create a table create table public.members( id int primary key, name text not null );
-- Enable auditing on the new table select audit.enable_tracking('public.members');
|
制作一些记录来审计:
-- create a new record insert into public.members(id, name) values (1, 'foo');
-- edit the record update public.members set name = 'bar' where id = 1;
-- delete the record delete from public.members;
|
审查审计日志:
select * from audit.record_history
|
id | record_id | old_record_id | op | ts | table_oid | table_schema | table_name | record | old_record ----+--------------------------------------+--------------------------------------+--------+-------------------------------------+-----------+--------------+------------+--------------------------+-------------------------- 2 | 1ecd5ff0-1b6b-5bc2-ad80-1cb19769c081 | | INSERT | Mon Feb 28 18:13:52.698511 2022 PST | 16452 | public | members | {"id": 1, "name": "foo"} | 3 | 1ecd5ff0-1b6b-5bc2-ad80-1cb19769c081 | 1ecd5ff0-1b6b-5bc2-ad80-1cb19769c081 | UPDATE | Mon Feb 28 18:13:52.698511 2022 PST | 16452 | public | members | {"id": 1, "name": "bar"} | {"id": 1, "name": "foo"} 4 | | 1ecd5ff0-1b6b-5bc2-ad80-1cb19769c081 | DELETE | Mon Feb 28 18:13:52.698511 2022 PST | 16452 | public | members | | {"id": 1, "name": "bar"} (3 rows)
|
注意我们的record_id和old_record_id在我们更新行的时候保持不变,所以我们可以很容易地查询到单行的历史记录。
以下得到上述结果的具体步骤:
首先,我们将创建一个单独的模式audit来命名我们的审计实体。
create schema if not exists audit;
接下来,我们需要一个表来跟踪插入、更新和删除。
传统上,审计表的模式反映了被审计的表,并附加了一些元数据列,如提交的时间戳。该解决方案存在一些维护挑战:
- 对表启用审计需要数据库迁移
- 当源表的模式改变时,审计表的模式也必须改变
因此,我们将依靠 PostgreSQL 的无模式JSONB数据类型将每条记录的数据存储在单个列中。这种方法的另一个好处是允许我们将多个表的审计历史存储在一个审计表中。create table audit.record_version( id bigserial primary key, -- auditing metadata record_id uuid, -- identifies a new record by it's table + primary key old_record_id uuid, -- ^ op varchar(8) not null, -- INSERT/UPDATE/DELETE/TRUNCATE ts timestamptz not null default now(), -- table identifiers table_oid oid not null, -- pg internal id for a table table_schema name not null, -- audited table's schema name e.g. 'public' table_name name not null, -- audited table's table name e.g. 'account' -- record data record jsonb, -- contents of the new record old_record jsonb -- previous record contents (for UPDATE/DELETE) );
|
如果查询太慢,审计日志对我们没有多大帮助!我们认为有 2 种查询模式是审计系统的赌注:时间范围内对表的更改:
对于时间片,我们需要ts列上的索引。由于该表是仅追加的并且该ts列由插入日期填充,因此我们的值ts自然是按升序排列的。
PostgreSQL 的内置BRIN 索引可以利用值和物理位置之间的相关性来生成一个索引,该索引在规模上比默认值(BTREE 索引)小数百倍,并且查找时间更快。
-- index ts for time range filtering create index record_version_ts on audit.record_version using brin(ts);
|
对于表过滤,我们包含了一个table_oid跟踪 PostgreSQL 内部数字表标识符的列。我们可以向该列添加索引而不是table_schemaandtable_name列,从而最小化索引大小并提供更好的性能。
-- index table_oid for table filtering create index record_version_table_oid on audit.record_version using btree(table_oid);
|
随着时间的推移对记录的更改:
存储每一行数据的缺点之一jsonb是基于列值的过滤变得非常低效。如果我们想快速查找一行的历史记录,我们需要为每一行提取和索引一个唯一标识符。
对于全局唯一标识符,我们将使用以下结构:
[table_oid, primary_key_value_1, primary_key_value_2, ...]
并将该数组散列为 UUID v5 以获得有效的可索引 UUID 类型,以识别对数据更改具有鲁棒性的行。
我们将使用一个实用函数来查找记录的主键列名:
create or replace function audit.primary_key_columns(entity_oid oid) returns text[] stable security definer language sql as $$ -- Looks up the names of a table's primary key columns select coalesce( array_agg(pa.attname::text order by pa.attnum), array[]::text[] ) column_names from pg_index pi join pg_attribute pa on pi.indrelid = pa.attrelid and pa.attnum = any(pi.indkey) where indrelid = $1 and indisprimary $$;
|
另一个使用table_oid和主键,将结果转换为记录的 UUID。
create or replace function audit.to_record_id( entity_oid oid, pkey_cols text[], rec jsonb ) returns uuid stable language sql as $$ select case when rec is null then null -- if no primary key exists, use a random uuid when pkey_cols = array[]::text[] then uuid_generate_v4() else ( select uuid_generate_v5( 'fd62bc3d-8d6e-43c2-919c-802ba3762271', ( jsonb_build_array(to_jsonb($1)) || jsonb_agg($3 ->> key_) )::text ) from unnest($2) x(key_) ) end $$;
|
最后,我们对包含这些唯一标识符的record_id和列进行索引,以便快速查询。old_record_id
-- index record_id for fast searching create index record_version_record_id on audit.record_version(record_id) where record_id is not null;
-- index old_record_id for fast searching create index record_version_old_record_id on audit.record_version(record_id) where old_record_id is not null;
|
现在我们有一个我们的审计数据的家,我们相信它可以被有效地查询。现在我们如何填充它?
我们需要在最终用户不对其事务进行任何更改的情况下填充审计表。因此,我们将设置一个触发器以在数据更改时触发。在这种情况下,我们将为每个插入/更新/删除的行触发一次触发器。create or replace function audit.insert_update_delete_trigger() returns trigger security definer language plpgsql as $$ declare pkey_cols text[] = audit.primary_key_columns(TG_RELID); record_jsonb jsonb = to_jsonb(new); record_id uuid = audit.to_record_id(TG_RELID, pkey_cols, record_jsonb); old_record_jsonb jsonb = to_jsonb(old); old_record_id uuid = audit.to_record_id(TG_RELID, pkey_cols, old_record_jsonb); begin
insert into audit.record_version( record_id, old_record_id, op, table_oid, table_schema, table_name, record, old_record ) select record_id, old_record_id, TG_OP, TG_RELID, TG_TABLE_SCHEMA, TG_TABLE_NAME, record_jsonb, old_record_jsonb;
return coalesce(new, old); end; $$;
|
最后,我们将在一个干净的、幂等的、面向用户的 API 后面结束触发器的创建和删除过程。
我们将公开的用于对表启用审计的 API 是
select audit.enable_tracking('<schema>.<table>'::regclass);
并禁用跟踪
select audit.disable_tracking('<schema>.<table>'::regclass);
在幕后,这些函数根据请求的表注册我们的审计触发器。
create or replace function audit.enable_tracking(regclass) returns void volatile security definer language plpgsql as $$ declare statement_row text = format(' create trigger audit_i_u_d before insert or update or delete on %I for each row execute procedure audit.insert_update_delete_trigger();', $1 );
pkey_cols text[] = audit.primary_key_columns($1); begin if pkey_cols = array[]::text[] then raise exception 'Table % can not be audited because it has no primary key', $1; end if;
if not exists(select 1 from pg_trigger where tgrelid = $1 and tgname = 'audit_i_u_d') then execute statement_row; end if; end; $$;
create or replace function audit.disable_tracking(regclass) returns void volatile security definer language plpgsql as $$ declare statement_row text = format( 'drop trigger if exists audit_i_u_d on %I;', $1 ); begin execute statement_row; end; $$;
|
性能
审计表总是会降低插入、更新和删除的吞吐量。在吞吐量低于每秒 1000 次写入的情况下,开销通常可以忽略不计。对于写入频率较高的表,请考虑使用pgAudit之类的工具记录 SQL 之外的更改。
总结
对于在 PostgreSQL 中进行审计的交钥匙解决方案,我们已经将此脚本打包到一个扩展中,并带有一些额外的好处,比如TRUNCATE支持。在https://github.com/supabase/supa_audit查看。