이벤트 시스템 구현 가이드
개요
본 문서는 DTA-WIDE 시스템의 분산 이벤트 처리 시스템 구현에 관한 상세 가이드를 제공합니다. 이벤트 시스템은 Cloud Run 환경에서 스케일 아웃된 컨테이너 간 안정적인 이벤트 전달을 보장하는 핵심 인프라입니다.
프로젝트 구조
이벤트 시스템 코어 모듈은 다음 경로에 구현되어 있습니다:
libs/
core/
events/
src/
bridge/
event-bridge.service.ts # 로컬 이벤트를 글로벌 이벤트로 변환
controllers/
events.controller.ts # 이벤트 수신 엔드포인트
services/
event-deduplication.service.ts # 이벤트 중복 처리 방지
event-schema.service.ts # 이벤트 스키마 검증
interfaces/
event.interface.ts # 이벤트 관련 인터페이스 정의
constants/
event-types.ts # 시스템 전체 이벤트 타입 정의
utils/
payload-sanitizer.ts # 이벤트 페이로드 처리 유틸리티
events.module.ts # 이벤트 모듈 정의
주요 컴포넌트 구현
1. 이벤트 모듈 (EventsModule)
// libs/core/events/src/events.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { RedisModule } from '@core/redis';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { EventsController } from './controllers/events.controller';
import { EventBridgeService } from './bridge/event-bridge.service';
import { EventDeDuplicationService } from './services/event-deduplication.service';
import { EventSchemaService } from './services/event-schema.service';
@Module({
imports: [
ConfigModule,
RedisModule,
EventEmitterModule.forRoot({
// 글로벌 설정
wildcard: true,
delimiter: '.',
maxListeners: 20,
verboseMemoryLeak: true,
}),
],
controllers: [EventsController],
providers: [
EventBridgeService,
EventDeDuplicationService,
EventSchemaService,
],
exports: [EventBridgeService],
})
export class EventsModule {}
2. 이벤트 인터페이스 정의
// libs/core/events/src/interfaces/event.interface.ts
export interface BaseEvent {
type: string; // 이벤트 타입 (예: 'user-created', 'order-completed')
payload: any; // 이벤트 페이로드 (이벤트 타입에 따라 달라짐)
timestamp: string; // ISO 형식 타임스탬프
eventId?: string; // 고유 이벤트 ID (선택적)
version?: string; // 이벤트 스키마 버전 (선택적)
source?: string; // 이벤트 소스 (서비스/모듈 이름)
containerId?: string; // 컨테이너 ID (자기 참조 탐지용)
}
export interface PubsubMessage {
message: {
attributes: Record<string, string>;
data: string; // Base64 인코딩된 이벤트 데이터
messageId: string; // Pub/Sub 메시지 ID
publishTime: string; // Pub/Sub 발행 시간
};
subscription: string; // 구독 이름
}
3. 이벤트 브릿지 서비스
// libs/core/events/src/bridge/event-bridge.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { ConfigService } from '@nestjs/config';
import { PubSub } from '@google-cloud/pubsub';
import { v4 as uuidv4 } from 'uuid';
import * as os from 'os';
import { BaseEvent } from '../interfaces/event.interface';
import { EventSchemaService } from '../services/event-schema.service';
@Injectable()
export class EventBridgeService {
private readonly pubSubClient: PubSub;
private readonly logger = new Logger(EventBridgeService.name);
private readonly topicName: string;
constructor(
private readonly eventEmitter: EventEmitter2,
private readonly configService: ConfigService,
private readonly schemaService: EventSchemaService,
) {
// 초기화
this.pubSubClient = new PubSub({
projectId: this.configService.get('EVENT_PUBSUB_PROJECT_ID'),
});
this.topicName = this.configService.get('EVENT_PUBSUB_TOPIC', 'events');
// 'event' 채널 리스닝 설정
this.setupEventListener();
}
private setupEventListener(): void {
// 단일 'event' 채널을 구독하여 모든 이벤트를 GCP Pub/Sub으로 브릿징
this.eventEmitter.on('event', async (event: BaseEvent) => {
try {
// 스키마 검증
this.schemaService.validateEventSchema(event);
// 추가 정보 보강
const enrichedEvent = this.enrichEvent(event);
// GCP Pub/Sub으로 발행
await this.publishToPubSub(enrichedEvent);
} catch (error) {
this.logger.error(`Failed to bridge event: ${error.message}`, {
eventType: event.type,
error,
});
}
});
this.logger.log('Event bridge initialized');
}
private enrichEvent(event: BaseEvent): BaseEvent {
return {
...event,
eventId: event.eventId || uuidv4(),
timestamp: event.timestamp || new Date().toISOString(),
source: event.source || this.configService.get('SERVICE_NAME', 'unknown'),
containerId: event.containerId || os.hostname(),
version: event.version || '1.0',
};
}
private async publishToPubSub(event: BaseEvent): Promise<string> {
try {
const dataBuffer = Buffer.from(JSON.stringify(event));
const messageId = await this.pubSubClient.topic(this.topicName).publish(dataBuffer);
this.logger.debug(`Event published to Pub/Sub: ${event.type}`, {
eventId: event.eventId,
messageId,
});
return messageId;
} catch (error) {
this.logger.error(`Pub/Sub publish error: ${error.message}`, {
eventType: event.type,
error,
});
throw error;
}
}
}
4. 이벤트 컨트롤러 (중앙 수신 엔드포인트)
// libs/core/events/src/controllers/events.controller.ts
import {
Controller,
Post,
Body,
Logger,
HttpException,
HttpStatus,
Req,
} from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Request } from 'express';
import { PubsubMessage, BaseEvent } from '../interfaces/event.interface';
import { EventDeDuplicationService } from '../services/event-deduplication.service';
import * as os from 'os';
@Controller('api/events')
export class EventsController {
private readonly logger = new Logger(EventsController.name);
constructor(
private readonly eventEmitter: EventEmitter2,
private readonly deduplicationService: EventDeDuplicationService,
) {}
@Post()
async handlePubSubEvent(
@Body() pubsubMessage: PubsubMessage,
@Req() request: Request,
): Promise<void> {
try {
// Pub/Sub 요청 검증
this.validatePubSubRequest(request);
// 메시지 디코딩
const event = this.decodePubSubMessage(pubsubMessage);
const { type: eventType, eventId, containerId } = event;
if (!eventType) {
this.logger.warn('Event type missing in message');
return;
}
// 중복 이벤트 확인
if (await this.deduplicationService.isDuplicate(eventId)) {
this.logger.debug(`Skipping duplicate event: ${eventId}`);
return;
}
// 자신이 발행한 이벤트는 무시 (선택적)
if (containerId === os.hostname()) {
this.logger.debug(`Skipping self-originated event: ${eventId}`);
return;
}
// 이벤트를 로컬 이벤트 버스로 발행
this.logger.debug(`Emitting local event: ${eventType}`, { eventId });
this.eventEmitter.emit(eventType, event.payload);
// 처리 완료 기록
await this.deduplicationService.markAsProcessed(eventId);
} catch (error) {
this.logger.error(`Error processing Pub/Sub event: ${error.message}`, { error });
throw new HttpException('Event processing failed', HttpStatus.INTERNAL_SERVER_ERROR);
}
}
private validatePubSubRequest(request: Request): void {
// JWT 토큰 검증, 헤더 검사 등
// 실제 구현에서는 더 강력한 검증 적용
}
private decodePubSubMessage(pubsubMessage: PubsubMessage): BaseEvent {
try {
// Base64로 인코딩된 데이터 디코딩
const data = Buffer.from(pubsubMessage.message.data, 'base64').toString('utf-8');
return JSON.parse(data) as BaseEvent;
} catch (error) {
this.logger.error(`Failed to decode Pub/Sub message: ${error.message}`);
throw new HttpException('Invalid message format', HttpStatus.BAD_REQUEST);
}
}
}
5. 중복 이벤트 처리 서비스
// libs/core/events/src/services/event-deduplication.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { RedisService } from '@core/redis';
import { ConfigService } from '@nestjs/config';
@Injectable()
export class EventDeDuplicationService {
private readonly logger = new Logger(EventDeDuplicationService.name);
private readonly ttl: number;
private readonly keyPrefix: string;
constructor(
private readonly redisService: RedisService,
private readonly configService: ConfigService,
) {
this.ttl = this.configService.get('EVENT_DEDUPLICATION_TTL', 86400); // 24시간 기본값
this.keyPrefix = 'events:processed:';
}
async isDuplicate(eventId: string): Promise<boolean> {
if (!eventId) {
this.logger.warn('Missing eventId for deduplication check');
return false;
}
const key = `${this.keyPrefix}${eventId}`;
const exists = await this.redisService.exists(key);
return exists === 1;
}
async markAsProcessed(eventId: string): Promise<void> {
if (!eventId) {
this.logger.warn('Missing eventId for marking as processed');
return;
}
const key = `${this.keyPrefix}${eventId}`;
await this.redisService.set(key, '1', this.ttl);
this.logger.debug(`Marked event as processed: ${eventId}`);
}
}
6. 이벤트 스키마 검증 서비스
// libs/core/events/src/services/event-schema.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { BaseEvent } from '../interfaces/event.interface';
@Injectable()
export class EventSchemaService {
private readonly logger = new Logger(EventSchemaService.name);
// 기본 스키마 검증
validateEventSchema(event: BaseEvent): boolean {
if (!event) {
throw new Error('Event object is required');
}
if (!event.type) {
throw new Error('Event type is required');
}
if (!event.payload) {
throw new Error('Event payload is required');
}
// 이벤트 타입별 특수 검증 (필요 시)
this.validateEventTypeSpecificRules(event);
return true;
}
private validateEventTypeSpecificRules(event: BaseEvent): void {
// 이벤트 타입에 따른 특별 규칙 검증
switch (event.type) {
case 'user-created':
this.validateUserCreatedEvent(event.payload);
break;
case 'order-completed':
this.validateOrderCompletedEvent(event.payload);
break;
// 기타 이벤트 타입
}
}
private validateUserCreatedEvent(payload: any): void {
if (!payload.userId) {
throw new Error('userId is required for user-created events');
}
}
private validateOrderCompletedEvent(payload: any): void {
if (!payload.orderId) {
throw new Error('orderId is required for order-completed events');
}
}
}
7. 이벤트 타입 상수 정의
각 도메인은 자신만의 이벤트 타입을 정의하고, 네이밍 컨벤션을 통해 충돌을 방지합니다.
이벤트 타입은 {domain}.{entity}.{action} 패턴을 따릅니다.
// 사용자 도메인의 이벤트 타입 예시
// libs/user/src/constants/event-types.ts
export enum UserEventTypes {
CREATED = 'user.created',
UPDATED = 'user.updated',
DELETED = 'user.deleted',
PROFILE_UPDATED = 'user.profile.updated',
}
// 인증 도메인의 이벤트 타입 예시
// libs/auth/src/constants/event-types.ts
export enum AuthEventTypes {
LOGIN = 'auth.user.login',
LOGOUT = 'auth.user.logout',
PASSWORD_CHANGED = 'auth.user.password.changed',
PASSWORD_RESET_REQUESTED = 'auth.user.password.reset.requested',
}
// 주문 도메인의 이벤트 타입 예시
// libs/order/src/constants/event-types.ts
export enum OrderEventTypes {
CREATED = 'order.created',
UPDATED = 'order.updated',
COMPLETED = 'order.completed',
CANCELLED = 'order.cancelled',
}
// 치료 도메인의 이벤트 타입 예시
// libs/treatment/src/constants/event-types.ts
export enum TreatmentEventTypes {
STARTED = 'treatment.started',
UPDATED = 'treatment.updated',
ENDED = 'treatment.ended',
PAUSED = 'treatment.paused',
RESUMED = 'treatment.resumed',
}
이벤트 core 모듈은 이벤트 타입을 직접 정의하지 않고, 스키마 검증 기능을 제공합니다:
// libs/core/events/src/services/event-registry.service.ts
import { Injectable, Logger } from '@nestjs/common';
@Injectable()
export class EventRegistryService {
private readonly logger = new Logger(EventRegistryService.name);
private readonly eventSchemaValidators: Map<string, (payload: any) => boolean> = new Map();
/**
* 도메인별 이벤트 타입에 대한 스키마 검증기 등록
*/
registerEventValidator(
eventType: string,
validator: (payload: any) => boolean
): void {
if (this.eventSchemaValidators.has(eventType)) {
this.logger.warn(`이벤트 검증기가 이미 등록됨: ${eventType}`);
}
this.eventSchemaValidators.set(eventType, validator);
this.logger.log(`이벤트 검증기 등록 완료: ${eventType}`);
}
/**
* 등록된 스키마 검증기로 특정 이벤트 타입 검증
*/
validateEventPayload(eventType: string, payload: any): boolean {
const validator = this.eventSchemaValidators.get(eventType);
if (!validator) {
this.logger.debug(`${eventType}에 대한 검증기 없음, 기본 검증 진행`);
return true; // 기본적으로 성공 처리 또는 공통 검증 로직
}
return validator(payload);
}
/**
* 등록된 모든 이벤트 타입 목록 조회
*/
getRegisteredEventTypes(): string[] {
return Array.from(this.eventSchemaValidators.keys());
}
}
이벤트 스키마 검증 서비스는 다음과 같이 수정됩니다:
// libs/core/events/src/services/event-schema.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { BaseEvent } from '../interfaces/event.interface';
import { EventRegistryService } from './event-registry.service';
@Injectable()
export class EventSchemaService {
private readonly logger = new Logger(EventSchemaService.name);
constructor(private readonly eventRegistryService: EventRegistryService) {}
// 기본 스키마 검증
validateEventSchema(event: BaseEvent): boolean {
if (!event) {
throw new Error('Event object is required');
}
if (!event.type) {
throw new Error('Event type is required');
}
if (!event.payload) {
throw new Error('Event payload is required');
}
// 등록된 검증기로 이벤트 페이로드 검증
this.eventRegistryService.validateEventPayload(event.type, event.payload);
return true;
}
}
이 접근 방식은 다음과 같은 장점이 있습니다:
- 도메인 경계 명확화: 각 도메인은 자신의 이벤트만 정의하고 관리합니다.
- 결합도 감소: 이벤트 core 모듈은 특정 도메인 이벤트에 의존하지 않습니다.
- 확장성 향상: 새로운 도메인이 추가되어도 core 모듈 수정이 필요 없습니다.
- 이벤트 충돌 방지: 네이밍 컨벤션으로 이벤트 타입 충돌을 방지합니다.
도메인별 구현 가이드
1. 이벤트 발행 (모든 도메인)
모든 도메인은 다음 패턴을 사용하여 이벤트를 발행합니다.
// 어떤 도메인의 서비스 예시
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { UserEventTypes } from '../constants/event-types';
import { v4 as uuidv4 } from 'uuid';
@Injectable()
export class UserService {
constructor(
private readonly userRepository: UserRepository,
private readonly eventEmitter: EventEmitter2,
) {}
async createUser(userData: CreateUserDto): Promise<User> {
// 1. 비즈니스 로직 실행
const user = await this.userRepository.create(userData);
// 2. 이벤트 발행 (단일 'event' 채널 사용)
this.eventEmitter.emit('event', {
type: UserEventTypes.CREATED, // 'user.created'
payload: {
userId: user.id,
email: user.email,
role: user.role,
// 민감 정보 제외
},
timestamp: new Date().toISOString(),
});
return user;
}
}
2. 이벤트 구독 (각 도메인의 리스너)
각 도메인은 독립적인 이벤트 리스너를 구현하여 관심 있는 이벤트를 처리합니다.
// 어떤 도메인의 이벤트 리스너 예시
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { UserEventTypes } from '@user/constants/event-types';
import { AuthEventTypes } from '@auth/constants/event-types';
@Injectable()
export class YourDomainEventListener implements OnModuleInit {
private readonly logger = new Logger(YourDomainEventListener.name);
constructor(
private readonly eventEmitter: EventEmitter2,
private readonly yourDomainService: YourDomainService,
) {}
onModuleInit() {
// 관심 있는 이벤트만 구독
this.setupUserEventListeners();
this.setupAuthEventListeners();
this.logger.log('YourDomain event listeners initialized');
}
private setupUserEventListeners() {
// 사용자 생성 이벤트 구독
this.eventEmitter.on(
UserEventTypes.CREATED, // 'user.created'
this.handleUserCreated.bind(this)
);
// 사용자 업데이트 이벤트 구독
this.eventEmitter.on(
UserEventTypes.UPDATED, // 'user.updated'
this.handleUserUpdated.bind(this)
);
}
private setupAuthEventListeners() {
// 로그인 이벤트 구독
this.eventEmitter.on(
AuthEventTypes.LOGIN, // 'auth.user.login'
this.handleUserLogin.bind(this)
);
}
private async handleUserCreated(payload: any) {
try {
this.logger.debug(`Processing ${UserEventTypes.CREATED} event for: ${payload.userId}`);
// 이벤트 처리 로직
await this.yourDomainService.processNewUser(payload);
} catch (error) {
this.logger.error(`Error handling ${UserEventTypes.CREATED} event: ${error.message}`, { error });
}
}
private async handleUserUpdated(payload: any) {
try {
this.logger.debug(`Processing ${UserEventTypes.UPDATED} event for: ${payload.userId}`);
// 이벤트 처리 로직
await this.yourDomainService.syncUserChanges(payload);
} catch (error) {
this.logger.error(`Error handling ${UserEventTypes.UPDATED} event: ${error.message}`, { error });
}
}
private async handleUserLogin(payload: any) {
try {
this.logger.debug(`Processing ${AuthEventTypes.LOGIN} event for: ${payload.userId}`);
// 이벤트 처리 로직
await this.yourDomainService.trackUserActivity(payload);
} catch (error) {
this.logger.error(`Error handling ${AuthEventTypes.LOGIN} event: ${error.message}`, { error });
}
}
}
배포 및 설정 가이드
1. Google Cloud Pub/Sub 설정
먼저 GCP에서 필요한 토픽과 구독을 생성합니다.
# 이벤트 토픽 생성
gcloud pubsub topics create events
# Push 구독 생성
gcloud pubsub subscriptions create events-subscription \
--topic=events \
--push-endpoint=https://your-service.run.app/api/events \
--ack-deadline=20 \
--message-retention-duration=7d \
--min-retry-delay=10s \
--max-retry-delay=600s \
--push-auth-service-account=cloud-run-invoker@your-project.iam.gserviceaccount.com
2. Cloud Run 서비스 배포
적절한 환경 변수 및 권한으로 Cloud Run 서비스를 배포합니다.
gcloud run deploy your-service \
--image=gcr.io/your-project/your-service:latest \
--platform=managed \
--region=us-central1 \
--service-account=service-account@your-project.iam.gserviceaccount.com \
--set-env-vars="EVENT_PUBSUB_PROJECT_ID=your-project,EVENT_PUBSUB_TOPIC=events" \
--min-instances=0 \
--max-instances=10
3. IAM 권한 설정
필요한 IAM 권한을 설정합니다.
# Pub/Sub 서비스 계정에 Cloud Run 호출 권한 부여
gcloud projects add-iam-policy-binding your-project \
--member=serviceAccount:cloud-run-invoker@your-project.iam.gserviceaccount.com \
--role=roles/run.invoker
# Cloud Run 서비스 계정에 Pub/Sub 게시 권한 부여
gcloud projects add-iam-policy-binding your-project \
--member=serviceAccount:service-account@your-project.iam.gserviceaccount.com \
--role=roles/pubsub.publisher
테스트 가이드
1. 단위 테스트
// EventBridgeService 단위 테스트 예시
describe('EventBridgeService', () => {
let service: EventBridgeService;
let eventEmitter: MockEventEmitter;
let pubSubClient: MockPubSub;
let schemaService: MockSchemaService;
beforeEach(async () => {
// 모의 객체 설정
eventEmitter = { on: jest.fn() };
pubSubClient = { topic: jest.fn(() => ({ publish: jest.fn() })) };
schemaService = { validateEventSchema: jest.fn() };
service = new EventBridgeService(
eventEmitter as any,
{ get: jest.fn() } as any,
schemaService as any,
);
(service as any).pubSubClient = pubSubClient;
});
it('should subscribe to the event channel on initialization', () => {
// setupEventListener가 호출되었는지 확인
expect(eventEmitter.on).toHaveBeenCalledWith('event', expect.any(Function));
});
// 추가 테스트 케이스
});
2. 통합 테스트
// EventsController 통합 테스트 예시
describe('EventsController (Integration)', () => {
let app: INestApplication;
let eventEmitter: EventEmitter2;
let deduplicationService: EventDeDuplicationService;
beforeAll(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [EventsModule],
}).compile();
app = moduleFixture.createNestApplication();
await app.init();
eventEmitter = app.get<EventEmitter2>(EventEmitter2);
deduplicationService = app.get<EventDeDuplicationService>(EventDeDuplicationService);
});
it('/api/events (POST) should process valid Pub/Sub messages', async () => {
// 모의 이벤트 구독 설정
const eventHandler = jest.fn();
eventEmitter.on('user-created', eventHandler);
// 중복 검사 모의 설정
jest.spyOn(deduplicationService, 'isDuplicate').mockResolvedValue(false);
jest.spyOn(deduplicationService, 'markAsProcessed').mockResolvedValue();
// 모의 Pub/Sub 메시지 생성
const mockPubSubMessage = {
message: {
data: Buffer.from(JSON.stringify({
type: 'user-created',
payload: { userId: '123' },
eventId: 'test-event-id',
})).toString('base64'),
messageId: 'test-message-id',
publishTime: new Date().toISOString(),
attributes: {},
},
subscription: 'test-subscription',
};
// 요청 실행
await request(app.getHttpServer())
.post('/api/events')
.send(mockPubSubMessage)
.expect(201);
// 이벤트 핸들러가 호출되었는지 확인
expect(eventHandler).toHaveBeenCalledWith({ userId: '123' });
});
afterAll(async () => {
await app.close();
});
});
모범 사례
1. 이벤트 페이로드 설계
- 이벤트 식별자를 명확히 포함 (userId, orderId 등)
- 필수 정보만 포함하고 과도한 정보는 제외
- 민감 정보 필터링 (비밀번호, 토큰 등)
- 필드 네이밍 일관성 유지 (camelCase 권장)
2. 에러 처리 패턴
- 모든 이벤트 핸들러에 try-catch 적용
- 이벤트 처리 실패 시 다른 서비스에 영향 주지 않도록 격리
- 오류 발생 시 명확한 로깅 및 모니터링
- 치명적 오류 발생 시에만 예외 전파
3. 로깅 표준화
이벤트 시스템의 로깅은 다음 패턴을 따르세요:
// 이벤트 발행 로깅
this.logger.info('Event published', {
eventType: event.type,
eventId: event.eventId,
source: serviceName,
});
// 이벤트 수신 로깅
this.logger.info('Event received', {
eventType: event.type,
eventId: event.eventId,
receivedAt: new Date().toISOString(),
});
// 이벤트 처리 오류 로깅
this.logger.error('Event processing failed', {
eventType: event.type,
eventId: event.eventId,
error: error.message,
stack: error.stack,
});
변경 이력
| 버전 | 날짜 | 작성자 | 변경 내용 |
|---|---|---|---|
| 0.1.0 | 2025-04-11 | bok@weltcorp.com | 최초 작성 |