CQRS Implementation Patterns
본 문서는 DTA Wide 플랫폼에서 사용하는 CQRS (Command Query Responsibility Segregation) 아키텍처 패턴의 구체적인 구현 방법과 모범 사례를 설명합니다.
목차
- 개요
- 핵심 원칙
- Command Handlers
- Query Handlers
- Repository Pattern
- Domain-Specific Error Handling
- DTO and Validation Pattern
- Controller Responsibilities
- Time Management with TimeMachine
- Testing Strategies
1. 개요
CQRS란?
CQRS는 시스템의 읽기(Query)와 쓰기(Command) 작업을 분리하는 아키텍처 패턴입니다. 이를 통해:
- 책임의 명확한 분리: 데이터 변경과 조회 로직을 독립적으로 관리
- 확장성: 읽기와 쓰기를 독립적으로 확장 가능
- 유지보수성: 각 작업에 최적화된 모델 사용 가능
DTA Wide에서의 CQRS
All business logic must be implemented in libs/ using CQRS handlers:
- Apps (
apps/*/) 계층은 CommandBus와 QueryBus만 사용 - Libs (
libs/feature/*/) 계층에서 실제 비즈니스 로직 구현 - Controllers는 Commands/Queries를 dispatch하며, 직접 서비스를 호출하지 않음
2. 핵심 원칙
| 구분 | Command | Query |
|---|---|---|
| 목적 | 상태 변경 (Create, Update, Delete) | 데이터 조회 (Read) |
| 반환값 | 생성된 ID 또는 DTO | DTO / ReadModel |
| 트랜잭션 | 필수 | 선택적 (Read Replica 활용 가능) |
| 이벤트 | Domain Event 발행 가능 | 발행 금지 |
Command Handlers
- 비즈니스 워크플로우 오케스트레이션: 도메인 로직을 직접 구현하지 않음
- 도메인 엔티티와 서비스에 위임: 비즈니스 규칙은 도메인 계층에서 처리
- Repository 작업 조정: 데이터 영속성 관리
- Result DTO 반환: 도메인 엔티티를 직접 반환하지 않음
- 횡단 관심사 처리: 트랜잭션, 검증, 이벤트 발행 등
Query Handlers
- 데이터 조회 오케스트레이션: 비즈니스 로직을 구현하지 않음
- Repository와 Read Model에 위임: 데이터 접근 로직 분리
- 프레젠테이션 로직 적용: 필터링, 정렬, 페이지네이션
- Result DTO 반환: 도메인 엔티티를 직접 반환하지 않음
- 캐싱 전략 적용: 성능 최적화
Usage Pattern
- Apps는 CommandBus와 QueryBus만 사용
- 다른 Libs는 Commands/Queries를 통해 도메인 로직 사용
- Controllers는 Commands/Queries를 dispatch하며, 서비스를 직접 호출하지 않음
- 직접 서비스 주입 금지: CQRS 패턴만 사용
3. Command Handlers
Command Handler 구현 규칙
- Handler에서 Repository는 Interface를 통해 주입 (DI)
- Aggregate의 factory method를 통해 도메인 객체 생성
- 저장 후 Domain Events를 EventBus로 발행
올바른 구현 패턴
// ✅ Correct: Domain entity with business logic
export class User {
constructor(public readonly id: string, public readonly email: string, private _name: string, public readonly createdAt: Date, private _updatedAt: Date) {}
// Domain logic in entity
updateName(newName: string): void {
if (!newName || newName.trim().length < 2) {
throw new InvalidUserNameException(newName);
}
this._name = newName.trim();
this._updatedAt = new Date();
}
get name(): string {
return this._name;
}
get updatedAt(): Date {
return this._updatedAt;
}
}
// Result DTO (returned by application services)
export class UserCreatedResultDto {
constructor(public readonly id: string, public readonly email: string, public readonly name: string, public readonly createdAt: string) {}
}
// Command Handler - orchestrates, doesn't implement business logic
@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
constructor(private readonly unitOfWork: UnitOfWork, private readonly eventBus: EventBus) {}
async execute(command: CreateUserCommand): Promise<UserCreatedResultDto> {
return await this.unitOfWork.execute(async (repositories) => {
// 1. Check business rules (delegate to domain)
const existingUser = await repositories.userRepository.findByEmail(command.email);
if (existingUser) {
throw new UserEmailAlreadyExistsException(command.email);
}
// 2. Create domain entity (business logic in domain)
const user = User.create(command.email, command.name); // Factory method with validation
// 3. Persist through repository
const savedUser = await repositories.userRepository.save(user);
// 4. Publish domain event
const event = new UserCreatedEvent(savedUser.id, savedUser.email, savedUser.name);
this.eventBus.publish(event);
// 5. Return DTO, not domain entity
return new UserCreatedResultDto(savedUser.id, savedUser.email, savedUser.name, savedUser.createdAt.toISOString());
});
}
}
잘못된 패턴
// ❌ Incorrect: Application service implementing business logic directly
@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
async execute(command: CreateUserCommand): Promise<User> {
// Wrong: business logic in application layer
if (!command.email.includes('@')) {
throw new Error('Invalid email');
}
// Wrong: returning domain entity
return new User(generateId(), command.email, command.name);
}
}
// ❌ Incorrect: Direct service injection in apps
@Controller()
export class UsersController {
constructor(private readonly usersService: UsersService) {} // Wrong!
}
4. Query Handlers
Query Handler 구현 규칙
- ReadRepository 또는 전용 ReadModel 사용
- Domain Event 발행 금지
- 상태 변경 로직 포함 금지
올바른 구현 패턴
// Query Handler - orchestrates data retrieval
@QueryHandler(GetUserByIdQuery)
export class GetUserByIdHandler {
constructor(private readonly userRepository: UserRepository) {}
async execute(query: GetUserByIdQuery): Promise<UserDetailDto> {
// 1. Delegate to repository
const user = await this.userRepository.findById(query.userId);
if (!user) {
throw new UserNotFoundException(query.userId);
}
// 2. Convert to DTO (presentation logic)
return new UserDetailDto(user.id, user.email, user.name, user.createdAt.toISOString(), user.updatedAt.toISOString());
}
}
Controller에서의 사용
@Controller('users')
export class UsersController {
constructor(private readonly commandBus: CommandBus, private readonly queryBus: QueryBus) {}
@Post()
async createUser(@Body() dto: CreateUserDto): Promise<UserCreatedResultDto> {
const command = new CreateUserCommand(dto.email, dto.name);
return this.commandBus.execute(command); // Returns DTO
}
@Get(':id')
async getUser(@Param('id') id: string): Promise<UserDetailDto> {
const query = new GetUserByIdQuery(id);
return this.queryBus.execute(query); // Returns DTO
}
}
5. Repository Pattern
Repository 인터페이스 (Domain Layer)
// ✅ Correct - Repository interface in domain layer
// libs/feature/user/src/lib/domain/repositories/user.repository.interface.ts
export interface UserRepository {
findById(id: string): Promise<User | null>;
findByEmail(email: string): Promise<User | null>;
save(user: User): Promise<User>;
delete(id: string): Promise<void>;
findManyByIds(ids: string[]): Promise<User[]>;
}
Domain Entity
// Domain entity
// libs/feature/user/src/lib/domain/entities/user.entity.ts
export class User {
constructor(public readonly id: string, public readonly email: string, public name: string, public readonly createdAt: Date, public updatedAt: Date) {}
updateName(newName: string): void {
this.name = newName;
this.updatedAt = new Date();
}
}
Infrastructure 구현
// Infrastructure implementation
// libs/feature/user/src/lib/infrastructure/repositories/prisma-user.repository.ts
@Injectable()
export class PrismaUserRepository implements UserRepository {
constructor(private readonly prisma: PrismaService) {}
async findById(id: string): Promise<User | null> {
const userData = await this.prisma.user.findUnique({ where: { id } });
return userData ? this.toDomainEntity(userData) : null;
}
async save(user: User): Promise<User> {
const userData = await this.prisma.user.upsert({
where: { id: user.id },
create: {
id: user.id,
email: user.email,
name: user.name,
createdAt: user.createdAt,
updatedAt: user.updatedAt,
},
update: {
name: user.name,
updatedAt: user.updatedAt,
},
});
return this.toDomainEntity(userData);
}
private toDomainEntity(userData: any): User {
return new User(userData.id, userData.email, userData.name, userData.createdAt, userData.updatedAt);
}
}
Transaction Handling
Option 1: Unit of Work Pattern (Clean Architecture)
export interface UnitOfWork {
execute<T>(work: (repositories: RepositoryContext) => Promise<T>): Promise<T>;
}
export interface RepositoryContext {
userRepository: UserRepository;
// other repositories...
}
@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
constructor(private readonly unitOfWork: UnitOfWork) {}
async execute(command: CreateUserCommand): Promise<UserCreatedResult> {
return await this.unitOfWork.execute(async (repositories) => {
const existingUser = await repositories.userRepository.findByEmail(command.email);
if (existingUser) {
throw new UserEmailAlreadyExistsException(command.email);
}
const user = new User(generateId(), command.email, command.name, new Date(), new Date());
const savedUser = await repositories.userRepository.save(user);
return new UserCreatedResult(savedUser);
});
}
}
// Infrastructure implementation of UnitOfWork
@Injectable()
export class PrismaUnitOfWork implements UnitOfWork {
constructor(private readonly prisma: PrismaService) {}
async execute<T>(work: (repositories: RepositoryContext) => Promise<T>): Promise<T> {
return this.prisma.$transaction(async (tx) => {
const repositories: RepositoryContext = {
userRepository: new PrismaUserRepository(tx),
// other repositories with transaction context...
};
return await work(repositories);
});
}
}
Option 2: Transactional Decorator (Pragmatic Approach)
@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
constructor(private readonly userRepository: UserRepository) {}
@Transactional()
async execute(command: CreateUserCommand): Promise<UserCreatedResult> {
const existingUser = await this.userRepository.findByEmail(command.email);
if (existingUser) {
throw new UserEmailAlreadyExistsException(command.email);
}
const user = new User(generateId(), command.email, command.name, new Date(), new Date());
const savedUser = await this.userRepository.save(user);
return new UserCreatedResult(savedUser);
}
}
Module Configuration
@Module({
providers: [
{
provide: 'UserRepository',
useClass: PrismaUserRepository,
},
],
})
export class FeatureUserModule {}
피해야 할 패턴
// ❌ Incorrect: Repository returning Prisma models directly
async findById(id: string): Promise<PrismaUser> { ... } // Wrong!
// ❌ Incorrect: Direct Prisma transaction in Command Handler
async execute(command: CreateUserCommand): Promise<UserCreatedResult> {
return this.prisma.$transaction(async (tx) => { ... }); // Wrong! Infrastructure concern
}
// ❌ Incorrect: Transaction in repository
async saveWithTransaction(user: User): Promise<User> {
return this.prisma.$transaction(async (tx) => { ... }); // Wrong layer!
}
6. Domain-Specific Error Handling
Domain Exceptions 정의
// ✅ Correct - Domain-specific exceptions
// libs/feature/user/src/lib/domain/exceptions/
export class UserNotFoundException extends DomainException {
constructor(userId: string) {
super(`User with ID ${userId} not found`, 'USER_NOT_FOUND', { userId });
}
}
export class UserEmailAlreadyExistsException extends DomainException {
constructor(email: string) {
super(`User with email ${email} already exists`, 'USER_EMAIL_EXISTS', { email });
}
}
Command Handler에서 사용
@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
async execute(command: CreateUserCommand): Promise<UserCreatedResult> {
const existingUser = await this.userRepository.findByEmail(command.email);
if (existingUser) {
throw new UserEmailAlreadyExistsException(command.email);
}
// ...
}
}
Controller에서 HTTP 응답 매핑
@Controller('users')
export class UsersController {
@Post()
async createUser(@Body() dto: CreateUserDto) {
try {
const command = new CreateUserCommand(dto.email, dto.name);
return await this.commandBus.execute(command);
} catch (error) {
if (error instanceof UserEmailAlreadyExistsException) {
throw new ConflictException({
message: error.message,
code: error.code,
details: error.details,
});
}
// Map other domain exceptions...
throw error;
}
}
}
피해야 할 패턴
// ❌ Incorrect: Generic errors
throw new Error('Something went wrong'); // Never do this!
throw new HttpException('User not found', 404); // Wrong layer!
7. DTO and Validation Pattern
DTO 위치
- DTOs는
apps/계층에만 정의:libs/에 정의하지 않음 - class-validator 데코레이터 사용: 입력 검증
- @ApiProperty 데코레이터 사용: Swagger 문서화
- DTOs는 API 계약: 도메인 모델이 아님
DTO 예제
// ✅ Correct - DTOs in apps/dta-wide-api/src/app/users/dto/
import { ApiProperty } from '@nestjs/swagger';
import { IsEmail, IsString, MinLength, IsOptional } from 'class-validator';
export class CreateUserDto {
@ApiProperty({
description: 'User email address',
example: 'user@example.com',
})
@IsEmail({}, { message: 'Invalid email format' })
email: string;
@ApiProperty({
description: 'User full name',
example: 'John Doe',
})
@IsString()
@MinLength(2, { message: 'Name must be at least 2 characters' })
name: string;
@ApiProperty({
description: 'User timezone',
example: 'Europe/Berlin',
required: false,
})
@IsOptional()
@IsString()
timezone?: string;
}
export class UpdateUserDto {
@ApiProperty({
description: 'User full name',
example: 'Jane Doe',
required: false,
})
@IsOptional()
@IsString()
@MinLength(2)
name?: string;
@ApiProperty({
description: 'User timezone',
example: 'America/New_York',
required: false,
})
@IsOptional()
@IsString()
timezone?: string;
}
Controller에서 사용
@Controller('users')
export class UsersController {
@Post()
@ApiOperation({ summary: 'Create new user' })
@ApiResponse({ status: 201, description: 'User created successfully' })
async createUser(@Body() dto: CreateUserDto) {
const command = new CreateUserCommand(dto.email, dto.name, dto.timezone);
return this.commandBus.execute(command);
}
}
피해야 할 패턴
// ❌ Incorrect - DTOs in libs
// libs/feature/user/src/lib/dto/create-user.dto.ts - Wrong location!
8. Controller Responsibilities
Controller의 역할
- Request validation: ValidationPipe를 사용한 입력 DTO 검증
- Response formatting: Result DTO를 적절한 HTTP 응답으로 변환
- REST conventions: RESTful API 디자인 원칙 준수
- API documentation: Swagger 데코레이터를 사용한 문서화
올바른 Controller 패턴
// ✅ Correct controller pattern
@Controller('users')
@ApiTags('Users')
export class UsersController {
constructor(private readonly commandBus: CommandBus, private readonly queryBus: QueryBus) {}
@Post()
@HttpCode(HttpStatus.CREATED)
@ApiOperation({ summary: 'Create a new user' })
@ApiResponse({
status: 201,
description: 'User created successfully',
type: UserCreatedResultDto,
})
@ApiResponse({
status: 409,
description: 'User with email already exists',
})
async createUser(@Body() dto: CreateUserDto): Promise<UserCreatedResultDto> {
try {
const command = new CreateUserCommand(dto.email, dto.name, dto.timezone);
return await this.commandBus.execute(command);
} catch (error) {
// Map domain exceptions to HTTP responses
if (error instanceof UserEmailAlreadyExistsException) {
throw new ConflictException({
message: error.message,
code: error.code,
details: error.details,
});
}
throw error;
}
}
@Get(':id')
@ApiOperation({ summary: 'Get user by ID' })
@ApiParam({ name: 'id', description: 'User ID' })
@ApiResponse({
status: 200,
description: 'User found',
type: UserDetailDto,
})
@ApiResponse({
status: 404,
description: 'User not found',
})
async getUser(@Param('id') id: string): Promise<UserDetailDto> {
try {
const query = new GetUserByIdQuery(id);
return await this.queryBus.execute(query);
} catch (error) {
if (error instanceof UserNotFoundException) {
throw new NotFoundException({
message: error.message,
code: error.code,
details: error.details,
});
}
throw error;
}
}
}
피해야 할 패턴
// ❌ Incorrect: Business logic in controller
@Post()
async createUser(@Body() dto: CreateUserDto) {
// Wrong: validation logic in controller
if (!dto.email.includes('@')) {
throw new BadRequestException('Invalid email');
}
// Wrong: direct repository access
const user = await this.userRepository.save(dto); // Wrong!
}
// ❌ Incorrect: Missing Swagger documentation
@Get(':id')
async getUser(@Param('id') id: string) { // Missing @ApiOperation, @ApiResponse
return this.queryBus.execute(new GetUserByIdQuery(id));
}
9. Time Management with TimeMachine
핵심 원칙
- 시스템 시간 직접 사용 금지:
new Date(),Date.now()사용 금지 - TimeMachine 서비스 사용: 모든 시간 관련 작업에 사용
- 사용자별 가상 시간: 각 사용자는 독립적인 가상 시간 관리
- TimeMachine 통합 필수: 시간 정보가 필요한 모든 도메인에서 필요
잘못된 패턴 - 직접 시간 사용
// ❌ Incorrect - Direct system time usage
export class CreateUserHandler {
async execute(command: CreateUserCommand): Promise<UserCreatedResultDto> {
const user = new User(
generateId(),
command.email,
command.name,
new Date(), // Wrong! Never use system time directly
new Date() // Wrong! Never use system time directly
);
return await this.userRepository.save(user);
}
}
올바른 패턴 - TimeMachine 서비스 사용
// ✅ Correct - TimeMachine service usage
export class CreateUserHandler {
constructor(
private readonly unitOfWork: UnitOfWork,
private readonly timeMachineService: TimeMachineService // Inject TimeMachine
) {}
async execute(command: CreateUserCommand): Promise<UserCreatedResultDto> {
return await this.unitOfWork.execute(async (repositories) => {
// Get current virtual time for user (or system time if TimeMachine disabled)
const currentTime = await this.timeMachineService.getCurrentTime(command.userId);
const user = new User(
generateId(),
command.email,
command.name,
currentTime, // Use TimeMachine time
currentTime // Use TimeMachine time
);
const savedUser = await repositories.userRepository.save(user);
return new UserCreatedResultDto(savedUser.id, savedUser.email, savedUser.name, savedUser.createdAt.toISOString());
});
}
}
🚨 CRITICAL: systemCurrentTime 파라미터 패턴
모든 Query/Command Handler에서 시간을 사용할 때는 다음 패턴을 반드시 따라야 합니다:
올바른 패턴 - systemCurrentTime 파라미터
// Query/Command 클래스
export class GetDataQuery implements IQuery {
constructor(
public readonly userId: string,
public readonly systemCurrentTime: Date, // 🚨 필수! 직접 시간 생성 금지
public readonly otherParams: string
) {}
}
// Handler 클래스
@QueryHandler(GetDataQuery)
export class GetDataHandler {
constructor(
private readonly repository: DataRepository // 🚨 TimeMachineService 주입하지 마세요!
) {}
async execute(query: GetDataQuery): Promise<DataResult> {
// ✅ query에서 전달받은 systemCurrentTime 사용
const result = await this.repository.getData(
query.userId,
query.systemCurrentTime // 전달받은 시간 직접 사용
);
return result;
}
}
절대 금지되는 패턴들
// 🚨 절대 하지 마세요 - SOL BackTest가 불가능해집니다!
const currentTime = new Date(); // ❌ 직접 시간 생성
const now = Date.now(); // ❌ 현재 시간 직접 조회
const time = await this.timeMachineService.getCurrentTime(userId); // ❌ Handler에서 TimeMachine 호출
적용 범위
- 모든 Query Handler - systemCurrentTime 파라미터 필수
- 모든 Command Handler - systemCurrentTime 파라미터 필수
- 모든 Repository 메서드 - currentTime 파라미터 필수
- 모든 Domain Service - currentTime 파라미터 필수
목적과 이유
- SOL BackTest 지원: 과거 시점 기준으로 예측 모델 검증 가능
- TimeMachine 테스트: 가상 시간 기반 시나리오 테스트 지원
- 시간 일관성: 모든 시간 처리의 단일 진입점 보장
- 테스트 격리: 각 테스트에서 독립적인 시간 설정 가능
10. Testing Strategies
Unit Testing Command Handlers
describe('CreateUserHandler', () => {
let handler: CreateUserHandler;
let unitOfWork: jest.Mocked<UnitOfWork>;
let eventBus: jest.Mocked<EventBus>;
beforeEach(() => {
unitOfWork = createMock<UnitOfWork>();
eventBus = createMock<EventBus>();
handler = new CreateUserHandler(unitOfWork, eventBus);
});
it('should create user successfully', async () => {
const command = new CreateUserCommand('test@example.com', 'Test User');
const mockUser = new User('user-id', command.email, command.name, new Date(), new Date());
unitOfWork.execute.mockImplementation(async (work) => {
const mockRepos = {
userRepository: {
findByEmail: jest.fn().mockResolvedValue(null),
save: jest.fn().mockResolvedValue(mockUser),
},
};
return work(mockRepos as any);
});
const result = await handler.execute(command);
expect(result).toBeInstanceOf(UserCreatedResultDto);
expect(result.email).toBe(command.email);
expect(eventBus.publish).toHaveBeenCalledWith(
expect.objectContaining({
userId: mockUser.id,
email: mockUser.email,
})
);
});
it('should throw exception if email already exists', async () => {
const command = new CreateUserCommand('existing@example.com', 'Test User');
const existingUser = new User('existing-id', command.email, 'Existing', new Date(), new Date());
unitOfWork.execute.mockImplementation(async (work) => {
const mockRepos = {
userRepository: {
findByEmail: jest.fn().mockResolvedValue(existingUser),
},
};
return work(mockRepos as any);
});
await expect(handler.execute(command)).rejects.toThrow(UserEmailAlreadyExistsException);
});
});
Integration Testing with CQRS
describe('User CQRS Integration', () => {
let app: INestApplication;
let commandBus: CommandBus;
let queryBus: QueryBus;
beforeAll(async () => {
const moduleRef = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = moduleRef.createNestApplication();
await app.init();
commandBus = moduleRef.get<CommandBus>(CommandBus);
queryBus = moduleRef.get<QueryBus>(QueryBus);
});
it('should create and retrieve user', async () => {
// Create user via command
const createCommand = new CreateUserCommand('integration@test.com', 'Integration Test');
const createResult = await commandBus.execute(createCommand);
expect(createResult).toBeInstanceOf(UserCreatedResultDto);
// Retrieve user via query
const getQuery = new GetUserByIdQuery(createResult.id);
const getResult = await queryBus.execute(getQuery);
expect(getResult).toBeInstanceOf(UserDetailDto);
expect(getResult.email).toBe(createCommand.email);
expect(getResult.name).toBe(createCommand.name);
});
});
관련 문서
- Distributed Event CQRS Pattern - 분산 이벤트 기반 CQRS 연동
- Event System Architecture - 이벤트 시스템 아키텍처
- Domain-Driven Design Principles - DDD 원칙
- API Implementation Guidelines - API 구현 가이드
- Testing Guidelines - 테스트 전략
요약 체크리스트
Command Handlers
- 비즈니스 로직을 도메인 계층에 위임
- Repository를 통해 데이터 영속성 관리
- Result DTO 반환 (도메인 엔티티 X)
- 도메인 이벤트 발행
- Unit of Work 또는 Transactional 데코레이터 사용
Query Handlers
- Repository에 데이터 조회 위임
- Result DTO 반환 (도메인 엔티티 X)
- 캐싱 전략 적용 (필요시)
- 프레젠테이션 로직만 포함
Repository
- 도메인 계층에 인터페이스 정의
- Infrastructure 계층에 구현
- 도메인 객체 반환 (Prisma 모델 X)
- 트랜잭션은 Command Handler에서 처리
Error Handling
- Domain-specific exceptions 정의
- Controller에서 HTTP 응답으로 매핑
- 구조화된 에러 응답
DTOs
- Apps 계층에만 정의
- class-validator 사용
- Swagger 문서화
Time Management
- systemCurrentTime 파라미터 사용
- Handler에서 시간 생성 금지
- TimeMachine 서비스는 Controller/Service 계층에서만 호출