본문으로 건너뛰기

CoreLoggingModule 구현 가이드

1. 개요

CoreLoggingModule은 애플리케이션 전반의 로깅과 메트릭 수집을 담당하는 핵심 모듈입니다. 이 문서는 CoreLoggingModule의 구현 상세와 사용 방법을 설명합니다.

1.1 주요 기능

  1. 구조화된 로깅

    • JSON 형식의 로그 데이터
    • 컨텍스트 기반 로깅
    • 상관 관계 ID 추적
    • 메타데이터 지원
  2. 로그 레벨 관리

    • DEBUG: 개발 및 디버깅용 상세 로그
    • INFO: 일반적인 작업 흐름 로그
    • WARN: 잠재적 문제 경고
    • ERROR: 오류 및 예외 상황
    • METRIC: 성능 및 메트릭 데이터
  3. BigQuery 통합

    • 실시간 로그 저장
    • 파티셔닝 및 클러스터링
    • 로그 보존 정책
    • 접근 제어
  4. 모니터링 및 알림

    • 실시간 에러 알림
    • 메트릭 기반 알림
    • Slack 통합
    • 이메일 알림

1.2 프로젝트 구조

CoreLoggingModule은 모든 서비스와 모듈에서 공통으로 사용되는 핵심 기능이므로 libs/ 디렉토리 아래에 위치합니다.

libs/
core/
logging/
src/
interfaces/
log-entry.interface.ts
metric-entry.interface.ts
services/
logger.service.ts
metrics-logger.service.ts
log-publisher.service.ts
alert.service.ts
logging-resource.service.ts
logging.module.ts
index.ts
README.md
tsconfig.json

모든 로깅 관련 코드는 libs/core/logging 디렉토리에 구현하고, 필요한 서비스에서 import { LoggingModule } from '@libs/core/logging';와 같이 임포트하여 사용합니다. 이를 통해 로깅 구현의 일관성을 보장하고 중앙 집중식 로그 수집 및 분석이 가능해집니다.

2. 아키텍처

2.1 컴포넌트 구조

2.2 데이터 흐름

  1. 리소스 초기화

  2. 로그 생성 및 처리

  3. 알림 처리

2.2 컴포넌트 설명

MetricsLoggerService

메트릭 수집을 전담하는 서비스입니다. 다음과 같은 기능을 제공합니다:

  • Prometheus 형식의 메트릭 생성
  • 다양한 메트릭 타입 지원 (gauge, counter, histogram)
  • 태그 기반 메트릭 분류
  • 배치 처리를 통한 성능 최적화
  • 자동 단위 변환 및 버킷 설정

3. 환경 변수 설정

3.1 Pub/Sub Topic 설정

// Pub/Sub Topic 설정
LOGGING_PUBSUB_SYSTEM_LOGS_TOPIC="projects/dta-cloud-de-dev/topics/system-logs"
LOGGING_PUBSUB_ERROR_LOGS_TOPIC="projects/dta-cloud-de-dev/topics/error-logs"
LOGGING_PUBSUB_METRIC_LOGS_TOPIC="projects/dta-cloud-de-dev/topics/metric-logs"

// Pub/Sub Subscription 설정 (BigQuery Write)
LOGGING_PUBSUB_SYSTEM_LOGS_SUBSCRIPTION="projects/dta-cloud-de-dev/subscriptions/system-logs-to-bigquery"
LOGGING_PUBSUB_ERROR_LOGS_SUBSCRIPTION="projects/dta-cloud-de-dev/subscriptions/error-logs-to-bigquery"
LOGGING_PUBSUB_METRIC_LOGS_SUBSCRIPTION="projects/dta-cloud-de-dev/subscriptions/metric-logs-to-bigquery"

3.2 BigQuery 설정

// BigQuery 프로젝트 및 데이터셋 설정
LOGGING_BIGQUERY_PROJECT_ID="dta-cloud-de-dev"
LOGGING_BIGQUERY_DATASET_ID="logging"
LOGGING_BIGQUERY_SYSTEM_LOGS_TABLE="system_logs"
LOGGING_BIGQUERY_ERROR_LOGS_TABLE="error_logs"
LOGGING_BIGQUERY_METRIC_LOGS_TABLE="metric_logs"

// 로그 보존 기간 설정 (일)
LOGGING_RETENTION_SYSTEM_DAYS="30"
LOGGING_RETENTION_ERROR_DAYS="90"
LOGGING_RETENTION_METRIC_DAYS="365"
LOGGING_RETENTION_BACKUP_DAYS="730"

// 접근 권한 설정
LOGGING_ACCESS_ROLES_SYSTEM="logging.viewer,logging.admin"
LOGGING_ACCESS_ROLES_ERROR="logging.viewer,logging.admin,monitoring.viewer"
LOGGING_ACCESS_ROLES_METRIC="monitoring.viewer,monitoring.admin"

3.3 알림 설정

// Slack 알림 설정
LOGGING_ALERT_SLACK_WEBHOOK_URL="https://hooks.slack.com/..."
LOGGING_ALERT_SLACK_CHANNEL="#alerts"

// 이메일 알림 설정
LOGGING_ALERT_EMAIL_FROM="alerts@weltcorp.com"
LOGGING_ALERT_EMAIL_TO="team@weltcorp.com"

4. Pub/Sub Topic 구성

4.1 Topic 구조

Topic 이름설명대상 테이블
system-logs일반 시스템 로그system_logs
error-logs에러 및 예외 로그error_logs
metric-logs메트릭 데이터metric_logs

4.2 메시지 스키마

interface SystemLogMessage {
timestamp: string;
service: string;
environment: string;
level: string;
message: string;
correlationId?: string;
userId?: string;
metadata?: {
[key: string]: any;
test_run_id?: string; // AI Agent 테스트 실행 ID (선택 사항)
};
}

interface ErrorLogMessage extends SystemLogMessage {
stackTrace: string;
errorCode: string;
}

interface MetricLogMessage {
timestamp: string;
service: string;
environment: string;
metricName: string;
value: number;
unit: string;
labels?: Record<string, string>;
metadata?: {
[key: string]: any;
test_run_id?: string; // AI Agent 테스트 실행 ID (선택 사항)
};
}

4.3 Subscription 설정

각 Topic의 Subscription은 BigQuery Write API를 사용하여 해당하는 테이블에 직접 데이터를 저장합니다.

# system-logs-to-bigquery subscription 설정 예시
name: projects/dta-cloud-de-dev/subscriptions/system-logs-to-bigquery
topic: projects/dta-cloud-de-dev/topics/system-logs
bigquery_config:
table: projects/dta-cloud-de-dev/datasets/logging/tables/system_logs
write_metadata: true
drop_unknown_fields: false

4.4 메시지 처리 정책

interface MessageProcessingPolicy {
// 재시도 설정
retry: {
minimum_backoff: string; // "10s"
maximum_backoff: string; // "600s"
max_delivery_attempts: number; // 5
};

// 데드레터 설정
dead_letter: {
topic: string;
max_delivery_attempts: number;
};

// 만료 설정
expiration: {
ttl: string; // "24h"
action: "retry" | "dead_letter" | "drop";
};
}

5. BigQuery 스키마

5.1 시스템 로그 테이블

CREATE TABLE IF NOT EXISTS `{project}.{dataset}.system_logs` (
timestamp TIMESTAMP,
service STRING,
environment STRING,
level STRING,
message STRING,
correlationId STRING,
userId STRING,
metadata JSON,
) PARTITION BY DATE(timestamp)
CLUSTER BY service, level;

5.2 에러 로그 테이블

CREATE TABLE IF NOT EXISTS `{project}.{dataset}.error_logs` (
timestamp TIMESTAMP,
service STRING,
environment STRING,
level STRING,
message STRING,
correlationId STRING,
userId STRING,
stackTrace STRING,
errorCode STRING,
metadata JSON,
) PARTITION BY DATE(timestamp)
CLUSTER BY service, errorCode;

5.3 메트릭 로그 테이블

CREATE TABLE IF NOT EXISTS `{project}.{dataset}.metric_logs` (
timestamp TIMESTAMP,
service STRING,
environment STRING,
metricName STRING,
value FLOAT64,
unit STRING,
labels JSON,
metadata JSON,
) PARTITION BY DATE(timestamp)
CLUSTER BY service, metricName;

5.4 HTTP API 요청 로그 테이블

CREATE TABLE IF NOT EXISTS `{project}.{dataset}.api_request_logs` (
timestamp TIMESTAMP,
service STRING,
environment STRING,
request_id STRING,
method STRING,
path STRING,
query_params JSON,
user_id STRING,
client_ip STRING,
user_agent STRING,
status_code INT64,
response_time INT64,
request_size INT64,
response_size INT64,
correlation_id STRING,
request_headers JSON,
response_headers JSON,
error_details JSON,
metadata JSON
) PARTITION BY DATE(timestamp)
CLUSTER BY method, path, status_code;

5.5 감사 로그 테이블

CREATE TABLE IF NOT EXISTS `{project}.{dataset}.audit_logs` (
timestamp TIMESTAMP,
service STRING,
environment STRING,
event_type STRING,
source STRING,
actor JSON,
target JSON,
action STRING,
details JSON,
metadata JSON,
status STRING,
correlation_id STRING,
request_id STRING
) PARTITION BY DATE(timestamp)
CLUSTER BY event_type, source, status;

5.6 IAM 감사 로그 테이블

CREATE TABLE IF NOT EXISTS `{project}.{dataset}.iam_audit_logs` (
timestamp TIMESTAMP,
service STRING,
environment STRING,
event_category STRING,
event_type STRING,
severity STRING,
actor JSON,
target JSON,
action JSON,
changes JSON,
metadata JSON
) PARTITION BY DATE(timestamp)
CLUSTER BY event_category, event_type, severity;

5.7 인프라 로그 테이블

CREATE TABLE IF NOT EXISTS `{project}.{dataset}.infrastructure_logs` (
timestamp TIMESTAMP,
service STRING,
environment STRING,
component STRING,
event_type STRING,
resource_type STRING,
resource_id STRING,
action STRING,
status STRING,
details JSON,
metadata JSON
) PARTITION BY DATE(timestamp)
CLUSTER BY component, event_type, status;

6. 로그 정책

6.1 보존 기간 정책

로그 유형보존 기간백업 보존 기간설명
시스템 로그30일2년일반 시스템 작동 로그
에러 로그90일2년오류 및 예외 로그
메트릭 로그365일2년성능 및 메트릭 데이터
HTTP API 요청 로그90일2년API 요청/응답 로그
감사 로그365일5년비즈니스 중요 이벤트 로그
IAM 감사 로그365일5년IAM 작업 감사 로그
인프라 로그90일2년인프라 변경 로그

6.2 접근 권한 정책

로그 유형역할권한설명
시스템 로그logging.viewer읽기로그 조회만 가능
logging.admin읽기/쓰기로그 관리 권한
에러 로그logging.viewer읽기로그 조회만 가능
logging.admin읽기/쓰기로그 관리 권한
monitoring.viewer읽기모니터링 목적 조회
메트릭 로그monitoring.viewer읽기메트릭 조회
monitoring.admin읽기/쓰기메트릭 관리
HTTP API 요청 로그logging.viewer읽기로그 조회만 가능
logging.admin읽기/쓰기로그 관리 권한
security.viewer읽기보안 모니터링
감사 로그audit.viewer읽기감사 로그 조회
audit.admin읽기/쓰기감사 로그 관리
compliance.viewer읽기규정 준수 모니터링
IAM 감사 로그iam.audit.viewer읽기IAM 감사 로그 조회
iam.audit.admin읽기/쓰기IAM 감사 로그 관리
security.admin읽기보안 관리
인프라 로그infra.viewer읽기인프라 로그 조회
infra.admin읽기/쓰기인프라 로그 관리
devops.viewer읽기DevOps 모니터링

6.3 로그 수집 정책

interface LogCollectionPolicy {
// 샘플링 설정
sampling: {
enabled: boolean;
rate: number; // 0.0 ~ 1.0
};

// 필터링 설정
filtering: {
excludePatterns: string[];
includePaths: string[];
minLevel: LogLevel;
};

// 데이터 보강
enrichment: {
addTimestamp: boolean;
addServiceInfo: boolean;
addCorrelationId: boolean;
};
}

// 기본 정책
const defaultPolicy: LogCollectionPolicy = {
sampling: {
enabled: true,
rate: 1.0
},
filtering: {
excludePatterns: [
'health-check',
'metrics-probe'
],
includePaths: [
'/api/**'
],
minLevel: LogLevel.INFO
},
enrichment: {
addTimestamp: true,
addServiceInfo: true,
addCorrelationId: true
}
};

6.4 백업 정책

interface LogBackupPolicy {
// 백업 스케줄링
schedule: {
frequency: 'daily' | 'weekly' | 'monthly';
time: string; // HH:mm
timezone: string;
};

// 백업 대상
destination: {
bucket: string;
path: string;
format: 'json' | 'avro' | 'parquet';
};

// 압축 설정
compression: {
enabled: boolean;
algorithm: 'gzip' | 'snappy';
level: number;
};

// 보존 설정
retention: {
days: number;
minBackups: number;
maxBackups: number;
};
}

// 기본 정책
const defaultBackupPolicy: LogBackupPolicy = {
schedule: {
frequency: 'daily',
time: '02:00',
timezone: 'UTC'
},
destination: {
bucket: 'log-backups',
path: '${service}/${date}',
format: 'parquet'
},
compression: {
enabled: true,
algorithm: 'snappy',
level: 9
},
retention: {
days: 730, // 2년
minBackups: 30,
maxBackups: 100
}
};

6.5 정책 적용

@Injectable()
class LogPolicyService {
// 보존 정책 적용
async applyRetentionPolicy() {
// BigQuery 테이블별 보존 기간 설정
// 오래된 데이터 자동 삭제 설정
}

// 백업 정책 적용
async applyBackupPolicy() {
// 정기 백업 작업 스케줄링
// 백업 데이터 압축 및 저장
// 오래된 백업 정리
}

// 접근 정책 적용
async applyAccessPolicy() {
// IAM 역할 및 권한 설정
// 사용자 그룹 매핑
// 접근 로그 기록
}
}

7. 운영 가이드

7.1 로그 모니터링

7.1.1 BigQuery 콘솔 접근

  1. GCP 콘솔 접속
  2. 프로젝트 선택
  3. BigQuery 데이터셋 선택
  4. 로그 테이블 조회

7.1.2 유용한 쿼리

  1. 최근 에러 모니터링
SELECT
timestamp,
service,
message,
errorCode,
metadata
FROM `{project}.{dataset}.error_logs`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
ORDER BY timestamp DESC
LIMIT 100;
  1. 도메인별 로그 수 집계
SELECT
service,
level,
COUNT(*) as log_count
FROM `{project}.{dataset}.system_logs`
WHERE DATE(timestamp) = CURRENT_DATE()
GROUP BY service, level
ORDER BY log_count DESC;
  1. 성능 메트릭 조회
SELECT
timestamp,
service,
metricName,
AVG(value) as avg_value,
MAX(value) as max_value
FROM `{project}.{dataset}.metric_logs`
WHERE
timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
AND metricName IN ('response_time', 'cpu_usage', 'memory_usage')
GROUP BY timestamp, service, metricName
ORDER BY timestamp DESC;

7.2 알림 설정

7.2.1 에러 알림 조건

interface ErrorAlertCondition {
// 심각도 기반 알림
severity: {
level: LogLevel;
threshold: number; // 발생 횟수
windowMinutes: number; // 시간 윈도우
};

// 에러 스파이크 감지
errorSpike: {
baselineMultiplier: number; // 기준 대비 증가율
minErrorCount: number; // 최소 에러 수
windowMinutes: number; // 시간 윈도우
};
}

// 기본 알림 조건
const defaultAlertConditions: ErrorAlertCondition = {
severity: {
level: LogLevel.ERROR,
threshold: 10,
windowMinutes: 5
},
errorSpike: {
baselineMultiplier: 2.0,
minErrorCount: 5,
windowMinutes: 15
}
};

7.2.2 알림 채널

interface NotificationChannel {
// Slack 알림
slack: {
webhook: string;
channel: string;
username: string;
icon: string;
};

// 이메일 알림
email: {
from: string;
to: string[];
subject: string;
};
}

// 기본 알림 채널
const defaultNotificationChannels: NotificationChannel = {
slack: {
webhook: process.env.LOGGING_ALERT_SLACK_WEBHOOK_URL,
channel: process.env.LOGGING_ALERT_SLACK_CHANNEL,
username: 'Logging Alert Bot',
icon: ':warning:'
},
email: {
from: process.env.LOGGING_ALERT_EMAIL_FROM,
to: process.env.LOGGING_ALERT_EMAIL_TO?.split(',') || [],
subject: '[ALERT] Logging System Alert'
}
};

7.3 문제 해결 가이드

7.3.1 로그가 보이지 않는 경우

  1. BigQuery 연결 확인

    SELECT 1;
  2. 로그 파이프라인 상태 확인

    async function checkLogPipelineHealth() {
    // 1. 설정 확인
    const config = await validateLoggingConfig();

    // 2. BigQuery 연결 테스트
    const bqHealth = await testBigQueryConnection();

    // 3. 최근 로그 확인
    const recentLogs = await checkRecentLogs();

    return {
    configStatus: config.status,
    connectionStatus: bqHealth.status,
    logStatus: recentLogs.status
    };
    }

7.3.2 로그 지연이 발생하는 경우

  1. 배치 설정 확인
  2. 네트워크 지연 확인
  3. BigQuery 할당량 확인

7.3.3 알림이 동작하지 않는 경우

  1. 알림 조건 검증
  2. 채널 연결 상태 확인
  3. 권한 설정 확인

7.4 성능 최적화

7.4.1 쿼리 최적화

  1. 파티션 활용
SELECT *
FROM `{project}.{dataset}.system_logs`
WHERE DATE(timestamp) = CURRENT_DATE()
AND service = 'user-service';
  1. 클러스터링 활용
SELECT *
FROM `{project}.{dataset}.error_logs`
WHERE service = 'payment-service'
AND errorCode = 'PAY_001';

7.4.2 로그 수집 성능

interface LogCollectionConfig {
// 배치 설정
batch: {
size: number;
waitMs: number;
maxRetries: number;
};

// 버퍼 설정
buffer: {
size: number;
flushInterval: number;
};

// 재시도 설정
retry: {
initialDelayMs: number;
maxDelayMs: number;
backoffMultiplier: number;
};
}

// 최적화된 설정
const optimizedConfig: LogCollectionConfig = {
batch: {
size: 100,
waitMs: 1000,
maxRetries: 3
},
buffer: {
size: 1000,
flushInterval: 5000
},
retry: {
initialDelayMs: 100,
maxDelayMs: 5000,
backoffMultiplier: 2
}
};

8. GCP 리소스 초기화

8.1 리소스 관리 서비스

@Injectable()
export class LoggingResourceService implements OnModuleInit {
constructor(
private readonly configService: ConfigService,
private readonly pubSubClient: PubSub,
private readonly bigQueryClient: BigQuery,
) {}

async onModuleInit() {
await this.ensureTopicsExist();
await this.ensureSubscriptionsExist();
}

private async ensureTopicsExist() {
const topics = [
this.configService.get('LOGGING_PUBSUB_SYSTEM_LOGS_TOPIC'),
this.configService.get('LOGGING_PUBSUB_ERROR_LOGS_TOPIC'),
this.configService.get('LOGGING_PUBSUB_METRIC_LOGS_TOPIC'),
];

for (const topicName of topics) {
const [exists] = await this.pubSubClient
.topic(topicName)
.exists();

if (!exists) {
await this.createTopic(topicName);
}
}
}

private async createTopic(topicName: string) {
const [topic] = await this.pubSubClient
.createTopic(topicName);

// 스키마 설정
const schemaConfig = this.getSchemaConfig(topicName);
if (schemaConfig) {
await topic.setMetadata({
schemaSettings: {
schema: schemaConfig.schema,
encoding: 'JSON'
}
});
}
}

private getSchemaConfig(topicName: string) {
const schemas = {
'system-logs': SystemLogMessage,
'error-logs': ErrorLogMessage,
'metric-logs': MetricLogMessage,
};

const topicType = topicName.split('/').pop();
return schemas[topicType];
}

private async ensureSubscriptionsExist() {
const subscriptions = [
{
name: this.configService.get('LOGGING_PUBSUB_SYSTEM_LOGS_SUBSCRIPTION'),
topic: this.configService.get('LOGGING_PUBSUB_SYSTEM_LOGS_TOPIC'),
table: 'system_logs'
},
{
name: this.configService.get('LOGGING_PUBSUB_ERROR_LOGS_SUBSCRIPTION'),
topic: this.configService.get('LOGGING_PUBSUB_ERROR_LOGS_TOPIC'),
table: 'error_logs'
},
{
name: this.configService.get('LOGGING_PUBSUB_METRIC_LOGS_SUBSCRIPTION'),
topic: this.configService.get('LOGGING_PUBSUB_METRIC_LOGS_TOPIC'),
table: 'metric_logs'
}
];

for (const sub of subscriptions) {
const [exists] = await this.pubSubClient
.subscription(sub.name)
.exists();

if (!exists) {
await this.createSubscription(sub);
}
}
}

private async createSubscription({
name,
topic,
table
}: {
name: string;
topic: string;
table: string;
}) {
const projectId = this.configService.get('LOGGING_BIGQUERY_PROJECT_ID');
const datasetId = this.configService.get('LOGGING_BIGQUERY_DATASET_ID');

await this.pubSubClient
.topic(topic)
.createSubscription(name, {
bigqueryConfig: {
table: `${projectId}.${datasetId}.${table}`,
writeMetadata: true,
dropUnknownFields: false,
},
retryPolicy: {
minimumBackoff: {
seconds: 10,
},
maximumBackoff: {
seconds: 600,
},
},
deadLetterPolicy: {
deadLetterTopic: `${topic}-dead-letter`,
maxDeliveryAttempts: 5,
},
expirationPolicy: {
ttl: {
seconds: 86400, // 24시간
},
},
});
}

private async ensureDeadLetterTopicsExist() {
const topics = [
`${this.configService.get('LOGGING_PUBSUB_SYSTEM_LOGS_TOPIC')}-dead-letter`,
`${this.configService.get('LOGGING_PUBSUB_ERROR_LOGS_TOPIC')}-dead-letter`,
`${this.configService.get('LOGGING_PUBSUB_METRIC_LOGS_TOPIC')}-dead-letter`,
];

for (const topicName of topics) {
const [exists] = await this.pubSubClient
.topic(topicName)
.exists();

if (!exists) {
await this.createTopic(topicName);
}
}
}
}

8.2 모듈 설정

@Module({
imports: [
ConfigModule,
],
providers: [
{
provide: PubSub,
useFactory: (configService: ConfigService) => {
return new PubSub({
projectId: configService.get('LOGGING_BIGQUERY_PROJECT_ID'),
});
},
inject: [ConfigService],
},
{
provide: BigQuery,
useFactory: (configService: ConfigService) => {
return new BigQuery({
projectId: configService.get('LOGGING_BIGQUERY_PROJECT_ID'),
});
},
inject: [ConfigService],
},
LoggingResourceService,
],
})
export class CoreLoggingModule {}

8.3 에러 처리

@Injectable()
export class LoggingResourceService implements OnModuleInit {
// ... existing code ...

private async handleResourceCreationError(
operation: string,
resource: string,
error: any
) {
this.logger.error(
`Failed to create ${resource} during ${operation}`,
{
error: error.message,
stack: error.stack,
resource,
operation,
}
);

// IAM 권한 문제인 경우
if (error.code === 7) { // PERMISSION_DENIED
throw new Error(
`필요한 GCP IAM 권한이 없습니다. 다음 역할이 필요합니다:
- roles/pubsub.admin
- roles/bigquery.admin`
);
}

// 할당량 초과인 경우
if (error.code === 8) { // RESOURCE_EXHAUSTED
throw new Error(
`GCP 리소스 할당량이 초과되었습니다. 할당량 증가를 요청하세요.`
);
}

throw error;
}
}

9. 로그 서비스 구현

9.1 HTTP API 요청 로그 인터셉터

@Injectable()
export class HttpRequestLoggingInterceptor implements NestInterceptor {
constructor(
private readonly apiRequestLogger: ApiRequestLoggerService,
private readonly configService: ConfigService
) {}

intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const request = context.switchToHttp().getRequest();
const startTime = Date.now();
const requestId = request.headers['x-request-id'] || uuidv4();
const correlationId = request.headers['x-correlation-id'] || requestId;
const testRunId = request.headers['x-test-run-id'];
request['test_run_id'] = testRunId;

// 요청 헤더에서 민감한 정보 제거
const sanitizedRequestHeaders = this.sanitizeHeaders(request.headers);

return next.handle().pipe(
tap({
next: (response: any) => {
const responseTime = Date.now() - startTime;

this.apiRequestLogger.logApiRequest({
request_id: requestId,
method: request.method,
path: request.path,
query_params: request.query,
user_id: request.user?.id,
client_ip: this.getClientIp(request),
user_agent: request.headers['user-agent'],
status_code: context.switchToHttp().getResponse().statusCode,
response_time: responseTime,
request_size: this.calculateRequestSize(request),
response_size: this.calculateResponseSize(response),
correlation_id: correlationId,
request_headers: sanitizedRequestHeaders,
response_headers: this.sanitizeHeaders(context.switchToHttp().getResponse().getHeaders()),
metadata: {
route: request.route?.path,
handler: context.getHandler().name,
controller: context.getClass().name,
test_run_id: testRunId,
}
});
},
error: (error: any) => {
const responseTime = Date.now() - startTime;

this.apiRequestLogger.logApiRequest({
request_id: requestId,
method: request.method,
path: request.path,
query_params: request.query,
user_id: request.user?.id,
client_ip: this.getClientIp(request),
user_agent: request.headers['user-agent'],
status_code: error.status || 500,
response_time: responseTime,
request_size: this.calculateRequestSize(request),
response_size: 0,
correlation_id: correlationId,
request_headers: sanitizedRequestHeaders,
error_details: {
name: error.name,
message: error.message,
code: error.code,
stack: this.configService.get('NODE_ENV') === 'development' ? error.stack : undefined
},
metadata: {
route: request.route?.path,
handler: context.getHandler().name,
controller: context.getClass().name,
test_run_id: testRunId,
}
});
}
})
);
}

private sanitizeHeaders(headers: Record<string, any>): Record<string, any> {
const sensitiveHeaders = [
'authorization',
'cookie',
'set-cookie',
'x-api-key',
'x-auth-token'
];

return Object.entries(headers).reduce((acc, [key, value]) => {
if (!sensitiveHeaders.includes(key.toLowerCase())) {
acc[key] = value;
}
return acc;
}, {});
}

private getClientIp(request: Request): string {
return (
request.headers['x-forwarded-for'] ||
request.headers['x-real-ip'] ||
request.connection.remoteAddress
).toString().split(',')[0].trim();
}

private calculateRequestSize(request: Request): number {
let size = 0;

// 헤더 크기 계산
size += JSON.stringify(request.headers).length;

// URL 크기 계산
size += request.url.length;

// 바디 크기 계산
if (request.body) {
size += JSON.stringify(request.body).length;
}

return size;
}

private calculateResponseSize(response: any): number {
return response ? JSON.stringify(response).length : 0;
}
}

// 전역 인터셉터 설정을 위한 모듈 설정 업데이트
@Module({
imports: [
ConfigModule,
PubSubModule,
BigQueryModule
],
providers: [
LogPublisherService,
ApiRequestLoggerService,
AuditLoggerService,
IamAuditLoggerService,
InfrastructureLoggerService,
LoggingResourceService,
{
provide: APP_INTERCEPTOR,
useClass: HttpRequestLoggingInterceptor
}
],
exports: [
ApiRequestLoggerService,
AuditLoggerService,
IamAuditLoggerService,
InfrastructureLoggerService
]
})
export class CoreLoggingModule {}

9.2 HTTP 요청 로깅 설정

interface HttpLoggingOptions {
// 로깅 제외할 경로 패턴
excludePaths?: string[];

// 로깅 제외할 HTTP 메서드
excludeMethods?: string[];

// 요청 바디 로깅 여부
logRequestBody?: boolean;

// 응답 바디 로깅 여부
logResponseBody?: boolean;

// 민감한 데이터 마스킹 패턴
maskPatterns?: {
fields: string[];
pattern: RegExp;
replacement: string;
}[];

// 성능 임계값 설정 (ms)
performanceThreshold?: {
warning: number; // 경고 임계값
error: number; // 에러 임계값
};
}

const defaultHttpLoggingOptions: HttpLoggingOptions = {
excludePaths: [
'/health',
'/metrics',
'/favicon.ico'
],
excludeMethods: [],
logRequestBody: true,
logResponseBody: false,
maskPatterns: [
{
fields: ['password', 'token', 'secret', 'key'],
pattern: /./g,
replacement: '*'
}
],
performanceThreshold: {
warning: 1000, // 1초
error: 3000 // 3초
}
};

// 인터셉터에 설정 적용
@Injectable()
export class HttpRequestLoggingInterceptor implements NestInterceptor {
constructor(
private readonly apiRequestLogger: ApiRequestLoggerService,
private readonly configService: ConfigService,
@Inject('HTTP_LOGGING_OPTIONS')
private readonly options: HttpLoggingOptions = defaultHttpLoggingOptions
) {}

intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
const request = context.switchToHttp().getRequest();

// 로깅 제외 경로 확인
if (this.shouldSkipLogging(request)) {
return next.handle();
}

const startTime = Date.now();
const requestId = request.headers['x-request-id'] || uuidv4();
const correlationId = request.headers['x-correlation-id'] || requestId;
const testRunId = request.headers['x-test-run-id'];
request['test_run_id'] = testRunId;

// 요청 헤더에서 민감한 정보 제거
const sanitizedRequestHeaders = this.sanitizeHeaders(request.headers);

// 요청 바디 마스킹 처리
const sanitizedRequestBody = this.options.logRequestBody
? this.maskSensitiveData(request.body)
: undefined;

return next.handle().pipe(
tap({
next: (response: any) => {
const responseTime = Date.now() - startTime;

// 성능 임계값 확인
const performanceLevel = this.checkPerformanceLevel(responseTime);

this.apiRequestLogger.logApiRequest({
request_id: requestId,
method: request.method,
path: request.path,
query_params: request.query,
user_id: request.user?.id,
client_ip: this.getClientIp(request),
user_agent: request.headers['user-agent'],
status_code: context.switchToHttp().getResponse().statusCode,
response_time: responseTime,
request_size: this.calculateRequestSize(request),
response_size: this.calculateResponseSize(response),
correlation_id: correlationId,
request_headers: sanitizedRequestHeaders,
request_body: sanitizedRequestBody,
response_body: this.options.logResponseBody
? this.maskSensitiveData(response)
: undefined,
response_headers: this.sanitizeHeaders(context.switchToHttp().getResponse().getHeaders()),
metadata: {
route: request.route?.path,
handler: context.getHandler().name,
controller: context.getClass().name,
performance_level: performanceLevel,
test_run_id: testRunId,
}
});
},
error: (error: any) => {
const responseTime = Date.now() - startTime;

this.apiRequestLogger.logApiRequest({
request_id: requestId,
method: request.method,
path: request.path,
query_params: request.query,
user_id: request.user?.id,
client_ip: this.getClientIp(request),
user_agent: request.headers['user-agent'],
status_code: error.status || 500,
response_time: responseTime,
request_size: this.calculateRequestSize(request),
response_size: 0,
correlation_id: correlationId,
request_headers: sanitizedRequestHeaders,
request_body: sanitizedRequestBody,
error_details: {
name: error.name,
message: error.message,
code: error.code,
stack: this.configService.get('NODE_ENV') === 'development' ? error.stack : undefined
},
metadata: {
route: request.route?.path,
handler: context.getHandler().name,
controller: context.getClass().name,
performance_level: this.checkPerformanceLevel(responseTime),
test_run_id: testRunId,
}
});
}
})
);
}

private shouldSkipLogging(request: Request): boolean {
return (
this.options.excludePaths?.some(path =>
request.path.startsWith(path) ||
new RegExp(path).test(request.path)
) ||
this.options.excludeMethods?.includes(request.method)
);
}

private maskSensitiveData(data: any): any {
if (!data || !this.options.maskPatterns) {
return data;
}

const maskedData = JSON.parse(JSON.stringify(data));

this.options.maskPatterns.forEach(({ fields, pattern, replacement }) => {
const maskField = (obj: any) => {
for (const [key, value] of Object.entries(obj)) {
if (fields.includes(key)) {
obj[key] = typeof value === 'string'
? value.replace(pattern, replacement)
: replacement;
} else if (typeof value === 'object' && value !== null) {
maskField(value);
}
}
};

maskField(maskedData);
});

return maskedData;
}

private checkPerformanceLevel(responseTime: number): 'normal' | 'warning' | 'error' {
if (responseTime >= this.options.performanceThreshold.error) {
return 'error';
}
if (responseTime >= this.options.performanceThreshold.warning) {
return 'warning';
}
return 'normal';
}

// ... 기존 유틸리티 메서드들 ...
}

10. 로그 모니터링 및 알림

10.1 로그 기반 알림 설정

interface LogAlertRule {
name: string;
description: string;
logType: 'system' | 'error' | 'audit' | 'iam' | 'infra';
condition: {
field: string;
operator: 'eq' | 'neq' | 'gt' | 'lt' | 'contains';
value: any;
};
threshold?: {
count: number;
timeWindowMinutes: number;
};
severity: 'info' | 'warning' | 'critical';
channels: {
slack?: {
channel: string;
username?: string;
};
email?: string[];
};
}

@Injectable()
export class LogAlertService {
private readonly rules: LogAlertRule[] = [
{
name: 'Critical Error Alert',
description: '심각한 에러 발생 시 알림',
logType: 'error',
condition: {
field: 'level',
operator: 'eq',
value: 'ERROR'
},
threshold: {
count: 5,
timeWindowMinutes: 5
},
severity: 'critical',
channels: {
slack: {
channel: '#alerts-critical'
},
email: ['oncall@weltcorp.com']
}
},
{
name: 'Security Violation Alert',
description: '보안 위반 사항 발생 시 알림',
logType: 'iam',
condition: {
field: 'severity',
operator: 'eq',
value: 'HIGH'
},
severity: 'critical',
channels: {
slack: {
channel: '#security-alerts'
},
email: ['security@weltcorp.com']
}
},
{
name: 'Infrastructure Change Alert',
description: '인프라 변경 사항 알림',
logType: 'infra',
condition: {
field: 'event_type',
operator: 'contains',
value: 'CHANGE'
},
severity: 'info',
channels: {
slack: {
channel: '#infra-changes'
}
}
}
];

async checkAlerts(): Promise<void> {
// 알림 규칙 검사 및 알림 발송 로직
}
}

10.2 로그 대시보드 쿼리

10.2.1 에러 모니터링

-- 최근 에러 발생 추이
SELECT
TIMESTAMP_TRUNC(timestamp, HOUR) as hour,
COUNT(*) as error_count
FROM `{project}.{dataset}.error_logs`
WHERE
timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY hour
ORDER BY hour DESC;

-- 서비스별 에러 분포
SELECT
service,
errorCode,
COUNT(*) as error_count
FROM `{project}.{dataset}.error_logs`
WHERE
timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY service, errorCode
ORDER BY error_count DESC
LIMIT 10;

10.2.2 API 성능 모니터링

-- API 엔드포인트별 평균 응답 시간
SELECT
path,
method,
COUNT(*) as request_count,
AVG(response_time) as avg_response_time_ms,
MAX(response_time) as max_response_time_ms
FROM `{project}.{dataset}.api_request_logs`
WHERE
timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
GROUP BY path, method
ORDER BY avg_response_time_ms DESC
LIMIT 10;

-- 상태 코드별 요청 분포
SELECT
status_code,
COUNT(*) as count
FROM `{project}.{dataset}.api_request_logs`
WHERE
timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
GROUP BY status_code
ORDER BY count DESC;

10.2.3 보안 감사 모니터링

-- IAM 권한 변경 이벤트
SELECT
timestamp,
actor.name as actor_name,
target.name as target_name,
action.name as action_name,
severity
FROM `{project}.{dataset}.iam_audit_logs`
WHERE
event_category = 'ROLE_MANAGEMENT'
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
ORDER BY timestamp DESC;

-- 보안 위반 시도
SELECT
timestamp,
event_type,
actor.name as actor_name,
actor.ip as actor_ip,
severity
FROM `{project}.{dataset}.iam_audit_logs`
WHERE
severity IN ('HIGH', 'CRITICAL')
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
ORDER BY timestamp DESC;

10.2.4 인프라 변경 모니터링

-- 컴포넌트별 변경 사항
SELECT
timestamp,
component,
event_type,
resource_type,
action,
status
FROM `{project}.{dataset}.infrastructure_logs`
WHERE
timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
ORDER BY timestamp DESC;

-- 실패한 인프라 변경
SELECT
timestamp,
component,
event_type,
resource_type,
action,
details
FROM `{project}.{dataset}.infrastructure_logs`
WHERE
status = 'FAILURE'
AND timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
ORDER BY timestamp DESC;

변경 이력

버전날짜작성자변경 내용
0.1.02025-03-25bok@weltcorp.com최초 작성
0.2.02025-03-30bok@weltcorp.com로깅 모듈 위치 libs/ 경로로 명시