일일 Raw 파일 자동 처리 스케줄러 기획서
📅 일일 Raw 파일 자동 처리 스케줄러 기획서
섹션 제목: “📅 일일 Raw 파일 자동 처리 스케줄러 기획서”🎯 목적 및 배경
섹션 제목: “🎯 목적 및 배경”현재 상황 분석
섹션 제목: “현재 상황 분석”- 수동 트리거: 현재 Seed 엔진은
POST /api/v1/seeds/orchestrateAPI 호출로만 실행됨 - 기존 Cron 설정: 개발환경 6시간마다, 프로덕션 4시간마다 실행되지만 메타데이터 동기화만 수행
- 처리 지연: 새로운 raw 파일이 업로드되어도 수동 트리거 전까지 처리되지 않음
- 자동화: 매일 1회 새로운 raw 파일들을 자동으로 감지하고 처리
- 효율성: 이미 처리된 파일은 건너뛰고 새로운 파일만 처리
- 안정성: 실패 시 재시도 및 모니터링 기능 포함
- 확장성: 향후 다양한 스케줄링 요구사항에 대응 가능한 구조
🏗️ 시스템 아키텍처
섹션 제목: “🏗️ 시스템 아키텍처”📊 전체 워크플로우
섹션 제목: “📊 전체 워크플로우”[Daily Cron Trigger] 00:00 UTC │ ▼[Scheduled Handler] 새로운 raw 파일 스캔 │ ├── R2에서 어제 날짜 파티션 스캔 ├── 미처리 파일 필터링 (.success 확인) └── 발견된 파일이 있으면 Orchestrator API 호출 │ ▼[Seed Engine Pipeline] 기존 3단계 워크플로우 실행 │ ├── Step 1: Orchestrator (파일 → SEED_QUEUE) ├── Step 2: File Processor (raw 파일 → DOMAIN_QUEUE) └── Step 3: Domain Collector (메타데이터 수집)🔄 스케줄링 전략
섹션 제목: “🔄 스케줄링 전략”1. 일일 스케줄 (Primary)
섹션 제목: “1. 일일 스케줄 (Primary)”- 실행 시간: 매일 00:00 UTC (한국시간 09:00)
- 대상 날짜: 어제 날짜 (
YYYY-MM-DD) - 이유: Research 엔진이 전날 생성한 데이터를 다음날 처리
2. 보완 스케줄 (Secondary)
섹션 제목: “2. 보완 스케줄 (Secondary)”- 실행 시간: 매일 12:00 UTC (한국시간 21:00)
- 목적: 오전에 놓친 파일이나 지연 업로드된 파일 처리
- 조건: 미처리 파일이 있을 때만 실행
📋 구현 계획
섹션 제목: “📋 구현 계획”🔧 1단계: Scheduled Handler 확장
섹션 제목: “🔧 1단계: Scheduled Handler 확장”현재 상태
섹션 제목: “현재 상태”export async function handleScheduledEvent( event: ScheduledEvent, env: Bindings): Promise<void> { logger.info('scheduled_event_start', { scheduledTime: event.scheduledTime, cron: event.cron });
// TODO: Add metadata sync logic here
logger.info('scheduled_event_complete');}개선 계획
섹션 제목: “개선 계획”// 새로운 구조export async function handleScheduledEvent( event: ScheduledEvent, env: Bindings): Promise<void> { const scheduledTime = new Date(event.scheduledTime); const cron = event.cron;
logger.info('scheduled_event_start', { scheduledTime, cron });
try { // 1. 일일 처리 스케줄 (00:00 UTC) if (cron === '0 0 * * *') { await handleDailyRawFileProcessing(scheduledTime, env); }
// 2. 보완 처리 스케줄 (12:00 UTC) if (cron === '0 12 * * *') { await handleSupplementaryProcessing(scheduledTime, env); }
// 3. 기존 메타데이터 동기화 (6시간/4시간마다) if (cron.includes('*/6') || cron.includes('*/4')) { await handleMetadataSync(env); }
} catch (error) { logger.error('scheduled_event_failed', { error: error instanceof Error ? error.message : 'Unknown error', cron, scheduledTime }); throw error; }
logger.info('scheduled_event_complete', { cron });}🔍 2단계: Raw 파일 스캔 로직
섹션 제목: “🔍 2단계: Raw 파일 스캔 로직”일일 처리 함수
섹션 제목: “일일 처리 함수”async function handleDailyRawFileProcessing( scheduledTime: Date, env: Bindings): Promise<void> { // 1. 어제 날짜 계산 const yesterday = new Date(scheduledTime); yesterday.setUTCDate(yesterday.getUTCDate() - 1); const targetDate = yesterday.toISOString().split('T')[0]; // YYYY-MM-DD
logger.info('daily_processing_start', { targetDate });
// 2. 모든 국가/카테고리 조합 스캔 const partitions = await scanForNewRawFiles(targetDate, env);
if (partitions.length === 0) { logger.info('no_new_files_found', { targetDate }); return; }
// 3. 각 파티션별로 Orchestrator 호출 const results = []; for (const partition of partitions) { try { const result = await triggerSeedOrchestrator(partition, env); results.push({ partition, result, success: true }); } catch (error) { logger.error('partition_processing_failed', { partition, error: error instanceof Error ? error.message : 'Unknown' }); results.push({ partition, error, success: false }); } }
// 4. 결과 요약 const successful = results.filter(r => r.success).length; const failed = results.filter(r => !r.success).length;
logger.info('daily_processing_complete', { targetDate, partitions_found: partitions.length, successful, failed });}파티션 스캔 로직
섹션 제목: “파티션 스캔 로직”interface PartitionInfo { country: string; category: string; date: string; newFiles: number;}
async function scanForNewRawFiles( targetDate: string, env: Bindings): Promise<PartitionInfo[]> { const partitions: PartitionInfo[] = [];
// 1. R2에서 모든 raw/ prefix 스캔 const allObjects = await listAllRawObjects(env.DATASETS_BUCKET);
// 2. 대상 날짜 필터링 const targetObjects = allObjects.filter(obj => obj.key.includes(`/date=${targetDate}/`) && obj.key.match(/raw_\d{4}\.json$/) );
// 3. 파티션별 그룹화 const partitionMap = new Map<string, string[]>();
for (const obj of targetObjects) { const match = obj.key.match( /raw\/country=([a-z]{2})\/category=([a-z]+)\/date=(\d{4}-\d{2}-\d{2})\// );
if (match) { const [, country, category, date] = match; const partitionKey = `${country}:${category}:${date}`;
if (!partitionMap.has(partitionKey)) { partitionMap.set(partitionKey, []); } partitionMap.get(partitionKey)!.push(obj.key); } }
// 4. 각 파티션의 미처리 파일 확인 for (const [partitionKey, files] of partitionMap) { const [country, category, date] = partitionKey.split(':');
// .success 파일 존재 여부 확인 const unprocessedFiles = []; for (const file of files) { const successPath = `${file}.success`; const exists = await env.DATASETS_BUCKET.head(successPath); if (!exists) { unprocessedFiles.push(file); } }
if (unprocessedFiles.length > 0) { partitions.push({ country, category, date, newFiles: unprocessedFiles.length }); } }
return partitions;}🚀 3단계: Orchestrator API 호출
섹션 제목: “🚀 3단계: Orchestrator API 호출”async function triggerSeedOrchestrator( partition: PartitionInfo, env: Bindings): Promise<any> { const request = { country: partition.country, category: partition.category, date: partition.date, force: false // 이미 처리된 파일은 건너뛰기 };
// 내부 함수 직접 호출 (HTTP 오버헤드 없음) const { orchestrateSeeds } = await import('../../routes/seed-orchestrator');
return await orchestrateSeeds(request, env);}⚙️ 4단계: Cron 설정 업데이트
섹션 제목: “⚙️ 4단계: Cron 설정 업데이트”wrangler.jsonc 수정
섹션 제목: “wrangler.jsonc 수정”{ // Development Environment "triggers": { "crons": [ "0 0 * * *", // 매일 00:00 UTC - 일일 raw 파일 처리 "0 12 * * *", // 매일 12:00 UTC - 보완 처리 "0 */6 * * *" // 6시간마다 - 메타데이터 동기화 ] },
// Production Environment "env": { "production": { "triggers": { "crons": [ "0 0 * * *", // 매일 00:00 UTC - 일일 raw 파일 처리 "0 12 * * *", // 매일 12:00 UTC - 보완 처리 "0 */4 * * *" // 4시간마다 - 메타데이터 동기화 ] } } }}📊 모니터링 및 알람
섹션 제목: “📊 모니터링 및 알람”🎯 핵심 메트릭
섹션 제목: “🎯 핵심 메트릭”- 일일 처리 성공률: 성공한 파티션 / 전체 파티션
- 새로운 파일 발견 수: 매일 발견되는 미처리 파일 수
- 처리 지연시간: 파일 업로드부터 처리 완료까지의 시간
- 실패 파티션 수: 처리에 실패한 파티션 수
🚨 알람 조건
섹션 제목: “🚨 알람 조건”- 3일 연속 새로운 파일이 발견되지 않음 (Research 엔진 문제 의심)
- 일일 처리 실패율 > 20%
- 특정 파티션이 3일 연속 처리 실패
- 스케줄된 작업이 30분 이상 지연
📈 대시보드 예시
섹션 제목: “📈 대시보드 예시”// 일일 처리 상태 요약{ "daily_processing_summary": { "date": "2026-01-30", "scheduled_at": "2026-01-31T00:00:00Z", "completed_at": "2026-01-31T00:15:23Z", "duration_minutes": 15.4, "partitions_scanned": 25, "partitions_with_new_files": 8, "partitions_processed_successfully": 7, "partitions_failed": 1, "total_new_files": 42, "total_files_processed": 38, "success_rate": 0.875 }, "failed_partitions": [ { "country": "kr", "category": "news", "date": "2026-01-30", "error": "Network timeout during domain processing", "retry_scheduled": "2026-01-31T12:00:00Z" } ]}🔄 에러 처리 및 복구
섹션 제목: “🔄 에러 처리 및 복구”🚨 에러 분류
섹션 제목: “🚨 에러 분류”- 스캔 에러: R2 List API 실패
- 파티션 처리 에러: 특정 파티션 처리 실패
- 타임아웃 에러: 전체 스케줄 작업 시간 초과
- 리소스 에러: Queue 용량 초과, Worker 메모리 부족
🔄 복구 전략
섹션 제목: “🔄 복구 전략”// 재시도 로직async function handleDailyRawFileProcessing( scheduledTime: Date, env: Bindings): Promise<void> { const maxRetries = 3; let attempt = 0;
while (attempt < maxRetries) { try { await processDailyRawFiles(scheduledTime, env); break; // 성공 시 루프 탈출 } catch (error) { attempt++; logger.error('daily_processing_attempt_failed', { attempt, maxRetries, error: error instanceof Error ? error.message : 'Unknown' });
if (attempt === maxRetries) { // 최종 실패 시 알람 발송 await sendFailureAlert(error, env); throw error; }
// 지수 백오프: 5분, 10분, 20분 const delay = Math.pow(2, attempt - 1) * 5 * 60 * 1000; await sleep(delay); } }}📞 실패 알람
섹션 제목: “📞 실패 알람”async function sendFailureAlert(error: Error, env: Bindings): Promise<void> { const alert = { type: 'daily_processing_failure', timestamp: new Date().toISOString(), error: error.message, stack: error.stack, environment: env.CF_ENV || 'unknown' };
// 1. D1에 실패 로그 저장 await logFailureToD1(alert, env);
// 2. 외부 모니터링 시스템에 알람 전송 (향후 구현) // await sendToMonitoringSystem(alert);}🎯 성능 최적화
섹션 제목: “🎯 성능 최적화”⚡ 병렬 처리
섹션 제목: “⚡ 병렬 처리”- 파티션 레벨: 여러 파티션 동시 처리 (최대 10개)
- 배치 크기: 한 번에 처리할 파티션 수 제한
- 타임아웃: 전체 스케줄 작업 최대 30분
📊 리소스 관리
섹션 제목: “📊 리소스 관리”// 병렬 처리 제어async function processPartitionsBatch( partitions: PartitionInfo[], env: Bindings): Promise<ProcessResult[]> { const BATCH_SIZE = 10; const results: ProcessResult[] = [];
for (let i = 0; i < partitions.length; i += BATCH_SIZE) { const batch = partitions.slice(i, i + BATCH_SIZE);
const batchResults = await Promise.allSettled( batch.map(partition => triggerSeedOrchestrator(partition, env)) );
results.push(...batchResults.map((result, index) => ({ partition: batch[index], success: result.status === 'fulfilled', result: result.status === 'fulfilled' ? result.value : undefined, error: result.status === 'rejected' ? result.reason : undefined }))); }
return results;}🚀 배포 계획
섹션 제목: “🚀 배포 계획”📅 Phase 1: 기본 구현 (1주)
섹션 제목: “📅 Phase 1: 기본 구현 (1주)”- Scheduled Handler 확장
- 일일 스캔 로직 구현
- Cron 설정 업데이트
- 기본 로깅 추가
📅 Phase 2: 모니터링 (1주)
섹션 제목: “📅 Phase 2: 모니터링 (1주)”- 메트릭 수집 로직
- D1 로그 테이블 생성
- 실패 알람 시스템
- 대시보드 API 엔드포인트
📅 Phase 3: 최적화 (1주)
섹션 제목: “📅 Phase 3: 최적화 (1주)”- 병렬 처리 최적화
- 에러 복구 로직 강화
- 성능 모니터링
- 문서화 완료
🧪 테스트 계획
섹션 제목: “🧪 테스트 계획”- 단위 테스트: 각 함수별 테스트
- 통합 테스트: 전체 워크플로우 테스트
- 부하 테스트: 대량 파티션 처리 테스트
- 장애 테스트: 네트워크 실패, 타임아웃 시나리오
🎉 기대 효과
섹션 제목: “🎉 기대 효과”📈 운영 효율성
섹션 제목: “📈 운영 효율성”- 자동화: 수동 개입 없이 매일 자동 처리
- 지연 최소화: 새로운 데이터를 24시간 내 처리
- 안정성: 실패 시 자동 재시도 및 알람
📊 시스템 안정성
섹션 제목: “📊 시스템 안정성”- 모니터링: 실시간 처리 상태 추적
- 복구: 자동 에러 복구 및 수동 개입 최소화
- 확장성: 향후 다양한 스케줄링 요구사항 대응
💰 비용 효율성
섹션 제목: “💰 비용 효율성”- 리소스 최적화: 필요한 시점에만 Workers 실행
- 중복 방지: 이미 처리된 파일 재처리 방지
- 배치 처리: 효율적인 병렬 처리로 처리 시간 단축