본문으로 건너뛰기

Outbox Pattern 구현 가이드

데이터베이스 변경과 이벤트 발행의 원자성 보장

📋 목차

  1. Outbox Pattern 개념
  2. DTx 플랫폼 적용
  3. 구현 아키텍처
  4. 단계별 구현
  5. 성능 최적화
  6. 모니터링
  7. 문제 해결

1. Outbox Pattern 개념

1.1 해결하는 문제

기존 문제: 데이터베이스 저장과 이벤트 발행이 별도의 트랜잭션

// ❌ 문제 있는 코드
async createUser(userData: CreateUserDto): Promise<User> {
// 1. DB에 사용자 저장
const user = await this.userRepository.save(userData);

// 2. 이벤트 발행 (별도 트랜잭션!)
await this.eventPublisher.publish('user.created', user);
// 만약 여기서 실패하면? 사용자는 생성되었지만 이벤트는 발행되지 않음!

return user;
}

Outbox Pattern 해결책: 단일 트랜잭션 내에서 데이터와 이벤트를 함께 저장

// ✅ Outbox Pattern 적용
async createUser(userData: CreateUserDto): Promise<User> {
return await this.prisma.$transaction(async (tx) => {
// 1. DB에 사용자 저장
const user = await tx.user.create({ data: userData });

// 2. 같은 트랜잭션에서 Outbox 이벤트 저장
await tx.outboxEvent.create({
data: {
eventType: 'user.created',
aggregateId: user.id,
payload: JSON.stringify(user),
status: 'PENDING'
}
});

return user;
});
// 별도 프로세스가 Outbox 테이블을 폴링하여 이벤트 발행
}

1.2 핵심 구성 요소


2. DTx 플랫폼 적용

2.1 적용 시나리오

사용자 등록 시 Outbox 적용

// 사용자 생성 + 프로필 초기화 이벤트
async registerUser(data: UserRegistrationDto): Promise<User> {
return await this.prisma.$transaction(async (tx) => {
// 1. 사용자 계정 생성
const user = await tx.user.create({
data: {
email: data.email,
accessCode: data.accessCode,
language: data.language
}
});

// 2. Outbox 이벤트들 생성 (단일 트랜잭션)
const events = [
{
eventType: 'user.created',
aggregateId: user.id,
payload: JSON.stringify({ userId: user.id, email: user.email }),
sagaTransactionId: data.sagaTransactionId
},
{
eventType: 'profile.initialization.requested',
aggregateId: user.id,
payload: JSON.stringify({ userId: user.id, timezone: data.timezone }),
sagaTransactionId: data.sagaTransactionId
}
];

for (const event of events) {
await tx.outboxEvent.create({ data: event });
}

return user;
});
}

수면 로그 처리 시 Outbox 적용

async processSleepLog(sleepData: CreateSleepLogDto): Promise<SleepLog> {
return await this.prisma.$transaction(async (tx) => {
// 1. 수면 로그 저장
const sleepLog = await tx.sleepLog.create({
data: {
userId: sleepData.userId,
bedtime: sleepData.bedtime,
wakeTime: sleepData.wakeTime,
quality: sleepData.quality
}
});

// 2. 분석 처리 이벤트 생성
await tx.outboxEvent.create({
data: {
eventType: 'sleep.analysis.requested',
aggregateId: sleepLog.id,
payload: JSON.stringify({
sleepLogId: sleepLog.id,
userId: sleepData.userId,
duration: sleepData.duration
}),
sagaTransactionId: sleepData.sagaTransactionId
}
});

return sleepLog;
});
}

2.2 이벤트 타입 정의

// src/outbox/types/outbox-events.types.ts
export interface OutboxEventPayload {
// 사용자 관련 이벤트
'user.created': {
userId: string;
email: string;
language: string;
};

'user.profile.initialized': {
userId: string;
profileId: string;
timezone: string;
};

// 수면 관련 이벤트
'sleep.log.created': {
sleepLogId: string;
userId: string;
bedtime: string;
wakeTime: string;
};

'sleep.analysis.completed': {
sleepLogId: string;
userId: string;
sleepScore: number;
recommendations: string[];
};

// 설문 관련 이벤트
'questionnaire.response.submitted': {
responseId: string;
userId: string;
questionnaireType: string;
totalScore: number;
};
}

export type OutboxEventType = keyof OutboxEventPayload;

3. 구현 아키텍처

3.1 데이터베이스 스키마

Prisma 스키마 정의

// prisma/schema.prisma
model OutboxEvent {
id String @id @default(cuid())
eventType String // 이벤트 타입
aggregateId String // 애그리게이트 ID
payload Json // 이벤트 페이로드
status OutboxStatus @default(PENDING)
retryCount Int @default(0)
sagaTransactionId String? // 연관된 Saga 트랜잭션 ID
correlationId String? // 요청 추적 ID

createdAt DateTime @default(now())
publishedAt DateTime?

@@map("outbox_events")
@@index([status, createdAt])
@@index([eventType, status])
}

enum OutboxStatus {
PENDING
PUBLISHED
FAILED
}

BigQuery 스키마 (장기 저장)

CREATE TABLE `distributed_transactions.outbox_events` (
event_id STRING NOT NULL,
transaction_id STRING,
event_type STRING NOT NULL,
aggregate_id STRING NOT NULL,
payload JSON NOT NULL,
status STRING NOT NULL,
retry_count INT64 DEFAULT 0,
correlation_id STRING,
environment STRING NOT NULL,
created_at TIMESTAMP NOT NULL,
published_at TIMESTAMP,
error_details JSON
)
PARTITION BY DATE(created_at)
CLUSTER BY status, event_type, environment;

3.2 핵심 서비스 구조

// src/outbox/outbox.module.ts
@Module({
imports: [
PrismaModule,
PubSubModule,
ScheduleModule.forRoot()
],
providers: [
OutboxService,
OutboxPublisher,
OutboxCleanupService,
OutboxMetricsService
],
exports: [OutboxService]
})
export class OutboxModule {}

4. 단계별 구현

4.1 Outbox 서비스 구현

기본 Outbox 서비스

// src/outbox/outbox.service.ts
@Injectable()
export class OutboxService {
constructor(
private readonly prisma: PrismaService,
private readonly logger: Logger
) {}

async createEvent<T extends OutboxEventType>(
eventType: T,
aggregateId: string,
payload: OutboxEventPayload[T],
options?: {
sagaTransactionId?: string;
correlationId?: string;
}
): Promise<void> {
await this.prisma.outboxEvent.create({
data: {
eventType,
aggregateId,
payload: payload as any,
sagaTransactionId: options?.sagaTransactionId,
correlationId: options?.correlationId || this.generateCorrelationId(),
status: 'PENDING'
}
});

this.logger.log(`Outbox event created: ${eventType} for aggregate: ${aggregateId}`);
}

async createMultipleEvents(
events: Array<{
eventType: OutboxEventType;
aggregateId: string;
payload: any;
sagaTransactionId?: string;
correlationId?: string;
}>
): Promise<void> {
const eventData = events.map(event => ({
eventType: event.eventType,
aggregateId: event.aggregateId,
payload: event.payload,
sagaTransactionId: event.sagaTransactionId,
correlationId: event.correlationId || this.generateCorrelationId(),
status: 'PENDING' as const
}));

await this.prisma.outboxEvent.createMany({
data: eventData
});

this.logger.log(`${events.length} outbox events created`);
}

async findPendingEvents(limit: number = 100): Promise<OutboxEvent[]> {
return await this.prisma.outboxEvent.findMany({
where: {
status: 'PENDING',
retryCount: { lt: 5 }
},
orderBy: { createdAt: 'asc' },
take: limit
});
}

async markAsPublished(eventId: string): Promise<void> {
await this.prisma.outboxEvent.update({
where: { id: eventId },
data: {
status: 'PUBLISHED',
publishedAt: new Date()
}
});
}

async incrementRetryCount(eventId: string): Promise<void> {
await this.prisma.outboxEvent.update({
where: { id: eventId },
data: { retryCount: { increment: 1 } }
});
}

private generateCorrelationId(): string {
return `${Date.now()}-${Math.random().toString(36).substring(2)}`;
}
}

4.2 Outbox Publisher 구현

스케줄러 기반 이벤트 발행

// src/outbox/outbox-publisher.service.ts
@Injectable()
export class OutboxPublisher {
constructor(
private readonly outboxService: OutboxService,
private readonly pubSubService: PubSubService,
private readonly metricsService: OutboxMetricsService,
private readonly logger: Logger
) {}

@Cron('*/10 * * * * *') // 10초마다 실행
async publishPendingEvents(): Promise<void> {
const startTime = Date.now();

try {
const pendingEvents = await this.outboxService.findPendingEvents(50);

if (pendingEvents.length === 0) {
return;
}

this.logger.log(`Publishing ${pendingEvents.length} pending events`);

const results = await Promise.allSettled(
pendingEvents.map(event => this.publishEvent(event))
);

const successful = results.filter(r => r.status === 'fulfilled').length;
const failed = results.filter(r => r.status === 'rejected').length;

// 메트릭 수집
await this.metricsService.recordPublishMetrics({
processed: pendingEvents.length,
successful,
failed,
duration: Date.now() - startTime
});

this.logger.log(`Event publishing completed: ${successful} successful, ${failed} failed`);

} catch (error) {
this.logger.error('Failed to publish outbox events', error);
await this.metricsService.recordError('publish_batch_error', error);
}
}

private async publishEvent(event: OutboxEvent): Promise<void> {
try {
const topicName = this.getTopicName(event.eventType);

const messageData = {
eventId: event.id,
eventType: event.eventType,
aggregateId: event.aggregateId,
payload: event.payload,
sagaTransactionId: event.sagaTransactionId,
correlationId: event.correlationId,
timestamp: event.createdAt.toISOString()
};

await this.pubSubService.publish(topicName, messageData, {
// 메시지 순서 보장을 위한 ordering key
orderingKey: event.aggregateId
});

await this.outboxService.markAsPublished(event.id);

this.logger.debug(`Event published: ${event.eventType} (${event.id})`);

} catch (error) {
this.logger.error(`Failed to publish event: ${event.id}`, error);

await this.outboxService.incrementRetryCount(event.id);

if (event.retryCount >= 4) { // 5회 실패 시 DLQ로 이동
await this.moveToDeadLetterQueue(event, error);
}

throw error;
}
}

private getTopicName(eventType: string): string {
const environment = process.env.DEPLOY_ENV || 'dev';

// 이벤트 타입에 따른 토픽 매핑
const topicMappings: Record<string, string> = {
'user.created': `saga-user-registration-${environment}`,
'user.profile.initialized': `saga-user-registration-${environment}`,
'sleep.log.created': `saga-sleep-log-processing-${environment}`,
'sleep.analysis.completed': `saga-sleep-log-processing-${environment}`,
'questionnaire.response.submitted': `saga-questionnaire-flow-${environment}`
};

return topicMappings[eventType] || `saga-default-${environment}`;
}

private async moveToDeadLetterQueue(event: OutboxEvent, error: Error): Promise<void> {
try {
const dlqTopicName = `saga-dlq-${process.env.DEPLOY_ENV || 'dev'}`;

await this.pubSubService.publish(dlqTopicName, {
originalEvent: event,
error: {
message: error.message,
stack: error.stack,
timestamp: new Date().toISOString()
},
failureReason: 'max_retries_exceeded'
});

await this.outboxService.markAsFailed(event.id);

this.logger.warn(`Event moved to DLQ: ${event.id} after ${event.retryCount + 1} failures`);

} catch (dlqError) {
this.logger.error(`Failed to move event to DLQ: ${event.id}`, dlqError);
}
}
}

4.3 비즈니스 서비스 통합

사용자 서비스에 Outbox 적용

// src/users/users.service.ts
@Injectable()
export class UsersService {
constructor(
private readonly prisma: PrismaService,
private readonly outboxService: OutboxService
) {}

async createUser(
userData: CreateUserDto,
sagaTransactionId?: string
): Promise<User> {
return await this.prisma.$transaction(async (tx) => {
// 1. 사용자 생성
const user = await tx.user.create({
data: {
email: userData.email,
accessCode: userData.accessCode,
language: userData.language || 'ko',
timezone: userData.timezone || 'Asia/Seoul'
}
});

// 2. Outbox 이벤트 생성 (같은 트랜잭션)
await tx.outboxEvent.create({
data: {
eventType: 'user.created',
aggregateId: user.id,
payload: {
userId: user.id,
email: user.email,
language: user.language
},
sagaTransactionId,
correlationId: this.extractCorrelationId()
}
});

return user;
});
}

async updateUserProfile(
userId: string,
profileData: UpdateProfileDto,
sagaTransactionId?: string
): Promise<UserProfile> {
return await this.prisma.$transaction(async (tx) => {
// 1. 프로필 업데이트
const profile = await tx.userProfile.upsert({
where: { userId },
create: {
userId,
...profileData
},
update: profileData
});

// 2. 프로필 업데이트 이벤트
await tx.outboxEvent.create({
data: {
eventType: 'user.profile.updated',
aggregateId: userId,
payload: {
userId,
profileId: profile.id,
changes: profileData
},
sagaTransactionId
}
});

return profile;
});
}

private extractCorrelationId(): string {
// Request context에서 correlation ID 추출
const correlationId = AsyncLocalStorage.getStore()?.correlationId;
return correlationId || `auto-${Date.now()}`;
}
}

4.4 메트릭 및 모니터링

Outbox 메트릭 서비스

// src/outbox/outbox-metrics.service.ts
@Injectable()
export class OutboxMetricsService {
constructor(
private readonly bigQueryService: BigQueryService,
private readonly logger: Logger
) {}

async recordPublishMetrics(metrics: {
processed: number;
successful: number;
failed: number;
duration: number;
}): Promise<void> {
const metricsData = {
timestamp: new Date().toISOString(),
environment: process.env.DEPLOY_ENV || 'dev',
metric_type: 'outbox_publish_batch',
processed_count: metrics.processed,
successful_count: metrics.successful,
failed_count: metrics.failed,
duration_ms: metrics.duration,
success_rate: metrics.processed > 0 ? metrics.successful / metrics.processed : 0
};

await this.bigQueryService.insert(
'distributed_transactions.outbox_metrics',
[metricsData]
);
}

async recordError(errorType: string, error: Error): Promise<void> {
const errorData = {
timestamp: new Date().toISOString(),
environment: process.env.DEPLOY_ENV || 'dev',
error_type: errorType,
error_message: error.message,
error_stack: error.stack?.substring(0, 1000) // 1KB 제한
};

await this.bigQueryService.insert(
'distributed_transactions.outbox_errors',
[errorData]
);
}

@Cron('0 */5 * * * *') // 5분마다 실행
async collectOutboxMetrics(): Promise<void> {
try {
const metrics = await this.calculateOutboxHealth();

await this.bigQueryService.insert(
'distributed_transactions.outbox_health',
[metrics]
);

} catch (error) {
this.logger.error('Failed to collect outbox metrics', error);
}
}

private async calculateOutboxHealth(): Promise<any> {
const [pendingCount, failedCount, oldestPending] = await Promise.all([
this.prisma.outboxEvent.count({ where: { status: 'PENDING' } }),
this.prisma.outboxEvent.count({ where: { status: 'FAILED' } }),
this.prisma.outboxEvent.findFirst({
where: { status: 'PENDING' },
orderBy: { createdAt: 'asc' },
select: { createdAt: true }
})
]);

const oldestPendingMinutes = oldestPending
? Math.floor((Date.now() - oldestPending.createdAt.getTime()) / 60000)
: 0;

return {
timestamp: new Date().toISOString(),
environment: process.env.DEPLOY_ENV || 'dev',
pending_events_count: pendingCount,
failed_events_count: failedCount,
oldest_pending_minutes: oldestPendingMinutes,
health_status: this.calculateHealthStatus(pendingCount, failedCount, oldestPendingMinutes)
};
}

private calculateHealthStatus(
pendingCount: number,
failedCount: number,
oldestPendingMinutes: number
): string {
if (failedCount > 100 || oldestPendingMinutes > 60) {
return 'CRITICAL';
} else if (pendingCount > 500 || oldestPendingMinutes > 30) {
return 'WARNING';
} else {
return 'HEALTHY';
}
}
}

5. 성능 최적화

5.1 배치 처리 최적화

이벤트 타입별 배치 처리

@Injectable()
export class OptimizedOutboxPublisher {
async publishEventsBatch(): Promise<void> {
const events = await this.outboxService.findPendingEvents(100);

// 이벤트 타입별로 그룹화
const eventsByType = this.groupEventsByType(events);

// 각 타입별로 병렬 처리
await Promise.all(
Object.entries(eventsByType).map(([eventType, events]) =>
this.publishEventTypeGroup(eventType, events)
)
);
}

private groupEventsByType(events: OutboxEvent[]): Record<string, OutboxEvent[]> {
return events.reduce((groups, event) => {
const type = event.eventType;
if (!groups[type]) {
groups[type] = [];
}
groups[type].push(event);
return groups;
}, {} as Record<string, OutboxEvent[]>);
}

private async publishEventTypeGroup(
eventType: string,
events: OutboxEvent[]
): Promise<void> {
const topicName = this.getTopicName(eventType);

// 배치로 발행 (Pub/Sub 배치 API 사용)
const messages = events.map(event => ({
data: JSON.stringify({
eventId: event.id,
eventType: event.eventType,
aggregateId: event.aggregateId,
payload: event.payload,
timestamp: event.createdAt.toISOString()
}),
orderingKey: event.aggregateId,
attributes: {
eventType: event.eventType,
correlationId: event.correlationId || ''
}
}));

await this.pubSubService.publishBatch(topicName, messages);

// 일괄 상태 업데이트
await this.prisma.outboxEvent.updateMany({
where: { id: { in: events.map(e => e.id) } },
data: { status: 'PUBLISHED', publishedAt: new Date() }
});
}
}

5.2 데이터베이스 최적화

인덱스 최적화

-- Outbox 테이블 인덱스 최적화
CREATE INDEX CONCURRENTLY idx_outbox_events_pending
ON outbox_events (status, created_at)
WHERE status = 'PENDING';

CREATE INDEX CONCURRENTLY idx_outbox_events_retry
ON outbox_events (retry_count, status, created_at)
WHERE status = 'PENDING';

CREATE INDEX CONCURRENTLY idx_outbox_events_cleanup
ON outbox_events (status, published_at)
WHERE status = 'PUBLISHED';

파티셔닝 전략

-- 월간 파티셔닝 (대용량 처리용)
CREATE TABLE outbox_events_y2025m01 PARTITION OF outbox_events
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

CREATE TABLE outbox_events_y2025m02 PARTITION OF outbox_events
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');

6. 모니터링

6.1 Outbox 상태 대시보드

BigQuery 모니터링 쿼리

-- Outbox 이벤트 처리 현황
WITH outbox_metrics AS (
SELECT
DATE(created_at) as date,
event_type,
status,
COUNT(*) as event_count,
AVG(retry_count) as avg_retries,
MAX(CASE WHEN status = 'PENDING'
THEN TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), created_at, MINUTE)
ELSE 0 END) as max_pending_minutes
FROM `{PROJECT_ID}.distributed_transactions.outbox_events`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
GROUP BY date, event_type, status
)
SELECT
date,
event_type,
SUM(CASE WHEN status = 'PUBLISHED' THEN event_count ELSE 0 END) as published_count,
SUM(CASE WHEN status = 'PENDING' THEN event_count ELSE 0 END) as pending_count,
SUM(CASE WHEN status = 'FAILED' THEN event_count ELSE 0 END) as failed_count,
ROUND(AVG(avg_retries), 2) as avg_retry_count,
MAX(max_pending_minutes) as max_pending_minutes
FROM outbox_metrics
GROUP BY date, event_type
ORDER BY date DESC, event_type;

6.2 알림 정책

Outbox 적체 알림

@Injectable()
export class OutboxAlertService {
@Cron('*/2 * * * *') // 2분마다 체크
async checkOutboxHealth(): Promise<void> {
const healthStatus = await this.outboxMetricsService.getHealthStatus();

if (healthStatus.status === 'CRITICAL') {
await this.sendCriticalAlert(healthStatus);
} else if (healthStatus.status === 'WARNING') {
await this.sendWarningAlert(healthStatus);
}
}

private async sendCriticalAlert(status: OutboxHealthStatus): Promise<void> {
await this.alertService.send({
severity: 'CRITICAL',
title: 'Outbox 이벤트 처리 중단',
message: `
Outbox 이벤트 처리에 심각한 문제가 발생했습니다.

상세 정보:
- 대기 중인 이벤트: ${status.pendingCount}
- 실패한 이벤트: ${status.failedCount}
- 가장 오래된 대기 이벤트: ${status.oldestPendingMinutes}분 전

즉시 확인이 필요합니다.
`,
channels: ['slack', 'email', 'sms']
});
}
}

7. 문제 해결

7.1 일반적인 문제

문제 1: Outbox 이벤트 적체

증상: PENDING 상태 이벤트가 계속 쌓임

진단:

# 적체된 이벤트 확인
bq query --use_legacy_sql=false '
SELECT
event_type,
COUNT(*) as pending_count,
MIN(created_at) as oldest_event,
AVG(retry_count) as avg_retries
FROM `{PROJECT_ID}.distributed_transactions.outbox_events`
WHERE status = "PENDING"
GROUP BY event_type
ORDER BY pending_count DESC'

해결책:

  1. Publisher 프로세스 상태 확인
  2. Pub/Sub 토픽 상태 및 권한 확인
  3. 데이터베이스 연결 상태 확인
  4. 배치 크기 조정 (더 작은 배치로 처리)

문제 2: 중복 이벤트 발행

증상: 같은 이벤트가 여러 번 발행됨

진단:

-- 중복 이벤트 조회
SELECT
event_type,
aggregate_id,
COUNT(*) as duplicate_count,
STRING_AGG(id) as event_ids
FROM `{PROJECT_ID}.distributed_transactions.outbox_events`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY event_type, aggregate_id, JSON_EXTRACT(payload, '$.userId')
HAVING COUNT(*) > 1
ORDER BY duplicate_count DESC;

해결책:

  1. 멱등성 키 구현: 이벤트에 고유 키 추가
  2. 중복 제거 로직: Publisher에서 중복 체크
  3. 트랜잭션 격리 수준 확인: 동시성 문제 해결

7.2 성능 문제 해결

Outbox 테이블 크기 관리

@Injectable()
export class OutboxCleanupService {
@Cron('0 2 * * *') // 매일 새벽 2시
async cleanupOldEvents(): Promise<void> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - 30); // 30일 이전

// 발행된 이벤트 삭제
const deleteResult = await this.prisma.outboxEvent.deleteMany({
where: {
status: 'PUBLISHED',
publishedAt: { lt: cutoffDate }
}
});

this.logger.log(`Cleaned up ${deleteResult.count} old outbox events`);

// BigQuery로 아카이브 (선택적)
await this.archiveToBigQuery(cutoffDate);
}

private async archiveToMigrate(cutoffDate: Date): Promise<void> {
const eventsToArchive = await this.prisma.outboxEvent.findMany({
where: {
status: 'PUBLISHED',
publishedAt: { lt: cutoffDate }
},
take: 1000 // 배치 처리
});

if (eventsToArchive.length > 0) {
await this.bigQueryService.insert(
'distributed_transactions.outbox_events_archive',
eventsToArchive
);
}
}
}

📚 다음 단계

  1. 성능 모니터링 대시보드 구축
  2. 자동 복구 메커니즘 구현
  3. 이벤트 스키마 버전 관리
  4. 배치 처리 크기 동적 조정

문서 버전: 1.0.0
최종 업데이트: 2025-08-13 관련 문서: 분산 트랜잭션 구현 가이드