NestJS 日志进阶:自定义 Winston Transport 实现日志与消息队列/数据库集成
为啥要自定义日志 Transport?
场景分析:日志,你的系统健康“晴雨表”
动手实践:自定义 Winston Transport
1. 准备工作
2. 创建自定义 Transport 类
3. 在 NestJS 中使用自定义 Transport
4. 在服务中使用日志
进阶玩法:更灵活的日志处理
1. 日志格式化
2. 日志级别
3. 错误处理
4. 异步处理
5. 日志轮转
6. 日志聚合和分析
总结一下
扩展:RabbitMQ 和数据库
兄弟们,今天咱们聊聊 NestJS 里的日志处理,特别是如何把日志玩出花来,跟各种消息队列(Kafka、RabbitMQ)还有数据库无缝对接。别担心,咱们一步步来,保证你能听懂,还能上手操作。
为啥要自定义日志 Transport?
NestJS 默认的日志记录方式,通常就是简单地把日志输出到控制台。这在开发阶段调试挺方便,但到了生产环境,你肯定不希望所有日志都一股脑儿堆在控制台里,对吧?
这时候,就需要更强大的日志处理工具了。Winston 就是一个非常流行的 Node.js 日志库,它提供了各种 Transport,可以把日志发送到不同的地方,比如文件、数据库、甚至各种云服务。
而 NestJS 也贴心地集成了 Winston,但有时候,Winston 自带的 Transport 可能还不能完全满足你的需求。比如说,你想把日志发送到 Kafka 或者 RabbitMQ 这样的消息队列,或者你想把日志存到 MongoDB 这样的数据库里,这时候,就需要自定义 Transport 了。
场景分析:日志,你的系统健康“晴雨表”
在深入代码之前,咱们先来设想几个场景,看看自定义 Transport 到底能帮咱们解决什么问题:
场景一:电商平台的订单日志。想象一下,你正在开发一个电商平台,用户的每一次下单操作,都需要记录详细的日志,包括订单号、商品信息、用户信息、支付状态等等。这些日志,你可能需要:
- 存储到数据库里,方便后续的查询和分析。
- 发送到消息队列,触发其他的业务流程,比如通知仓库发货、给用户发送短信等等。
- 实时监控订单日志,一旦发现异常,立即报警。
场景二:微服务架构的分布式日志。如果你的系统采用了微服务架构,那么日志就会分散在各个服务中。这时候,你就需要一个统一的日志收集和管理系统,把各个服务的日志汇集起来,方便排查问题。
场景三:安全审计日志。对于一些安全性要求比较高的系统,比如金融系统,你需要记录用户的每一次操作,以及系统的每一次安全事件。这些日志需要长期保存,并且不能被篡改。
在这些场景下,自定义 Winston Transport 就能派上大用场了。你可以根据自己的需求,把日志发送到最合适的地方,实现日志的灵活管理和利用。
动手实践:自定义 Winston Transport
好了,说了这么多,咱们开始动手吧!咱们一步步来实现一个自定义的 Winston Transport,把日志发送到 Kafka。
1. 准备工作
首先,确保你已经安装了 NestJS 和相关的依赖:
npm install --save @nestjs/common @nestjs/core winston winston-transport kafkajs
这里,我们安装了 winston
和 winston-transport
这两个库,它们是 Winston 的核心。kafkajs
是一个 Kafka 的客户端库,用于连接 Kafka。
2. 创建自定义 Transport 类
接下来,咱们创建一个自定义的 Transport 类,继承自 winston-transport
:
// kafka-transport.ts import * as Transport from 'winston-transport'; import { Kafka, Producer, KafkaConfig, ProducerConfig } from 'kafkajs'; interface KafkaTransportOptions extends Transport.TransportStreamOptions { kafkaConfig: KafkaConfig; producerConfig?: ProducerConfig; topic: string; } export class KafkaTransport extends Transport { private producer: Producer; private topic: string; constructor(opts: KafkaTransportOptions) { super(opts); const kafka = new Kafka(opts.kafkaConfig); this.producer = kafka.producer(opts.producerConfig); this.topic = opts.topic; this.connect(); } async connect() { await this.producer.connect(); } async log(info: any, callback: () => void) { setImmediate(() => { this.emit('logged', info); }); try { await this.producer.send({ topic: this.topic, messages: [{ value: JSON.stringify(info) }], }); } catch (error) { console.error('Error sending log to Kafka:', error); } callback(); } async close() { await this.producer.disconnect(); } }
这个 KafkaTransport
类,主要做了这么几件事:
- 构造函数:接收 Kafka 的配置信息(
kafkaConfig
)、生产者配置(producerConfig
)、以及要发送到的 topic(topic
)。 - connect 方法:连接到 Kafka。
- log 方法:这是核心方法,每当有新的日志产生时,Winston 就会调用这个方法。在这个方法里,我们把日志信息序列化成 JSON 字符串,然后发送到 Kafka。
- close 方法:断开与 Kafka 的连接。
3. 在 NestJS 中使用自定义 Transport
有了自定义的 Transport,咱们就可以在 NestJS 中使用它了。咱们创建一个 LoggerModule
,来配置 Winston:
// logger.module.ts import { Module } from '@nestjs/common'; import { WinstonModule } from 'nest-winston'; import * as winston from 'winston'; import { KafkaTransport } from './kafka-transport'; @Module({ imports: [ WinstonModule.forRoot({ transports: [ new winston.transports.Console({ format: winston.format.combine( winston.format.timestamp(), winston.format.ms(), winston.format.printf(({ level, message, timestamp, ms, context, ...meta }) => { let msg = `${timestamp} [${level}] [${context}] ${message}`; if(Object.keys(meta).length) { msg += ` ${JSON.stringify(meta)}`; } msg +=` ${ms}`; return msg; }), ), }), new KafkaTransport({ level: 'info', kafkaConfig: { clientId: 'my-app', brokers: ['localhost:9092'], }, topic: 'my-app-logs', }), ], }), ], }) export class LoggerModule {}
在这个 LoggerModule
里,我们使用 WinstonModule.forRoot
方法来配置 Winston。transports
数组里,可以配置多个 Transport,这里我们配置了两个:
winston.transports.Console
:这是 Winston 自带的控制台 Transport,用于在控制台输出日志。KafkaTransport
:这就是咱们刚刚创建的自定义 Transport,用于把日志发送到 Kafka。
注意,在 KafkaTransport
的配置中,我们指定了 Kafka 的连接信息(brokers
)和 topic(my-app-logs
)。
4. 在服务中使用日志
配置好了 LoggerModule,咱们就可以在服务中使用日志了。NestJS 提供了 LoggerService
接口,你可以通过依赖注入的方式,在你的服务中使用它:
// app.service.ts import { Injectable, LoggerService } from '@nestjs/common'; import { Inject } from '@nestjs/common'; @Injectable() export class AppService { constructor(@Inject(LoggerService) private logger: LoggerService) {} doSomething() { this.logger.log('Doing something...', AppService.name); // ... } }
在这个例子中,我们在 AppService
的构造函数中,注入了 LoggerService
。然后,就可以使用 this.logger.log
、this.logger.error
等方法来记录日志了。 第一个参数是日志消息, 第二个参数是context, 通常使用类名。
当你调用 this.logger.log
方法时,Winston 就会把日志信息传递给所有配置的 Transport,包括咱们的 KafkaTransport
。KafkaTransport
会把日志发送到 Kafka,同时 Console
也会在控制台输出。
进阶玩法:更灵活的日志处理
上面的例子,只是一个简单的演示。在实际应用中,你可能还需要考虑更多的问题,比如:
1. 日志格式化
Winston 提供了强大的日志格式化功能。你可以使用 winston.format
模块,来定制日志的输出格式。比如,你可以添加时间戳、日志级别、上下文信息等等:
// logger.module.ts winston.format.combine( winston.format.timestamp(), winston.format.json() )
上面这个例子,把时间戳加到日志中,并把日志格式化成 JSON 字符串。
2. 日志级别
Winston 支持多种日志级别,比如 error
、warn
、info
、debug
等等。你可以根据日志的严重程度,选择不同的级别。在配置 Transport 时,你也可以指定 Transport 处理的最低日志级别:
new KafkaTransport({ level: 'info', // 只处理 info 级别及以上的日志 // ... })
3. 错误处理
在自定义 Transport 中,你需要考虑错误处理。比如,如果 Kafka 连接失败,或者发送消息失败,你应该怎么处理?你可以选择重试、记录错误日志、或者抛出异常。
4. 异步处理
为了不阻塞主线程,建议在 Transport 中使用异步操作。比如,在 log
方法中,你可以使用 async/await
来发送消息到 Kafka。
5. 日志轮转
如果你的日志量很大,你可能需要考虑日志轮转(log rotation)。Winston 本身不提供日志轮转功能,但你可以使用一些第三方的库,比如 winston-daily-rotate-file
,来实现日志轮转。
6. 日志聚合和分析
把日志发送到 Kafka 或者数据库,只是第一步。你还需要对日志进行聚合和分析。你可以使用一些开源工具,比如 ELK Stack(Elasticsearch、Logstash、Kibana),来实现日志的搜索、分析和可视化。
总结一下
今天,咱们一起学习了如何在 NestJS 中自定义 Winston Transport,把日志发送到 Kafka。这只是一个开始,你可以根据自己的需求,定制更强大的日志处理方案。记住,日志是系统的“眼睛”,用好日志,可以帮助你更好地了解系统的运行状态,及时发现和解决问题。
希望这篇文章对你有帮助!如果你有任何问题,或者想分享你的经验,欢迎在评论区留言,咱们一起交流学习!
扩展:RabbitMQ 和数据库
除了 Kafka,你还可以使用类似的方法,把日志发送到 RabbitMQ 或者数据库。只需要创建相应的 Transport 类,实现 log
方法即可。
- RabbitMQ:你可以使用
amqplib
这个库来连接 RabbitMQ。 - 数据库:你可以使用各种数据库的客户端库,比如
mongoose
(MongoDB)、pg
(PostgreSQL)等等。