PLT-NFR-011 분산 트랜잭션 구현 가이드
Saga Pattern & Outbox Pattern 기반 데이터 무결성 보장
📋 목차
- 개요
- 아키텍처 설계
- 구현 솔루션
- Saga Pattern 구현
- Outbox Pattern 구현
- 보상 트랜잭션
- 구체적 구현 단계
- 모니터링 시스템
- 테스트 전략
- 문제 해결
- 운영 가이드
- 다음 단계
1. 개요
1.1 PLT-NFR-011 요구사항
저장 계층은 ACID 트랜잭션과 제약조건을 통해 데이터 무결성을 보장해야 하며, 다중 리소스 작업은 일관된 트랜잭션 경계 또는 보상 트랜잭션을 제공해야 한다.
비즈니스 임팩트 (DTx 플랫폼)
- 데이터 일관성: 사용자 데이터와 관련 정보의 일관성 보장
- 의료 규제 준수: 환자 데이터 무결성 유지로 규제 요구사항 충족
- 시스템 신뢰성: 부분적 실패 상황에서도 데이터 정합성 유지
- 복구 가능성: 실패 시 자동 복구 및 보상 처리
1.2 현재 상황 분석
✅ 지원되는 기능
- Cloud SQL ACID: 단일 데이터베이스 내 ACID 트랜잭션
- 이벤트 시스템: Pub/Sub 기반 비동기 메시지 처리
- 데이터 롤백: TimeMachine의 분산 롤백 시스템
❌ 미구현 기능
- 분산 트랜잭션: 여러 서비스 간 원자적 트랜잭션 보장
- Saga Pattern: 장기 실행 비즈니스 프로세스 관리
- Outbox Pattern: 데이터베이스 변경과 이벤트 발행의 원자성
1.3 구현 범위
| 환경 | 트랜잭션 타임아웃 | 재시도 횟수 | 상태 보관 | 모니터링 |
|---|---|---|---|---|
| Dev | 5분 | 3회 | 7일 | 기본 |
| Stage | 15분 | 5회 | 30일 | 상세 |
| Prod | 60분 | 10회 | 90일 | 완전 |
1.4 관련 문서
2. 아키텍처 설계
2.1 분산 트랜잭션 패턴
Saga Pattern (Orchestration 방식)
Saga Orchestrator
│
▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Step 1 │───▶│ Step 2 │───▶│ Step 3 │
│(Local Txn) │ │(Local Txn) │ │(Local Txn) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│Compensation │ │Compensation │ │Compensation │
│ Handler │ │ Handler │ │ Handler │
└─────────────┘ └─────────────┘ └─────────────┘
Outbox Pattern
Business Transaction:
┌────────────────────────┐
│ 1. Business Data Save │
│ 2. Outbox Event Save │ ◄── 단일 DB 트랜잭션
└────────────────────────┘
│
▼
┌────────────────────────┐
│ Outbox Publisher │ ◄── 별도 프로세스
│ (BigQuery → Pub/Sub) │
└────────────────────────┘
2.2 DTx 플랫폼 Saga 패턴
사용자 등록 Saga
interface UserRegistrationSaga {
steps: [
'create_user', // 사용자 계정 생성
'initialize_profile', // 프로필 초기화
'assign_plan', // 플랜 할당
'setup_notifications' // 알림 설정
];
compensations: [
'delete_user',
'cleanup_profile',
'revoke_plan',
'disable_notifications'
];
}
수면 로그 처리 Saga
interface SleepLogProcessingSaga {
steps: [
'validate_sleep_log', // 수면 로그 검증
'save_sleep_data', // 수면 데이터 저장
'trigger_analysis', // 분석 처리 시작
'calculate_scores', // 점수 계산
'send_notification' // 결과 알림
];
compensations: [
'mark_invalid',
'delete_sleep_data',
'cancel_analysis',
'remove_scores',
'cancel_notification'
];
}
2.3 시스템 아키텍처
3. 구현 솔루션
3.1 핵심 구성 요소
Saga Orchestrator
@Injectable()
export class SagaOrchestrator {
async executeSaga<T>(
sagaType: SagaType,
initialData: T,
steps: SagaStep[]
): Promise<SagaResult> {
const transaction = await this.createSagaTransaction(sagaType, steps);
try {
for (const step of steps) {
await this.executeStep(transaction.id, step, initialData);
await this.markStepCompleted(transaction.id, step.name);
}
await this.markSagaCompleted(transaction.id);
return { status: 'COMPLETED', transactionId: transaction.id };
} catch (error) {
await this.initiateCompensation(transaction.id, error);
throw new SagaExecutionException(error);
}
}
}
Outbox Event Publisher
@Injectable()
export class OutboxEventPublisher {
async publishEvents(): Promise<void> {
const pendingEvents = await this.outboxRepository.findPendingEvents();
for (const event of pendingEvents) {
try {
await this.pubSubService.publish(event.eventType, event.payload);
await this.outboxRepository.markAsPublished(event.id);
} catch (error) {
await this.outboxRepository.incrementRetryCount(event.id);
if (event.retryCount >= this.maxRetries) {
await this.handleFailedEvent(event);
}
}
}
}
}
3.2 데이터 모델
Saga Transaction (BigQuery)
CREATE TABLE `distributed_transactions.saga_transactions` (
transaction_id STRING NOT NULL,
saga_type STRING NOT NULL,
status STRING NOT NULL, -- STARTED, IN_PROGRESS, COMPLETED, FAILED, COMPENSATING
current_step STRING,
total_steps INT64 NOT NULL,
completed_steps INT64 NOT NULL,
user_id STRING,
correlation_id STRING,
payload JSON,
error_details JSON,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP
)
PARTITION BY DATE(created_at)
CLUSTER BY environment, saga_type, status;
Outbox Events (BigQuery)
CREATE TABLE `distributed_transactions.outbox_events` (
event_id STRING NOT NULL,
transaction_id STRING NOT NULL,
event_type STRING NOT NULL,
aggregate_id STRING NOT NULL,
payload JSON NOT NULL,
status STRING NOT NULL, -- PENDING, PUBLISHED, FAILED
retry_count INT64 NOT NULL DEFAULT 0,
created_at TIMESTAMP NOT NULL,
published_at TIMESTAMP
)
PARTITION BY DATE(created_at)
CLUSTER BY status, event_type;
4. Saga Pattern 구현
4.1 Saga 정의
사용자 등록 Saga 구현
// src/sagas/user-registration.saga.ts
@Injectable()
export class UserRegistrationSaga {
constructor(
private readonly userService: UserService,
private readonly profileService: ProfileService,
private readonly planService: PlanService,
private readonly notificationService: NotificationService,
private readonly sagaOrchestrator: SagaOrchestrator
) {}
async execute(registrationData: UserRegistrationDto): Promise<SagaResult> {
const sagaSteps: SagaStep[] = [
{
name: 'create_user',
handler: this.createUser.bind(this),
compensation: this.deleteUser.bind(this)
},
{
name: 'initialize_profile',
handler: this.initializeProfile.bind(this),
compensation: this.cleanupProfile.bind(this)
},
{
name: 'assign_plan',
handler: this.assignPlan.bind(this),
compensation: this.revokePlan.bind(this)
},
{
name: 'setup_notifications',
handler: this.setupNotifications.bind(this),
compensation: this.disableNotifications.bind(this)
}
];
return await this.sagaOrchestrator.executeSaga(
'user_registration',
registrationData,
sagaSteps
);
}
// 단계별 구현
private async createUser(data: UserRegistrationDto): Promise<User> {
return await this.userService.createUser({
email: data.email,
accessCode: data.accessCode,
language: data.language
});
}
private async initializeProfile(user: User): Promise<UserProfile> {
return await this.profileService.createProfile({
userId: user.id,
timezone: user.timezone,
onboardingCompleted: false
});
}
private async assignPlan(user: User): Promise<UserPlan> {
return await this.planService.assignPlan({
userId: user.id,
planId: 'plan.therapeutic', // 기본 플랜
startDate: new Date()
});
}
private async setupNotifications(user: User): Promise<NotificationSettings> {
return await this.notificationService.createSettings({
userId: user.id,
preferences: {
sleepReminders: true,
weeklyReports: true,
systemUpdates: false
}
});
}
// 보상 트랜잭션들
private async deleteUser(user: User): Promise<void> {
await this.userService.deleteUser(user.id);
}
private async cleanupProfile(profile: UserProfile): Promise<void> {
await this.profileService.deleteProfile(profile.userId);
}
private async revokePlan(plan: UserPlan): Promise<void> {
await this.planService.revokePlan(plan.userId);
}
private async disableNotifications(settings: NotificationSettings): Promise<void> {
await this.notificationService.deleteSettings(settings.userId);
}
}
4.2 Saga 단계 실행기
단계 실행 및 상태 관리
// src/sagas/saga-step-executor.service.ts
@Injectable()
export class SagaStepExecutor {
constructor(
private readonly sagaStateRepository: SagaStateRepository,
private readonly outboxService: OutboxService,
private readonly logger: Logger
) {}
async executeStep(
transactionId: string,
step: SagaStep,
data: any
): Promise<any> {
this.logger.log(`Executing step: ${step.name} for transaction: ${transactionId}`);
try {
// 1. 단계 실행 시작 기록
await this.sagaStateRepository.updateStep(transactionId, {
currentStep: step.name,
status: 'IN_PROGRESS',
updatedAt: new Date()
});
// 2. 실제 단계 실행
const result = await step.handler(data);
// 3. 성공 시 Outbox 이벤트 생성
await this.outboxService.createEvent({
transactionId,
eventType: `saga.step.completed`,
aggregateId: transactionId,
payload: {
stepName: step.name,
result: this.sanitizeResult(result),
timestamp: new Date().toISOString()
}
});
// 4. 단계 완료 기록
await this.sagaStateRepository.incrementCompletedSteps(transactionId);
this.logger.log(`Step completed: ${step.name} for transaction: ${transactionId}`);
return result;
} catch (error) {
this.logger.error(`Step failed: ${step.name} for transaction: ${transactionId}`, error);
// 실패 시 상태 업데이트
await this.sagaStateRepository.updateStep(transactionId, {
status: 'FAILED',
errorDetails: {
stepName: step.name,
error: error.message,
timestamp: new Date().toISOString()
},
updatedAt: new Date()
});
throw error;
}
}
private sanitizeResult(result: any): any {
// 민감한 정보 제거 (패스워드, 토큰 등)
if (typeof result === 'object' && result !== null) {
const sanitized = { ...result };
delete sanitized.password;
delete sanitized.token;
delete sanitized.accessToken;
return sanitized;
}
return result;
}
}
4.3 Saga 상태 관리
상태 저장소 구현
// src/sagas/repositories/saga-state.repository.ts
@Injectable()
export class SagaStateRepository {
constructor(
private readonly bigQueryService: BigQueryService
) {}
async createSagaTransaction(
sagaType: string,
steps: SagaStep[],
initialData: any
): Promise<SagaTransaction> {
const transaction: SagaTransaction = {
transactionId: uuidv4(),
sagaType,
status: 'STARTED',
currentStep: null,
totalSteps: steps.length,
completedSteps: 0,
environment: process.env.DEPLOY_ENV || 'dev',
userId: this.extractUserId(initialData),
correlationId: this.extractCorrelationId(initialData),
payload: this.sanitizePayload(initialData),
createdAt: new Date(),
updatedAt: new Date()
};
await this.bigQueryService.insert(
'distributed_transactions.saga_transactions',
[transaction]
);
return transaction;
}
async updateStep(
transactionId: string,
updates: Partial<SagaTransaction>
): Promise<void> {
const updateQuery = `
UPDATE \`${process.env.PROJECT_ID}.distributed_transactions.saga_transactions\`
SET
status = @status,
current_step = @currentStep,
error_details = @errorDetails,
updated_at = @updatedAt
WHERE transaction_id = @transactionId
`;
await this.bigQueryService.query(updateQuery, {
transactionId,
status: updates.status,
currentStep: updates.currentStep,
errorDetails: JSON.stringify(updates.errorDetails),
updatedAt: updates.updatedAt?.toISOString()
});
}
async findRunningTransactions(): Promise<SagaTransaction[]> {
const query = `
SELECT *
FROM \`${process.env.PROJECT_ID}.distributed_transactions.saga_transactions\`
WHERE status IN ('STARTED', 'IN_PROGRESS')
AND created_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
ORDER BY created_at DESC
`;
const [rows] = await this.bigQueryService.query(query);
return rows;
}
async findTimedOutTransactions(timeoutMinutes: number): Promise<SagaTransaction[]> {
const query = `
SELECT *
FROM \`${process.env.PROJECT_ID}.distributed_transactions.saga_transactions\`
WHERE status IN ('STARTED', 'IN_PROGRESS')
AND created_at < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL ${timeoutMinutes} MINUTE)
ORDER BY created_at ASC
`;
const [rows] = await this.bigQueryService.query(query);
return rows;
}
}
5. Outbox Pattern 구현
5.1 Outbox 서비스
이벤트 생성 및 발행
// src/outbox/outbox.service.ts
@Injectable()
export class OutboxService {
constructor(
private readonly bigQueryService: BigQueryService,
private readonly pubSubService: PubSubService,
private readonly logger: Logger
) {}
async createEvent(eventData: CreateOutboxEventDto): Promise<void> {
const event: OutboxEvent = {
eventId: uuidv4(),
transactionId: eventData.transactionId,
eventType: eventData.eventType,
aggregateId: eventData.aggregateId,
payload: eventData.payload,
status: 'PENDING',
retryCount: 0,
environment: process.env.DEPLOY_ENV || 'dev',
createdAt: new Date()
};
await this.bigQueryService.insert(
'distributed_transactions.outbox_events',
[event]
);
this.logger.log(`Outbox event created: ${event.eventId} for transaction: ${event.transactionId}`);
}
async publishPendingEvents(): Promise<void> {
const pendingEvents = await this.findPendingEvents();
this.logger.log(`Found ${pendingEvents.length} pending events to publish`);
for (const event of pendingEvents) {
try {
await this.publishEvent(event);
} catch (error) {
this.logger.error(`Failed to publish event: ${event.eventId}`, error);
await this.handleFailedEvent(event, error);
}
}
}
private async publishEvent(event: OutboxEvent): Promise<void> {
const topicName = this.getTopicName(event.eventType);
await this.pubSubService.publish(topicName, {
eventId: event.eventId,
transactionId: event.transactionId,
eventType: event.eventType,
aggregateId: event.aggregateId,
payload: event.payload,
timestamp: event.createdAt.toISOString()
});
await this.markEventAsPublished(event.eventId);
this.logger.log(`Event published: ${event.eventId} to topic: ${topicName}`);
}
private async findPendingEvents(): Promise<OutboxEvent[]> {
const query = `
SELECT *
FROM \`${process.env.PROJECT_ID}.distributed_transactions.outbox_events\`
WHERE status = 'PENDING'
AND retry_count < 5
AND created_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
ORDER BY created_at ASC
LIMIT 100
`;
const [rows] = await this.bigQueryService.query(query);
return rows;
}
private async markEventAsPublished(eventId: string): Promise<void> {
const updateQuery = `
UPDATE \`${process.env.PROJECT_ID}.distributed_transactions.outbox_events\`
SET
status = 'PUBLISHED',
published_at = CURRENT_TIMESTAMP()
WHERE event_id = @eventId
`;
await this.bigQueryService.query(updateQuery, { eventId });
}
private async handleFailedEvent(event: OutboxEvent, error: Error): Promise<void> {
const updateQuery = `
UPDATE \`${process.env.PROJECT_ID}.distributed_transactions.outbox_events\`
SET retry_count = retry_count + 1
WHERE event_id = @eventId
`;
await this.bigQueryService.query(updateQuery, { eventId: event.eventId });
if (event.retryCount >= 5) {
await this.moveToDeadLetterQueue(event, error);
}
}
private getTopicName(eventType: string): string {
const environment = process.env.DEPLOY_ENV || 'dev';
const eventTypeParts = eventType.split('.');
const sagaType = eventTypeParts[1]; // saga.step.completed -> step
return `saga-${sagaType}-${environment}`;
}
}
5.2 Outbox Publisher (스케줄러)
정기적 이벤트 발행
// src/outbox/outbox-publisher.scheduler.ts
@Injectable()
export class OutboxPublisherScheduler {
private readonly logger = new Logger(OutboxPublisherScheduler.name);
constructor(
private readonly outboxService: OutboxService
) {}
@Cron('*/30 * * * * *') // 30초마다 실행
async publishEvents(): Promise<void> {
try {
await this.outboxService.publishPendingEvents();
} catch (error) {
this.logger.error('Failed to publish outbox events', error);
}
}
@Cron('0 */5 * * * *') // 5분마다 실행
async cleanupOldEvents(): Promise<void> {
try {
await this.outboxService.cleanupPublishedEvents();
this.logger.log('Outbox cleanup completed');
} catch (error) {
this.logger.error('Failed to cleanup outbox events', error);
}
}
}
6. 보상 트랜잭션
6.1 보상 트랜잭션 처리기
자동 보상 실행
// src/sagas/compensation-handler.service.ts
@Injectable()
export class CompensationHandler {
constructor(
private readonly sagaStateRepository: SagaStateRepository,
private readonly compensationLogRepository: CompensationLogRepository,
private readonly logger: Logger
) {}
async executeCompensation(
transactionId: string,
failedStep: string,
completedSteps: SagaStep[]
): Promise<void> {
this.logger.log(`Starting compensation for transaction: ${transactionId}`);
// 상태를 COMPENSATING으로 변경
await this.sagaStateRepository.updateStep(transactionId, {
status: 'COMPENSATING',
updatedAt: new Date()
});
// 완료된 단계들을 역순으로 보상 실행
const stepsToCompensate = completedSteps.reverse();
for (const step of stepsToCompensate) {
await this.executeCompensationStep(transactionId, step);
}
// 최종 상태를 COMPENSATED로 변경
await this.sagaStateRepository.updateStep(transactionId, {
status: 'COMPENSATED',
completedAt: new Date(),
updatedAt: new Date()
});
this.logger.log(`Compensation completed for transaction: ${transactionId}`);
}
private async executeCompensationStep(
transactionId: string,
step: SagaStep
): Promise<void> {
const compensationId = uuidv4();
try {
this.logger.log(`Executing compensation for step: ${step.name}`);
// 보상 로그 기록 시작
await this.compensationLogRepository.createLog({
compensationId,
transactionId,
compensationType: step.name,
stepName: step.name,
status: 'EXECUTING',
executedAt: new Date()
});
// 보상 트랜잭션 실행
await step.compensation();
// 성공 로그 기록
await this.compensationLogRepository.updateLog(compensationId, {
status: 'SUCCESS',
executionDetails: {
message: `Compensation for ${step.name} completed successfully`,
timestamp: new Date().toISOString()
}
});
this.logger.log(`Compensation completed for step: ${step.name}`);
} catch (error) {
this.logger.error(`Compensation failed for step: ${step.name}`, error);
// 실패 로그 기록
await this.compensationLogRepository.updateLog(compensationId, {
status: 'FAILED',
errorDetails: {
error: error.message,
stack: error.stack,
timestamp: new Date().toISOString()
}
});
// 보상 실패는 로그만 남기고 계속 진행
// (다른 단계의 보상은 여전히 시도해야 함)
}
}
}
6.2 수동 보상 트랜잭션
관리자 인터페이스
// src/admin/saga-admin.controller.ts
@Controller('admin/sagas')
@UseGuards(AdminGuard)
export class SagaAdminController {
constructor(
private readonly sagaStateRepository: SagaStateRepository,
private readonly compensationHandler: CompensationHandler
) {}
@Get('failed')
async getFailedTransactions(): Promise<SagaTransaction[]> {
return await this.sagaStateRepository.findFailedTransactions();
}
@Post(':transactionId/compensate')
async manualCompensation(
@Param('transactionId') transactionId: string
): Promise<{ message: string }> {
const transaction = await this.sagaStateRepository.findById(transactionId);
if (!transaction) {
throw new NotFoundException('Transaction not found');
}
if (transaction.status !== 'FAILED') {
throw new BadRequestException('Only failed transactions can be compensated');
}
// 수동 보상 실행
await this.compensationHandler.executeCompensation(
transactionId,
transaction.currentStep,
transaction.completedSteps
);
return { message: 'Manual compensation initiated' };
}
@Get(':transactionId/status')
async getTransactionStatus(
@Param('transactionId') transactionId: string
): Promise<SagaTransactionStatus> {
const transaction = await this.sagaStateRepository.findById(transactionId);
const compensationLogs = await this.compensationLogRepository.findByTransactionId(transactionId);
return {
transaction,
compensationLogs,
canRetry: transaction.status === 'FAILED',
canCompensate: transaction.status === 'FAILED' && transaction.completedSteps > 0
};
}
}
7. 구체적 구현 단계
7.1 개요: 분산 트랜잭션 시스템 구축 로드맵
DTx 플랫폼에서 Saga Pattern 기반 분산 트랜잭션을 구현하기 위한 6단계 실행 계획입니다.
7.2 단계 1: 인프라 설정 및 배포
1.1 Terraform 모듈 배포
# Dev 환경 분산 트랜잭션 인프라 배포
cd infrastructure/terragrunt/dev/distributed-transactions
terragrunt init
terragrunt plan
terragrunt apply
terragrunt output
1.2 배포 검증
# BigQuery 데이터셋 확인
bq ls --project_id=dta-cloud-de-dev distributed_transactions
# Pub/Sub 토픽 확인
gcloud pubsub topics list --filter="name:saga-" --project=dta-cloud-de-dev
# 모니터링 대시보드 확인
echo "Dashboard: $(terragrunt output monitoring_dashboard_url)"
7.3 단계 2: 핵심 서비스 구현
2.1 필수 패키지 설치
cd apps/dta-wide-api
# 분산 트랜잭션 관련 패키지 설치
npm install uuid @google-cloud/bigquery @google-cloud/pubsub
npm install @nestjs/schedule @nestjs/cqrs
# 타입 정의
npm install -D @types/uuid
2.2 모듈 구조 생성
# 분산 트랜잭션 모듈 구조 생성
mkdir -p src/distributed-transactions/{sagas,outbox,compensation}
mkdir -p src/distributed-transactions/{repositories,services,dto}
# 기본 파일들 생성
touch src/distributed-transactions/distributed-transactions.module.ts
touch src/distributed-transactions/sagas/saga-orchestrator.service.ts
touch src/distributed-transactions/outbox/outbox.service.ts
2.3 기본 모듈 설정
// src/distributed-transactions/distributed-transactions.module.ts
@Module({
imports: [
ScheduleModule.forRoot(),
CqrsModule
],
providers: [
SagaOrchestrator,
SagaStepExecutor,
SagaStateRepository,
OutboxService,
OutboxPublisherScheduler,
CompensationHandler,
BigQueryService,
PubSubService
],
exports: [
SagaOrchestrator,
OutboxService
]
})
export class DistributedTransactionsModule {}
7.4 단계 3: Saga 구현
3.1 사용자 등록 Saga 구현
실제 비즈니스 로직에 Saga 패턴을 적용합니다:
// src/sagas/user-registration.saga.ts
@Injectable()
export class UserRegistrationSaga {
async execute(data: UserRegistrationDto): Promise<string> {
const sagaSteps: SagaStep[] = [
{
name: 'create_user',
handler: async () => {
return await this.userService.createUser(data);
},
compensation: async (user: User) => {
await this.userService.deleteUser(user.id);
}
},
// 추가 단계들...
];
const result = await this.sagaOrchestrator.executeSaga(
'user_registration',
data,
sagaSteps
);
return result.transactionId;
}
}
3.2 컨트롤러 통합
// src/auth/auth.controller.ts
@Controller('auth')
export class AuthController {
constructor(
private readonly userRegistrationSaga: UserRegistrationSaga
) {}
@Post('register')
async register(@Body() dto: UserRegistrationDto): Promise<{ transactionId: string }> {
const transactionId = await this.userRegistrationSaga.execute(dto);
return {
transactionId,
message: 'User registration initiated'
};
}
}
7.5 단계 4: Outbox Pattern 구현
4.1 비즈니스 로직에 Outbox 적용
// src/users/user.service.ts
@Injectable()
export class UserService {
async createUser(data: CreateUserDto): Promise<User> {
return await this.prisma.$transaction(async (tx) => {
// 1. 사용자 생성
const user = await tx.user.create({ data });
// 2. Outbox 이벤트 생성 (같은 트랜잭션 내)
await this.outboxService.createEvent({
transactionId: data.sagaTransactionId,
eventType: 'user.created',
aggregateId: user.id,
payload: { userId: user.id, email: user.email }
});
return user;
});
}
}
4.2 이벤트 발행 스케줄러 활성화
// src/app.module.ts
@Module({
imports: [
DistributedTransactionsModule,
ScheduleModule.forRoot(),
// 기타 모듈들
]
})
export class AppModule {}
7.6 단계 5: 모니터링 및 검증
5.1 기본 분산 트랜잭션 테스트
# 사용자 등록 API 호출
curl -X POST "https://dta-wide-api-dev.run.app/auth/register" \
-H "Content-Type: application/json" \
-d '{
"email": "test@example.com",
"accessCode": "TEST123",
"language": "ko"
}'
5.2 트랜잭션 상태 확인
# BigQuery에서 Saga 상태 확인
bq query --use_legacy_sql=false --project_id=dta-cloud-de-dev '
SELECT
transaction_id,
saga_type,
status,
current_step,
completed_steps,
total_steps,
created_at
FROM `dta-cloud-de-dev.distributed_transactions.saga_transactions`
WHERE created_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
ORDER BY created_at DESC'
5.3 Outbox 이벤트 확인
# Outbox 이벤트 발행 상태 확인
bq query --use_legacy_sql=false --project_id=dta-cloud-de-dev '
SELECT
event_type,
status,
COUNT(*) as count,
AVG(retry_count) as avg_retries
FROM `dta-cloud-de-dev.distributed_transactions.outbox_events`
WHERE created_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
GROUP BY event_type, status
ORDER BY event_type, status'
7.7 단계 6: 실패 시나리오 테스트
6.1 의도적 실패 유발
// 테스트용 실패 Saga
@Injectable()
export class TestFailureSaga {
async execute(): Promise<string> {
const sagaSteps: SagaStep[] = [
{
name: 'step_1_success',
handler: async () => ({ result: 'success' }),
compensation: async () => console.log('Compensating step 1')
},
{
name: 'step_2_fail',
handler: async () => {
throw new Error('Intentional failure for testing');
},
compensation: async () => console.log('Compensating step 2')
}
];
return await this.sagaOrchestrator.executeSaga('test_failure', {}, sagaSteps);
}
}
6.2 보상 트랜잭션 확인
# 보상 트랜잭션 로그 확인
bq query --use_legacy_sql=false --project_id=dta-cloud-de-dev '
SELECT
compensation_id,
transaction_id,
compensation_type,
status,
execution_details,
executed_at
FROM `dta-cloud-de-dev.distributed_transactions.compensation_logs`
WHERE executed_at > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
ORDER BY executed_at DESC'
8. 모니터링 시스템
8.1 핵심 메트릭
분산 트랜잭션 SLI/SLO
-- 트랜잭션 성공률 SLI
WITH transaction_metrics AS (
SELECT
TIMESTAMP_TRUNC(created_at, HOUR) as time_window,
COUNT(*) as total_transactions,
COUNTIF(status = 'COMPLETED') as successful_transactions,
COUNTIF(status = 'FAILED') as failed_transactions,
COUNTIF(status = 'COMPENSATED') as compensated_transactions
FROM `{PROJECT_ID}.distributed_transactions.saga_transactions`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
AND environment = 'prod'
GROUP BY time_window
)
SELECT
time_window,
total_transactions,
successful_transactions,
SAFE_DIVIDE(successful_transactions, total_transactions) * 100 as success_rate_percent,
failed_transactions,
compensated_transactions
FROM transaction_metrics
ORDER BY time_window DESC;
트랜잭션 지속 시간 분석
-- P95 트랜잭션 처리 시간
SELECT
saga_type,
PERCENTILE_CONT(
TIMESTAMP_DIFF(COALESCE(completed_at, updated_at), created_at, SECOND), 0.95
) OVER (PARTITION BY saga_type) as p95_duration_seconds,
PERCENTILE_CONT(
TIMESTAMP_DIFF(COALESCE(completed_at, updated_at), created_at, SECOND), 0.50
) OVER (PARTITION BY saga_type) as p50_duration_seconds,
COUNT(*) as transaction_count
FROM `{PROJECT_ID}.distributed_transactions.saga_transactions`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
AND status IN ('COMPLETED', 'FAILED', 'COMPENSATED')
GROUP BY saga_type
ORDER BY p95_duration_seconds DESC;
8.2 대시보드 구성
분산 트랜잭션 개요 대시보드
{
"displayName": "분산 트랜잭션 운영 대시보드 ({ENVIRONMENT})",
"mosaicLayout": {
"tiles": [
{
"width": 6,
"height": 4,
"widget": {
"title": "트랜잭션 성공률 (시간별)",
"xyChart": {
"dataSets": [
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "resource.type=\"bigquery_dataset\"",
"aggregation": {
"alignmentPeriod": "3600s",
"perSeriesAligner": "ALIGN_RATE",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": ["resource.labels.environment"]
}
}
}
}
],
"thresholds": [
{
"value": 0.999,
"color": "RED",
"direction": "BELOW",
"label": "SLO Target (99.9%)"
}
]
}
}
},
{
"width": 6,
"height": 4,
"xPos": 6,
"widget": {
"title": "평균 트랜잭션 처리 시간",
"xyChart": {
"dataSets": [
{
"timeSeriesQuery": {
"timeSeriesFilter": {
"filter": "metric.type=\"custom.googleapis.com/saga_transactions/duration\"",
"aggregation": {
"alignmentPeriod": "300s",
"perSeriesAligner": "ALIGN_MEAN",
"crossSeriesReducer": "REDUCE_MEAN",
"groupByFields": ["resource.labels.saga_type"]
}
}
}
}
]
}
}
}
]
}
}
8.3 알림 정책
트랜잭션 실패율 알림
// src/monitoring/saga-monitoring.service.ts
@Injectable()
export class SagaMonitoringService {
@Cron('*/5 * * * *') // 5분마다 실행
async checkTransactionHealth(): Promise<void> {
const metrics = await this.calculateSagaMetrics();
if (metrics.failureRate > 0.01) { // 1% 초과
await this.alertService.sendAlert({
severity: 'WARNING',
title: '분산 트랜잭션 실패율 증가',
description: `현재 실패율: ${(metrics.failureRate * 100).toFixed(2)}%`,
metrics
});
}
if (metrics.avgDurationMinutes > 30) { // 30분 초과
await this.alertService.sendAlert({
severity: 'CRITICAL',
title: '분산 트랜잭션 처리 시간 과다',
description: `평균 처리 시간: ${metrics.avgDurationMinutes}분`,
metrics
});
}
}
private async calculateSagaMetrics(): Promise<SagaMetrics> {
const query = `
SELECT
COUNT(*) as total,
COUNTIF(status = 'FAILED') as failed,
AVG(TIMESTAMP_DIFF(COALESCE(completed_at, updated_at), created_at, SECOND)) / 60 as avg_duration_minutes
FROM \`${process.env.PROJECT_ID}.distributed_transactions.saga_transactions\`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
`;
const [rows] = await this.bigQueryService.query(query);
const result = rows[0];
return {
total: result.total,
failed: result.failed,
failureRate: result.total > 0 ? result.failed / result.total : 0,
avgDurationMinutes: result.avg_duration_minutes || 0
};
}
}
9. 테스트 전략
9.1 단위 테스트
Saga 단계 테스트
// src/sagas/__tests__/user-registration.saga.spec.ts
describe('UserRegistrationSaga', () => {
let saga: UserRegistrationSaga;
let mockUserService: jest.Mocked<UserService>;
let mockSagaOrchestrator: jest.Mocked<SagaOrchestrator>;
beforeEach(() => {
// Mock 설정
});
describe('execute', () => {
it('should complete user registration successfully', async () => {
// Given
const registrationData = {
email: 'test@example.com',
accessCode: 'TEST123'
};
mockSagaOrchestrator.executeSaga.mockResolvedValue({
status: 'COMPLETED',
transactionId: 'test-transaction-id'
});
// When
const result = await saga.execute(registrationData);
// Then
expect(result).toBe('test-transaction-id');
expect(mockSagaOrchestrator.executeSaga).toHaveBeenCalledWith(
'user_registration',
registrationData,
expect.arrayContaining([
expect.objectContaining({ name: 'create_user' }),
expect.objectContaining({ name: 'initialize_profile' })
])
);
});
it('should handle saga execution failure', async () => {
// Given
const registrationData = { email: 'test@example.com' };
mockSagaOrchestrator.executeSaga.mockRejectedValue(
new SagaExecutionException('Step failed')
);
// When & Then
await expect(saga.execute(registrationData)).rejects.toThrow(SagaExecutionException);
});
});
});
9.2 통합 테스트
완전한 Saga 플로우 테스트
// src/sagas/__tests__/integration/saga-integration.spec.ts
describe('Saga Integration Tests', () => {
let app: INestApplication;
let bigQueryService: BigQueryService;
beforeAll(async () => {
// 테스트 애플리케이션 설정
const module = await Test.createTestingModule({
imports: [AppModule]
}).compile();
app = module.createNestApplication();
await app.init();
});
describe('User Registration Saga', () => {
it('should complete full user registration flow', async () => {
// Given
const registrationData = {
email: 'integration-test@example.com',
accessCode: 'INT-TEST-123'
};
// When
const response = await request(app.getHttpServer())
.post('/auth/register')
.send(registrationData)
.expect(201);
const transactionId = response.body.transactionId;
// Wait for saga completion
await new Promise(resolve => setTimeout(resolve, 5000));
// Then
const sagaState = await bigQueryService.query(`
SELECT status, completed_steps, total_steps
FROM \`${process.env.PROJECT_ID}.distributed_transactions.saga_transactions\`
WHERE transaction_id = @transactionId
`, { transactionId });
expect(sagaState[0]).toEqual(
expect.objectContaining({
status: 'COMPLETED',
completed_steps: 4,
total_steps: 4
})
);
});
});
});
9.3 부하 테스트
동시 트랜잭션 처리 테스트
// src/sagas/__tests__/load/saga-load.spec.ts
describe('Saga Load Tests', () => {
it('should handle concurrent saga executions', async () => {
const concurrentRequests = 50;
const promises = [];
for (let i = 0; i < concurrentRequests; i++) {
promises.push(
request(app.getHttpServer())
.post('/auth/register')
.send({
email: `load-test-${i}@example.com`,
accessCode: `LOAD-${i}`
})
);
}
const results = await Promise.allSettled(promises);
const successful = results.filter(r => r.status === 'fulfilled').length;
expect(successful).toBeGreaterThan(concurrentRequests * 0.95); // 95% 성공률
});
});
10. 문제 해결
10.1 일반적인 문제
문제 1: Saga 트랜잭션이 "IN_PROGRESS"에서 멈춤
증상:
- 트랜잭션이 몇 시간째 "IN_PROGRESS" 상태
- 마지막 단계에서 진행되지 않음
진단:
# 멈춘 트랜잭션 조회
bq query --use_legacy_sql=false '
SELECT
transaction_id,
saga_type,
current_step,
completed_steps,
total_steps,
TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), updated_at, MINUTE) as stuck_minutes
FROM `{PROJECT_ID}.distributed_transactions.saga_transactions`
WHERE status = "IN_PROGRESS"
AND updated_at < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 MINUTE)
ORDER BY stuck_minutes DESC'
해결책:
- 애플리케이션 로그 확인: 해당 단계 실행 오류 확인
- Pub/Sub 메시지 적체: 구독자 상태 및 메시지 처리 확인
- 수동 재시도: 관리자 인터페이스에서 수동 재시작
- 타임아웃 처리: 자동 타임아웃 설정 확인
문제 2: Outbox 이벤트 발행 실패
증상:
- Outbox 테이블에 "PENDING" 상태 이벤트가 계속 쌓임
- 재시도 횟수가 최대치에 도달
진단:
# 실패한 Outbox 이벤트 조회
bq query --use_legacy_sql=false '
SELECT
event_type,
COUNT(*) as failed_count,
AVG(retry_count) as avg_retries,
STRING_AGG(DISTINCT SUBSTR(JSON_EXTRACT(payload, "$.error"), 1, 100)) as error_samples
FROM `{PROJECT_ID}.distributed_transactions.outbox_events`
WHERE status = "PENDING"
AND retry_count >= 3
GROUP BY event_type
ORDER BY failed_count DESC'
해결책:
- Pub/Sub 토픽 상태: 토픽 존재 여부 및 권한 확인
- 네트워크 연결: Pub/Sub 서비스 연결 상태 확인
- 메시지 형식: 페이로드 유효성 검사
- 수동 재발행: 스크립트를 통한 수동 이벤트 발행
문제 3: 보상 트랜잭션 실패
증상:
- 원본 트랜잭션은 실패했지만 보상 트랜잭션도 실패
- 데이터 불일치 상태 발생
진단:
# 보상 실패 패턴 분석
bq query --use_legacy_sql=false '
SELECT
c.compensation_type,
COUNT(*) as total_compensations,
COUNTIF(c.status = "FAILED") as failed_compensations,
SAFE_DIVIDE(COUNTIF(c.status = "FAILED"), COUNT(*)) * 100 as failure_rate_percent
FROM `{PROJECT_ID}.distributed_transactions.compensation_logs` c
WHERE c.executed_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
GROUP BY c.compensation_type
HAVING failure_rate_percent > 5
ORDER BY failure_rate_percent DESC'
해결책:
- 멱등성 확인: 보상 로직이 멱등한지 검증
- 수동 데이터 정리: DBA와 협력하여 수동 데이터 정합성 복구
- 보상 로직 개선: 더 견고한 보상 알고리즘 구현
- 알림 설정: 보상 실패 시 즉시 알림 설정
10.2 성능 최적화
BigQuery 쿼리 최적화
-- 파티션 필터 필수 사용
SELECT *
FROM `{PROJECT_ID}.distributed_transactions.saga_transactions`
WHERE DATE(created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) -- 파티션 필터
AND environment = 'prod' -- 클러스터 필터
AND status = 'FAILED'
ORDER BY created_at DESC
LIMIT 100;
Outbox 배치 처리 최적화
@Injectable()
export class OptimizedOutboxPublisher {
async publishEventsBatch(): Promise<void> {
const batchSize = parseInt(process.env.OUTBOX_BATCH_SIZE || '50');
const events = await this.findPendingEvents(batchSize);
// 이벤트 타입별로 그룹화하여 배치 처리
const eventsByType = groupBy(events, 'eventType');
for (const [eventType, typeEvents] of Object.entries(eventsByType)) {
await this.publishEventsBatchByType(eventType, typeEvents);
}
}
}
11. 운영 가이드
11.1 일일 운영 체크리스트
#!/bin/bash
# scripts/daily-saga-check.sh
echo "=== 분산 트랜잭션 일일 점검 ==="
echo "점검 시간: $(date)"
# 1. 멈춘 트랜잭션 확인
echo "📊 멈춘 트랜잭션 조회"
bq query --use_legacy_sql=false --format=table '
SELECT
COUNT(*) as stuck_transactions,
STRING_AGG(DISTINCT saga_type) as affected_saga_types
FROM `'$PROJECT_ID'.distributed_transactions.saga_transactions`
WHERE status IN ("STARTED", "IN_PROGRESS")
AND updated_at < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)'
# 2. 실패율 확인
echo "🚨 24시간 실패율 확인"
bq query --use_legacy_sql=false --format=table '
SELECT
saga_type,
COUNT(*) as total,
COUNTIF(status = "FAILED") as failed,
ROUND(SAFE_DIVIDE(COUNTIF(status = "FAILED"), COUNT(*)) * 100, 2) as failure_rate_percent
FROM `'$PROJECT_ID'.distributed_transactions.saga_transactions`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY saga_type
ORDER BY failure_rate_percent DESC'
# 3. Outbox 적체 확인
echo "📬 Outbox 이벤트 적체 확인"
bq query --use_legacy_sql=false --format=table '
SELECT
status,
COUNT(*) as event_count,
AVG(retry_count) as avg_retries
FROM `'$PROJECT_ID'.distributed_transactions.outbox_events`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY status'
echo "✅ 일일 점검 완료"
11.2 주간 성능 분석
-- weekly-saga-performance.sql
-- 주간 분산 트랜잭션 성능 분석
-- 1. Saga 타입별 성능 요약
WITH weekly_performance AS (
SELECT
saga_type,
COUNT(*) as total_transactions,
COUNTIF(status = 'COMPLETED') as successful,
COUNTIF(status = 'FAILED') as failed,
COUNTIF(status = 'COMPENSATED') as compensated,
AVG(completed_steps) as avg_completed_steps,
AVG(TIMESTAMP_DIFF(COALESCE(completed_at, updated_at), created_at, SECOND)) as avg_duration_seconds
FROM `{PROJECT_ID}.distributed_transactions.saga_transactions`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
GROUP BY saga_type
)
SELECT
saga_type,
total_transactions,
ROUND(SAFE_DIVIDE(successful, total_transactions) * 100, 2) as success_rate_percent,
ROUND(SAFE_DIVIDE(failed, total_transactions) * 100, 2) as failure_rate_percent,
ROUND(avg_completed_steps, 1) as avg_completed_steps,
ROUND(avg_duration_seconds / 60, 1) as avg_duration_minutes
FROM weekly_performance
ORDER BY total_transactions DESC;
11.3 월간 최적화 검토
#!/bin/bash
# scripts/monthly-saga-optimization.sh
echo "=== 분산 트랜잭션 월간 최적화 검토 ==="
# 환경별 비용 분석
echo "💰 환경별 트랜잭션 볼륨 및 비용 분석"
bq query --use_legacy_sql=false '
SELECT
environment,
COUNT(*) as monthly_transactions,
COUNT(DISTINCT DATE(created_at)) as active_days,
ROUND(COUNT(*) / COUNT(DISTINCT DATE(created_at)), 0) as avg_daily_transactions,
ROUND(AVG(total_steps), 1) as avg_saga_complexity
FROM `'$PROJECT_ID'.distributed_transactions.saga_transactions`
WHERE created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
GROUP BY environment
ORDER BY monthly_transactions DESC'
echo
echo "📊 최적화 권장사항:"
echo " 1. 높은 실패율 Saga 패턴 재검토"
echo " 2. 장기 실행 트랜잭션 타임아웃 조정"
echo " 3. Outbox 배치 크기 최적화"
echo " 4. 보상 트랜잭션 성능 개선"
12. 다음 단계
12.1 단기 개선 사항 (1-2주)
-
성능 최적화
- Outbox 배치 처리 크기 조정
- BigQuery 쿼리 성능 최적화
- Saga 타임아웃 값 환경별 세부 조정
-
모니터링 강화
- 실시간 성능 대시보드 구축
- 자동 복구 메커니즘 구현
- 상세한 알림 정책 설정
-
개발자 경험 개선
- Saga 개발 가이드라인 수립
- 디버깅 도구 개발
- 테스트 헬퍼 함수 구현
12.2 중기 계획 (1-2개월)
-
고급 패턴 구현
- Saga 병렬 실행 지원
- 동적 Saga 구성
- 조건부 단계 실행
-
규제 준수 강화
- 의료 기기 소프트웨어 감사 추적
- 분산 트랜잭션 거버넌스 정책
- 개인정보 보호 강화
-
운영 자동화
- 자동 복구 시스템
- 예측적 장애 감지
- 용량 계획 자동화
12.3 장기 비전 (3-6개월)
-
AI/ML 기반 최적화
- Saga 실행 패턴 분석
- 자동 성능 튜닝
- 지능형 보상 전략
-
다중 클라우드 지원
- 클라우드 간 분산 트랜잭션
- 하이브리드 환경 지원
- 재해 복구 시나리오
-
플랫폼 확장
- 다른 팀/프로젝트 지원
- Saga-as-a-Service 제공
- 표준화된 분산 트랜잭션 플랫폼
📚 관련 문서
문서 버전: 1.0.0
최종 업데이트: 2025-01-22
문서 승인: DTA-Wide 플랫폼팀
다음 검토 예정: 2025-02-22
변경 이력
| 버전 | 날짜 | 작성자 | 변경 내용 |
|---|---|---|---|
| 0.1.0 | 2025-08-13 | bok@weltcorp.com | 최초 작성 |