Skip to content

일일 Raw 파일 자동 처리 스케줄러 기획서

This content is not available in your language yet.

📅 일일 Raw 파일 자동 처리 스케줄러 기획서

섹션 제목: “📅 일일 Raw 파일 자동 처리 스케줄러 기획서”
  • 수동 트리거: 현재 Seed 엔진은 POST /api/v1/seeds/orchestrate API 호출로만 실행됨
  • 기존 Cron 설정: 개발환경 6시간마다, 프로덕션 4시간마다 실행되지만 메타데이터 동기화만 수행
  • 처리 지연: 새로운 raw 파일이 업로드되어도 수동 트리거 전까지 처리되지 않음
  • 자동화: 매일 1회 새로운 raw 파일들을 자동으로 감지하고 처리
  • 효율성: 이미 처리된 파일은 건너뛰고 새로운 파일만 처리
  • 안정성: 실패 시 재시도 및 모니터링 기능 포함
  • 확장성: 향후 다양한 스케줄링 요구사항에 대응 가능한 구조

[Daily Cron Trigger] 00:00 UTC
[Scheduled Handler] 새로운 raw 파일 스캔
├── R2에서 어제 날짜 파티션 스캔
├── 미처리 파일 필터링 (.success 확인)
└── 발견된 파일이 있으면 Orchestrator API 호출
[Seed Engine Pipeline] 기존 3단계 워크플로우 실행
├── Step 1: Orchestrator (파일 → SEED_QUEUE)
├── Step 2: File Processor (raw 파일 → DOMAIN_QUEUE)
└── Step 3: Domain Collector (메타데이터 수집)
  • 실행 시간: 매일 00:00 UTC (한국시간 09:00)
  • 대상 날짜: 어제 날짜 (YYYY-MM-DD)
  • 이유: Research 엔진이 전날 생성한 데이터를 다음날 처리
  • 실행 시간: 매일 12:00 UTC (한국시간 21:00)
  • 목적: 오전에 놓친 파일이나 지연 업로드된 파일 처리
  • 조건: 미처리 파일이 있을 때만 실행

src/apps/api/scheduled-handler.ts
export async function handleScheduledEvent(
event: ScheduledEvent,
env: Bindings
): Promise<void> {
logger.info('scheduled_event_start', {
scheduledTime: event.scheduledTime,
cron: event.cron
});
// TODO: Add metadata sync logic here
logger.info('scheduled_event_complete');
}
// 새로운 구조
export async function handleScheduledEvent(
event: ScheduledEvent,
env: Bindings
): Promise<void> {
const scheduledTime = new Date(event.scheduledTime);
const cron = event.cron;
logger.info('scheduled_event_start', { scheduledTime, cron });
try {
// 1. 일일 처리 스케줄 (00:00 UTC)
if (cron === '0 0 * * *') {
await handleDailyRawFileProcessing(scheduledTime, env);
}
// 2. 보완 처리 스케줄 (12:00 UTC)
if (cron === '0 12 * * *') {
await handleSupplementaryProcessing(scheduledTime, env);
}
// 3. 기존 메타데이터 동기화 (6시간/4시간마다)
if (cron.includes('*/6') || cron.includes('*/4')) {
await handleMetadataSync(env);
}
} catch (error) {
logger.error('scheduled_event_failed', {
error: error instanceof Error ? error.message : 'Unknown error',
cron,
scheduledTime
});
throw error;
}
logger.info('scheduled_event_complete', { cron });
}
async function handleDailyRawFileProcessing(
scheduledTime: Date,
env: Bindings
): Promise<void> {
// 1. 어제 날짜 계산
const yesterday = new Date(scheduledTime);
yesterday.setUTCDate(yesterday.getUTCDate() - 1);
const targetDate = yesterday.toISOString().split('T')[0]; // YYYY-MM-DD
logger.info('daily_processing_start', { targetDate });
// 2. 모든 국가/카테고리 조합 스캔
const partitions = await scanForNewRawFiles(targetDate, env);
if (partitions.length === 0) {
logger.info('no_new_files_found', { targetDate });
return;
}
// 3. 각 파티션별로 Orchestrator 호출
const results = [];
for (const partition of partitions) {
try {
const result = await triggerSeedOrchestrator(partition, env);
results.push({ partition, result, success: true });
} catch (error) {
logger.error('partition_processing_failed', {
partition,
error: error instanceof Error ? error.message : 'Unknown'
});
results.push({ partition, error, success: false });
}
}
// 4. 결과 요약
const successful = results.filter(r => r.success).length;
const failed = results.filter(r => !r.success).length;
logger.info('daily_processing_complete', {
targetDate,
partitions_found: partitions.length,
successful,
failed
});
}
interface PartitionInfo {
country: string;
category: string;
date: string;
newFiles: number;
}
async function scanForNewRawFiles(
targetDate: string,
env: Bindings
): Promise<PartitionInfo[]> {
const partitions: PartitionInfo[] = [];
// 1. R2에서 모든 raw/ prefix 스캔
const allObjects = await listAllRawObjects(env.DATASETS_BUCKET);
// 2. 대상 날짜 필터링
const targetObjects = allObjects.filter(obj =>
obj.key.includes(`/date=${targetDate}/`) &&
obj.key.match(/raw_\d{4}\.json$/)
);
// 3. 파티션별 그룹화
const partitionMap = new Map<string, string[]>();
for (const obj of targetObjects) {
const match = obj.key.match(
/raw\/country=([a-z]{2})\/category=([a-z]+)\/date=(\d{4}-\d{2}-\d{2})\//
);
if (match) {
const [, country, category, date] = match;
const partitionKey = `${country}:${category}:${date}`;
if (!partitionMap.has(partitionKey)) {
partitionMap.set(partitionKey, []);
}
partitionMap.get(partitionKey)!.push(obj.key);
}
}
// 4. 각 파티션의 미처리 파일 확인
for (const [partitionKey, files] of partitionMap) {
const [country, category, date] = partitionKey.split(':');
// .success 파일 존재 여부 확인
const unprocessedFiles = [];
for (const file of files) {
const successPath = `${file}.success`;
const exists = await env.DATASETS_BUCKET.head(successPath);
if (!exists) {
unprocessedFiles.push(file);
}
}
if (unprocessedFiles.length > 0) {
partitions.push({
country,
category,
date,
newFiles: unprocessedFiles.length
});
}
}
return partitions;
}
async function triggerSeedOrchestrator(
partition: PartitionInfo,
env: Bindings
): Promise<any> {
const request = {
country: partition.country,
category: partition.category,
date: partition.date,
force: false // 이미 처리된 파일은 건너뛰기
};
// 내부 함수 직접 호출 (HTTP 오버헤드 없음)
const { orchestrateSeeds } = await import('../../routes/seed-orchestrator');
return await orchestrateSeeds(request, env);
}
{
// Development Environment
"triggers": {
"crons": [
"0 0 * * *", // 매일 00:00 UTC - 일일 raw 파일 처리
"0 12 * * *", // 매일 12:00 UTC - 보완 처리
"0 */6 * * *" // 6시간마다 - 메타데이터 동기화
]
},
// Production Environment
"env": {
"production": {
"triggers": {
"crons": [
"0 0 * * *", // 매일 00:00 UTC - 일일 raw 파일 처리
"0 12 * * *", // 매일 12:00 UTC - 보완 처리
"0 */4 * * *" // 4시간마다 - 메타데이터 동기화
]
}
}
}
}

  • 일일 처리 성공률: 성공한 파티션 / 전체 파티션
  • 새로운 파일 발견 수: 매일 발견되는 미처리 파일 수
  • 처리 지연시간: 파일 업로드부터 처리 완료까지의 시간
  • 실패 파티션 수: 처리에 실패한 파티션 수
  • 3일 연속 새로운 파일이 발견되지 않음 (Research 엔진 문제 의심)
  • 일일 처리 실패율 > 20%
  • 특정 파티션이 3일 연속 처리 실패
  • 스케줄된 작업이 30분 이상 지연
// 일일 처리 상태 요약
{
"daily_processing_summary": {
"date": "2026-01-30",
"scheduled_at": "2026-01-31T00:00:00Z",
"completed_at": "2026-01-31T00:15:23Z",
"duration_minutes": 15.4,
"partitions_scanned": 25,
"partitions_with_new_files": 8,
"partitions_processed_successfully": 7,
"partitions_failed": 1,
"total_new_files": 42,
"total_files_processed": 38,
"success_rate": 0.875
},
"failed_partitions": [
{
"country": "kr",
"category": "news",
"date": "2026-01-30",
"error": "Network timeout during domain processing",
"retry_scheduled": "2026-01-31T12:00:00Z"
}
]
}

  1. 스캔 에러: R2 List API 실패
  2. 파티션 처리 에러: 특정 파티션 처리 실패
  3. 타임아웃 에러: 전체 스케줄 작업 시간 초과
  4. 리소스 에러: Queue 용량 초과, Worker 메모리 부족
// 재시도 로직
async function handleDailyRawFileProcessing(
scheduledTime: Date,
env: Bindings
): Promise<void> {
const maxRetries = 3;
let attempt = 0;
while (attempt < maxRetries) {
try {
await processDailyRawFiles(scheduledTime, env);
break; // 성공 시 루프 탈출
} catch (error) {
attempt++;
logger.error('daily_processing_attempt_failed', {
attempt,
maxRetries,
error: error instanceof Error ? error.message : 'Unknown'
});
if (attempt === maxRetries) {
// 최종 실패 시 알람 발송
await sendFailureAlert(error, env);
throw error;
}
// 지수 백오프: 5분, 10분, 20분
const delay = Math.pow(2, attempt - 1) * 5 * 60 * 1000;
await sleep(delay);
}
}
}
async function sendFailureAlert(error: Error, env: Bindings): Promise<void> {
const alert = {
type: 'daily_processing_failure',
timestamp: new Date().toISOString(),
error: error.message,
stack: error.stack,
environment: env.CF_ENV || 'unknown'
};
// 1. D1에 실패 로그 저장
await logFailureToD1(alert, env);
// 2. 외부 모니터링 시스템에 알람 전송 (향후 구현)
// await sendToMonitoringSystem(alert);
}

  • 파티션 레벨: 여러 파티션 동시 처리 (최대 10개)
  • 배치 크기: 한 번에 처리할 파티션 수 제한
  • 타임아웃: 전체 스케줄 작업 최대 30분
// 병렬 처리 제어
async function processPartitionsBatch(
partitions: PartitionInfo[],
env: Bindings
): Promise<ProcessResult[]> {
const BATCH_SIZE = 10;
const results: ProcessResult[] = [];
for (let i = 0; i < partitions.length; i += BATCH_SIZE) {
const batch = partitions.slice(i, i + BATCH_SIZE);
const batchResults = await Promise.allSettled(
batch.map(partition => triggerSeedOrchestrator(partition, env))
);
results.push(...batchResults.map((result, index) => ({
partition: batch[index],
success: result.status === 'fulfilled',
result: result.status === 'fulfilled' ? result.value : undefined,
error: result.status === 'rejected' ? result.reason : undefined
})));
}
return results;
}

  • Scheduled Handler 확장
  • 일일 스캔 로직 구현
  • Cron 설정 업데이트
  • 기본 로깅 추가
  • 메트릭 수집 로직
  • D1 로그 테이블 생성
  • 실패 알람 시스템
  • 대시보드 API 엔드포인트
  • 병렬 처리 최적화
  • 에러 복구 로직 강화
  • 성능 모니터링
  • 문서화 완료
  1. 단위 테스트: 각 함수별 테스트
  2. 통합 테스트: 전체 워크플로우 테스트
  3. 부하 테스트: 대량 파티션 처리 테스트
  4. 장애 테스트: 네트워크 실패, 타임아웃 시나리오

  • 자동화: 수동 개입 없이 매일 자동 처리
  • 지연 최소화: 새로운 데이터를 24시간 내 처리
  • 안정성: 실패 시 자동 재시도 및 알람
  • 모니터링: 실시간 처리 상태 추적
  • 복구: 자동 에러 복구 및 수동 개입 최소화
  • 확장성: 향후 다양한 스케줄링 요구사항 대응
  • 리소스 최적화: 필요한 시점에만 Workers 실행
  • 중복 방지: 이미 처리된 파일 재처리 방지
  • 배치 처리: 효율적인 병렬 처리로 처리 시간 단축