PostgreSQL 性能优化:手撸一个高性能行级触发器扩展
为什么需要高性能触发器?
设计思路
代码实现 (C 语言)
1. 定义扩展
2. 编写 C 代码
3. 编译和安装
4. 使用扩展
总结
你好,我是那个喜欢折腾的程序员老王。
咱们今天来聊聊 PostgreSQL 里的触发器。你肯定用过触发器,这玩意儿在数据变更时自动执行一些操作,挺方便的。但,你有没有遇到过这种情况:数据批量更新时,触发器导致性能急剧下降?特别是行级触发器,每改一行数据就触发一次,简直是性能杀手!
别担心,今天咱们就来一起手撸一个 PostgreSQL 扩展,实现一个高性能的行级触发器。通过批量缓存变更数据、异步写入等方式,大幅减少触发器带来的开销,让你的数据库飞起来!
为什么需要高性能触发器?
在深入代码之前,咱们先来聊聊为什么需要高性能触发器。PostgreSQL 原生的触发器在处理大量数据变更时,性能瓶颈主要体现在以下几个方面:
- 频繁的上下文切换: 行级触发器为每一行数据的变更都会触发一次,这意味着数据库需要在 PostgreSQL 主进程和触发器函数执行环境之间频繁切换。这种切换是有开销的,尤其是在高并发场景下。
- 同步执行: 默认情况下,触发器是同步执行的。也就是说,在触发器函数执行完成之前,数据变更操作会被阻塞。如果触发器函数执行时间较长,会严重影响数据库的整体性能。
- 缺乏批量处理: 原生触发器没有批量处理机制,每次只处理一行数据。如果触发器函数内部需要执行一些复杂的操作,比如写入其他表、调用外部服务等,那么每一行数据都会触发一次这些操作,效率极低。
为了解决这些问题,我们需要一个高性能的触发器。这个触发器应该具备以下特性:
- 批量处理: 将多次数据变更合并成一批进行处理,减少触发器函数的调用次数。
- 异步执行: 将触发器函数的执行与数据变更操作分离,避免阻塞主进程。
- 可配置: 提供灵活的配置选项,允许用户根据实际需求调整触发器的行为。
设计思路
我们的高性能触发器扩展将采用以下设计思路:
- 共享内存缓冲区: 使用 PostgreSQL 的共享内存机制,创建一个缓冲区用于存储变更的数据。所有的数据变更操作都会先将变更数据写入缓冲区,而不是直接触发触发器函数。
- 后台工作进程 (Background Worker): 创建一个后台工作进程,负责从缓冲区读取变更数据,并批量执行触发器函数。后台工作进程与 PostgreSQL 主进程异步执行,不会阻塞数据变更操作。
- 信号机制: 当缓冲区中的数据达到一定数量或一定时间间隔后,后台工作进程会向 PostgreSQL 主进程发送信号,通知主进程有新的数据需要处理。主进程收到信号后,会唤醒后台工作进程。
- 自定义触发器函数: 提供一个自定义的触发器函数,用于替换原生的触发器函数。这个自定义函数会将变更数据写入共享内存缓冲区,而不是直接执行用户定义的逻辑。
- 控制函数: 提供创建/销毁 触发器, 启动/停止 后台工作进程的控制函数. 便于使用.
代码实现 (C 语言)
下面,我们将逐步实现这个高性能触发器扩展。为了简化代码,我们只实现核心功能,并省略一些错误处理和边界检查。
1. 定义扩展
首先,我们需要定义一个 PostgreSQL 扩展。创建一个名为 pg_hll_trigger
的目录,并在其中创建以下文件:
pg_hll_trigger.control
:
# pg_hll_trigger extension comment = 'High-performance row-level trigger for PostgreSQL' default_version = '1.0' module_pathname = '$libdir/pg_hll_trigger' relocatable = true
pg_hll_trigger--1.0.sql
:
-- 定义共享内存结构 CREATE TYPE hll_trigger_data AS ( tg_op text, tg_table_name text, new_row jsonb, old_row jsonb ); -- 创建控制函数 CREATE FUNCTION hll_trigger_create(trigger_name text, table_name text, function_name text, buffer_size int DEFAULT 100, flush_interval int DEFAULT 10) RETURNS void AS '$libdir/pg_hll_trigger', 'hll_trigger_create' LANGUAGE C STRICT; CREATE FUNCTION hll_trigger_drop(trigger_name text) RETURNS void AS '$libdir/pg_hll_trigger', 'hll_trigger_drop' LANGUAGE C STRICT; CREATE FUNCTION hll_trigger_start() RETURNS void AS '$libdir/pg_hll_trigger','hll_trigger_start' LANGUAGE C STRICT; CREATE FUNCTION hll_trigger_stop() RETURNS void AS '$libdir/pg_hll_trigger','hll_trigger_stop' LANGUAGE C STRICT; -- 内部使用的触发器函数 CREATE FUNCTION hll_trigger_handler() RETURNS trigger AS '$libdir/pg_hll_trigger', 'hll_trigger_handler' LANGUAGE C STRICT;
2. 编写 C 代码
创建一个名为 pg_hll_trigger.c
的文件,并编写以下 C 代码:
#include "postgres.h" #include "fmgr.h" #include "funcapi.h" #include "executor/spi.h" #include "commands/trigger.h" #include "utils/memutils.h" #include "utils/guc.h" #include "miscadmin.h" #include "storage/ipc.h" #include "storage/shmem.h" #include "storage/lwlock.h" #include "storage/proc.h" #include "postmaster/bgworker.h" #include "access/xact.h" #include <time.h> PG_MODULE_MAGIC; // 共享内存结构 typedef struct { LWLock lock; // 轻量级锁,用于保护缓冲区 int buffer_size; // 缓冲区大小(记录条数) int flush_interval; // 刷新间隔(秒) int count; // 当前缓冲区中的记录数 time_t last_flush; // 上次刷新的时间 char data[FLEXIBLE_ARRAY_MEMBER]; // 存储触发器数据的数组. hll_trigger_data } SharedBuffer; static SharedBuffer *shared_buffer = NULL; static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static bool hll_trigger_worker_running = false; // 获取共享内存中记录的结构体 static inline hll_trigger_data* get_hll_trigger_data(int index) { return (hll_trigger_data*)(shared_buffer->data + index * sizeof(hll_trigger_data)); } void hll_trigger_start(void); void hll_trigger_stop(void); // 共享内存初始化 static void hll_trigger_shmem_startup(void); // 后台工作进程主函数 void hll_trigger_main(Datum main_arg); // 信号处理函数 static void hll_trigger_sigterm(SIGNAL_ARGS); // 扩展初始化函数 void _PG_init(void) { if (!process_shared_preload_libraries_in_progress) { return; } RequestAddinShmemSpace(sizeof(SharedBuffer)); RequestNamedLWLockTranche("hll_trigger", 1); prev_shmem_startup_hook = shmem_startup_hook; shmem_startup_hook = hll_trigger_shmem_startup; BackgroundWorker worker = { .bgw_name = "HLL Trigger Worker", .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION, .bgw_start_time = BgWorkerStart_ConsistentState, .bgw_restart_time = BGW_NEVER_RESTART, // 发生错误不自动重启, 简化流程. .bgw_main = hll_trigger_main, .bgw_notify_pid = 0 }; RegisterBackgroundWorker(&worker); } // 共享内存初始化 static void hll_trigger_shmem_startup(void) { bool found; if (prev_shmem_startup_hook) { prev_shmem_startup_hook(); } shared_buffer = (SharedBuffer *)ShmemInitStruct("HLL Trigger Buffer", sizeof(SharedBuffer), &found); if (!found) { LWLockInitialize(&shared_buffer->lock, 0); shared_buffer->buffer_size = 100; // 默认缓冲区大小 shared_buffer->flush_interval = 10; // 默认刷新间隔 shared_buffer->count = 0; shared_buffer->last_flush = time(NULL); } } PG_FUNCTION_INFO_V1(hll_trigger_create); Datum hll_trigger_create(PG_FUNCTION_ARGS) { text *trigger_name_text = PG_GETARG_TEXT_P(0); text *table_name_text = PG_GETARG_TEXT_P(1); text *function_name_text = PG_GETARG_TEXT_P(2); int32 buffer_size = PG_GETARG_INT32(3); int32 flush_interval = PG_GETARG_INT32(4); char *trigger_name = text_to_cstring(trigger_name_text); char *table_name = text_to_cstring(table_name_text); char *function_name = text_to_cstring(function_name_text); if(!hll_trigger_worker_running){ ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("hll_trigger worker is not running, please call hll_trigger_start()"))); } // 检查是否已经存在同名触发器 if (SPI_connect() != SPI_OK_CONNECT) { ereport(ERROR, (errmsg("SPI_connect failed"))); } char query[512]; snprintf(query, sizeof(query), "SELECT 1 FROM pg_trigger WHERE tgname = '%s'", trigger_name); if (SPI_execute(query, true, 0) != SPI_OK_SELECT) { ereport(ERROR, (errmsg("SPI_execute failed"))); } if(SPI_processed > 0 ) { ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("trigger \"%s\" already exists", trigger_name))); } // 修改共享内存配置 LWLockAcquire(&shared_buffer->lock, LW_EXCLUSIVE); shared_buffer->buffer_size = buffer_size; shared_buffer->flush_interval = flush_interval; LWLockRelease(&shared_buffer->lock); // 创建触发器 snprintf(query, sizeof(query), "CREATE TRIGGER %s AFTER INSERT OR UPDATE OR DELETE ON %s FOR EACH ROW EXECUTE PROCEDURE hll_trigger_handler()", trigger_name, table_name); if (SPI_execute(query, false, 0) != SPI_OK_UTILITY) { ereport(ERROR, (errmsg("SPI_execute failed"))); } //记录function_name, 在后台进程中使用. SPI_finish(); PG_RETURN_VOID(); } PG_FUNCTION_INFO_V1(hll_trigger_drop); Datum hll_trigger_drop(PG_FUNCTION_ARGS) { text *trigger_name_text = PG_GETARG_TEXT_P(0); char *trigger_name = text_to_cstring(trigger_name_text); if (SPI_connect() != SPI_OK_CONNECT) { ereport(ERROR, (errmsg("SPI_connect failed"))); } // 删除触发器 char query[256]; snprintf(query, sizeof(query), "DROP TRIGGER IF EXISTS %s ON %s", trigger_name,quote_ident("table_xxx")); //quote_ident 防止sql注入 if (SPI_execute(query, false, 0) != SPI_OK_UTILITY) { ereport(ERROR, (errmsg("SPI_execute failed"))); } SPI_finish(); PG_RETURN_VOID(); } PG_FUNCTION_INFO_V1(hll_trigger_handler); Datum hll_trigger_handler(PG_FUNCTION_ARGS) { TriggerData *trigdata = (TriggerData *) fcinfo->context; HeapTuple new_row = trigdata->tg_newtuple; HeapTuple old_row = trigdata->tg_trigtuple; if (!CALLED_AS_TRIGGER(fcinfo)) { ereport(ERROR, (errmsg("not called as trigger"))); } // 获取操作类型 char *tg_op; switch (trigdata->tg_event & TRIGGER_EVENT_OPMASK) { case TRIGGER_EVENT_INSERT: tg_op = "INSERT"; break; case TRIGGER_EVENT_UPDATE: tg_op = "UPDATE"; break; case TRIGGER_EVENT_DELETE: tg_op = "DELETE"; break; default: tg_op = "UNKNOWN"; } LWLockAcquire(&shared_buffer->lock, LW_EXCLUSIVE); // 检查缓冲区是否已满 if (shared_buffer->count >= shared_buffer->buffer_size) { LWLockRelease(&shared_buffer->lock); //这里可以根据需要选择是否阻塞. 简化处理,直接返回. ereport(WARNING,(errmsg("hll_trigger buffer is full, discarding trigger data"))); return PointerGetDatum(NULL); } // 将数据写入缓冲区 hll_trigger_data* trigger_data = get_hll_trigger_data(shared_buffer->count); trigger_data->tg_op = pstrdup(tg_op); trigger_data->tg_table_name = pstrdup(SPI_getrelname(trigdata->tg_relation)); if (new_row) { trigger_data->new_row = SPI_rowtojsonb(new_row); } else { trigger_data->new_row = NULL; } if (old_row) { trigger_data->old_row = SPI_rowtojsonb(old_row); } else { trigger_data->old_row = NULL; } shared_buffer->count++; LWLockRelease(&shared_buffer->lock); return PointerGetDatum(NULL); } // 后台工作进程主函数 void hll_trigger_main(Datum main_arg) { pqsignal(SIGTERM, hll_trigger_sigterm); BackgroundWorkerUnblockSignals(); BackgroundWorkerInitializeConnection("postgres", NULL,0); while (true) { CHECK_FOR_INTERRUPTS(); // 检查是否需要退出 if (IsBackgroundWorkerShutdownRequested()) { proc_exit(0); } LWLockAcquire(&shared_buffer->lock, LW_SHARED); int count = shared_buffer->count; time_t last_flush = shared_buffer->last_flush; LWLockRelease(&shared_buffer->lock); // 检查是否需要刷新缓冲区 if (count > 0 && (count >= shared_buffer->buffer_size || (time(NULL) - last_flush) >= shared_buffer->flush_interval)) { // 复制数据 hll_trigger_data *data_copy = (hll_trigger_data *) palloc(count * sizeof(hll_trigger_data)); LWLockAcquire(&shared_buffer->lock, LW_EXCLUSIVE); for(int i=0; i< count; i++){ data_copy[i].tg_op = pstrdup(get_hll_trigger_data(i)->tg_op); data_copy[i].tg_table_name = pstrdup(get_hll_trigger_data(i)->tg_table_name); data_copy[i].new_row = get_hll_trigger_data(i)->new_row ? jsonb_copy(get_hll_trigger_data(i)->new_row): NULL; data_copy[i].old_row = get_hll_trigger_data(i)->old_row ? jsonb_copy(get_hll_trigger_data(i)->old_row) : NULL; } shared_buffer->count = 0; shared_buffer->last_flush = time(NULL); LWLockRelease(&shared_buffer->lock); // 处理数据 (这里只是简单的打印, 应该调用用户定义的函数) if (SPI_connect() != SPI_OK_CONNECT) { ereport(ERROR, (errmsg("SPI_connect failed"))); } for (int i = 0; i < count; i++) { ereport(LOG,( errmsg("table: %s, op: %s, new: %s, old: %s", data_copy[i].tg_table_name, data_copy[i].tg_op, data_copy[i].new_row ? TextDatumGetCString(JsonbToCString(NULL,data_copy[i].new_row, -1)): "null", data_copy[i].old_row ? TextDatumGetCString(JsonbToCString(NULL,data_copy[i].old_row, -1)) : "null" ) )); //TODO: 调用用户函数. //pfree(data_copy[i]); } SPI_finish(); pfree(data_copy); } // 休眠一段时间 pg_usleep(1000000L); // 1 second } } // 信号处理函数, 处理关闭信号. static void hll_trigger_sigterm(SIGNAL_ARGS) { int save_errno = errno; SetLatch(MyLatch); if(MyProc) { MyProc->procLatch = save_errno; } } PG_FUNCTION_INFO_V1(hll_trigger_start); Datum hll_trigger_start(PG_FUNCTION_ARGS) { if(hll_trigger_worker_running) { ereport(WARNING, (errmsg("hll_trigger worker already running"))); PG_RETURN_VOID(); } BackgroundWorker worker = { .bgw_name = "HLL Trigger Worker", .bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION, .bgw_start_time = BgWorkerStart_ConsistentState, .bgw_restart_time = BGW_NEVER_RESTART, // 发生错误不自动重启, 简化流程. .bgw_main = hll_trigger_main, .bgw_notify_pid = 0 }; RegisterBackgroundWorker(&worker); hll_trigger_worker_running = true; PG_RETURN_VOID(); } PG_FUNCTION_INFO_V1(hll_trigger_stop); Datum hll_trigger_stop(PG_FUNCTION_ARGS) { //TODO: 向后台进程发送关闭信号. hll_trigger_worker_running = false; PG_RETURN_VOID(); }
3. 编译和安装
要编译这个扩展,你需要安装 PostgreSQL 开发包,并确保 pg_config
在你的 PATH 中。然后,你可以使用以下命令编译和安装扩展:
make make install
4. 使用扩展
安装扩展后,你可以在 PostgreSQL 中使用以下步骤创建和使用高性能触发器:
- 加载扩展:
CREATE EXTENSION pg_hll_trigger;
- 启动后台工作进程:
SELECT hll_trigger_start();
- 创建触发器:
-- 假设你有一个名为 my_table 的表,和一个名为 my_trigger_function 的触发器函数 CREATE TABLE my_table (id serial, data text); CREATE OR REPLACE FUNCTION my_trigger_function() RETURNS trigger AS $$ BEGIN -- 这里是你的触发器逻辑 RAISE NOTICE 'Trigger function called for table % with operation %', TG_TABLE_NAME, TG_OP; IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN RAISE NOTICE 'NEW: %', row_to_json(NEW); END IF; IF TG_OP = 'DELETE' OR TG_OP = 'UPDATE' THEN RAISE NOTICE 'OLD: %', row_to_json(OLD); END IF; RETURN NULL; END; $$ LANGUAGE plpgsql; -- 使用 hll_trigger_create 创建高性能触发器, 1000条 或 60s 触发 SELECT hll_trigger_create('my_hll_trigger', 'my_table', 'my_trigger_function',1000, 60);
- 测试触发器:
-- 插入一些数据 INSERT INTO my_table (data) VALUES ('data1'), ('data2'), ('data3'); -- 更新一些数据 UPDATE my_table SET data = 'updated_data' WHERE id = 1; -- 删除一些数据 DELETE FROM my_table WHERE id = 2; -- 查看日志输出, 确认触发器是否被批量执行
- 停止后台进程
SELECT hll_trigger_stop();
- 删除触发器:
SELECT hll_trigger_drop('my_hll_trigger');
总结
通过以上步骤,我们就实现了一个简单的 PostgreSQL 高性能行级触发器扩展。这个扩展通过共享内存缓冲区、后台工作进程和自定义触发器函数,实现了批量处理和异步执行,从而减少了触发器带来的性能开销。
当然,这只是一个简单的示例,还有很多可以优化和完善的地方,比如:
- 更精细的锁机制: 使用更精细的锁机制,减少锁竞争,提高并发性能。
- 错误处理: 添加更完善的错误处理机制,提高扩展的健壮性。
- 动态注册/注销后台进程: 根据需要动态注册和注销后台进程,减少资源占用。
- 支持更多触发器事件: 支持更多的触发器事件,比如
TRUNCATE
等。 - 用户自定义函数调用: 目前只是打印日志, 需要修改代码, 调用用户创建触发器时传入的函数.
- 完善 hll_trigger_stop 函数: 停止后台工作进程, 避免异常.
希望这个示例能帮助你更好地理解 PostgreSQL 触发器的工作原理,并为你的数据库性能优化提供一些思路。记住,性能优化是一个持续的过程,需要不断地分析和调整。如果你有任何问题或建议,欢迎随时交流!