본문으로 건너뛰기

PLT-NFR-011 분산 트랜잭션 구현 가이드

Saga Pattern & Outbox Pattern 기반 데이터 무결성 보장

📋 목차

  1. 개요
  2. 아키텍처 설계
  3. 구현 솔루션
  4. Saga Pattern 구현
  5. Outbox Pattern 구현
  6. 보상 트랜잭션
  7. 구체적 구현 단계
  8. 모니터링 시스템
  9. 테스트 전략
  10. 문제 해결
  11. 운영 가이드
  12. 다음 단계

1. 개요

1.1 PLT-NFR-011 요구사항

저장 계층은 ACID 트랜잭션과 제약조건을 통해 데이터 무결성을 보장해야 하며, 다중 리소스 작업은 일관된 트랜잭션 경계 또는 보상 트랜잭션을 제공해야 한다.

비즈니스 임팩트 (DTx 플랫폼)

  • 데이터 일관성: 사용자 데이터와 관련 정보의 일관성 보장
  • 의료 규제 준수: 환자 데이터 무결성 유지로 규제 요구사항 충족
  • 시스템 신뢰성: 부분적 실패 상황에서도 데이터 정합성 유지
  • 복구 가능성: 실패 시 자동 복구 및 보상 처리

1.2 현재 상황 분석

✅ 지원되는 기능

  • Cloud SQL ACID: 단일 데이터베이스 내 ACID 트랜잭션
  • 이벤트 시스템: Pub/Sub 기반 비동기 메시지 처리
  • 데이터 롤백: TimeMachine의 분산 롤백 시스템

❌ 미구현 기능

  • 분산 트랜잭션: 여러 서비스 간 원자적 트랜잭션 보장
  • Saga Pattern: 장기 실행 비즈니스 프로세스 관리
  • Outbox Pattern: 데이터베이스 변경과 이벤트 발행의 원자성

1.3 구현 범위

환경트랜잭션 타임아웃재시도 횟수상태 보관모니터링
Dev5분3회7일기본
Stage15분5회30일상세
Prod60분10회90일완전

1.4 관련 문서


2. 아키텍처 설계

2.1 분산 트랜잭션 패턴

Saga Pattern (Orchestration 방식)

Saga Orchestrator


┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Step 1 │───▶│ Step 2 │───▶│ Step 3 │
│(Local Txn) │ │(Local Txn) │ │(Local Txn) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│Compensation │ │Compensation │ │Compensation │
│ Handler │ │ Handler │ │ Handler │
└─────────────┘ └─────────────┘ └─────────────┘

Outbox Pattern

Business Transaction:
┌────────────────────────┐
│ 1. Business Data Save │
│ 2. Outbox Event Save │ ◄── 단일 DB 트랜잭션
└────────────────────────┘


┌────────────────────────┐
│ Outbox Publisher │ ◄── 별도 프로세스
│ (BigQuery → Pub/Sub) │
└────────────────────────┘

2.2 DTx 플랫폼 Saga 패턴

사용자 등록 Saga

interface UserRegistrationSaga {
steps: [
'create_user', // 사용자 계정 생성
'initialize_profile', // 프로필 초기화
'assign_plan', // 플랜 할당
'setup_notifications' // 알림 설정
];
compensations: [
'delete_user',
'cleanup_profile',
'revoke_plan',
'disable_notifications'
];
}

수면 로그 처리 Saga

interface SleepLogProcessingSaga {
steps: [
'validate_sleep_log', // 수면 로그 검증
'save_sleep_data', // 수면 데이터 저장
'trigger_analysis', // 분석 처리 시작
'calculate_scores', // 점수 계산
'send_notification' // 결과 알림
];
compensations: [
'mark_invalid',
'delete_sleep_data',
'cancel_analysis',
'remove_scores',
'cancel_notification'
];
}

2.3 시스템 아키텍처


3. 구현 솔루션

3.1 핵심 구성 요소

Saga Orchestrator

@Injectable()
export class SagaOrchestrator {
async executeSaga<T>(
sagaType: SagaType,
initialData: T,
steps: SagaStep[]
): Promise<SagaResult> {
const transaction = await this.createSagaTransaction(sagaType, steps);

try {
for (const step of steps) {
await this.executeStep(transaction.id, step, initialData);
await this.markStepCompleted(transaction.id, step.name);
}

await this.markSagaCompleted(transaction.id);
return { status: 'COMPLETED', transactionId: transaction.id };

} catch (error) {
await this.initiateCompensation(transaction.id, error);
throw new SagaExecutionException(error);
}
}
}

Outbox Event Publisher

@Injectable()
export class OutboxEventPublisher {
async publishEvents(): Promise<void> {
const pendingEvents = await this.outboxRepository.findPendingEvents();

for (const event of pendingEvents) {
try {
await this.pubSubService.publish(event.eventType, event.payload);
await this.outboxRepository.markAsPublished(event.id);
} catch (error) {
await this.outboxRepository.incrementRetryCount(event.id);
if (event.retryCount >= this.maxRetries) {
await this.handleFailedEvent(event);
}
}
}
}
}

3.2 데이터 모델

Saga Transaction (BigQuery)

CREATE TABLE `distributed_transactions.saga_transactions` (
transaction_id STRING NOT NULL,
saga_type STRING NOT NULL,
status STRING NOT NULL, -- STARTED, IN_PROGRESS, COMPLETED, FAILED, COMPENSATING
current_step STRING,
total_steps INT64 NOT NULL,
completed_steps INT64 NOT NULL,
user_id STRING,
correlation_id STRING,
payload JSON,
error_details JSON,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP
)
PARTITION BY DATE(created_at)
CLUSTER BY environment, saga_type, status;

Outbox Events (BigQuery)

CREATE TABLE `distributed_transactions.outbox_events` (
event_id STRING NOT NULL,
transaction_id STRING NOT NULL,
event_type STRING NOT NULL,
aggregate_id STRING NOT NULL,
payload JSON NOT NULL,
status STRING NOT NULL, -- PENDING, PUBLISHED, FAILED
retry_count INT64 NOT NULL DEFAULT 0,
created_at TIMESTAMP NOT NULL,
published_at TIMESTAMP
)
PARTITION BY DATE(created_at)
CLUSTER BY status, event_type;

4. Saga Pattern 구현

4.1 Saga 정의

사용자 등록 Saga 구현

// src/sagas/user-registration.saga.ts
@Injectable()
export class UserRegistrationSaga {
constructor(
private readonly userService: UserService,
private readonly profileService: ProfileService,
private readonly planService: PlanService,
private readonly notificationService: NotificationService,
private readonly sagaOrchestrator: SagaOrchestrator
) {}

async execute(registrationData: UserRegistrationDto): Promise<SagaResult> {
const sagaSteps: SagaStep[] = [
{
name: 'create_user',
handler: this.createUser.bind(this),
compensation: this.deleteUser.bind(this)
},
{
name: 'initialize_profile',
handler: this.initializeProfile.bind(this),
compensation: this.cleanupProfile.bind(this)
},
{
name: 'assign_plan',
handler: this.assignPlan.bind(this),
compensation: this.revokePlan.bind(this)
},
{
name: 'setup_notifications',
handler: this.setupNotifications.bind(this),
compensation: this.disableNotifications.bind(this)
}
];

return await this.sagaOrchestrator.executeSaga(
'user_registration',
registrationData,
sagaSteps
);
}

// 단계별 구현
private async createUser(data: UserRegistrationDto): Promise<User> {
return await this.userService.createUser({
email: data.email,
accessCode: data.accessCode,
language: data.language
});
}

private async initializeProfile(user: User): Promise<UserProfile> {
return await this.profileService.createProfile({
userId: user.id,
timezone: user.timezone,
onboardingCompleted: false
});
}

private async assignPlan(user: User): Promise<UserPlan> {
return await this.planService.assignPlan({
userId: user.id,
planId: 'plan.therapeutic', // 기본 플랜
startDate: new Date()
});
}

private async setupNotifications(user: User): Promise<NotificationSettings> {
return await this.notificationService.createSettings({
userId: user.id,
preferences: {
sleepReminders: true,
weeklyReports: true,
systemUpdates: false
}
});
}

// 보상 트랜잭션들
private async deleteUser(user: User): Promise<void> {
await this.userService.deleteUser(user.id);
}

private async cleanupProfile(profile: UserProfile): Promise<void> {
await this.profileService.deleteProfile(profile.userId);
}

private async revokePlan(plan: UserPlan): Promise<void> {
await this.planService.revokePlan(plan.userId);
}

private async disableNotifications(settings: NotificationSettings): Promise<void> {
await this.notificationService.deleteSettings(settings.userId);
}
}

4.2 Saga 단계 실행기

단계 실행 및 상태 관리

// src/sagas/saga-step-executor.service.ts
@Injectable()
export class SagaStepExecutor {
constructor(
private readonly sagaStateRepository: SagaStateRepository,
private readonly outboxService: OutboxService,
private readonly logger: Logger
) {}

async executeStep(
transactionId: string,
step: SagaStep,
data: any
): Promise<any> {
this.logger.log(`Executing step: ${step.name} for transaction: ${transactionId}`);

try {
// 1. 단계 실행 시작 기록
await this.sagaStateRepository.updateStep(transactionId, {
currentStep: step.name,
status: 'IN_PROGRESS',
updatedAt: new Date()
});

// 2. 실제 단계 실행
const result = await step.handler(data);

// 3. 성공 시 Outbox 이벤트 생성
await this.outboxService.createEvent({
transactionId,
eventType: `saga.step.completed`,
aggregateId: transactionId,
payload: {
stepName: step.name,
result: this.sanitizeResult(result),
timestamp: new Date().toISOString()
}
});

// 4. 단계 완료 기록
await this.sagaStateRepository.incrementCompletedSteps(transactionId);

this.logger.log(`Step completed: ${step.name} for transaction: ${transactionId}`);
return result;

} catch (error) {
this.logger.error(`Step failed: ${step.name} for transaction: ${transactionId}`, error);

// 실패 시 상태 업데이트
await this.sagaStateRepository.updateStep(transactionId, {
status: 'FAILED',
errorDetails: {
stepName: step.name,
error: error.message,
timestamp: new Date().toISOString()
},
updatedAt: new Date()
});

throw error;
}
}

private sanitizeResult(result: any): any {
// 민감한 정보 제거 (패스워드, 토큰 등)
if (typeof result === 'object' && result !== null) {
const sanitized = { ...result };
delete sanitized.password;
delete sanitized.token;
delete sanitized.accessToken;
return sanitized;
}
return result;
}
}

4.3 Saga 상태 관리

상태 저장소 구현

// src/sagas/repositories/saga-state.repository.ts
@Injectable()
export class SagaStateRepository {
constructor(
private readonly bigQueryService: BigQueryService
) {}

async createSagaTransaction(
sagaType: string,
steps: SagaStep[],
initialData: any
): Promise<SagaTransaction> {
const transaction: SagaTransaction = {
transactionId: uuidv4(),
sagaType,
status: 'STARTED',
currentStep: null,
totalSteps: steps.length,
completedSteps: 0,
environment: process.env.DEPLOY_ENV || 'dev',
userId: this.extractUserId(initialData),
correlationId: this.extractCorrelationId(initialData),
payload: this.sanitizePayload(initialData),
createdAt: new Date(),
updatedAt: new Date()
};

await this.bigQueryService.insert(
'distributed_transactions.saga_transactions',
[transaction]
);

return transaction;
}

async updateStep(
transactionId: string,
updates: Partial<SagaTransaction>
): Promise<void> {
const updateQuery = `
UPDATE \`${process.env.PROJECT_ID}.distributed_transactions.saga_transactions\`
SET
status = @status,
current_step = @currentStep,
error_details = @errorDetails,
updated_at = @updatedAt
WHERE transaction_id = @transactionId
`;

await this.bigQueryService.query(updateQuery, {
transactionId,
status: updates.status,
currentStep: updates.currentStep,
errorDetails: JSON.stringify(updates.errorDetails),
updatedAt: updates.updatedAt?.toISOString()
});
}

async findRunningTransactions(): Promise<SagaTransaction[]> {
const query = `
SELECT *
FROM \`${process.env.PROJECT_ID}.distributed_transactions.saga_transactions\`
WHERE status IN ('STARTED', 'IN_PROGRESS')
AND created_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
ORDER BY created_at DESC
`;

const [rows] = await this.bigQueryService.query(query);
return rows;
}

async findTimedOutTransactions(timeoutMinutes: number): Promise<SagaTransaction[]> {
const query = `
SELECT *
FROM \`${process.env.PROJECT_ID}.distributed_transactions.saga_transactions\`
WHERE status IN ('STARTED', 'IN_PROGRESS')
AND created_at < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL ${timeoutMinutes} MINUTE)
ORDER BY created_at ASC
`;

const [rows] = await this.bigQueryService.query(query);
return rows;
}
}

5. Outbox Pattern 구현

5.1 Outbox 서비스

이벤트 생성 및 발행

// src/outbox/outbox.service.ts
@Injectable()
export class OutboxService {
constructor(
private readonly bigQueryService: BigQueryService,
private readonly pubSubService: PubSubService,
private readonly logger: Logger
) {}

async createEvent(eventData: CreateOutboxEventDto): Promise<void> {
const event: OutboxEvent = {
eventId: uuidv4(),
transactionId: eventData.transactionId,
eventType: eventData.eventType,
aggregateId: eventData.aggregateId,
payload: eventData.payload,
status: 'PENDING',
retryCount: 0,
environment: process.env.DEPLOY_ENV || 'dev',
createdAt: new Date()
};

await this.bigQueryService.insert(
'distributed_transactions.outbox_events',
[event]
);

this.logger.log(`Outbox event created: ${event.eventId} for transaction: ${event.transactionId}`);
}

async publishPendingEvents(): Promise<void> {
const pendingEvents = await this.findPendingEvents();

this.logger.log(`Found ${pendingEvents.length} pending events to publish`);

for (const event of pendingEvents) {
try {
await this.publishEvent(event);
} catch (error) {
this.logger.error(`Failed to publish event: ${event.eventId}`, error);
await this.handleFailedEvent(event, error);
}
}
}

private async publishEvent(event: OutboxEvent): Promise<void> {
const topicName = this.getTopicName(event.eventType);

await this.pubSubService.publish(topicName, {
eventId: event.eventId,
transactionId: event.transactionId,
eventType: event.eventType,
aggregateId: event.aggregateId,
payload: event.payload,
timestamp: event.createdAt.toISOString()
});

await this.markEventAsPublished(event.eventId);
this.logger.log(`Event published: ${event.eventId} to topic: ${topicName}`);
}

private async findPendingEvents(): Promise<OutboxEvent[]> {
const query = `
SELECT *
FROM \`${process.env.PROJECT_ID}.distributed_transactions.outbox_events\`
WHERE status = 'PENDING'
AND retry_count < 5
AND created_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
ORDER BY created_at ASC
LIMIT 100
`;

const [rows] = await this.bigQueryService.query(query);
return rows;
}

private async markEventAsPublished(eventId: string): Promise<void> {
const updateQuery = `
UPDATE \`${process.env.PROJECT_ID}.distributed_transactions.outbox_events\`
SET
status = 'PUBLISHED',
published_at = CURRENT_TIMESTAMP()
WHERE event_id = @eventId
`;

await this.bigQueryService.query(updateQuery, { eventId });
}

private async handleFailedEvent(event: OutboxEvent, error: Error): Promise<void> {
const updateQuery = `
UPDATE \`${process.env.PROJECT_ID}.distributed_transactions.outbox_events\`
SET retry_count = retry_count + 1
WHERE event_id = @eventId
`;

await this.bigQueryService.query(updateQuery, { eventId: event.eventId });

if (event.retryCount >= 5) {
await this.moveToDeadLetterQueue(event, error);
}
}

private getTopicName(eventType: string): string {
const environment = process.env.DEPLOY_ENV || 'dev';
const eventTypeParts = eventType.split('.');
const sagaType = eventTypeParts[1]; // saga.step.completed -> step

return `saga-${sagaType}-${environment}`;
}
}

5.2 Outbox Publisher (스케줄러)

정기적 이벤트 발행

// src/outbox/outbox-publisher.scheduler.ts
@Injectable()
export class OutboxPublisherScheduler {
private readonly logger = new Logger(OutboxPublisherScheduler.name);

constructor(
private readonly outboxService: OutboxService
) {}

@Cron('*/30 * * * * *') // 30초마다 실행
async publishEvents(): Promise<void> {
try {
await this.outboxService.publishPendingEvents();
} catch (error) {
this.logger.error('Failed to publish outbox events', error);
}
}

@Cron('0 */5 * * * *') // 5분마다 실행
async cleanupOldEvents(): Promise<void> {
try {
await this.outboxService.cleanupPublishedEvents();
this.logger.log('Outbox cleanup completed');
} catch (error) {
this.logger.error('Failed to cleanup outbox events', error);
}
}
}

6. 보상 트랜잭션

6.1 보상 트랜잭션 처리기

자동 보상 실행

// src/sagas/compensation-handler.service.ts
@Injectable()
export class CompensationHandler {
constructor(
private readonly sagaStateRepository: SagaStateRepository,
private readonly compensationLogRepository: CompensationLogRepository,
private readonly logger: Logger
) {}

async executeCompensation(
transactionId: string,
failedStep: string,
completedSteps: SagaStep[]
): Promise<void> {
this.logger.log(`Starting compensation for transaction: ${transactionId}`);

// 상태를 COMPENSATING으로 변경
await this.sagaStateRepository.updateStep(transactionId, {
status: 'COMPENSATING',
updatedAt: new Date()
});

// 완료된 단계들을 역순으로 보상 실행
const stepsToCompensate = completedSteps.reverse();

for (const step of stepsToCompensate) {
await this.executeCompensationStep(transactionId, step);
}

// 최종 상태를 COMPENSATED로 변경
await this.sagaStateRepository.updateStep(transactionId, {
status: 'COMPENSATED',
completedAt: new Date(),
updatedAt: new Date()
});

this.logger.log(`Compensation completed for transaction: ${transactionId}`);
}

private async executeCompensationStep(
transactionId: string,
step: SagaStep
): Promise<void> {
const compensationId = uuidv4();

try {
this.logger.log(`Executing compensation for step: ${step.name}`);

// 보상 로그 기록 시작
await this.compensationLogRepository.createLog({
compensationId,
transactionId,
compensationType: step.name,
stepName: step.name,
status: 'EXECUTING',
executedAt: new Date()
});

// 보상 트랜잭션 실행
await step.compensation();

// 성공 로그 기록
await this.compensationLogRepository.updateLog(compensationId, {
status: 'SUCCESS',
executionDetails: {
message: `Compensation for ${step.name} completed successfully`,
timestamp: new Date().toISOString()
}
});

this.logger.log(`Compensation completed for step: ${step.name}`);

} catch (error) {
this.logger.error(`Compensation failed for step: ${step.name}`, error);

// 실패 로그 기록
await this.compensationLogRepository.updateLog(compensationId, {
status: 'FAILED',
errorDetails: {
error: error.message,
stack: error.stack,
timestamp: new Date().toISOString()
}
});

// 보상 실패는 로그만 남기고 계속 진행
// (다른 단계의 보상은 여전히 시도해야 함)
}
}
}

6.2 수동 보상 트랜잭션

관리자 인터페이스

// src/admin/saga-admin.controller.ts
@Controller('admin/sagas')
@UseGuards(AdminGuard)
export class SagaAdminController {
constructor(
private readonly sagaStateRepository: SagaStateRepository,
private readonly compensationHandler: CompensationHandler
) {}

@Get('failed')
async getFailedTransactions(): Promise<SagaTransaction[]> {
return await this.sagaStateRepository.findFailedTransactions();
}

@Post(':transactionId/compensate')
async manualCompensation(
@Param('transactionId') transactionId: string
): Promise<{ message: string }> {
const transaction = await this.sagaStateRepository.findById(transactionId);

if (!transaction) {
throw new NotFoundException('Transaction not found');
}

if (transaction.status !== 'FAILED') {
throw new BadRequestException('Only failed transactions can be compensated');
}

// 수동 보상 실행
await this.compensationHandler.executeCompensation(
transactionId,
transaction.currentStep,
transaction.completedSteps
);

return { message: 'Manual compensation initiated' };
}

@Get(':transactionId/status')
async getTransactionStatus(
@Param('transactionId') transactionId: string
): Promise<SagaTransactionStatus> {
const transaction = await this.sagaStateRepository.findById(transactionId);
const compensationLogs = await this.compensationLogRepository.findByTransactionId(transactionId);

return {
transaction,
compensationLogs,
canRetry: transaction.status === 'FAILED',
canCompensate: transaction.status === 'FAILED' && transaction.completedSteps > 0
};
}
}

7. 구체적 구현 단계

7.1 개요: 분산 트랜잭션 시스템 구축 로드맵

DTx 플랫폼에서 Saga Pattern 기반 분산 트랜잭션을 구현하기 위한 6단계 실행 계획입니다.

7.2 단계 1: 인프라 설정 및 배포

1.1 Terraform 모듈 배포

# Dev 환경 분산 트랜잭션 인프라 배포
cd infrastructure/terragrunt/dev/distributed-transactions

terragrunt init
terragrunt plan
terragrunt apply
terragrunt output

1.2 배포 검증

# BigQuery 데이터셋 확인
bq ls --project_id=dta-cloud-de-dev distributed_transactions

# Pub/Sub 토픽 확인
gcloud pubsub topics list --filter="name:saga-" --project=dta-cloud-de-dev

# 모니터링 대시보드 확인
echo "Dashboard: $(terragrunt output monitoring_dashboard_url)"

7.3 단계 2: 핵심 서비스 구현

2.1 필수 패키지 설치

cd apps/dta-wide-api

# 분산 트랜잭션 관련 패키지 설치
npm install uuid @google-cloud/bigquery @google-cloud/pubsub
npm install @nestjs/schedule @nestjs/cqrs

# 타입 정의
npm install -D @types/uuid

2.2 모듈 구조 생성

# 분산 트랜잭션 모듈 구조 생성
mkdir -p src/distributed-transactions/{sagas,outbox,compensation}
mkdir -p src/distributed-transactions/{repositories,services,dto}

# 기본 파일들 생성
touch src/distributed-transactions/distributed-transactions.module.ts
touch src/distributed-transactions/sagas/saga-orchestrator.service.ts
touch src/distributed-transactions/outbox/outbox.service.ts

2.3 기본 모듈 설정

// src/distributed-transactions/distributed-transactions.module.ts
@Module({
imports: [
ScheduleModule.forRoot(),
CqrsModule
],
providers: [
SagaOrchestrator,
SagaStepExecutor,
SagaStateRepository,
OutboxService,
OutboxPublisherScheduler,
CompensationHandler,
BigQueryService,
PubSubService
],
exports: [
SagaOrchestrator,
OutboxService
]
})
export class DistributedTransactionsModule {}

7.4 단계 3: Saga 구현

3.1 사용자 등록 Saga 구현

실제 비즈니스 로직에 Saga 패턴을 적용합니다:

// src/sagas/user-registration.saga.ts
@Injectable()
export class UserRegistrationSaga {
async execute(data: UserRegistrationDto): Promise<string> {
const sagaSteps: SagaStep[] = [
{
name: 'create_user',
handler: async () => {
return await this.userService.createUser(data);
},
compensation: async (user: User) => {
await this.userService.deleteUser(user.id);
}
},
// 추가 단계들...
];

const result = await this.sagaOrchestrator.executeSaga(
'user_registration',
data,
sagaSteps
);

return result.transactionId;
}
}

3.2 컨트롤러 통합

// src/auth/auth.controller.ts
@Controller('auth')
export class AuthController {
constructor(
private readonly userRegistrationSaga: UserRegistrationSaga
) {}

@Post('register')
async register(@Body() dto: UserRegistrationDto): Promise<{ transactionId: string }> {
const transactionId = await this.userRegistrationSaga.execute(dto);

return {
transactionId,
message: 'User registration initiated'
};
}
}

7.5 단계 4: Outbox Pattern 구현

4.1 비즈니스 로직에 Outbox 적용

// src/users/user.service.ts
@Injectable()
export class UserService {
async createUser(data: CreateUserDto): Promise<User> {
return await this.prisma.$transaction(async (tx) => {
// 1. 사용자 생성
const user = await tx.user.create({ data });

// 2. Outbox 이벤트 생성 (같은 트랜잭션 내)
await this.outboxService.createEvent({
transactionId: data.sagaTransactionId,
eventType: 'user.created',
aggregateId: user.id,
payload: { userId: user.id, email: user.email }
});

return user;
});
}
}

4.2 이벤트 발행 스케줄러 활성화

// src/app.module.ts
@Module({
imports: [
DistributedTransactionsModule,
ScheduleModule.forRoot(),
// 기타 모듈들
]
})
export class AppModule {}

7.6 단계 5: 모니터링 및 검증

5.1 기본 분산 트랜잭션 테스트

# 사용자 등록 API 호출
curl -X POST "https://dta-wide-api-dev.run.app/auth/register" \
-H "Content-Type: application/json" \
-d '{
"email": "test@example.com",
"accessCode": "TEST123",
"language": "ko"
}'

5.2 트랜잭션 상태 확인

# BigQuery에서 Saga 상태 확인
bq query --use_legacy_sql=false --project_id=dta-cloud-de-dev '
SELECT
transaction_id,
saga_type,
status,
current_step,
completed_steps,
total_steps,
created_at
FROM `dta-cloud-de-dev.distributed_transactions.saga_transactions`
WHERE created_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
ORDER BY created_at DESC'

5.3 Outbox 이벤트 확인

# Outbox 이벤트 발행 상태 확인
bq query --use_legacy_sql=false --project_id=dta-cloud-de-dev '
SELECT
event_type,
status,
COUNT(*) as count,
AVG(retry_count) as avg_retries
FROM `dta-cloud-de-dev.distributed_transactions.outbox_events`
WHERE created_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
GROUP BY event_type, status
ORDER BY event_type, status'

7.7 단계 6: 실패 시나리오 테스트

6.1 의도적 실패 유발

// 테스트용 실패 Saga
@Injectable()
export class TestFailureSaga {
async execute(): Promise<string> {
const sagaSteps: SagaStep[] = [
{
name: 'step_1_success',
handler: async () => ({ result: 'success' }),
compensation: async () => console.log('Compensating step 1')
},
{
name: 'step_2_fail',
handler: async () => {
throw new Error('Intentional failure for testing');
},
compensation: async () => console.log('Compensating step 2')
}
];

return await this.sagaOrchestrator.executeSaga('test_failure', {}, sagaSteps);
}
}

6.2 보상 트랜잭션 확인

# 보상 트랜잭션 로그 확인
bq query --use_legacy_sql=false --project_id=dta-cloud-de-dev '
SELECT
compensation_id,
transaction_id,
compensation_type,
status,
execution_details,
executed_at
FROM `dta-cloud-de-dev.distributed_transactions.compensation_logs`
WHERE executed_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
ORDER BY executed_at DESC'

8. 모니터링 시스템

8.1 핵심 메트릭

분산 트랜잭션 SLI/SLO

-- 트랜잭션 성공률 SLI
WITH transaction_metrics AS (
SELECT
TIMESTAMP_TRUNC(created_at, HOUR) as time_window,
COUNT(*) as total_transactions,
COUNTIF(status = 'COMPLETED') as successful_transactions,
COUNTIF(status = 'FAILED') as failed_transactions,
COUNTIF(status = 'COMPENSATED') as compensated_transactions
FROM `{PROJECT_ID}.distributed_transactions.saga_transactions`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
AND environment = 'prod'
GROUP BY time_window
)
SELECT
time_window,
total_transactions,
successful_transactions,
SAFE_DIVIDE(successful_transactions, total_transactions) * 100 as success_rate_percent,
failed_transactions,
compensated_transactions
FROM transaction_metrics
ORDER BY time_window DESC;

트랜잭션 지속 시간 분석

-- P95 트랜잭션 처리 시간
SELECT
saga_type,
PERCENTILE_CONT(
TIMESTAMP_DIFF(COALESCE(completed_at, updated_at), created_at, SECOND), 0.95
) OVER (PARTITION BY saga_type) as p95_duration_seconds,
PERCENTILE_CONT(
TIMESTAMP_DIFF(COALESCE(completed_at, updated_at), created_at, SECOND), 0.50
) OVER (PARTITION BY saga_type) as p50_duration_seconds,
COUNT(*) as transaction_count
FROM `{PROJECT_ID}.distributed_transactions.saga_transactions`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
AND status IN ('COMPLETED', 'FAILED', 'COMPENSATED')
GROUP BY saga_type
ORDER BY p95_duration_seconds DESC;

8.2 대시보드 구성

분산 트랜잭션 개요 대시보드

{
"displayName": "분산 트랜잭션 운영 대시보드 ({ENVIRONMENT})",
"mosaicLayout": {
"tiles": [
{
"width": 6,
"height": 4,
"widget": {
"title": "트랜잭션 성공률 (시간별)",
"xyChart": {
"dataSets": [
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "resource.type=\"bigquery_dataset\"",
"aggregation": {
"alignmentPeriod": "3600s",
"perSeriesAligner": "ALIGN_RATE",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": ["resource.labels.environment"]
}
}
}
}
],
"thresholds": [
{
"value": 0.999,
"color": "RED",
"direction": "BELOW",
"label": "SLO Target (99.9%)"
}
]
}
}
},
{
"width": 6,
"height": 4,
"xPos": 6,
"widget": {
"title": "평균 트랜잭션 처리 시간",
"xyChart": {
"dataSets": [
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/saga_transactions/duration\"",
"aggregation": {
"alignmentPeriod": "300s",
"perSeriesAligner": "ALIGN_MEAN",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": ["resource.labels.saga_type"]
}
}
}
}
]
}
}
}
]
}
}

8.3 알림 정책

트랜잭션 실패율 알림

// src/monitoring/saga-monitoring.service.ts
@Injectable()
export class SagaMonitoringService {
@Cron('*/5 * * * *') // 5분마다 실행
async checkTransactionHealth(): Promise<void> {
const metrics = await this.calculateSagaMetrics();

if (metrics.failureRate > 0.01) { // 1% 초과
await this.alertService.sendAlert({
severity: 'WARNING',
title: '분산 트랜잭션 실패율 증가',
description: `현재 실패율: ${(metrics.failureRate * 100).toFixed(2)}%`,
metrics
});
}

if (metrics.avgDurationMinutes > 30) { // 30분 초과
await this.alertService.sendAlert({
severity: 'CRITICAL',
title: '분산 트랜잭션 처리 시간 과다',
description: `평균 처리 시간: ${metrics.avgDurationMinutes}`,
metrics
});
}
}

private async calculateSagaMetrics(): Promise<SagaMetrics> {
const query = `
SELECT
COUNT(*) as total,
COUNTIF(status = 'FAILED') as failed,
AVG(TIMESTAMP_DIFF(COALESCE(completed_at, updated_at), created_at, SECOND)) / 60 as avg_duration_minutes
FROM \`${process.env.PROJECT_ID}.distributed_transactions.saga_transactions\`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
`;

const [rows] = await this.bigQueryService.query(query);
const result = rows[0];

return {
total: result.total,
failed: result.failed,
failureRate: result.total > 0 ? result.failed / result.total : 0,
avgDurationMinutes: result.avg_duration_minutes || 0
};
}
}

9. 테스트 전략

9.1 단위 테스트

Saga 단계 테스트

// src/sagas/__tests__/user-registration.saga.spec.ts
describe('UserRegistrationSaga', () => {
let saga: UserRegistrationSaga;
let mockUserService: jest.Mocked<UserService>;
let mockSagaOrchestrator: jest.Mocked<SagaOrchestrator>;

beforeEach(() => {
// Mock 설정
});

describe('execute', () => {
it('should complete user registration successfully', async () => {
// Given
const registrationData = {
email: 'test@example.com',
accessCode: 'TEST123'
};

mockSagaOrchestrator.executeSaga.mockResolvedValue({
status: 'COMPLETED',
transactionId: 'test-transaction-id'
});

// When
const result = await saga.execute(registrationData);

// Then
expect(result).toBe('test-transaction-id');
expect(mockSagaOrchestrator.executeSaga).toHaveBeenCalledWith(
'user_registration',
registrationData,
expect.arrayContaining([
expect.objectContaining({ name: 'create_user' }),
expect.objectContaining({ name: 'initialize_profile' })
])
);
});

it('should handle saga execution failure', async () => {
// Given
const registrationData = { email: 'test@example.com' };
mockSagaOrchestrator.executeSaga.mockRejectedValue(
new SagaExecutionException('Step failed')
);

// When & Then
await expect(saga.execute(registrationData)).rejects.toThrow(SagaExecutionException);
});
});
});

9.2 통합 테스트

완전한 Saga 플로우 테스트

// src/sagas/__tests__/integration/saga-integration.spec.ts
describe('Saga Integration Tests', () => {
let app: INestApplication;
let bigQueryService: BigQueryService;

beforeAll(async () => {
// 테스트 애플리케이션 설정
const module = await Test.createTestingModule({
imports: [AppModule]
}).compile();

app = module.createNestApplication();
await app.init();
});

describe('User Registration Saga', () => {
it('should complete full user registration flow', async () => {
// Given
const registrationData = {
email: 'integration-test@example.com',
accessCode: 'INT-TEST-123'
};

// When
const response = await request(app.getHttpServer())
.post('/auth/register')
.send(registrationData)
.expect(201);

const transactionId = response.body.transactionId;

// Wait for saga completion
await new Promise(resolve => setTimeout(resolve, 5000));

// Then
const sagaState = await bigQueryService.query(`
SELECT status, completed_steps, total_steps
FROM \`${process.env.PROJECT_ID}.distributed_transactions.saga_transactions\`
WHERE transaction_id = @transactionId
`, { transactionId });

expect(sagaState[0]).toEqual(
expect.objectContaining({
status: 'COMPLETED',
completed_steps: 4,
total_steps: 4
})
);
});
});
});

9.3 부하 테스트

동시 트랜잭션 처리 테스트

// src/sagas/__tests__/load/saga-load.spec.ts
describe('Saga Load Tests', () => {
it('should handle concurrent saga executions', async () => {
const concurrentRequests = 50;
const promises = [];

for (let i = 0; i < concurrentRequests; i++) {
promises.push(
request(app.getHttpServer())
.post('/auth/register')
.send({
email: `load-test-${i}@example.com`,
accessCode: `LOAD-${i}`
})
);
}

const results = await Promise.allSettled(promises);
const successful = results.filter(r => r.status === 'fulfilled').length;

expect(successful).toBeGreaterThan(concurrentRequests * 0.95); // 95% 성공률
});
});

10. 문제 해결

10.1 일반적인 문제

문제 1: Saga 트랜잭션이 "IN_PROGRESS"에서 멈춤

증상:

  • 트랜잭션이 몇 시간째 "IN_PROGRESS" 상태
  • 마지막 단계에서 진행되지 않음

진단:

# 멈춘 트랜잭션 조회
bq query --use_legacy_sql=false '
SELECT
transaction_id,
saga_type,
current_step,
completed_steps,
total_steps,
TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), updated_at, MINUTE) as stuck_minutes
FROM `{PROJECT_ID}.distributed_transactions.saga_transactions`
WHERE status = "IN_PROGRESS"
AND updated_at < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 MINUTE)
ORDER BY stuck_minutes DESC'

해결책:

  1. 애플리케이션 로그 확인: 해당 단계 실행 오류 확인
  2. Pub/Sub 메시지 적체: 구독자 상태 및 메시지 처리 확인
  3. 수동 재시도: 관리자 인터페이스에서 수동 재시작
  4. 타임아웃 처리: 자동 타임아웃 설정 확인

문제 2: Outbox 이벤트 발행 실패

증상:

  • Outbox 테이블에 "PENDING" 상태 이벤트가 계속 쌓임
  • 재시도 횟수가 최대치에 도달

진단:

# 실패한 Outbox 이벤트 조회
bq query --use_legacy_sql=false '
SELECT
event_type,
COUNT(*) as failed_count,
AVG(retry_count) as avg_retries,
STRING_AGG(DISTINCT SUBSTR(JSON_EXTRACT(payload, "$.error"), 1, 100)) as error_samples
FROM `{PROJECT_ID}.distributed_transactions.outbox_events`
WHERE status = "PENDING"
AND retry_count >= 3
GROUP BY event_type
ORDER BY failed_count DESC'

해결책:

  1. Pub/Sub 토픽 상태: 토픽 존재 여부 및 권한 확인
  2. 네트워크 연결: Pub/Sub 서비스 연결 상태 확인
  3. 메시지 형식: 페이로드 유효성 검사
  4. 수동 재발행: 스크립트를 통한 수동 이벤트 발행

문제 3: 보상 트랜잭션 실패

증상:

  • 원본 트랜잭션은 실패했지만 보상 트랜잭션도 실패
  • 데이터 불일치 상태 발생

진단:

# 보상 실패 패턴 분석
bq query --use_legacy_sql=false '
SELECT
c.compensation_type,
COUNT(*) as total_compensations,
COUNTIF(c.status = "FAILED") as failed_compensations,
SAFE_DIVIDE(COUNTIF(c.status = "FAILED"), COUNT(*)) * 100 as failure_rate_percent
FROM `{PROJECT_ID}.distributed_transactions.compensation_logs` c
WHERE c.executed_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
GROUP BY c.compensation_type
HAVING failure_rate_percent > 5
ORDER BY failure_rate_percent DESC'

해결책:

  1. 멱등성 확인: 보상 로직이 멱등한지 검증
  2. 수동 데이터 정리: DBA와 협력하여 수동 데이터 정합성 복구
  3. 보상 로직 개선: 더 견고한 보상 알고리즘 구현
  4. 알림 설정: 보상 실패 시 즉시 알림 설정

10.2 성능 최적화

BigQuery 쿼리 최적화

-- 파티션 필터 필수 사용
SELECT *
FROM `{PROJECT_ID}.distributed_transactions.saga_transactions`
WHERE DATE(created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) -- 파티션 필터
AND environment = 'prod' -- 클러스터 필터
AND status = 'FAILED'
ORDER BY created_at DESC
LIMIT 100;

Outbox 배치 처리 최적화

@Injectable()
export class OptimizedOutboxPublisher {
async publishEventsBatch(): Promise<void> {
const batchSize = parseInt(process.env.OUTBOX_BATCH_SIZE || '50');
const events = await this.findPendingEvents(batchSize);

// 이벤트 타입별로 그룹화하여 배치 처리
const eventsByType = groupBy(events, 'eventType');

for (const [eventType, typeEvents] of Object.entries(eventsByType)) {
await this.publishEventsBatchByType(eventType, typeEvents);
}
}
}

11. 운영 가이드

11.1 일일 운영 체크리스트

#!/bin/bash
# scripts/daily-saga-check.sh

echo "=== 분산 트랜잭션 일일 점검 ==="
echo "점검 시간: $(date)"

# 1. 멈춘 트랜잭션 확인
echo "📊 멈춘 트랜잭션 조회"
bq query --use_legacy_sql=false --format=table '
SELECT
COUNT(*) as stuck_transactions,
STRING_AGG(DISTINCT saga_type) as affected_saga_types
FROM `'$PROJECT_ID'.distributed_transactions.saga_transactions`
WHERE status IN ("STARTED", "IN_PROGRESS")
AND updated_at < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)'

# 2. 실패율 확인
echo "🚨 24시간 실패율 확인"
bq query --use_legacy_sql=false --format=table '
SELECT
saga_type,
COUNT(*) as total,
COUNTIF(status = "FAILED") as failed,
ROUND(SAFE_DIVIDE(COUNTIF(status = "FAILED"), COUNT(*)) * 100, 2) as failure_rate_percent
FROM `'$PROJECT_ID'.distributed_transactions.saga_transactions`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY saga_type
ORDER BY failure_rate_percent DESC'

# 3. Outbox 적체 확인
echo "📬 Outbox 이벤트 적체 확인"
bq query --use_legacy_sql=false --format=table '
SELECT
status,
COUNT(*) as event_count,
AVG(retry_count) as avg_retries
FROM `'$PROJECT_ID'.distributed_transactions.outbox_events`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY status'

echo "✅ 일일 점검 완료"

11.2 주간 성능 분석

-- weekly-saga-performance.sql
-- 주간 분산 트랜잭션 성능 분석

-- 1. Saga 타입별 성능 요약
WITH weekly_performance AS (
SELECT
saga_type,
COUNT(*) as total_transactions,
COUNTIF(status = 'COMPLETED') as successful,
COUNTIF(status = 'FAILED') as failed,
COUNTIF(status = 'COMPENSATED') as compensated,
AVG(completed_steps) as avg_completed_steps,
AVG(TIMESTAMP_DIFF(COALESCE(completed_at, updated_at), created_at, SECOND)) as avg_duration_seconds
FROM `{PROJECT_ID}.distributed_transactions.saga_transactions`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
GROUP BY saga_type
)
SELECT
saga_type,
total_transactions,
ROUND(SAFE_DIVIDE(successful, total_transactions) * 100, 2) as success_rate_percent,
ROUND(SAFE_DIVIDE(failed, total_transactions) * 100, 2) as failure_rate_percent,
ROUND(avg_completed_steps, 1) as avg_completed_steps,
ROUND(avg_duration_seconds / 60, 1) as avg_duration_minutes
FROM weekly_performance
ORDER BY total_transactions DESC;

11.3 월간 최적화 검토

#!/bin/bash
# scripts/monthly-saga-optimization.sh

echo "=== 분산 트랜잭션 월간 최적화 검토 ==="

# 환경별 비용 분석
echo "💰 환경별 트랜잭션 볼륨 및 비용 분석"
bq query --use_legacy_sql=false '
SELECT
environment,
COUNT(*) as monthly_transactions,
COUNT(DISTINCT DATE(created_at)) as active_days,
ROUND(COUNT(*) / COUNT(DISTINCT DATE(created_at)), 0) as avg_daily_transactions,
ROUND(AVG(total_steps), 1) as avg_saga_complexity
FROM `'$PROJECT_ID'.distributed_transactions.saga_transactions`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY environment
ORDER BY monthly_transactions DESC'

echo
echo "📊 최적화 권장사항:"
echo " 1. 높은 실패율 Saga 패턴 재검토"
echo " 2. 장기 실행 트랜잭션 타임아웃 조정"
echo " 3. Outbox 배치 크기 최적화"
echo " 4. 보상 트랜잭션 성능 개선"

12. 다음 단계

12.1 단기 개선 사항 (1-2주)

  1. 성능 최적화

    • Outbox 배치 처리 크기 조정
    • BigQuery 쿼리 성능 최적화
    • Saga 타임아웃 값 환경별 세부 조정
  2. 모니터링 강화

    • 실시간 성능 대시보드 구축
    • 자동 복구 메커니즘 구현
    • 상세한 알림 정책 설정
  3. 개발자 경험 개선

    • Saga 개발 가이드라인 수립
    • 디버깅 도구 개발
    • 테스트 헬퍼 함수 구현

12.2 중기 계획 (1-2개월)

  1. 고급 패턴 구현

    • Saga 병렬 실행 지원
    • 동적 Saga 구성
    • 조건부 단계 실행
  2. 규제 준수 강화

    • 의료 기기 소프트웨어 감사 추적
    • 분산 트랜잭션 거버넌스 정책
    • 개인정보 보호 강화
  3. 운영 자동화

    • 자동 복구 시스템
    • 예측적 장애 감지
    • 용량 계획 자동화

12.3 장기 비전 (3-6개월)

  1. AI/ML 기반 최적화

    • Saga 실행 패턴 분석
    • 자동 성능 튜닝
    • 지능형 보상 전략
  2. 다중 클라우드 지원

    • 클라우드 간 분산 트랜잭션
    • 하이브리드 환경 지원
    • 재해 복구 시나리오
  3. 플랫폼 확장

    • 다른 팀/프로젝트 지원
    • Saga-as-a-Service 제공
    • 표준화된 분산 트랜잭션 플랫폼

📚 관련 문서


문서 버전: 1.0.0
최종 업데이트: 2025-01-22
문서 승인: DTA-Wide 플랫폼팀
다음 검토 예정: 2025-02-22

변경 이력

버전날짜작성자변경 내용
0.1.02025-08-13bok@weltcorp.com최초 작성