본문으로 건너뛰기

이벤트 시스템 코어 모듈

개요

이벤트 시스템 코어 모듈은 DTA-WIDE 시스템의 분산 이벤트 처리를 담당하는 핵심 모듈입니다. 이 모듈은 서비스 간 통신, 도메인 간 통합, 비동기 작업 처리 등 다양한 용도로 사용됩니다.

참고: 이벤트 기반 아키텍처의 설계 원칙과 패턴은 이벤트 기반 아키텍처 설계 가이드를 참조하세요.

구현 경로

이벤트 시스템 코어 모듈은 다음 경로에 구현되어 있습니다:

자세한 구현 방법과 프로젝트 구조는 이벤트 시스템 구현 가이드를 참조하세요.

주요 기능

  • 도메인 간 이벤트 기반 통신
  • Cloud Run 환경에서의 분산 이벤트 처리
  • 이벤트 중복 처리 방지 (멱등성 보장)
  • 이벤트 필터링 및 라우팅
  • 이벤트 기반 워크플로우 실행
  • 서비스 간 느슨한 결합(Loose Coupling) 지원

참고: DTA-WIDE는 Cloud Run 기반의 스케일 아웃 아키텍처를 사용하므로, 단일 프로세스 내 이벤트 에미터 대신 중앙화된 Pub/Sub 기반 이벤트 시스템을 사용합니다. 이를 통해 제로 스케일링 환경에서도 안정적인 이벤트 전달을 보장합니다.

아키텍처

모듈 구조

@Module({
imports: [RedisModule, ConfigModule],
controllers: [EventsController],
providers: [EventBridgeService, EventDeDuplicationService],
exports: [EventBridgeService],
})
export class EventsModule {}

주요 컴포넌트

  • EventsController: 이벤트 수신 엔드포인트
  • EventBridgeService: 로컬 이벤트를 글로벌 이벤트로 변환
  • EventDeDuplicationService: 이벤트 중복 처리 방지
  • EventRegistryService: 도메인별 이벤트 타입 스키마 검증기 등록 관리

설정

환경 변수

EVENT_PUBSUB_PROJECT_ID=your-gcp-project-id
EVENT_PUBSUB_TOPIC=events
EVENT_PUBSUB_SUBSCRIPTION=events-subscription

기본 설정값

  • 이벤트 처리 TTL: 86400초 (24시간)
  • 최대 재시도 횟수: 5회
  • 재시도 지연: 500ms ~ 5000ms (지수 백오프)

사용 가이드

모듈 임포트

import { EventsModule } from '@core/events';

@Module({
imports: [EventsModule],
// ...
})
export class YourModule {}

이벤트 발행

import { UserEventTypes } from '../constants/event-types';

constructor(private readonly eventEmitter: EventEmitter2) {}

async updateUserProfile(userId: number, profileData: UpdateProfileDto): Promise<void> {
// 1. 비즈니스 로직 처리
await this.userRepository.updateProfile(userId, profileData);

// 2. 이벤트 발행
this.eventEmitter.emit('event', {
type: UserEventTypes.PROFILE_UPDATED, // 'user.profile.updated'
payload: {
userId,
updatedFields: Object.keys(profileData)
},
timestamp: new Date().toISOString()
});
}

이벤트 구독

import { Injectable, OnModuleInit } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { UserEventTypes } from '../constants/event-types';

@Injectable()
export class UserEventListener implements OnModuleInit {
constructor(
private readonly eventEmitter: EventEmitter2,
private readonly userService: UserService,
private readonly logger: LoggerService
) {}

onModuleInit() {
// 관심 있는 이벤트 구독
this.eventEmitter.on(UserEventTypes.CREATED, this.handleUserCreated.bind(this));
this.eventEmitter.on(UserEventTypes.PROFILE_UPDATED, this.handleUserProfileUpdated.bind(this));
}

private async handleUserCreated(payload: any) {
try {
this.logger.debug(`Processing ${UserEventTypes.CREATED} event`, { userId: payload.userId });
// 이벤트 처리 로직
} catch (error) {
this.logger.error(`Error handling ${UserEventTypes.CREATED} event`, { error });
}
}

private async handleUserProfileUpdated(payload: any) {
try {
this.logger.debug(`Processing ${UserEventTypes.PROFILE_UPDATED} event`, { userId: payload.userId });
// 이벤트 처리 로직
} catch (error) {
this.logger.error(`Error handling ${UserEventTypes.PROFILE_UPDATED} event`, { error });
}
}
}

도메인별 활용 예시

1. 사용자 도메인 (User Domain)

사용자 정보 변경 시 다른 도메인에 알림을 전달하여 관련 데이터를 업데이트합니다.

import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { UserEventTypes } from '../constants/event-types';

@Injectable()
export class UserService {
constructor(
private readonly userRepository: UserRepository,
private readonly eventEmitter: EventEmitter2
) {}

async createUser(userData: CreateUserDto): Promise<UserDto> {
// 1. 사용자 생성
const user = await this.userRepository.create(userData);

// 2. 이벤트 발행
this.eventEmitter.emit('event', {
type: UserEventTypes.CREATED, // 'user.created'
payload: {
userId: user.id,
email: user.email,
displayName: user.displayName,
role: user.role
},
timestamp: new Date().toISOString()
});

return UserDto.fromEntity(user);
}

async updateUser(userId: number, updateData: UpdateUserDto): Promise<void> {
// 1. 사용자 업데이트
await this.userRepository.update(userId, updateData);

// 2. 이벤트 발행
this.eventEmitter.emit('event', {
type: UserEventTypes.UPDATED, // 'user.updated'
payload: {
userId,
updatedFields: Object.keys(updateData)
},
timestamp: new Date().toISOString()
});
}
}

2. 인증 도메인 (Auth Domain)

로그인/로그아웃 이벤트를 발행하여 감사 로그 생성 및 보안 모니터링에 활용합니다.

import { Injectable, UnauthorizedException } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { AuthEventTypes } from '../constants/event-types';

@Injectable()
export class AuthService {
constructor(
private readonly userRepository: UserRepository,
private readonly tokenService: TokenService,
private readonly eventEmitter: EventEmitter2
) {}

async login(credentials: LoginDto): Promise<AuthResponseDto> {
// 1. 사용자 검증
const user = await this.userRepository.findByEmail(credentials.email);
if (!user || !await this.validatePassword(credentials.password, user.password)) {
throw new UnauthorizedException('Invalid credentials');
}

// 2. 토큰 생성
const tokens = await this.tokenService.generateTokens(user);

// 3. 이벤트 발행
this.eventEmitter.emit('event', {
type: AuthEventTypes.LOGIN, // 'auth.user.login'
payload: {
userId: user.id,
email: user.email,
ipAddress: credentials.ipAddress,
userAgent: credentials.userAgent,
deviceId: credentials.deviceId,
sessionId: tokens.sessionId
},
timestamp: new Date().toISOString()
});

return tokens;
}

async logout(userId: number, sessionId: string): Promise<void> {
// 1. 토큰 무효화
await this.tokenService.revokeSession(sessionId);

// 2. 이벤트 발행
this.eventEmitter.emit('event', {
type: AuthEventTypes.LOGOUT, // 'auth.user.logout'
payload: {
userId,
sessionId,
reason: 'USER_INITIATED'
},
timestamp: new Date().toISOString()
});
}
}

3. 감사 도메인 (Audit Domain)

이벤트 리스너를 통해 다양한 시스템 이벤트에 대한 감사 로그를 자동으로 생성합니다.

import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { UserEventTypes } from '@user/constants/event-types';
import { AuthEventTypes } from '@auth/constants/event-types';
import { IAMEventTypes } from '@iam/constants/event-types';

@Injectable()
export class AuditEventListener implements OnModuleInit {
constructor(
private readonly eventEmitter: EventEmitter2,
private readonly auditService: AuditService,
private readonly logger: Logger
) {}

onModuleInit() {
// 사용자 관련 이벤트 구독
this.eventEmitter.on(UserEventTypes.CREATED, this.handleUserCreated.bind(this));
this.eventEmitter.on(UserEventTypes.UPDATED, this.handleUserUpdated.bind(this));
this.eventEmitter.on(AuthEventTypes.LOGIN, this.handleUserLogin.bind(this));
this.eventEmitter.on(AuthEventTypes.LOGOUT, this.handleUserLogout.bind(this));

// IAM 관련 이벤트 구독
this.eventEmitter.on(IAMEventTypes.PERMISSION_GRANTED, this.handleIamEvent.bind(this));
this.eventEmitter.on(IAMEventTypes.PERMISSION_REVOKED, this.handleIamEvent.bind(this));

this.logger.log('Audit event listeners initialized');
}

private async handleUserLogin(payload: any) {
try {
// 로그인 감사 로그 생성
await this.auditService.createAuditLog({
eventType: 'USER_LOGIN',
source: 'AUTH_SERVICE',
userId: payload.userId,
sessionId: payload.sessionId,
metadata: {
ipAddress: payload.ipAddress,
userAgent: payload.userAgent,
deviceId: payload.deviceId
}
});
} catch (error) {
this.logger.error('Error creating audit log for user login', { error });
}
}

// 기타 이벤트 핸들러...
}

4. 접근 코드 도메인 (Access Code Domain)

사용자 등록 및 치료 시작 이벤트에 응답하여 접근 코드를 자동으로 생성합니다.

import { Injectable, OnModuleInit, Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { UserEventTypes } from '@user/constants/event-types';
import { TreatmentEventTypes } from '@treatment/constants/event-types';

@Injectable()
export class AccessCodeEventListener implements OnModuleInit {
constructor(
private readonly eventEmitter: EventEmitter2,
private readonly accessCodeService: AccessCodeService,
private readonly logger: Logger
) {}

onModuleInit() {
// 이벤트 구독 설정
this.eventEmitter.on(UserEventTypes.CREATED, this.handleUserCreated.bind(this));
this.eventEmitter.on(TreatmentEventTypes.STARTED, this.handleTreatmentStarted.bind(this));
this.eventEmitter.on(TreatmentEventTypes.ENDED, this.handleTreatmentEnded.bind(this));

this.logger.log('AccessCode event listeners initialized');
}

private async handleUserCreated(payload: any) {
try {
// 새 사용자를 위한 접근 코드 생성
await this.accessCodeService.generateInitialAccessCode(
payload.userId,
payload.role
);
} catch (error) {
this.logger.error(`Error handling ${UserEventTypes.CREATED} event`, { error });
}
}

private async handleTreatmentStarted(payload: any) {
try {
// 치료 시작 시 접근 코드 상태 업데이트
await this.accessCodeService.handleTreatmentStart(
payload.userId,
payload.treatmentId,
payload.treatmentPeriod
);
} catch (error) {
this.logger.error(`Error handling ${TreatmentEventTypes.STARTED} event`, { error });
}
}

private async handleTreatmentEnded(payload: any) {
try {
// 치료 종료 시 접근 코드 상태 업데이트
await this.accessCodeService.handleTreatmentEnd(
payload.userId,
payload.treatmentId
);
} catch (error) {
this.logger.error(`Error handling ${TreatmentEventTypes.ENDED} event`, { error });
}
}
}

보안

접근 제어

  • Pub/Sub IAM 권한 제한
  • API 엔드포인트 인증
  • 이벤트 발행자 검증

데이터 보호

  • 이벤트 페이로드 암호화 (필요 시)
  • 민감 정보 필터링
  • 이벤트 접근 로깅

모니터링

메트릭

  • 이벤트 발행/수신 건수
  • 이벤트 처리 시간
  • 실패율 및 재시도 횟수
  • 처리 지연 시간

알림 설정

  • 이벤트 처리 실패
  • 비정상적 지연
  • 중복 이벤트 비율 증가
  • 데드레터 큐 메시지 발생

에러 처리

주요 에러 유형

  • 이벤트 발행 실패
  • 이벤트 수신 실패
  • 이벤트 처리 타임아웃
  • 이벤트 형식 오류

에러 로깅

this.logger.error('Failed to process event', {
module: 'EventsService',
method: 'handleUserCreated',
error: error.message,
eventId: payload.eventId,
eventType: payload.type
});

모범 사례

이벤트 설계

  1. 명확한 이벤트 네이밍 규칙 적용 ({domain}.{entity}.{action} 형식)
  2. 이벤트에 충분한 컨텍스트 정보 포함
  3. 이벤트 스키마 버전 관리 고려
  4. 멱등성을 위한 고유 이벤트 ID 포함
  5. 각 도메인은 자신의 이벤트 타입을 정의하고 관리

성능 최적화

  1. 처리 불필요한 이벤트 조기 필터링
  2. 비용이 높은 작업은 배치 처리 고려
  3. 처리 시간 모니터링 및 최적화
  4. 중복 이벤트 효율적 관리

장애 대응

  1. 재시도 정책 구현
  2. 데드레터 큐 설정
  3. 서킷 브레이커 패턴 고려
  4. 장애 복구 자동화

관련 문서

TBD

변경 이력

버전날짜작성자변경 내용
0.1.02025-04-02bok@weltcorp.com최초 작성