본문으로 건너뛰기

CQRS Implementation Patterns

본 문서는 DTA Wide 플랫폼에서 사용하는 CQRS (Command Query Responsibility Segregation) 아키텍처 패턴의 구체적인 구현 방법과 모범 사례를 설명합니다.

목차

  1. 개요
  2. 핵심 원칙
  3. Command Handlers
  4. Query Handlers
  5. Repository Pattern
  6. Domain-Specific Error Handling
  7. DTO and Validation Pattern
  8. Controller Responsibilities
  9. Time Management with TimeMachine
  10. Testing Strategies

1. 개요

CQRS란?

CQRS는 시스템의 읽기(Query)와 쓰기(Command) 작업을 분리하는 아키텍처 패턴입니다. 이를 통해:

  • 책임의 명확한 분리: 데이터 변경과 조회 로직을 독립적으로 관리
  • 확장성: 읽기와 쓰기를 독립적으로 확장 가능
  • 유지보수성: 각 작업에 최적화된 모델 사용 가능

DTA Wide에서의 CQRS

All business logic must be implemented in libs/ using CQRS handlers:

  • Apps (apps/*/) 계층은 CommandBus와 QueryBus만 사용
  • Libs (libs/feature/*/) 계층에서 실제 비즈니스 로직 구현
  • Controllers는 Commands/Queries를 dispatch하며, 직접 서비스를 호출하지 않음

2. 핵심 원칙

구분CommandQuery
목적상태 변경 (Create, Update, Delete)데이터 조회 (Read)
반환값생성된 ID 또는 DTODTO / ReadModel
트랜잭션필수선택적 (Read Replica 활용 가능)
이벤트Domain Event 발행 가능발행 금지

Command Handlers

  • 비즈니스 워크플로우 오케스트레이션: 도메인 로직을 직접 구현하지 않음
  • 도메인 엔티티와 서비스에 위임: 비즈니스 규칙은 도메인 계층에서 처리
  • Repository 작업 조정: 데이터 영속성 관리
  • Result DTO 반환: 도메인 엔티티를 직접 반환하지 않음
  • 횡단 관심사 처리: 트랜잭션, 검증, 이벤트 발행 등

Query Handlers

  • 데이터 조회 오케스트레이션: 비즈니스 로직을 구현하지 않음
  • Repository와 Read Model에 위임: 데이터 접근 로직 분리
  • 프레젠테이션 로직 적용: 필터링, 정렬, 페이지네이션
  • Result DTO 반환: 도메인 엔티티를 직접 반환하지 않음
  • 캐싱 전략 적용: 성능 최적화

Usage Pattern

  • Apps는 CommandBus와 QueryBus만 사용
  • 다른 Libs는 Commands/Queries를 통해 도메인 로직 사용
  • Controllers는 Commands/Queries를 dispatch하며, 서비스를 직접 호출하지 않음
  • 직접 서비스 주입 금지: CQRS 패턴만 사용

3. Command Handlers

Command Handler 구현 규칙

  1. Handler에서 Repository는 Interface를 통해 주입 (DI)
  2. Aggregate의 factory method를 통해 도메인 객체 생성
  3. 저장 후 Domain Events를 EventBus로 발행

올바른 구현 패턴

// ✅ Correct: Domain entity with business logic
export class User {
constructor(public readonly id: string, public readonly email: string, private _name: string, public readonly createdAt: Date, private _updatedAt: Date) {}

// Domain logic in entity
updateName(newName: string): void {
if (!newName || newName.trim().length < 2) {
throw new InvalidUserNameException(newName);
}
this._name = newName.trim();
this._updatedAt = new Date();
}

get name(): string {
return this._name;
}
get updatedAt(): Date {
return this._updatedAt;
}
}

// Result DTO (returned by application services)
export class UserCreatedResultDto {
constructor(public readonly id: string, public readonly email: string, public readonly name: string, public readonly createdAt: string) {}
}

// Command Handler - orchestrates, doesn't implement business logic
@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
constructor(private readonly unitOfWork: UnitOfWork, private readonly eventBus: EventBus) {}

async execute(command: CreateUserCommand): Promise<UserCreatedResultDto> {
return await this.unitOfWork.execute(async (repositories) => {
// 1. Check business rules (delegate to domain)
const existingUser = await repositories.userRepository.findByEmail(command.email);
if (existingUser) {
throw new UserEmailAlreadyExistsException(command.email);
}

// 2. Create domain entity (business logic in domain)
const user = User.create(command.email, command.name); // Factory method with validation

// 3. Persist through repository
const savedUser = await repositories.userRepository.save(user);

// 4. Publish domain event
const event = new UserCreatedEvent(savedUser.id, savedUser.email, savedUser.name);
this.eventBus.publish(event);

// 5. Return DTO, not domain entity
return new UserCreatedResultDto(savedUser.id, savedUser.email, savedUser.name, savedUser.createdAt.toISOString());
});
}
}

잘못된 패턴

// ❌ Incorrect: Application service implementing business logic directly
@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
async execute(command: CreateUserCommand): Promise<User> {
// Wrong: business logic in application layer
if (!command.email.includes('@')) {
throw new Error('Invalid email');
}

// Wrong: returning domain entity
return new User(generateId(), command.email, command.name);
}
}

// ❌ Incorrect: Direct service injection in apps
@Controller()
export class UsersController {
constructor(private readonly usersService: UsersService) {} // Wrong!
}

4. Query Handlers

Query Handler 구현 규칙

  1. ReadRepository 또는 전용 ReadModel 사용
  2. Domain Event 발행 금지
  3. 상태 변경 로직 포함 금지

올바른 구현 패턴

// Query Handler - orchestrates data retrieval
@QueryHandler(GetUserByIdQuery)
export class GetUserByIdHandler {
constructor(private readonly userRepository: UserRepository) {}

async execute(query: GetUserByIdQuery): Promise<UserDetailDto> {
// 1. Delegate to repository
const user = await this.userRepository.findById(query.userId);
if (!user) {
throw new UserNotFoundException(query.userId);
}

// 2. Convert to DTO (presentation logic)
return new UserDetailDto(user.id, user.email, user.name, user.createdAt.toISOString(), user.updatedAt.toISOString());
}
}

Controller에서의 사용

@Controller('users')
export class UsersController {
constructor(private readonly commandBus: CommandBus, private readonly queryBus: QueryBus) {}

@Post()
async createUser(@Body() dto: CreateUserDto): Promise<UserCreatedResultDto> {
const command = new CreateUserCommand(dto.email, dto.name);
return this.commandBus.execute(command); // Returns DTO
}

@Get(':id')
async getUser(@Param('id') id: string): Promise<UserDetailDto> {
const query = new GetUserByIdQuery(id);
return this.queryBus.execute(query); // Returns DTO
}
}

5. Repository Pattern

Repository 인터페이스 (Domain Layer)

// ✅ Correct - Repository interface in domain layer
// libs/feature/user/src/lib/domain/repositories/user.repository.interface.ts
export interface UserRepository {
findById(id: string): Promise<User | null>;
findByEmail(email: string): Promise<User | null>;
save(user: User): Promise<User>;
delete(id: string): Promise<void>;
findManyByIds(ids: string[]): Promise<User[]>;
}

Domain Entity

// Domain entity
// libs/feature/user/src/lib/domain/entities/user.entity.ts
export class User {
constructor(public readonly id: string, public readonly email: string, public name: string, public readonly createdAt: Date, public updatedAt: Date) {}

updateName(newName: string): void {
this.name = newName;
this.updatedAt = new Date();
}
}

Infrastructure 구현

// Infrastructure implementation
// libs/feature/user/src/lib/infrastructure/repositories/prisma-user.repository.ts
@Injectable()
export class PrismaUserRepository implements UserRepository {
constructor(private readonly prisma: PrismaService) {}

async findById(id: string): Promise<User | null> {
const userData = await this.prisma.user.findUnique({ where: { id } });
return userData ? this.toDomainEntity(userData) : null;
}

async save(user: User): Promise<User> {
const userData = await this.prisma.user.upsert({
where: { id: user.id },
create: {
id: user.id,
email: user.email,
name: user.name,
createdAt: user.createdAt,
updatedAt: user.updatedAt,
},
update: {
name: user.name,
updatedAt: user.updatedAt,
},
});
return this.toDomainEntity(userData);
}

private toDomainEntity(userData: any): User {
return new User(userData.id, userData.email, userData.name, userData.createdAt, userData.updatedAt);
}
}

Transaction Handling

Option 1: Unit of Work Pattern (Clean Architecture)

export interface UnitOfWork {
execute<T>(work: (repositories: RepositoryContext) => Promise<T>): Promise<T>;
}

export interface RepositoryContext {
userRepository: UserRepository;
// other repositories...
}

@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
constructor(private readonly unitOfWork: UnitOfWork) {}

async execute(command: CreateUserCommand): Promise<UserCreatedResult> {
return await this.unitOfWork.execute(async (repositories) => {
const existingUser = await repositories.userRepository.findByEmail(command.email);
if (existingUser) {
throw new UserEmailAlreadyExistsException(command.email);
}

const user = new User(generateId(), command.email, command.name, new Date(), new Date());

const savedUser = await repositories.userRepository.save(user);
return new UserCreatedResult(savedUser);
});
}
}

// Infrastructure implementation of UnitOfWork
@Injectable()
export class PrismaUnitOfWork implements UnitOfWork {
constructor(private readonly prisma: PrismaService) {}

async execute<T>(work: (repositories: RepositoryContext) => Promise<T>): Promise<T> {
return this.prisma.$transaction(async (tx) => {
const repositories: RepositoryContext = {
userRepository: new PrismaUserRepository(tx),
// other repositories with transaction context...
};
return await work(repositories);
});
}
}

Option 2: Transactional Decorator (Pragmatic Approach)

@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
constructor(private readonly userRepository: UserRepository) {}

@Transactional()
async execute(command: CreateUserCommand): Promise<UserCreatedResult> {
const existingUser = await this.userRepository.findByEmail(command.email);
if (existingUser) {
throw new UserEmailAlreadyExistsException(command.email);
}

const user = new User(generateId(), command.email, command.name, new Date(), new Date());

const savedUser = await this.userRepository.save(user);
return new UserCreatedResult(savedUser);
}
}

Module Configuration

@Module({
providers: [
{
provide: 'UserRepository',
useClass: PrismaUserRepository,
},
],
})
export class FeatureUserModule {}

피해야 할 패턴

// ❌ Incorrect: Repository returning Prisma models directly
async findById(id: string): Promise<PrismaUser> { ... } // Wrong!

// ❌ Incorrect: Direct Prisma transaction in Command Handler
async execute(command: CreateUserCommand): Promise<UserCreatedResult> {
return this.prisma.$transaction(async (tx) => { ... }); // Wrong! Infrastructure concern
}

// ❌ Incorrect: Transaction in repository
async saveWithTransaction(user: User): Promise<User> {
return this.prisma.$transaction(async (tx) => { ... }); // Wrong layer!
}

6. Domain-Specific Error Handling

Domain Exceptions 정의

// ✅ Correct - Domain-specific exceptions
// libs/feature/user/src/lib/domain/exceptions/
export class UserNotFoundException extends DomainException {
constructor(userId: string) {
super(`User with ID ${userId} not found`, 'USER_NOT_FOUND', { userId });
}
}

export class UserEmailAlreadyExistsException extends DomainException {
constructor(email: string) {
super(`User with email ${email} already exists`, 'USER_EMAIL_EXISTS', { email });
}
}

Command Handler에서 사용

@CommandHandler(CreateUserCommand)
export class CreateUserHandler {
async execute(command: CreateUserCommand): Promise<UserCreatedResult> {
const existingUser = await this.userRepository.findByEmail(command.email);
if (existingUser) {
throw new UserEmailAlreadyExistsException(command.email);
}
// ...
}
}

Controller에서 HTTP 응답 매핑

@Controller('users')
export class UsersController {
@Post()
async createUser(@Body() dto: CreateUserDto) {
try {
const command = new CreateUserCommand(dto.email, dto.name);
return await this.commandBus.execute(command);
} catch (error) {
if (error instanceof UserEmailAlreadyExistsException) {
throw new ConflictException({
message: error.message,
code: error.code,
details: error.details,
});
}
// Map other domain exceptions...
throw error;
}
}
}

피해야 할 패턴

// ❌ Incorrect: Generic errors
throw new Error('Something went wrong'); // Never do this!
throw new HttpException('User not found', 404); // Wrong layer!

7. DTO and Validation Pattern

DTO 위치

  • DTOs는 apps/ 계층에만 정의: libs/에 정의하지 않음
  • class-validator 데코레이터 사용: 입력 검증
  • @ApiProperty 데코레이터 사용: Swagger 문서화
  • DTOs는 API 계약: 도메인 모델이 아님

DTO 예제

// ✅ Correct - DTOs in apps/dta-wide-api/src/app/users/dto/
import { ApiProperty } from '@nestjs/swagger';
import { IsEmail, IsString, MinLength, IsOptional } from 'class-validator';

export class CreateUserDto {
@ApiProperty({
description: 'User email address',
example: 'user@example.com',
})
@IsEmail({}, { message: 'Invalid email format' })
email: string;

@ApiProperty({
description: 'User full name',
example: 'John Doe',
})
@IsString()
@MinLength(2, { message: 'Name must be at least 2 characters' })
name: string;

@ApiProperty({
description: 'User timezone',
example: 'Europe/Berlin',
required: false,
})
@IsOptional()
@IsString()
timezone?: string;
}

export class UpdateUserDto {
@ApiProperty({
description: 'User full name',
example: 'Jane Doe',
required: false,
})
@IsOptional()
@IsString()
@MinLength(2)
name?: string;

@ApiProperty({
description: 'User timezone',
example: 'America/New_York',
required: false,
})
@IsOptional()
@IsString()
timezone?: string;
}

Controller에서 사용

@Controller('users')
export class UsersController {
@Post()
@ApiOperation({ summary: 'Create new user' })
@ApiResponse({ status: 201, description: 'User created successfully' })
async createUser(@Body() dto: CreateUserDto) {
const command = new CreateUserCommand(dto.email, dto.name, dto.timezone);
return this.commandBus.execute(command);
}
}

피해야 할 패턴

// ❌ Incorrect - DTOs in libs
// libs/feature/user/src/lib/dto/create-user.dto.ts - Wrong location!

8. Controller Responsibilities

Controller의 역할

  • Request validation: ValidationPipe를 사용한 입력 DTO 검증
  • Response formatting: Result DTO를 적절한 HTTP 응답으로 변환
  • REST conventions: RESTful API 디자인 원칙 준수
  • API documentation: Swagger 데코레이터를 사용한 문서화

올바른 Controller 패턴

// ✅ Correct controller pattern
@Controller('users')
@ApiTags('Users')
export class UsersController {
constructor(private readonly commandBus: CommandBus, private readonly queryBus: QueryBus) {}

@Post()
@HttpCode(HttpStatus.CREATED)
@ApiOperation({ summary: 'Create a new user' })
@ApiResponse({
status: 201,
description: 'User created successfully',
type: UserCreatedResultDto,
})
@ApiResponse({
status: 409,
description: 'User with email already exists',
})
async createUser(@Body() dto: CreateUserDto): Promise<UserCreatedResultDto> {
try {
const command = new CreateUserCommand(dto.email, dto.name, dto.timezone);
return await this.commandBus.execute(command);
} catch (error) {
// Map domain exceptions to HTTP responses
if (error instanceof UserEmailAlreadyExistsException) {
throw new ConflictException({
message: error.message,
code: error.code,
details: error.details,
});
}
throw error;
}
}

@Get(':id')
@ApiOperation({ summary: 'Get user by ID' })
@ApiParam({ name: 'id', description: 'User ID' })
@ApiResponse({
status: 200,
description: 'User found',
type: UserDetailDto,
})
@ApiResponse({
status: 404,
description: 'User not found',
})
async getUser(@Param('id') id: string): Promise<UserDetailDto> {
try {
const query = new GetUserByIdQuery(id);
return await this.queryBus.execute(query);
} catch (error) {
if (error instanceof UserNotFoundException) {
throw new NotFoundException({
message: error.message,
code: error.code,
details: error.details,
});
}
throw error;
}
}
}

피해야 할 패턴

// ❌ Incorrect: Business logic in controller
@Post()
async createUser(@Body() dto: CreateUserDto) {
// Wrong: validation logic in controller
if (!dto.email.includes('@')) {
throw new BadRequestException('Invalid email');
}

// Wrong: direct repository access
const user = await this.userRepository.save(dto); // Wrong!
}

// ❌ Incorrect: Missing Swagger documentation
@Get(':id')
async getUser(@Param('id') id: string) { // Missing @ApiOperation, @ApiResponse
return this.queryBus.execute(new GetUserByIdQuery(id));
}

9. Time Management with TimeMachine

핵심 원칙

  • 시스템 시간 직접 사용 금지: new Date(), Date.now() 사용 금지
  • TimeMachine 서비스 사용: 모든 시간 관련 작업에 사용
  • 사용자별 가상 시간: 각 사용자는 독립적인 가상 시간 관리
  • TimeMachine 통합 필수: 시간 정보가 필요한 모든 도메인에서 필요

잘못된 패턴 - 직접 시간 사용

// ❌ Incorrect - Direct system time usage
export class CreateUserHandler {
async execute(command: CreateUserCommand): Promise<UserCreatedResultDto> {
const user = new User(
generateId(),
command.email,
command.name,
new Date(), // Wrong! Never use system time directly
new Date() // Wrong! Never use system time directly
);
return await this.userRepository.save(user);
}
}

올바른 패턴 - TimeMachine 서비스 사용

// ✅ Correct - TimeMachine service usage
export class CreateUserHandler {
constructor(
private readonly unitOfWork: UnitOfWork,
private readonly timeMachineService: TimeMachineService // Inject TimeMachine
) {}

async execute(command: CreateUserCommand): Promise<UserCreatedResultDto> {
return await this.unitOfWork.execute(async (repositories) => {
// Get current virtual time for user (or system time if TimeMachine disabled)
const currentTime = await this.timeMachineService.getCurrentTime(command.userId);

const user = new User(
generateId(),
command.email,
command.name,
currentTime, // Use TimeMachine time
currentTime // Use TimeMachine time
);

const savedUser = await repositories.userRepository.save(user);
return new UserCreatedResultDto(savedUser.id, savedUser.email, savedUser.name, savedUser.createdAt.toISOString());
});
}
}

🚨 CRITICAL: systemCurrentTime 파라미터 패턴

모든 Query/Command Handler에서 시간을 사용할 때는 다음 패턴을 반드시 따라야 합니다:

올바른 패턴 - systemCurrentTime 파라미터

// Query/Command 클래스
export class GetDataQuery implements IQuery {
constructor(
public readonly userId: string,
public readonly systemCurrentTime: Date, // 🚨 필수! 직접 시간 생성 금지
public readonly otherParams: string
) {}
}

// Handler 클래스
@QueryHandler(GetDataQuery)
export class GetDataHandler {
constructor(
private readonly repository: DataRepository // 🚨 TimeMachineService 주입하지 마세요!
) {}

async execute(query: GetDataQuery): Promise<DataResult> {
// ✅ query에서 전달받은 systemCurrentTime 사용
const result = await this.repository.getData(
query.userId,
query.systemCurrentTime // 전달받은 시간 직접 사용
);
return result;
}
}

절대 금지되는 패턴들

// 🚨 절대 하지 마세요 - SOL BackTest가 불가능해집니다!
const currentTime = new Date(); // ❌ 직접 시간 생성
const now = Date.now(); // ❌ 현재 시간 직접 조회
const time = await this.timeMachineService.getCurrentTime(userId); // ❌ Handler에서 TimeMachine 호출

적용 범위

  • 모든 Query Handler - systemCurrentTime 파라미터 필수
  • 모든 Command Handler - systemCurrentTime 파라미터 필수
  • 모든 Repository 메서드 - currentTime 파라미터 필수
  • 모든 Domain Service - currentTime 파라미터 필수

목적과 이유

  1. SOL BackTest 지원: 과거 시점 기준으로 예측 모델 검증 가능
  2. TimeMachine 테스트: 가상 시간 기반 시나리오 테스트 지원
  3. 시간 일관성: 모든 시간 처리의 단일 진입점 보장
  4. 테스트 격리: 각 테스트에서 독립적인 시간 설정 가능

10. Testing Strategies

Unit Testing Command Handlers

describe('CreateUserHandler', () => {
let handler: CreateUserHandler;
let unitOfWork: jest.Mocked<UnitOfWork>;
let eventBus: jest.Mocked<EventBus>;

beforeEach(() => {
unitOfWork = createMock<UnitOfWork>();
eventBus = createMock<EventBus>();
handler = new CreateUserHandler(unitOfWork, eventBus);
});

it('should create user successfully', async () => {
const command = new CreateUserCommand('test@example.com', 'Test User');
const mockUser = new User('user-id', command.email, command.name, new Date(), new Date());

unitOfWork.execute.mockImplementation(async (work) => {
const mockRepos = {
userRepository: {
findByEmail: jest.fn().mockResolvedValue(null),
save: jest.fn().mockResolvedValue(mockUser),
},
};
return work(mockRepos as any);
});

const result = await handler.execute(command);

expect(result).toBeInstanceOf(UserCreatedResultDto);
expect(result.email).toBe(command.email);
expect(eventBus.publish).toHaveBeenCalledWith(
expect.objectContaining({
userId: mockUser.id,
email: mockUser.email,
})
);
});

it('should throw exception if email already exists', async () => {
const command = new CreateUserCommand('existing@example.com', 'Test User');
const existingUser = new User('existing-id', command.email, 'Existing', new Date(), new Date());

unitOfWork.execute.mockImplementation(async (work) => {
const mockRepos = {
userRepository: {
findByEmail: jest.fn().mockResolvedValue(existingUser),
},
};
return work(mockRepos as any);
});

await expect(handler.execute(command)).rejects.toThrow(UserEmailAlreadyExistsException);
});
});

Integration Testing with CQRS

describe('User CQRS Integration', () => {
let app: INestApplication;
let commandBus: CommandBus;
let queryBus: QueryBus;

beforeAll(async () => {
const moduleRef = await Test.createTestingModule({
imports: [AppModule],
}).compile();

app = moduleRef.createNestApplication();
await app.init();

commandBus = moduleRef.get<CommandBus>(CommandBus);
queryBus = moduleRef.get<QueryBus>(QueryBus);
});

it('should create and retrieve user', async () => {
// Create user via command
const createCommand = new CreateUserCommand('integration@test.com', 'Integration Test');
const createResult = await commandBus.execute(createCommand);

expect(createResult).toBeInstanceOf(UserCreatedResultDto);

// Retrieve user via query
const getQuery = new GetUserByIdQuery(createResult.id);
const getResult = await queryBus.execute(getQuery);

expect(getResult).toBeInstanceOf(UserDetailDto);
expect(getResult.email).toBe(createCommand.email);
expect(getResult.name).toBe(createCommand.name);
});
});

관련 문서

요약 체크리스트

Command Handlers

  • 비즈니스 로직을 도메인 계층에 위임
  • Repository를 통해 데이터 영속성 관리
  • Result DTO 반환 (도메인 엔티티 X)
  • 도메인 이벤트 발행
  • Unit of Work 또는 Transactional 데코레이터 사용

Query Handlers

  • Repository에 데이터 조회 위임
  • Result DTO 반환 (도메인 엔티티 X)
  • 캐싱 전략 적용 (필요시)
  • 프레젠테이션 로직만 포함

Repository

  • 도메인 계층에 인터페이스 정의
  • Infrastructure 계층에 구현
  • 도메인 객체 반환 (Prisma 모델 X)
  • 트랜잭션은 Command Handler에서 처리

Error Handling

  • Domain-specific exceptions 정의
  • Controller에서 HTTP 응답으로 매핑
  • 구조화된 에러 응답

DTOs

  • Apps 계층에만 정의
  • class-validator 사용
  • Swagger 문서화

Time Management

  • systemCurrentTime 파라미터 사용
  • Handler에서 시간 생성 금지
  • TimeMachine 서비스는 Controller/Service 계층에서만 호출