淡蓝十七 發表於 2026-3-4 10:59:00

PipelinR:在Java中实现优雅的CQRS架构

<blockquote>
<p>使用中介者模式轻松实现命令查询职责分离,构建高内聚、低耦合的应用系统</p>
</blockquote>
<hr>
<h1 id="一知识点回顾">一、知识点回顾</h1>
<h2 id="1-什么是cqrs">1. 什么是CQRS?</h2>
<p>CQRS是Command Query Responsibility Segregation的缩写,一般称作命令查询职责分离。从字面意思理解,就是将命令(写入)和查询(读取)的责任划分到不同的模型中。</p>
<p>对比一下常用的 CRUD 模式(创建-读取-更新-删除),通常我们会让用户界面与负责所有四种操作的数据存储交互。而 CQRS 则将这些操作分成两种模式,一种用于查询(又称 "R"),另一种用于命令(又称 "CUD")。</p>
<p><img src="https://qiniu-cdn.zhaorong.pro/images/CQRS-Diagram-e1598922649719.png" alt="" loading="lazy"></p>
<h2 id="2-cqrs的作用是什么">2. CQRS的作用是什么?</h2>
<p>CQRS将系统的写操作(命令)和读操作(查询)分离到不同的模型和数据存储中,从而实现读写分离,提高系统的性能、可扩展性和安全性,并使复杂业务逻辑(写端)和高效查询(读端)各自得到优化,降低系统复杂性。它允许为写操作设计严谨的领域模型,为读操作设计简单、只关注查询效率的数据模型(如专用视图或报表数据库),并可通过事件等机制保持最终一致性。</p>
<h2 id="3-cqrs-的优点">3. CQRS 的优点</h2>
<ul>
<li><strong>独立缩放。</strong> CQRS 使读取模型和写入模型能够独立缩放。 此方法可帮助最大程度地减少锁争用并提高负载下的系统性能。</li>
<li><strong>优化的数据架构。</strong> 读取操作可以使用针对查询进行优化的模式。 写入操作使用针对更新优化的模式。</li>
<li><strong>安全性。</strong> 通过分隔读取和写入,可以确保只有适当的域实体或操作有权对数据执行写入操作。</li>
<li><strong>关注点分离。</strong> 分离读取和写入责任会导致更简洁、更易于维护的模型。 写入端通常处理复杂的业务逻辑。 读取端可以保持简单且专注于查询效率。</li>
<li><strong>更简单的查询。</strong> 在读取数据库中存储具体化视图时,应用程序可以在查询时避免复杂的联接。</li>
</ul>
<h1 id="二关于pipelinr">二、关于PipelinR</h1>
<p>项目地址</p>
<blockquote>
<p>https://github.com/sizovs/PipelinR</p>
</blockquote>
<p>项目开发者在Github的介绍不多,关键是最后一句话:<strong>It's similar to a popular MediatR .NET library.</strong> 意思就是这个项目是参考着一个叫<em>MediatR</em>的.net库写的。关于MediatR我之前有两篇文章专门介绍过。</p>
<p>PipelinR(包括MediatR)提供了一种CQRS的实现方式,基于中介者模式实现进程内消息传递,用于解耦应用中的各个组件,支持请求/响应(一对一,有返回值)和发布/订阅(一对多,无返回值)两种消息模式。它们在内部提供管道行为 (Pipeline Behaviors),用于在消息处理前后插入自定义逻辑,如日志、验证、异常处理等。</p>
<blockquote>
<p>需要提醒的是,PipelinR并不是一个完整的CQRS框架,它只是一个中介者模式的具体实现方式,将调用方和处理方进行了解耦,而这种模式恰好可以用来在一个单体应用(或者是微服务的服务内部)中实现简单的CQRS。</p>
</blockquote>
<h1 id="三依赖安装和配置">三、依赖安装和配置</h1>
<h2 id="1-maven安装">1. Maven安装</h2>
<pre><code class="language-xml">&lt;dependency&gt;
&lt;groupId&gt;net.sizovs&lt;/groupId&gt;
&lt;artifactId&gt;pipelinr&lt;/artifactId&gt;
&lt;version&gt;0.11&lt;/version&gt;
&lt;/dependency&gt;
</code></pre>
<h2 id="2-gradle安装">2. Gradle安装</h2>
<pre><code>dependencies {
    compile 'net.sizovs:pipelinr:0.11'
}
</code></pre>
<h2 id="在spring项目中配置pipelinr">在Spring项目中配置PipelinR</h2>
<pre><code class="language-java">@Configuration
public class PipelinrConfiguration {

    @Bean
    Pipeline pipeline(ObjectProvider&lt;Command.Handler&gt; commandHandlers, ObjectProvider&lt;Notification.Handler&gt; notificationHandlers, ObjectProvider&lt;Command.Middleware&gt; middlewares) {
      return new Pipelinr()
          .with(commandHandlers::stream)
          .with(notificationHandlers::stream)
          .with(middlewares::orderedStream);
    }
}
</code></pre>
<h1 id="四核心组件">四、核心组件</h1>
<ul>
<li><code>Pipeline/Pipelinr</code>:Pipeline是消息和处理器之间的中介者,调用方向Pipeline发送消息,Pipeline收到消息后通过注册到Pipeline的中间件进行层层传递并最终抵达匹配的消息处理器进行处理。Pipelinr是Pipeline的默认实现。</li>
<li><code>Command&lt;R&gt;</code>:用于约定请求/响应模式的消息类型,泛型参数R是返回值的类型,如果不需要返回值,可以将R指定为Voidy。</li>
<li><code>Notification</code>:用于约定发布/订阅模式的消息类型,没有返回值,消息可以有多个处理器。</li>
<li><code>Middleware</code>:管道中间件,Command和Notification都定义了各自的中间件接口。Pipeline接收到的消息,在到达最终的处理器之前,会经过所有注册到Pipeline的中间。可以使用Middleware实现诸如日志记录、数据验证、开启事务等一系列操作。</li>
</ul>
<h1 id="五请求响应模式实现">五、请求/响应模式实现</h1>
<p>请求/响应模式需要用到Command接口。</p>
<h2 id="1-定义command">1. 定义Command</h2>
<p>Command代表一个请求,需要实现<code>net.sizovs.pipelinr.Command</code>接口。泛型参数指定返回值类型。</p>
<pre><code class="language-java">// 定义一个创建用户的命令
public class CreateUserCommand implements Command&lt;UserResponse&gt; {
    private String username;
    private String email;
   
    public CreateUserCommand(String username, String email) {
      this.username = username;
      this.email = email;
    }
   
    public String getUsername() {
      return username;
    }
   
    public String getEmail() {
      return email;
    }
}

// 返回值类型
public class UserResponse {
    private Long userId;
    private String username;
    private String email;
   
    public UserResponse(Long userId, String username, String email) {
      this.userId = userId;
      this.username = username;
      this.email = email;
    }
   
    // getters
}
</code></pre>
<h2 id="2-定义command-handler">2. 定义Command Handler</h2>
<p>创建该Command对应的处理器,实现<code>net.sizovs.pipelinr.Command.Handler</code>接口。</p>
<pre><code class="language-java">@Component
public class CreateUserCommandHandler implements Command.Handler&lt;CreateUserCommand, UserResponse&gt; {
   
    @Autowired
    private UserRepository userRepository;
   
    @Override
    public UserResponse handle(CreateUserCommand command) {
      // 业务逻辑处理
      User user = new User();
      user.setUsername(command.getUsername());
      user.setEmail(command.getEmail());
      
      User savedUser = userRepository.save(user);
      
      return new UserResponse(savedUser.getId(), savedUser.getUsername(), savedUser.getEmail());
    }
}
</code></pre>
<h2 id="3-在业务代码中使用">3. 在业务代码中使用</h2>
<p>通过注入Pipeline实例,发送Command并获取响应。</p>
<pre><code class="language-java">@Service
public class UserService {
   
    @Autowired
    private Pipeline pipeline;
   
    public UserResponse createUser(String username, String email) {
      CreateUserCommand command = new CreateUserCommand(username, email);
      UserResponse response = pipeline.send(command);
      return response;
    }
}
</code></pre>
<h2 id="4-添加command中间件">4. 添加Command中间件</h2>
<p>中间件可以在Command处理前后执行一些操作,如验证、日志、事务管理等。</p>
<pre><code class="language-java">@Component
public class LoggingMiddleware implements Command.Middleware {
   
    private static final Logger logger = LoggerFactory.getLogger(LoggingMiddleware.class);
   
    @Override
    public &lt;R, C extends Command&lt;R&gt;&gt; R invoke(C command, Chain&lt;R&gt; chain) {
      logger.info("Executing command: {}", command.getClass().getSimpleName());
      try {
            R result = chain.proceed(command);
            logger.info("Command executed successfully");
            return result;
      } catch (Exception e) {
            logger.error("Command execution failed", e);
            throw e;
      }
    }
}

@Component
public class ValidationMiddleware implements Command.Middleware {
   
    @Autowired
    private Validator validator;
   
    @Override
    public &lt;R, C extends Command&lt;R&gt;&gt; R invoke(C command, Chain&lt;R&gt; chain) {
      Set&lt;ConstraintViolation&lt;C&gt;&gt; violations = validator.validate(command);
      if (!violations.isEmpty()) {
            throw new ConstraintViolationException("Validation failed", violations);
      }
      return chain.proceed(command);
    }
}

@Component
@Order(1) // 指定中间件执行顺序
public class TransactionMiddleware implements Command.Middleware {
   
    @Autowired
    private PlatformTransactionManager transactionManager;
   
    @Override
    public &lt;R, C extends Command&lt;R&gt;&gt; R invoke(C command, Chain&lt;R&gt; chain) {
      TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());
      try {
            R result = chain.proceed(command);
            transactionManager.commit(status);
            return result;
      } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
      }
    }
}
</code></pre>
<h1 id="六发布订阅模式实现">六、发布/订阅模式实现</h1>
<p>发布/订阅模式使用Notification接口,用于一对多的消息分发,没有返回值。</p>
<h2 id="1-定义notification">1. 定义Notification</h2>
<p>Notification代表一个事件通知,需要实现<code>net.sizovs.pipelinr.Notification</code>接口。</p>
<pre><code class="language-java">// 定义一个用户创建成功的事件通知
public class UserCreatedNotification implements Notification {
    private Long userId;
    private String username;
    private String email;
    private LocalDateTime createdTime;
   
    public UserCreatedNotification(Long userId, String username, String email) {
      this.userId = userId;
      this.username = username;
      this.email = email;
      this.createdTime = LocalDateTime.now();
    }
   
    // getters
}
</code></pre>
<h2 id="2-定义notification-handler">2. 定义Notification Handler</h2>
<p>Notification可以有多个处理器,每个处理器实现<code>net.sizovs.pipelinr.Notification.Handler</code>接口。</p>
<pre><code class="language-java">@Component
public class SendWelcomeEmailHandler implements Notification.Handler&lt;UserCreatedNotification&gt; {
   
    private static final Logger logger = LoggerFactory.getLogger(SendWelcomeEmailHandler.class);
   
    @Autowired
    private EmailService emailService;
   
    @Override
    public void handle(UserCreatedNotification notification) {
      logger.info("Sending welcome email to user: {}", notification.getUsername());
      emailService.sendWelcomeEmail(notification.getEmail(), notification.getUsername());
    }
}

@Component
public class LogUserCreationHandler implements Notification.Handler&lt;UserCreatedNotification&gt; {
   
    private static final Logger logger = LoggerFactory.getLogger(LogUserCreationHandler.class);
   
    @Autowired
    private UserAuditLogRepository auditLogRepository;
   
    @Override
    public void handle(UserCreatedNotification notification) {
      logger.info("Logging user creation: {}", notification.getUsername());
      UserAuditLog auditLog = new UserAuditLog();
      auditLog.setUserId(notification.getUserId());
      auditLog.setOperation("CREATE");
      auditLog.setTimestamp(notification.getCreatedTime());
      auditLogRepository.save(auditLog);
    }
}

@Component
public class UpdateUserStatisticsHandler implements Notification.Handler&lt;UserCreatedNotification&gt; {
   
    private static final Logger logger = LoggerFactory.getLogger(UpdateUserStatisticsHandler.class);
   
    @Autowired
    private UserStatisticsRepository statisticsRepository;
   
    @Override
    public void handle(UserCreatedNotification notification) {
      logger.info("Updating statistics for new user: {}", notification.getUsername());
      UserStatistics stats = statisticsRepository.findOrCreate();
      stats.incrementTotalUsers();
      statisticsRepository.save(stats);
    }
}
</code></pre>
<h2 id="3-发送notification">3. 发送Notification</h2>
<p>在Command处理完成后,可以发送Notification通知所有相关的处理器。</p>
<pre><code class="language-java">@Component
public class CreateUserCommandHandler implements Command.Handler&lt;CreateUserCommand, UserResponse&gt; {
   
    @Autowired
    private UserRepository userRepository;
   
    @Autowired
    private Pipeline pipeline;
   
    @Override
    public UserResponse handle(CreateUserCommand command) {
      // 业务逻辑处理
      User user = new User();
      user.setUsername(command.getUsername());
      user.setEmail(command.getEmail());
      
      User savedUser = userRepository.save(user);
      
      // 发送事件通知
      UserCreatedNotification notification = new UserCreatedNotification(
            savedUser.getId(),
            savedUser.getUsername(),
            savedUser.getEmail()
      );
      pipeline.send(notification);
      
      return new UserResponse(savedUser.getId(), savedUser.getUsername(), savedUser.getEmail());
    }
}
</code></pre>
<h2 id="4-添加notification中间件">4. 添加Notification中间件</h2>
<p>类似Command,Notification也支持中间件。</p>
<pre><code class="language-java">@Component
public class NotificationLoggingMiddleware implements Notification.Middleware {
   
    private static final Logger logger = LoggerFactory.getLogger(NotificationLoggingMiddleware.class);
   
    @Override
    public &lt;N extends Notification&gt; void invoke(N notification, Chain chain) {
      logger.info("Publishing notification: {}", notification.getClass().getSimpleName());
      try {
            chain.proceed(notification);
            logger.info("Notification published successfully");
      } catch (Exception e) {
            logger.error("Notification publishing failed", e);
            throw e;
      }
    }
}

@Component
public class NotificationErrorHandlingMiddleware implements Notification.Middleware {
   
    private static final Logger logger = LoggerFactory.getLogger(NotificationErrorHandlingMiddleware.class);
   
    @Override
    public &lt;N extends Notification&gt; void invoke(N notification, Chain chain) {
      try {
            chain.proceed(notification);
      } catch (Exception e) {
            logger.error("Error handling notification: {}", notification.getClass().getSimpleName(), e);
            // 可以选择吞掉异常或重新抛出,取决于业务需求
            // throw e;
      }
    }
}
</code></pre>
<h1 id="七总结">七、总结</h1>
<h2 id="核心收获">核心收获</h2>
<p>通过本文的介绍,我们了解了如何在Java应用中使用PipelinR框架实现CQRS模式。核心要点总结如下:</p>
<h3 id="1-cqrs的价值">1. CQRS的价值</h3>
<ul>
<li><strong>读写分离</strong>:通过Command处理写操作,Notification处理事件响应,实现职责的明确划分</li>
<li><strong>独立优化</strong>:读端和写端可以独立优化,不同的数据模型适应不同的场景需求</li>
<li><strong>系统解耦</strong>:中介者模式解耦了调用方和处理方,提高了系统的可维护性和可扩展性</li>
</ul>
<h3 id="2-pipelinr的核心特性">2. PipelinR的核心特性</h3>
<ul>
<li><strong>轻量级实现</strong>:相比完整的CQRS框架,PipelinR更轻便,学习成本低</li>
<li><strong>灵活的管道机制</strong>:通过中间件可以方便地植入横切关注点(如日志、验证、事务等)</li>
<li><strong>支持两种消息模式</strong>:Command用于请求/响应,Notification用于发布/订阅</li>
</ul>
<h3 id="3-最佳实践建议">3. 最佳实践建议</h3>
<ul>
<li><strong>合理使用中间件</strong>:通过@Order注解控制中间件执行顺序,但要避免中间件层级过多导致性能问题</li>
<li><strong>异常处理</strong>:根据场景选择合适的异常处理策略,Notification可考虑不中断其他处理器的错误隔离</li>
<li><strong>事件驱动设计</strong>:充分利用Notification实现事件驱动架构,解耦不同的业务流程</li>
<li><strong>代码组织</strong>:按照Command、Handler、Middleware的划分方式组织代码,保持结构清晰</li>
</ul>
<h2 id="实施建议">实施建议</h2>
<h3 id="适用场景">适用场景</h3>
<ul>
<li>中等复杂度的业务系统,需要良好的代码结构和可维护性</li>
<li>业务逻辑相对复杂,需要事件驱动的系统设计</li>
<li>团队具备良好的DDD设计理念和架构意识</li>
</ul>
<h3 id="注意事项">注意事项</h3>
<ul>
<li><strong>学习曲线</strong>:虽然PipelinR本身简单,但要理解CQRS的设计理念需要一定时间</li>
<li><strong>适度使用</strong>:CQRS不是银弹,过度设计会增加系统复杂度,要根据实际需求决定是否引入</li>
<li><strong>团队协作</strong>:CQRS的有效实施对团队的整体架构意识和编码规范要求较高</li>
<li><strong>性能考虑</strong>:虽然使用了中介者模式会引入少量额外开销,但对大多数应用来说可以忽略不计</li>
</ul>
<h2 id="结论">结论</h2>
<p>PipelinR提供了一种轻量级、简洁的CQRS实现方案。它特别适合那些想要在不过度复杂化系统的前提下,引入DDD思想和事件驱动设计的项目。通过合理运用Command和Notification,结合恰当的中间件设计,开发者可以构建出高内聚、低耦合、易于维护和扩展的应用系统。</p>
<p>关键是要把握好"度"——既要充分发挥CQRS和PipelinR的优势,又要避免为了追求"高大上"的架构而过度设计,最终的目标是为业务的快速迭代和长期维护提供支撑。</p><br><br>
来源:https://www.cnblogs.com/zhaorong/p/19503542
頁: [1]
查看完整版本: PipelinR:在Java中实现优雅的CQRS架构