WEBKT

PostgreSQL 性能优化:手撸一个高性能行级触发器扩展

34 0 0 0

为什么需要高性能触发器?

设计思路

代码实现 (C 语言)

1. 定义扩展

2. 编写 C 代码

3. 编译和安装

4. 使用扩展

总结

你好,我是那个喜欢折腾的程序员老王。

咱们今天来聊聊 PostgreSQL 里的触发器。你肯定用过触发器,这玩意儿在数据变更时自动执行一些操作,挺方便的。但,你有没有遇到过这种情况:数据批量更新时,触发器导致性能急剧下降?特别是行级触发器,每改一行数据就触发一次,简直是性能杀手!

别担心,今天咱们就来一起手撸一个 PostgreSQL 扩展,实现一个高性能的行级触发器。通过批量缓存变更数据、异步写入等方式,大幅减少触发器带来的开销,让你的数据库飞起来!

为什么需要高性能触发器?

在深入代码之前,咱们先来聊聊为什么需要高性能触发器。PostgreSQL 原生的触发器在处理大量数据变更时,性能瓶颈主要体现在以下几个方面:

  1. 频繁的上下文切换: 行级触发器为每一行数据的变更都会触发一次,这意味着数据库需要在 PostgreSQL 主进程和触发器函数执行环境之间频繁切换。这种切换是有开销的,尤其是在高并发场景下。
  2. 同步执行: 默认情况下,触发器是同步执行的。也就是说,在触发器函数执行完成之前,数据变更操作会被阻塞。如果触发器函数执行时间较长,会严重影响数据库的整体性能。
  3. 缺乏批量处理: 原生触发器没有批量处理机制,每次只处理一行数据。如果触发器函数内部需要执行一些复杂的操作,比如写入其他表、调用外部服务等,那么每一行数据都会触发一次这些操作,效率极低。

为了解决这些问题,我们需要一个高性能的触发器。这个触发器应该具备以下特性:

  • 批量处理: 将多次数据变更合并成一批进行处理,减少触发器函数的调用次数。
  • 异步执行: 将触发器函数的执行与数据变更操作分离,避免阻塞主进程。
  • 可配置: 提供灵活的配置选项,允许用户根据实际需求调整触发器的行为。

设计思路

我们的高性能触发器扩展将采用以下设计思路:

  1. 共享内存缓冲区: 使用 PostgreSQL 的共享内存机制,创建一个缓冲区用于存储变更的数据。所有的数据变更操作都会先将变更数据写入缓冲区,而不是直接触发触发器函数。
  2. 后台工作进程 (Background Worker): 创建一个后台工作进程,负责从缓冲区读取变更数据,并批量执行触发器函数。后台工作进程与 PostgreSQL 主进程异步执行,不会阻塞数据变更操作。
  3. 信号机制: 当缓冲区中的数据达到一定数量或一定时间间隔后,后台工作进程会向 PostgreSQL 主进程发送信号,通知主进程有新的数据需要处理。主进程收到信号后,会唤醒后台工作进程。
  4. 自定义触发器函数: 提供一个自定义的触发器函数,用于替换原生的触发器函数。这个自定义函数会将变更数据写入共享内存缓冲区,而不是直接执行用户定义的逻辑。
  5. 控制函数: 提供创建/销毁 触发器, 启动/停止 后台工作进程的控制函数. 便于使用.

代码实现 (C 语言)

下面,我们将逐步实现这个高性能触发器扩展。为了简化代码,我们只实现核心功能,并省略一些错误处理和边界检查。

1. 定义扩展

首先,我们需要定义一个 PostgreSQL 扩展。创建一个名为 pg_hll_trigger 的目录,并在其中创建以下文件:

  • pg_hll_trigger.control:
# pg_hll_trigger extension
comment = 'High-performance row-level trigger for PostgreSQL'
default_version = '1.0'
module_pathname = '$libdir/pg_hll_trigger'
relocatable = true
  • pg_hll_trigger--1.0.sql:
-- 定义共享内存结构
CREATE TYPE hll_trigger_data AS (
tg_op text,
tg_table_name text,
new_row jsonb,
old_row jsonb
);
-- 创建控制函数
CREATE FUNCTION hll_trigger_create(trigger_name text, table_name text, function_name text, buffer_size int DEFAULT 100, flush_interval int DEFAULT 10) RETURNS void
AS '$libdir/pg_hll_trigger', 'hll_trigger_create'
LANGUAGE C STRICT;
CREATE FUNCTION hll_trigger_drop(trigger_name text) RETURNS void
AS '$libdir/pg_hll_trigger', 'hll_trigger_drop'
LANGUAGE C STRICT;
CREATE FUNCTION hll_trigger_start() RETURNS void
AS '$libdir/pg_hll_trigger','hll_trigger_start'
LANGUAGE C STRICT;
CREATE FUNCTION hll_trigger_stop() RETURNS void
AS '$libdir/pg_hll_trigger','hll_trigger_stop'
LANGUAGE C STRICT;
-- 内部使用的触发器函数
CREATE FUNCTION hll_trigger_handler() RETURNS trigger
AS '$libdir/pg_hll_trigger', 'hll_trigger_handler'
LANGUAGE C STRICT;

2. 编写 C 代码

创建一个名为 pg_hll_trigger.c 的文件,并编写以下 C 代码:

#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "executor/spi.h"
#include "commands/trigger.h"
#include "utils/memutils.h"
#include "utils/guc.h"
#include "miscadmin.h"
#include "storage/ipc.h"
#include "storage/shmem.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "postmaster/bgworker.h"
#include "access/xact.h"
#include <time.h>
PG_MODULE_MAGIC;
// 共享内存结构
typedef struct {
LWLock lock; // 轻量级锁,用于保护缓冲区
int buffer_size; // 缓冲区大小(记录条数)
int flush_interval; // 刷新间隔(秒)
int count; // 当前缓冲区中的记录数
time_t last_flush; // 上次刷新的时间
char data[FLEXIBLE_ARRAY_MEMBER]; // 存储触发器数据的数组. hll_trigger_data
} SharedBuffer;
static SharedBuffer *shared_buffer = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static bool hll_trigger_worker_running = false;
// 获取共享内存中记录的结构体
static inline hll_trigger_data* get_hll_trigger_data(int index) {
return (hll_trigger_data*)(shared_buffer->data + index * sizeof(hll_trigger_data));
}
void hll_trigger_start(void);
void hll_trigger_stop(void);
// 共享内存初始化
static void hll_trigger_shmem_startup(void);
// 后台工作进程主函数
void hll_trigger_main(Datum main_arg);
// 信号处理函数
static void hll_trigger_sigterm(SIGNAL_ARGS);
// 扩展初始化函数
void _PG_init(void) {
if (!process_shared_preload_libraries_in_progress) {
return;
}
RequestAddinShmemSpace(sizeof(SharedBuffer));
RequestNamedLWLockTranche("hll_trigger", 1);
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = hll_trigger_shmem_startup;
BackgroundWorker worker = {
.bgw_name = "HLL Trigger Worker",
.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
.bgw_start_time = BgWorkerStart_ConsistentState,
.bgw_restart_time = BGW_NEVER_RESTART, // 发生错误不自动重启, 简化流程.
.bgw_main = hll_trigger_main,
.bgw_notify_pid = 0
};
RegisterBackgroundWorker(&worker);
}
// 共享内存初始化
static void hll_trigger_shmem_startup(void) {
bool found;
if (prev_shmem_startup_hook) {
prev_shmem_startup_hook();
}
shared_buffer = (SharedBuffer *)ShmemInitStruct("HLL Trigger Buffer", sizeof(SharedBuffer), &found);
if (!found) {
LWLockInitialize(&shared_buffer->lock, 0);
shared_buffer->buffer_size = 100; // 默认缓冲区大小
shared_buffer->flush_interval = 10; // 默认刷新间隔
shared_buffer->count = 0;
shared_buffer->last_flush = time(NULL);
}
}
PG_FUNCTION_INFO_V1(hll_trigger_create);
Datum hll_trigger_create(PG_FUNCTION_ARGS) {
text *trigger_name_text = PG_GETARG_TEXT_P(0);
text *table_name_text = PG_GETARG_TEXT_P(1);
text *function_name_text = PG_GETARG_TEXT_P(2);
int32 buffer_size = PG_GETARG_INT32(3);
int32 flush_interval = PG_GETARG_INT32(4);
char *trigger_name = text_to_cstring(trigger_name_text);
char *table_name = text_to_cstring(table_name_text);
char *function_name = text_to_cstring(function_name_text);
if(!hll_trigger_worker_running){
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("hll_trigger worker is not running, please call hll_trigger_start()")));
}
// 检查是否已经存在同名触发器
if (SPI_connect() != SPI_OK_CONNECT) {
ereport(ERROR, (errmsg("SPI_connect failed")));
}
char query[512];
snprintf(query, sizeof(query), "SELECT 1 FROM pg_trigger WHERE tgname = '%s'", trigger_name);
if (SPI_execute(query, true, 0) != SPI_OK_SELECT) {
ereport(ERROR, (errmsg("SPI_execute failed")));
}
if(SPI_processed > 0 ) {
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("trigger \"%s\" already exists", trigger_name)));
}
// 修改共享内存配置
LWLockAcquire(&shared_buffer->lock, LW_EXCLUSIVE);
shared_buffer->buffer_size = buffer_size;
shared_buffer->flush_interval = flush_interval;
LWLockRelease(&shared_buffer->lock);
// 创建触发器
snprintf(query, sizeof(query),
"CREATE TRIGGER %s AFTER INSERT OR UPDATE OR DELETE ON %s FOR EACH ROW EXECUTE PROCEDURE hll_trigger_handler()",
trigger_name, table_name);
if (SPI_execute(query, false, 0) != SPI_OK_UTILITY) {
ereport(ERROR, (errmsg("SPI_execute failed")));
}
//记录function_name, 在后台进程中使用.
SPI_finish();
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(hll_trigger_drop);
Datum hll_trigger_drop(PG_FUNCTION_ARGS) {
text *trigger_name_text = PG_GETARG_TEXT_P(0);
char *trigger_name = text_to_cstring(trigger_name_text);
if (SPI_connect() != SPI_OK_CONNECT) {
ereport(ERROR, (errmsg("SPI_connect failed")));
}
// 删除触发器
char query[256];
snprintf(query, sizeof(query), "DROP TRIGGER IF EXISTS %s ON %s", trigger_name,quote_ident("table_xxx")); //quote_ident 防止sql注入
if (SPI_execute(query, false, 0) != SPI_OK_UTILITY) {
ereport(ERROR, (errmsg("SPI_execute failed")));
}
SPI_finish();
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(hll_trigger_handler);
Datum hll_trigger_handler(PG_FUNCTION_ARGS) {
TriggerData *trigdata = (TriggerData *) fcinfo->context;
HeapTuple new_row = trigdata->tg_newtuple;
HeapTuple old_row = trigdata->tg_trigtuple;
if (!CALLED_AS_TRIGGER(fcinfo)) {
ereport(ERROR, (errmsg("not called as trigger")));
}
// 获取操作类型
char *tg_op;
switch (trigdata->tg_event & TRIGGER_EVENT_OPMASK) {
case TRIGGER_EVENT_INSERT:
tg_op = "INSERT";
break;
case TRIGGER_EVENT_UPDATE:
tg_op = "UPDATE";
break;
case TRIGGER_EVENT_DELETE:
tg_op = "DELETE";
break;
default:
tg_op = "UNKNOWN";
}
LWLockAcquire(&shared_buffer->lock, LW_EXCLUSIVE);
// 检查缓冲区是否已满
if (shared_buffer->count >= shared_buffer->buffer_size) {
LWLockRelease(&shared_buffer->lock);
//这里可以根据需要选择是否阻塞. 简化处理,直接返回.
ereport(WARNING,(errmsg("hll_trigger buffer is full, discarding trigger data")));
return PointerGetDatum(NULL);
}
// 将数据写入缓冲区
hll_trigger_data* trigger_data = get_hll_trigger_data(shared_buffer->count);
trigger_data->tg_op = pstrdup(tg_op);
trigger_data->tg_table_name = pstrdup(SPI_getrelname(trigdata->tg_relation));
if (new_row) {
trigger_data->new_row = SPI_rowtojsonb(new_row);
} else {
trigger_data->new_row = NULL;
}
if (old_row) {
trigger_data->old_row = SPI_rowtojsonb(old_row);
} else {
trigger_data->old_row = NULL;
}
shared_buffer->count++;
LWLockRelease(&shared_buffer->lock);
return PointerGetDatum(NULL);
}
// 后台工作进程主函数
void hll_trigger_main(Datum main_arg) {
pqsignal(SIGTERM, hll_trigger_sigterm);
BackgroundWorkerUnblockSignals();
BackgroundWorkerInitializeConnection("postgres", NULL,0);
while (true) {
CHECK_FOR_INTERRUPTS();
// 检查是否需要退出
if (IsBackgroundWorkerShutdownRequested()) {
proc_exit(0);
}
LWLockAcquire(&shared_buffer->lock, LW_SHARED);
int count = shared_buffer->count;
time_t last_flush = shared_buffer->last_flush;
LWLockRelease(&shared_buffer->lock);
// 检查是否需要刷新缓冲区
if (count > 0 && (count >= shared_buffer->buffer_size || (time(NULL) - last_flush) >= shared_buffer->flush_interval)) {
// 复制数据
hll_trigger_data *data_copy = (hll_trigger_data *) palloc(count * sizeof(hll_trigger_data));
LWLockAcquire(&shared_buffer->lock, LW_EXCLUSIVE);
for(int i=0; i< count; i++){
data_copy[i].tg_op = pstrdup(get_hll_trigger_data(i)->tg_op);
data_copy[i].tg_table_name = pstrdup(get_hll_trigger_data(i)->tg_table_name);
data_copy[i].new_row = get_hll_trigger_data(i)->new_row ? jsonb_copy(get_hll_trigger_data(i)->new_row): NULL;
data_copy[i].old_row = get_hll_trigger_data(i)->old_row ? jsonb_copy(get_hll_trigger_data(i)->old_row) : NULL;
}
shared_buffer->count = 0;
shared_buffer->last_flush = time(NULL);
LWLockRelease(&shared_buffer->lock);
// 处理数据 (这里只是简单的打印, 应该调用用户定义的函数)
if (SPI_connect() != SPI_OK_CONNECT) {
ereport(ERROR, (errmsg("SPI_connect failed")));
}
for (int i = 0; i < count; i++) {
ereport(LOG,(
errmsg("table: %s, op: %s, new: %s, old: %s",
data_copy[i].tg_table_name,
data_copy[i].tg_op,
data_copy[i].new_row ? TextDatumGetCString(JsonbToCString(NULL,data_copy[i].new_row, -1)): "null",
data_copy[i].old_row ? TextDatumGetCString(JsonbToCString(NULL,data_copy[i].old_row, -1)) : "null"
)
));
//TODO: 调用用户函数.
//pfree(data_copy[i]);
}
SPI_finish();
pfree(data_copy);
}
// 休眠一段时间
pg_usleep(1000000L); // 1 second
}
}
// 信号处理函数, 处理关闭信号.
static void hll_trigger_sigterm(SIGNAL_ARGS) {
int save_errno = errno;
SetLatch(MyLatch);
if(MyProc) {
MyProc->procLatch = save_errno;
}
}
PG_FUNCTION_INFO_V1(hll_trigger_start);
Datum hll_trigger_start(PG_FUNCTION_ARGS) {
if(hll_trigger_worker_running) {
ereport(WARNING, (errmsg("hll_trigger worker already running")));
PG_RETURN_VOID();
}
BackgroundWorker worker = {
.bgw_name = "HLL Trigger Worker",
.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
.bgw_start_time = BgWorkerStart_ConsistentState,
.bgw_restart_time = BGW_NEVER_RESTART, // 发生错误不自动重启, 简化流程.
.bgw_main = hll_trigger_main,
.bgw_notify_pid = 0
};
RegisterBackgroundWorker(&worker);
hll_trigger_worker_running = true;
PG_RETURN_VOID();
}
PG_FUNCTION_INFO_V1(hll_trigger_stop);
Datum hll_trigger_stop(PG_FUNCTION_ARGS) {
//TODO: 向后台进程发送关闭信号.
hll_trigger_worker_running = false;
PG_RETURN_VOID();
}

3. 编译和安装

要编译这个扩展,你需要安装 PostgreSQL 开发包,并确保 pg_config 在你的 PATH 中。然后,你可以使用以下命令编译和安装扩展:

make
make install

4. 使用扩展

安装扩展后,你可以在 PostgreSQL 中使用以下步骤创建和使用高性能触发器:

  1. 加载扩展:
CREATE EXTENSION pg_hll_trigger;
  1. 启动后台工作进程:
SELECT hll_trigger_start();
  1. 创建触发器:
-- 假设你有一个名为 my_table 的表,和一个名为 my_trigger_function 的触发器函数
CREATE TABLE my_table (id serial, data text);
CREATE OR REPLACE FUNCTION my_trigger_function() RETURNS trigger AS $$
BEGIN
-- 这里是你的触发器逻辑
RAISE NOTICE 'Trigger function called for table % with operation %', TG_TABLE_NAME, TG_OP;
IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN
RAISE NOTICE 'NEW: %', row_to_json(NEW);
END IF;
IF TG_OP = 'DELETE' OR TG_OP = 'UPDATE' THEN
RAISE NOTICE 'OLD: %', row_to_json(OLD);
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- 使用 hll_trigger_create 创建高性能触发器, 1000条 或 60s 触发
SELECT hll_trigger_create('my_hll_trigger', 'my_table', 'my_trigger_function',1000, 60);
  1. 测试触发器:
-- 插入一些数据
INSERT INTO my_table (data) VALUES ('data1'), ('data2'), ('data3');
-- 更新一些数据
UPDATE my_table SET data = 'updated_data' WHERE id = 1;
-- 删除一些数据
DELETE FROM my_table WHERE id = 2;
-- 查看日志输出, 确认触发器是否被批量执行
  1. 停止后台进程
SELECT hll_trigger_stop();
  1. 删除触发器:
SELECT hll_trigger_drop('my_hll_trigger');

总结

通过以上步骤,我们就实现了一个简单的 PostgreSQL 高性能行级触发器扩展。这个扩展通过共享内存缓冲区、后台工作进程和自定义触发器函数,实现了批量处理和异步执行,从而减少了触发器带来的性能开销。

当然,这只是一个简单的示例,还有很多可以优化和完善的地方,比如:

  • 更精细的锁机制: 使用更精细的锁机制,减少锁竞争,提高并发性能。
  • 错误处理: 添加更完善的错误处理机制,提高扩展的健壮性。
  • 动态注册/注销后台进程: 根据需要动态注册和注销后台进程,减少资源占用。
  • 支持更多触发器事件: 支持更多的触发器事件,比如 TRUNCATE 等。
  • 用户自定义函数调用: 目前只是打印日志, 需要修改代码, 调用用户创建触发器时传入的函数.
  • 完善 hll_trigger_stop 函数: 停止后台工作进程, 避免异常.

希望这个示例能帮助你更好地理解 PostgreSQL 触发器的工作原理,并为你的数据库性能优化提供一些思路。记住,性能优化是一个持续的过程,需要不断地分析和调整。如果你有任何问题或建议,欢迎随时交流!

爱折腾的老王 PostgreSQL触发器性能优化

评论点评

打赏赞助
sponsor

感谢您的支持让我们更好的前行

分享

QRcode

https://www.webkt.com/article/7705