WEBKT

Salesforce Platform Events 高级应用:解耦大规模异步处理,提升系统弹性和扩展性

12 0 0 0

为什么选择 Platform Events 解耦复杂异步任务?

场景实战:使用 Batch Apex 和 Platform Events 实现数据清洗

设计高效且可靠的 Platform Events

限制与注意事项

结论:拥抱事件驱动,构建弹性未来

在复杂的 Salesforce 应用场景中,尤其是处理海量数据或涉及多个系统交互时,同步处理往往会遇到性能瓶颈和 governor limits 挑战。异步处理是必然选择,而 Salesforce Platform Events 提供了一种强大且灵活的事件驱动架构 (Event-Driven Architecture, EDA) 机制,特别适合解耦复杂的大规模异步流程,显著提升系统的弹性和可扩展性。

想象一下这样一个场景:你需要对数百万条联系人记录进行数据清洗。这个清洗过程可能涉及多个步骤:

  1. 验证地址有效性:调用外部地址验证服务。
  2. 标准化电话号码格式:根据区域规则统一格式。
  3. 查找并合并重复记录:基于特定规则识别潜在重复项。
  4. 更新相关业务机会:如果联系人信息变更,可能需要更新关联的业务机会状态或备注。
  5. 通知外部营销系统:将清洗后的数据同步到外部营销平台。
  6. 记录审计日志:详细记录清洗过程和结果。

如果试图在一个单一的 Batch Apex 作业中完成所有这些步骤,代码会变得极其复杂、难以维护,并且很容易因为某个步骤的失败或超时导致整个批次处理失败。更糟糕的是,不同步骤可能由不同的团队负责,或者需要不同的处理速度和资源。

这时候,Platform Events 就闪亮登场了。

为什么选择 Platform Events 解耦复杂异步任务?

相比于传统的 Future 方法、Queueable Apex 或单一的 Batch Apex,Platform Events 在处理这类复杂、多步骤的异步场景时具有明显优势:

  1. 高度解耦 (Decoupling):事件发布者(Publisher)和订阅者(Subscriber)之间没有直接依赖。发布者只需定义好事件契约(事件结构)并发布事件,不关心谁来处理、如何处理。订阅者也只关心自己感兴趣的事件,独立执行逻辑。
    • 思考:这意味着你可以独立地添加、修改或移除订阅者逻辑,而无需改动发布者或其他订阅者。例如,未来需要增加一个新的“同步到数据仓库”的步骤,只需创建一个新的订阅者即可,对现有流程零影响。
  2. 可扩展性 (Scalability):多个不同的订阅者可以并行处理同一个事件。Salesforce 平台会自动处理事件分发和订阅者触发。对于需要大量计算或调用外部系统的步骤,可以将其拆分为独立的订阅者,避免相互阻塞。
  3. 弹性与韧性 (Resilience)
    • 隔离失败:某个订阅者的处理失败不会影响其他订阅者对同一事件的处理。
    • 重试机制:Platform Events 具有内置的持久性和 Replay ID 机制(特别是 High-Volume Events 保留 72 小时),允许订阅者在失败后从上次停止的地方重新处理事件(需要自行实现重试逻辑)。
  4. 多样的订阅者类型:Platform Events 可以被多种类型的订阅者消费:
    • Apex Triggers:用于执行复杂的、需要编程逻辑的处理。
    • Flows (Platform Event-Triggered Flow):用于声明式地处理事件,适合简单到中等复杂度的自动化逻辑。
    • External Systems (via CometD / Pub/Sub API):允许外部应用程序实时订阅 Salesforce 事件,实现系统集成。
    • Lightning Web Components (via empApi module):在前端实时响应事件。

场景实战:使用 Batch Apex 和 Platform Events 实现数据清洗

让我们回到数据清洗的例子,看看如何用 Platform Events 构建一个更优的解决方案。

架构设计:

  1. 定义 Platform Event: 创建一个名为 Data_Cleansing_Request__e 的 Platform Event。包含关键字段,例如:
    • Record_ID__c (Text): 需要清洗的记录 ID (如 Contact ID)。
    • Cleansing_Step__c (Picklist/Text): 可选字段,用于指示特定的清洗阶段或触发特定订阅者(如果需要更细粒度的控制)。
    • Source_System__c (Text): 触发清洗的来源(例如 'Batch Job', 'Manual Trigger')。
    • Additional_Context__c (Text/Long Text Area): 传递少量必要的上下文信息,避免订阅者重复查询。
  2. 发布者 (Publisher) - Batch Apex: 创建一个 Batch Apex 类,其 execute 方法查询需要清洗的记录,然后为每条(或每批)记录发布 Data_Cleansing_Request__e 事件。
  3. 订阅者 (Subscribers):
    • 地址验证订阅者 (Apex Trigger): 订阅 Data_Cleansing_Request__e,获取 Record_ID__c,查询 Contact 记录,调用外部地址验证 API,并将结果写回 Contact 记录或记录到自定义日志对象。
    • 电话号码标准化订阅者 (Flow): 创建一个 Platform Event-Triggered Flow,当 Data_Cleansing_Request__e 事件发生时触发。Flow 获取 Record_ID__c,查询 Contact 记录,使用 Flow 的内置函数或调用 Invocable Apex Action 来标准化电话号码,并更新记录。
    • 重复记录检测订阅者 (Apex Trigger): 可能需要更复杂的逻辑,订阅事件后,基于 Record_ID__c 和其他信息,执行重复规则匹配,并可能发布另一个事件(如 Potential_Duplicate_Found__e)或创建 Task。
    • 业务机会更新订阅者 (Apex Trigger/Flow): 订阅事件,查询相关业务机会并根据逻辑进行更新。
    • 外部营销系统同步订阅者 (External App / Middleware): 外部系统通过 Pub/Sub API 或 CometD 订阅 Data_Cleansing_Request__e,获取 Record_ID__c,然后通过 Salesforce API 查询最新的联系人信息并同步。
    • 审计日志订阅者 (Apex Trigger): 订阅事件,记录哪个记录被触发了清洗流程。

发布者实现 (Batch Apex 示例片段):

public class BatchDataCleanser implements Database.Batchable<sObject>, Database.Stateful, Database.RaisesPlatformEvents {

    public String query;
    private Integer recordsProcessed = 0;

    public BatchDataCleanser(String q) {
        this.query = q;
    }

    public Database.QueryLocator start(Database.BatchableContext BC) {
        return Database.getQueryLocator(query);
    }

    public void execute(Database.BatchableContext BC, List<Contact> scope) {
        List<Data_Cleansing_Request__e> eventsToPublish = new List<Data_Cleansing_Request__e>();

        for (Contact c : scope) {
            // 可以添加逻辑判断是否真的需要清洗
            if (needsCleansing(c)) { 
                Data_Cleansing_Request__e event = new Data_Cleansing_Request__e(
                    Record_ID__c = c.Id,
                    Source_System__c = 'Batch Job',
                    // 可以传递少量关键信息,避免订阅者查询
                    Additional_Context__c = JSON.serialize(new Map<String, String>{'LastName' => c.LastName, 'Email' => c.Email})
                );
                eventsToPublish.add(event);
            }
        }

        if (!eventsToPublish.isEmpty()) {
            // 发布事件
            // Database.SaveResult[] results = EventBus.publish(eventsToPublish);
            List<Database.SaveResult> results = EventBus.publish(eventsToPublish);

            // 可选:检查发布结果
            for (Database.SaveResult sr : results) {
                if (!sr.isSuccess()) {
                    for (Database.Error err : sr.getErrors()) {
                        System.debug('Error publishing event: ' + err.getStatusCode() + ' - ' + err.getMessage());
                        // 添加错误处理逻辑,例如记录失败的记录ID
                    }
                } else {
                    recordsProcessed++;
                }
            }
        }
    }

    public void finish(Database.BatchableContext BC) {
        System.debug('BatchDataCleanser finished. Total events published attempts (may include failures): ' + recordsProcessed);
        // 可以发送完成通知等
    }

    private Boolean needsCleansing(Contact c) {
        // 实现具体的判断逻辑,例如检查地址是否为空,电话格式是否不规范等
        return true; // 示例:假设都需要清洗
    }
}

订阅者实现 (Apex Trigger 示例片段):

trigger AddressValidationSubscriber on Data_Cleansing_Request__e (after insert) {

    // 最佳实践:将主要逻辑放在 Handler 类中
    AddressValidationHandler.handleEvents(Trigger.new);

}

public class AddressValidationHandler {

    @future // 使用 @future 或 Queueable 处理可能长时间运行的调用
    public static void handleEvents(List<Data_Cleansing_Request__e> events) {
        Set<Id> contactIds = new Set<Id>();
        Map<Id, Data_Cleansing_Request__e> eventMap = new Map<Id, Data_Cleansing_Request__e>();

        for (Data_Cleansing_Request__e event : events) {
            // 防止空指针,尽管 Record_ID__c 应该是必须的
            if (String.isNotBlank(event.Record_ID__c)) {
                contactIds.add(event.Record_ID__c);
                // 如果需要事件中的其他信息,可以存入 Map
                eventMap.put(event.Record_ID__c, event);
            }
        }

        if (contactIds.isEmpty()) {
            return;
        }

        // 批量查询联系人信息
        List<Contact> contactsToUpdate = [SELECT Id, MailingStreet, MailingCity, MailingState, MailingPostalCode, MailingCountry, Is_Address_Validated__c FROM Contact WHERE Id IN :contactIds];
        List<Contact> updatedContacts = new List<Contact>();

        for (Contact c : contactsToUpdate) {
            // 模拟调用外部地址验证服务
            // ExternalAddressService.ValidationResult result = ExternalAddressService.validate(c.MailingStreet, c.MailingCity, ...);
            Boolean isValid = callExternalValidationService(c); // 模拟调用

            if (isValid) {
                c.Is_Address_Validated__c = true;
                // 可能还需要更新地址字段为标准化后的值
                updatedContacts.add(c);
            } else {
                // 处理验证失败的情况,例如记录日志或创建 Task
                System.debug('Address validation failed for Contact ID: ' + c.Id);
                // 可以在这里实现重试逻辑,或者发布另一个“处理失败”事件
            }
        }

        if (!updatedContacts.isEmpty()) {
            try {
                update updatedContacts;
            } catch (DmlException e) {
                System.debug('Error updating contacts after address validation: ' + e.getMessage());
                // 重要的错误处理:记录失败的 ID,可能需要手动介入或触发重试
                // 避免吞噬异常,确保问题可见
            }
        }
    }

    // 模拟外部服务调用
    private static Boolean callExternalValidationService(Contact c) {
        // 实际应用中会是 HTTP Callout
        // 注意:如果在这里进行 Callout,handleEvents 方法必须是 @future 或 Queueable
        // 模拟随机成功/失败
        return Math.random() > 0.2;
    }
}

关键考量:

  • 幂等性 (Idempotency): 订阅者逻辑应该设计成幂等的。由于事件可能因为重试或其他原因被重复传递,处理逻辑需要能够安全地重复执行而不产生副作用。例如,在更新记录前检查状态,或者使用唯一的事务 ID 来防止重复处理。
  • 错误处理与重试: Apex Trigger 默认没有自动重试机制。如果订阅者逻辑失败(例如调用外部服务超时或 DML 异常),该事件的特定订阅者处理会失败。你需要:
    • 捕获异常: 在 Trigger Handler 中使用 try-catch 块。
    • 记录错误: 详细记录错误信息和失败的事件数据(ReplayId 非常重要)。
    • 实现重试:
      • 简单场景:可以在 catch 块中将任务放入 Queueable Apex 进行延时重试。
      • 复杂场景:可以记录失败信息到自定义对象,由另一个调度任务定期检查并重试。或者利用 Platform Cache 存储重试状态。
      • 毒丸消息 (Poison Pill Message): 设计一个最大重试次数,防止某个格式错误或持续失败的事件无限阻塞重试队列。
  • 异步执行限制: 从 Platform Event Trigger 调用 @futureQueueable Apex 时,需要注意它们共享异步执行限制。确保你的整体架构不会轻易超出限制。

设计高效且可靠的 Platform Events

要充分发挥 Platform Events 的威力,需要精心设计事件本身和处理流程。

  1. 事件载荷 (Payload) 设计: 这是艺术与科学的结合。
    • 平衡上下文与大小: Platform Event 消息体有大小限制(当前为 1MB)。
      • 最小化载荷: 只包含必要标识符(如 Record_ID__c)。订阅者接收到事件后,再根据 ID 查询所需数据。
        • 优点: 事件小,发布快,不易超限。
        • 缺点: 订阅者需要额外 SOQL 查询,增加数据库负载和处理时间。如果发布后、订阅者处理前,源记录被修改或删除,订阅者获取到的数据可能不是事件发生时的状态(这可能是好事也可能是坏事,取决于业务需求)。
      • 丰富载荷: 包含足够多的上下文信息,让订阅者无需额外查询即可处理。
        • 优点: 订阅者处理更快,减少 SOQL 查询。可以传递事件发生时的“快照”数据。
        • 缺点: 事件体积增大,可能接近或超过 1MB 限制,尤其是在批量发布时。发布者需要做更多工作来组装数据。
    • 建议: 对于需要传递复杂结构化数据的情况,可以将数据序列化为 JSON 字符串放入 Long Text Area 类型的字段。但仍需注意总大小。
    • 思考: “我传递的这个字段,订阅者是否真的总是需要?它是否可以通过 Record ID 轻易查询到?传递它带来的便利是否值得增加事件大小和发布者的复杂性?”
  2. 选择事件类型:Standard vs. High-Volume:
    • Standard Events: 发布行为类似于 DML 操作(事务性),发布后立即对订阅者可见。事件保留 24 小时。有更严格的发布限制。
    • High-Volume Events: 设计用于高吞吐量场景。发布是异步的(调用 EventBus.publish 后不保证立即持久化,但通常很快)。事件保留 72 小时,这对于实现可靠的重试和恢复至关重要。发布限制宽松得多。
    • 选择: 对于需要解耦、可能需要重试、或者发布量较大的大规模异步处理(如我们的数据清洗场景),强烈推荐使用 High-Volume Events
  3. 发布确认与可靠性: EventBus.publish() 方法返回 Database.SaveResult[]。检查 isSuccess() 来确认事件是否已成功提交到事件总线(对于 High-Volume,这意味着平台已接受请求,将在稍后异步持久化)。虽然不能保证 100% 不丢失(极端平台故障下),但这提供了很高的可靠性。对于关键业务,务必检查发布结果并记录失败。
  4. 利用 Replay ID: 每个发布的事件都有一个唯一的、递增的 ReplayId。订阅者可以(也应该)记录自己成功处理的最后一个 ReplayId。如果订阅者重启或从失败中恢复,它可以从这个 ReplayId 之后开始重新订阅和处理事件,避免遗漏或重复处理(配合幂等性设计)。
    • Apex Trigger: 在 Trigger.New 中的事件对象上可以访问 ReplayId
    • 外部订阅者: CometD/Pub/Sub API 明确支持基于 ReplayId 的订阅。

限制与注意事项

  • 发布限制: High-Volume Events 虽然限制宽松,但仍有每小时/每天的发布配额。在设计 Batch Apex 或其他发布逻辑时,要估算峰值事件量,确保不超过限制。可以通过调整 Batch Size、增加处理间隔等方式缓解。
  • 订阅者处理: Apex Trigger 订阅者运行在自己的事务中,并受制于所有 Apex Governor Limits。如果处理逻辑复杂或需要大量 DML/SOQL,很容易超限。这就是为什么通常推荐在 Trigger 中保持最小逻辑,并将重量级处理委托给 @futureQueueable Apex
  • 交付顺序: Salesforce 不保证 Platform Events 按发布的顺序交付给所有订阅者,甚至不保证单个订阅者在不同事务中按顺序接收它们。如果你的业务逻辑强依赖于事件顺序,Platform Events 可能不是最佳选择,或者你需要在订阅者端实现复杂的排序和状态管理逻辑。
  • 事务边界: 每个事件由其订阅者在一个独立的事务中处理。这意味着如果一个事件有 3 个 Apex Trigger 订阅者,会触发 3 个独立的事务。这提供了隔离性,但也意味着不能轻易实现跨多个订阅者逻辑的原子性操作。如果需要原子性,可能需要重新设计流程,或者引入更复杂的协调机制(例如,状态机模式,用自定义对象跟踪整体进度)。
  • 测试: 测试 Platform Event 触发的逻辑需要特殊考虑。使用 Test.startTest()Test.stopTest() 包围你的事件发布代码,然后在 Test.stopTest() 之后,使用 Test.getEventBus().deliver() 来强制同步交付事件,以便你的断言可以验证订阅者逻辑是否按预期执行。

结论:拥抱事件驱动,构建弹性未来

Salesforce Platform Events 是构建现代化、可扩展、高弹性 Salesforce 应用的关键工具。通过将复杂的大规模异步处理(如数据清洗、系统集成通知、复杂状态同步等)分解为发布者和多个独立订阅者的事件驱动模式,你可以:

  • 简化单个组件的逻辑,提高代码可维护性。
  • 增强系统的容错能力,单个环节的失败不影响整体流程。
  • 提升处理吞吐量和可扩展性,轻松应对不断增长的数据和业务需求。

虽然 Platform Events 的设计和实施需要仔细考虑事件结构、可靠性机制、错误处理和限制,但其带来的解耦、弹性和可扩展性优势,对于构建健壮的企业级 Salesforce 解决方案而言,是无价的。

下次当你面对一个需要跨多个步骤、涉及大量数据或需要与外部系统交互的复杂异步需求时,认真思考一下:Platform Events 是否能让你的架构更上一层楼?

Salesforce架构师老王 Salesforce Platform Events异步处理事件驱动架构Salesforce开发数据清洗

评论点评

打赏赞助
sponsor

感谢您的支持让我们更好的前行

分享

QRcode

https://www.webkt.com/article/8943