TimeMachine 이벤트 관리 구현 가이드
개요
본 문서는 TimeMachine 도메인의 이벤트 관리 시스템 구현에 대한 상세 가이드를 제공합니다. 시간 변경, 데이터 롤백 등 TimeMachine 내 주요 상태 변화 시 발생하는 이벤트의 발행, 구독, 처리 및 모니터링 방법을 설명합니다.
이 시스템은 중앙화된 분산 이벤트 시스템 아키텍처 (/architecture/event-system.md) 가이드라인을 따릅니다.
⚠️ 중요: 중복 처리 방지, 멱등성 보장, 표준 이벤트 구조 등 공통 가이드라인은 [
/architecture/event-system.md)를 참조하세요.
아키텍처 개요
TimeMachine 이벤트 관리 시스템은 중앙 EventBridgeService와 GCP Pub/Sub을 활용하여 구현됩니다.
주요 컴포넌트
- Event Publisher 서비스 (예:
UserTimeEventPublisher,RollbackEventPublisher): 도메인 로직에 의해 호출되어, 표준 형식의 이벤트 객체를 생성하고 로컬EventEmitter의'event'채널로 발행합니다. (GCP Pub/Sub 직접 호출 안 함) - EventBridgeService (외부/코어): 로컬
'event'채널을 구독하여 모든 이벤트를 중앙 GCP Pub/Sub 토픽 (events)으로 전달합니다. - Event Repository (
EventHistoryRepository): 발행 시도된 이벤트의 이력 및 상태(성공, 실패, 재시도 등)를 데이터베이스에 저장하고 조회합니다. - Event Subscribers (예:
EventsController내 로직,@OnEvent핸들러): 다른 서비스 또는 TimeMachine 내부에서 중앙 토픽으로부터 Push된 이벤트를 수신 컨트롤러를 통해 받아 로컬 이벤트로 변환하거나, 특정 로컬 이벤트를@OnEvent데코레이터로 구독하여 처리합니다. - Event Retry Service: 실패한 이벤트의 재시도를 관리합니다. (필요시 구현)
- Admin API: 이벤트 이력 조회 및 실패한 이벤트 재시도 관리 기능을 제공합니다. (필요시 구현)
TimeMachine 특화 이벤트 타입
1. 시간 변경 이벤트
interface TimeChangedEvent {
type: 'time-machine.time.changed';
eventId: string;
timestamp: string;
source: 'time-machine-service';
payload: {
userId?: string; // 사용자별 시간 변경인 경우
previousTime: string; // 이전 시간 (ISO format)
newTime: string; // 새로운 시간 (ISO format)
timeType: 'GLOBAL' | 'USER_SPECIFIC';
reason?: string; // 변경 사유
requestedBy: string; // 요청자 ID
};
}
2. 롤백 관련 이벤트
interface RollbackStartedEvent {
type: 'time-machine.rollback.started';
eventId: string;
timestamp: string;
source: 'time-machine-service';
payload: {
rollbackId: string;
targetTime: string; // 롤백 목표 시간
affectedDomains: string[]; // 영향받는 도메인 목록
reason: string;
requestedBy: string;
};
}
interface RollbackCompletedEvent {
type: 'time-machine.rollback.completed';
eventId: string;
timestamp: string;
source: 'time-machine-service';
payload: {
rollbackId: string;
completedAt: string;
status: 'SUCCESS' | 'PARTIAL_SUCCESS' | 'FAILED';
domainResults: Array<{
domain: string;
status: 'SUCCESS' | 'FAILED';
errorMessage?: string;
}>;
};
}
구현 가이드
1. TimeMachine 이벤트 발행자 구현
// 파일 경로: libs/feature/time-machine/src/lib/application/services/user-time-event-publisher.service.ts
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { v4 as uuidv4 } from 'uuid';
import * as os from 'os';
import { LoggerService } from '@core/logging';
@Injectable()
export class UserTimeEventPublisher {
constructor(
private readonly eventEmitter: EventEmitter2,
private readonly logger: LoggerService,
) {
this.logger.setContext(UserTimeEventPublisher.name);
}
async publishTimeChanged(
userId: string,
previousTime: Date,
newTime: Date,
reason?: string,
requestedBy?: string,
): Promise<void> {
const event = {
type: 'time-machine.time.changed',
eventId: uuidv4(),
timestamp: new Date().toISOString(),
source: 'time-machine-service',
containerId: os.hostname(),
payload: {
userId,
previousTime: previousTime.toISOString(),
newTime: newTime.toISOString(),
timeType: 'USER_SPECIFIC' as const,
reason,
requestedBy,
},
};
this.logger.debug('Publishing time changed event', {
eventId: event.eventId,
userId,
newTime: event.payload.newTime
});
// 로컬 'event' 채널로 발행 - EventBridgeService가 Pub/Sub으로 전달
this.eventEmitter.emit('event', event);
}
async publishRollbackStarted(
rollbackId: string,
targetTime: Date,
affectedDomains: string[],
reason: string,
requestedBy: string,
): Promise<void> {
const event = {
type: 'time-machine.rollback.started',
eventId: uuidv4(),
timestamp: new Date().toISOString(),
source: 'time-machine-service',
containerId: os.hostname(),
payload: {
rollbackId,
targetTime: targetTime.toISOString(),
affectedDomains,
reason,
requestedBy,
},
};
this.logger.log('Publishing rollback started event', {
eventId: event.eventId,
rollbackId,
affectedDomains
});
this.eventEmitter.emit('event', event);
}
// 기타 TimeMachine 이벤트 발행 메서드들...
}
2. 이벤트 저장소 구현 (EventHistoryRepository)
📋 참고: 중복 처리 방지 로직은 (
/architecture/event-system.md)를 참조하세요.
// 파일 경로: libs/feature/time-machine/src/lib/infrastructure/repositories/event-history.repository.ts
import { Injectable } from '@nestjs/common';
import { PrismaService } from '@core/database';
import { Prisma, TimeChangeEvent as PrismaTimeChangeEvent } from '@prisma/client';
import { LoggerService } from '@core/logging';
// TimeMachine 특화 EventHistory 타입
export interface TimeMachineEventHistory {
id: string;
eventId: string;
eventType: string;
payload: any;
status: string; // 'PENDING', 'PUBLISHED', 'DELIVERY_FAILED', 'PROCESSED', 'PROCESSING_FAILED', 'RETRY_PENDING'
publishedAt?: Date;
processedAt?: Date;
errorMessage?: string;
retryCount?: number;
lastRetryAt?: Date;
messageId?: string;
createdAt: Date;
// TimeMachine 특화 필드들
rollbackId?: string; // 롤백 관련 이벤트인 경우
affectedUserId?: string; // 사용자별 시간 변경인 경우
}
@Injectable()
export class EventHistoryRepository {
constructor(
private readonly prisma: PrismaService,
private readonly logger: LoggerService,
) {
this.logger.setContext(EventHistoryRepository.name);
}
async createEventHistory(data: Partial<TimeMachineEventHistory>): Promise<PrismaTimeChangeEvent> {
this.logger.debug('Creating TimeMachine event history', {
eventId: data.eventId,
eventType: data.eventType
});
const createInput: Prisma.TimeChangeEventCreateInput = {
eventId: data.eventId,
eventType: data.eventType,
status: data.status || 'PENDING',
payload: data.payload || Prisma.DbNull,
rollbackId: data.rollbackId,
affectedUserId: data.affectedUserId,
// ... 기타 필드 매핑
};
return this.prisma.timeChangeEvent.create({ data: createInput });
}
async findByRollbackId(rollbackId: string): Promise<PrismaTimeChangeEvent[]> {
this.logger.debug('Finding events by rollback ID', { rollbackId });
return this.prisma.timeChangeEvent.findMany({
where: { rollbackId },
orderBy: { createdAt: 'asc' },
});
}
async findByUserId(userId: string): Promise<PrismaTimeChangeEvent[]> {
this.logger.debug('Finding events by user ID', { userId });
return this.prisma.timeChangeEvent.findMany({
where: { affectedUserId: userId },
orderBy: { createdAt: 'desc' },
take: 50, // 최근 50개만
});
}
// 기본 CRUD 메서드들은 공통 구현과 동일...
async updateEventHistory(eventId: string, data: Partial<TimeMachineEventHistory>): Promise<PrismaTimeChangeEvent | null> {
// 구현 생략 (이전과 동일)
}
async findById(eventId: string): Promise<PrismaTimeChangeEvent | null> {
// 구현 생략 (이전과 동일)
}
}
3. TimeMachine 이벤트 핸들러 구현
// 파일 경로: libs/feature/time-machine/src/lib/application/handlers/time-change.handler.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { LoggerService } from '@core/logging';
@Injectable()
export class TimeChangeEventHandler {
constructor(private readonly logger: LoggerService) {
this.logger.setContext(TimeChangeEventHandler.name);
}
@OnEvent('time-machine.time.changed')
async handleTimeChanged(payload: any): Promise<void> {
try {
const { userId, previousTime, newTime, timeType } = payload;
this.logger.log('Processing time changed event', {
userId,
timeType,
newTime
});
// TimeMachine 내부 로직 처리
// 예: 캐시 무효화, 내부 상태 업데이트 등
if (timeType === 'USER_SPECIFIC') {
await this.handleUserSpecificTimeChange(userId, newTime);
} else {
await this.handleGlobalTimeChange(newTime);
}
} catch (error) {
this.logger.error('Failed to handle time changed event', error, { payload });
}
}
@OnEvent('time-machine.rollback.started')
async handleRollbackStarted(payload: any): Promise<void> {
try {
const { rollbackId, targetTime, affectedDomains } = payload;
this.logger.log('Processing rollback started event', {
rollbackId,
affectedDomains
});
// 롤백 시작 처리 로직
await this.initializeRollbackProcess(rollbackId, targetTime, affectedDomains);
} catch (error) {
this.logger.error('Failed to handle rollback started event', error, { payload });
}
}
private async handleUserSpecificTimeChange(userId: string, newTime: string): Promise<void> {
// 사용자별 시간 변경 처리 로직
this.logger.debug('Handling user-specific time change', { userId, newTime });
}
private async handleGlobalTimeChange(newTime: string): Promise<void> {
// 전역 시간 변경 처리 로직
this.logger.debug('Handling global time change', { newTime });
}
private async initializeRollbackProcess(
rollbackId: string,
targetTime: string,
affectedDomains: string[]
): Promise<void> {
// 롤백 프로세스 초기화 로직
this.logger.debug('Initializing rollback process', { rollbackId, targetTime });
}
}
4. 이벤트 재시도 서비스 구현
// 파일 경로: libs/feature/time-machine/src/lib/application/services/event-retry.service.ts
import { Injectable } from '@nestjs/common';
import { EventHistoryRepository, TimeMachineEventHistory } from '../../infrastructure/repositories/event-history.repository';
import { LoggerService } from '@core/logging';
@Injectable()
export class EventRetryService {
constructor(
private readonly eventHistoryRepository: EventHistoryRepository,
private readonly logger: LoggerService,
) {
this.logger.setContext(EventRetryService.name);
}
async retryFailedRollbackEvents(rollbackId: string, maxRetries: number = 5): Promise<number> {
this.logger.log('Retrying failed events for rollback', { rollbackId });
const rollbackEvents = await this.eventHistoryRepository.findByRollbackId(rollbackId);
const failedEvents = rollbackEvents.filter(event =>
['DELIVERY_FAILED', 'PROCESSING_FAILED'].includes(event.status) &&
(event.retryCount || 0) < maxRetries
);
let successCount = 0;
for (const event of failedEvents) {
try {
await this.retrySingleEvent(event);
successCount++;
} catch (error) {
this.logger.error('Failed to retry event', error, { eventId: event.eventId });
}
}
this.logger.log('Completed rollback event retry', {
rollbackId,
totalAttempts: failedEvents.length,
successCount
});
return successCount;
}
private async retrySingleEvent(eventHistory: TimeMachineEventHistory): Promise<void> {
await this.eventHistoryRepository.updateEventHistory(eventHistory.eventId, {
status: 'RETRY_PENDING',
retryCount: (eventHistory.retryCount || 0) + 1,
lastRetryAt: new Date(),
});
this.logger.debug('Marked TimeMachine event for retry', {
eventId: eventHistory.eventId,
rollbackId: eventHistory.rollbackId
});
}
}
5. 관리자 API 구현
// 파일 경로: apps/dta-wide-api/src/app/time-machine/controllers/event.controller.ts
import { Controller, Get, Post, Query, Param, UseGuards } from '@nestjs/common';
import { AuthGuard } from '@nestjs/passport';
import { RolesGuard, Roles } from '@feature/iam';
import { EventHistoryRepository } from '@feature/time-machine';
import { EventRetryService } from '@feature/time-machine';
@Controller('time-machine/events')
@UseGuards(AuthGuard('jwt'), RolesGuard)
export class TimeMachineEventController {
constructor(
private readonly eventHistoryRepository: EventHistoryRepository,
private readonly eventRetryService: EventRetryService,
) {}
@Get('rollback/:rollbackId')
@Roles('ADMIN', 'TESTER')
async getRollbackEvents(@Param('rollbackId') rollbackId: string) {
const events = await this.eventHistoryRepository.findByRollbackId(rollbackId);
return {
rollbackId,
events: events.map(event => ({
eventId: event.eventId,
eventType: event.eventType,
status: event.status,
retryCount: event.retryCount,
createdAt: event.createdAt.toISOString(),
errorMessage: event.errorMessage,
})),
};
}
@Get('user/:userId')
@Roles('ADMIN', 'TESTER')
async getUserTimeEvents(@Param('userId') userId: string) {
const events = await this.eventHistoryRepository.findByUserId(userId);
return {
userId,
events: events.map(event => ({
eventId: event.eventId,
eventType: event.eventType,
status: event.status,
createdAt: event.createdAt.toISOString(),
payload: event.payload,
})),
};
}
@Post('rollback/:rollbackId/retry')
@Roles('ADMIN', 'TESTER')
async retryRollbackEvents(@Param('rollbackId') rollbackId: string) {
const successCount = await this.eventRetryService.retryFailedRollbackEvents(rollbackId);
return {
rollbackId,
retriedCount: successCount,
timestamp: new Date().toISOString(),
};
}
}
구현 가이드라인
1. TimeMachine 특화 이벤트 설계 원칙
- 시간 컨텍스트 포함: 모든 이벤트에 시간 변경/롤백 관련 컨텍스트 정보 포함
- 롤백 추적성: 롤백 관련 이벤트는
rollbackId로 연관성 추적 - 사용자별 구분: 사용자별 시간 변경과 전역 시간 변경 명확히 구분
- 도메인 간 영향 명시: 다른 도메인에 영향을 주는 이벤트는 영향 범위 명시
2. 이벤트 처리 지침
📋 참고: 멱등성, 트랜잭션 관리, 오류 처리 등 공통 지침은 [
/architecture/event-system.md]를 참조하세요.
- 시간 순서 보장: 시간 변경 이벤트는 발생 순서대로 처리되도록 설계
- 롤백 상태 관리: 롤백 진행 상태를 이벤트를 통해 추적 및 관리
- 데이터 일관성: 시간 변경 시 관련 데이터의 일관성 유지
3. 모니터링 방안
- 시간 변경 이력: 모든 시간 변경 이벤트의 상세 이력 유지
- 롤백 진행 상황: 롤백 프로세스의 단계별 진행 상황 모니터링
- 도메인별 영향도: 시간 변경이 각 도메인에 미치는 영향도 추적
배포 및 운영 가이드
1. GCP Pub/Sub 설정
📋 참고: Pub/Sub 설정은 [
/architecture/event-system.md]의 공통 가이드라인을 따릅니다.
TimeMachine은 중앙 events 토픽을 사용하며, 별도 토픽 생성이 불필요합니다.
2. TimeMachine 특화 성능 최적화
- 롤백 이벤트 배치 처리 최적화
- 시간 이력 데이터 파티셔닝 (날짜별, 사용자별)
- 사용자별 시간 변경 이벤트 인덱싱 최적화
3. 장애 복구 전략
- 롤백 프로세스 중단 시 복구 절차
- 시간 변경 실패 시 rollback 메커니즘
- 부분적 롤백 성공 시 정합성 검증 및 복구
테스트 전략
1. TimeMachine 특화 테스트
- 시간 변경 이벤트 발행 및 처리 테스트
- 롤백 프로세스 전체 플로우 테스트
- 사용자별/전역 시간 변경 시나리오 테스트
2. 통합 테스트
- TimeMachine → 다른 도메인 이벤트 전파 테스트
- 다른 도메인 → TimeMachine 롤백 이벤트 수신 테스트
3. 성능 테스트
- 대량 사용자 동시 시간 변경 처리 성능
- 롤백 시 대량 이벤트 처리 성능
API 명세
TimeMachine 이벤트 관리 API의 상세 명세는 TimeMachine API 문서를 참조하세요.