이벤트 시스템 코어 모듈
개요
이벤트 시스템 코어 모듈은 DTA-WIDE 시스템의 분산 이벤트 처리를 담당하는 핵심 모듈입니다. 이 모듈은 서비스 간 통신, 도메인 간 통합, 비동기 작업 처리 등 다양한 용도로 사용됩니다.
참고: 이벤트 기반 아키텍처의 설계 원칙과 패턴은 이벤트 기반 아키텍처 설계 가이드를 참조하세요.
구현 경로
이벤트 시스템 코어 모듈은 다음 경로에 구현되어 있습니다:
자세한 구현 방법과 프로젝트 구조는 이벤트 시스템 구현 가이드를 참조하세요.
주요 기능
- 도메인 간 이벤트 기반 통신
- Cloud Run 환경에서의 분산 이벤트 처리
- 이벤트 중복 처리 방지 (멱등성 보장)
- 이벤트 필터링 및 라우팅
- 이벤트 기반 워크플로우 실행
- 서비스 간 느슨한 결합(Loose Coupling) 지원
참고: DTA-WIDE는 Cloud Run 기반의 스케일 아웃 아키텍처를 사용하므로, 단일 프로세스 내 이벤트 에미터 대신 중앙화된 Pub/Sub 기반 이벤트 시스템을 사용합니다. 이를 통해 제로 스케일링 환경에서도 안정적인 이벤트 전달을 보장합니다.
아키텍처
모듈 구조
@Module({
imports: [RedisModule, ConfigModule],
controllers: [EventsController],
providers: [EventBridgeService, EventDeDuplicationService],
exports: [EventBridgeService],
})
export class EventsModule {}
주요 컴포넌트
EventsController: 이벤트 수신 엔드포인트EventBridgeService: 로컬 이벤트를 글로벌 이벤트로 변환EventDeDuplicationService: 이벤트 중복 처리 방지EventRegistryService: 도메인별 이벤트 타입 스키마 검증기 등록 관리
설정
환경 변수
EVENT_PUBSUB_PROJECT_ID=your-gcp-project-id
EVENT_PUBSUB_TOPIC=events
EVENT_PUBSUB_SUBSCRIPTION=events-subscription
기본 설정값
- 이벤트 처리 TTL: 86400초 (24시간)
- 최대 재시도 횟수: 5회
- 재시도 지연: 500ms ~ 5000ms (지수 백오프)
사용 가이드
모듈 임포트
import { EventsModule } from '@core/events';
@Module({
imports: [EventsModule],
// ...
})
export class YourModule {}
이벤트 발행
import { UserEventTypes } from '../constants/event-types';
constructor(private readonly eventEmitter: EventEmitter2) {}
async updateUserProfile(userId: number, profileData: UpdateProfileDto): Promise<void> {
// 1. 비즈니스 로직 처리
await this.userRepository.updateProfile(userId, profileData);
// 2. 이벤트 발행
this.eventEmitter.emit('event', {
type: UserEventTypes.PROFILE_UPDATED, // 'user.profile.updated'
payload: {
userId,
updatedFields: Object.keys(profileData)
},
timestamp: new Date().toISOString()
});
}
이벤트 구독
import { Injectable, OnModuleInit } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { UserEventTypes } from '../constants/event-types';
@Injectable()
export class UserEventListener implements OnModuleInit {
constructor(
private readonly eventEmitter: EventEmitter2,
private readonly userService: UserService,
private readonly logger: LoggerService
) {}
onModuleInit() {
// 관심 있는 이벤트 구독
this.eventEmitter.on(UserEventTypes.CREATED, this.handleUserCreated.bind(this));
this.eventEmitter.on(UserEventTypes.PROFILE_UPDATED, this.handleUserProfileUpdated.bind(this));
}
private async handleUserCreated(payload: any) {
try {
this.logger.debug(`Processing ${UserEventTypes.CREATED} event`, { userId: payload.userId });
// 이벤트 처리 로직
} catch (error) {
this.logger.error(`Error handling ${UserEventTypes.CREATED} event`, { error });
}
}
private async handleUserProfileUpdated(payload: any) {
try {
this.logger.debug(`Processing ${UserEventTypes.PROFILE_UPDATED} event`, { userId: payload.userId });
// 이벤트 처리 로직
} catch (error) {
this.logger.error(`Error handling ${UserEventTypes.PROFILE_UPDATED} event`, { error });
}
}
}
도메인별 활용 예시
1. 사용자 도메인 (User Domain)
사용자 정보 변경 시 다른 도메인에 알림을 전달하여 관련 데이터를 업데이트합니다.
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { UserEventTypes } from '../constants/event-types';
@Injectable()
export class UserService {
constructor(
private readonly userRepository: UserRepository,
private readonly eventEmitter: EventEmitter2
) {}
async createUser(userData: CreateUserDto): Promise<UserDto> {
// 1. 사용자 생성
const user = await this.userRepository.create(userData);
// 2. 이벤트 발행
this.eventEmitter.emit('event', {
type: UserEventTypes.CREATED, // 'user.created'
payload: {
userId: user.id,
email: user.email,
displayName: user.displayName,
role: user.role
},
timestamp: new Date().toISOString()
});
return UserDto.fromEntity(user);
}
async updateUser(userId: number, updateData: UpdateUserDto): Promise<void> {
// 1. 사용자 업데이트
await this.userRepository.update(userId, updateData);
// 2. 이벤트 발행
this.eventEmitter.emit('event', {
type: UserEventTypes.UPDATED, // 'user.updated'
payload: {
userId,
updatedFields: Object.keys(updateData)
},
timestamp: new Date().toISOString()
});
}
}
2. 인증 도메인 (Auth Domain)
로그인/로그아웃 이벤트를 발행하여 감사 로그 생성 및 보안 모니터링에 활용합니다.
import { Injectable, UnauthorizedException } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { AuthEventTypes } from '../constants/event-types';
@Injectable()
export class AuthService {
constructor(
private readonly userRepository: UserRepository,
private readonly tokenService: TokenService,
private readonly eventEmitter: EventEmitter2
) {}
async login(credentials: LoginDto): Promise<AuthResponseDto> {
// 1. 사용자 검증
const user = await this.userRepository.findByEmail(credentials.email);
if (!user || !await this.validatePassword(credentials.password, user.password)) {
throw new UnauthorizedException('Invalid credentials');
}
// 2. 토큰 생성
const tokens = await this.tokenService.generateTokens(user);
// 3. 이벤트 발행
this.eventEmitter.emit('event', {
type: AuthEventTypes.LOGIN, // 'auth.user.login'
payload: {
userId: user.id,
email: user.email,
ipAddress: credentials.ipAddress,
userAgent: credentials.userAgent,
deviceId: credentials.deviceId,
sessionId: tokens.sessionId
},
timestamp: new Date().toISOString()
});
return tokens;
}
async logout(userId: number, sessionId: string): Promise<void> {
// 1. 토큰 무효화
await this.tokenService.revokeSession(sessionId);
// 2. 이벤트 발행
this.eventEmitter.emit('event', {
type: AuthEventTypes.LOGOUT, // 'auth.user.logout'
payload: {
userId,
sessionId,
reason: 'USER_INITIATED'
},
timestamp: new Date().toISOString()
});
}
}
3. 감사 도메인 (Audit Domain)
이벤트 리스너를 통해 다양한 시스템 이벤트에 대한 감사 로그를 자동으로 생성합니다.
import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { UserEventTypes } from '@user/constants/event-types';
import { AuthEventTypes } from '@auth/constants/event-types';
import { IAMEventTypes } from '@iam/constants/event-types';
@Injectable()
export class AuditEventListener implements OnModuleInit {
constructor(
private readonly eventEmitter: EventEmitter2,
private readonly auditService: AuditService,
private readonly logger: Logger
) {}
onModuleInit() {
// 사용자 관련 이벤트 구독
this.eventEmitter.on(UserEventTypes.CREATED, this.handleUserCreated.bind(this));
this.eventEmitter.on(UserEventTypes.UPDATED, this.handleUserUpdated.bind(this));
this.eventEmitter.on(AuthEventTypes.LOGIN, this.handleUserLogin.bind(this));
this.eventEmitter.on(AuthEventTypes.LOGOUT, this.handleUserLogout.bind(this));
// IAM 관련 이벤트 구독
this.eventEmitter.on(IAMEventTypes.PERMISSION_GRANTED, this.handleIamEvent.bind(this));
this.eventEmitter.on(IAMEventTypes.PERMISSION_REVOKED, this.handleIamEvent.bind(this));
this.logger.log('Audit event listeners initialized');
}
private async handleUserLogin(payload: any) {
try {
// 로그인 감사 로그 생성
await this.auditService.createAuditLog({
eventType: 'USER_LOGIN',
source: 'AUTH_SERVICE',
userId: payload.userId,
sessionId: payload.sessionId,
metadata: {
ipAddress: payload.ipAddress,
userAgent: payload.userAgent,
deviceId: payload.deviceId
}
});
} catch (error) {
this.logger.error('Error creating audit log for user login', { error });
}
}
// 기타 이벤트 핸들러...
}
4. 접근 코드 도메인 (Access Code Domain)
사용자 등록 및 치료 시작 이벤트에 응답하여 접근 코드를 자동으로 생성합니다.
import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { UserEventTypes } from '@user/constants/event-types';
import { TreatmentEventTypes } from '@treatment/constants/event-types';
@Injectable()
export class AccessCodeEventListener implements OnModuleInit {
constructor(
private readonly eventEmitter: EventEmitter2,
private readonly accessCodeService: AccessCodeService,
private readonly logger: Logger
) {}
onModuleInit() {
// 이벤트 구독 설정
this.eventEmitter.on(UserEventTypes.CREATED, this.handleUserCreated.bind(this));
this.eventEmitter.on(TreatmentEventTypes.STARTED, this.handleTreatmentStarted.bind(this));
this.eventEmitter.on(TreatmentEventTypes.ENDED, this.handleTreatmentEnded.bind(this));
this.logger.log('AccessCode event listeners initialized');
}
private async handleUserCreated(payload: any) {
try {
// 새 사용자를 위한 접근 코드 생성
await this.accessCodeService.generateInitialAccessCode(
payload.userId,
payload.role
);
} catch (error) {
this.logger.error(`Error handling ${UserEventTypes.CREATED} event`, { error });
}
}
private async handleTreatmentStarted(payload: any) {
try {
// 치료 시작 시 접근 코드 상태 업데이트
await this.accessCodeService.handleTreatmentStart(
payload.userId,
payload.treatmentId,
payload.treatmentPeriod
);
} catch (error) {
this.logger.error(`Error handling ${TreatmentEventTypes.STARTED} event`, { error });
}
}
private async handleTreatmentEnded(payload: any) {
try {
// 치료 종료 시 접근 코드 상태 업데이트
await this.accessCodeService.handleTreatmentEnd(
payload.userId,
payload.treatmentId
);
} catch (error) {
this.logger.error(`Error handling ${TreatmentEventTypes.ENDED} event`, { error });
}
}
}
보안
접근 제어
- Pub/Sub IAM 권한 제한
- API 엔드포인트 인증
- 이벤트 발행자 검증
데이터 보호
- 이벤트 페이로드 암호화 (필요 시)
- 민감 정보 필터링
- 이벤트 접근 로깅
모니터링
메트릭
- 이벤트 발행/수신 건수
- 이벤트 처리 시간
- 실패율 및 재시도 횟수
- 처리 지연 시간
알림 설정
- 이벤트 처리 실패
- 비정상적 지연
- 중복 이벤트 비율 증가
- 데드레터 큐 메시지 발생
에러 처리
주요 에러 유형
- 이벤트 발행 실패
- 이벤트 수신 실패
- 이벤트 처리 타임아웃
- 이벤트 형식 오류
에러 로깅
this.logger.error('Failed to process event', {
module: 'EventsService',
method: 'handleUserCreated',
error: error.message,
eventId: payload.eventId,
eventType: payload.type
});
모범 사례
이벤트 설계
- 명확한 이벤트 네이밍 규칙 적용 (
{domain}.{entity}.{action}형식) - 이벤트에 충분한 컨텍스트 정보 포함
- 이벤트 스키마 버전 관리 고려
- 멱등성을 위한 고유 이벤트 ID 포함
- 각 도메인은 자신의 이벤트 타입을 정의하고 관리
성능 최적화
- 처리 불필요한 이벤트 조기 필터링
- 비용이 높은 작업은 배치 처리 고려
- 처리 시간 모니터링 및 최적화
- 중복 이벤트 효율적 관리
장애 대응
- 재시도 정책 구현
- 데드레터 큐 설정
- 서킷 브레이커 패턴 고려
- 장애 복구 자동화
관련 문서
TBD
변경 이력
| 버전 | 날짜 | 작성자 | 변경 내용 |
|---|---|---|---|
| 0.1.0 | 2025-04-02 | bok@weltcorp.com | 최초 작성 |