Raw/Prod 데이터 이전 워크플로우 기획서
Raw/Prod 데이터 이전 워크플로우 기획서
섹션 제목: “Raw/Prod 데이터 이전 워크플로우 기획서”1. 개요
섹션 제목: “1. 개요”1.1 목적
섹션 제목: “1.1 목적”- 기존 3단계 데이터 구조(
datasets/processing/prod)에서 2단계 구조(raw/prod)로 마이그레이션 - 데이터 무결성을 보장하면서 안전한 이전 수행
- 운영 중단 없이 점진적 마이그레이션 실행
1.2 마이그레이션 범위
섹션 제목: “1.2 마이그레이션 범위”- Source:
datasets/prefix 파일들 → Target:raw/prefix - Source:
processing/prefix 파일들 → Target:prod/prefix - 관련 코드 및 문서 업데이트
2. 현재 상태 분석
섹션 제목: “2. 현재 상태 분석”2.1 기존 구조 (3단계)
섹션 제목: “2.1 기존 구조 (3단계)”newsfork-datasets/├── datasets/country={cc}/category={cat}/date=YYYY-MM-DD/│ ├── raw_0001.json # 원본 데이터│ ├── raw_0002.json│ ├── raw_metadata.json # 메타데이터│ └── raw_NNNN.json.success # 체크포인트├── processing/country={cc}/category={cat}/date=YYYY-MM-DD/{domain}/│ ├── domain_metadata.json # 도메인 메타데이터│ ├── robots.txt # 수집된 파일│ ├── sitemap.xml│ └── domain_metadata.json.success # 체크포인트└── prod/ # 최종 산출물 (사용 안함)2.2 목표 구조 (2단계)
섹션 제목: “2.2 목표 구조 (2단계)”newsfork-datasets/├── raw/country={cc}/category={cat}/date=YYYY-MM-DD/│ ├── raw_0001.json # 불변 원본│ ├── raw_0002.json│ ├── raw_metadata.json # 메타데이터│ └── raw_NNNN.json.success # 체크포인트└── prod/country={cc}/category={cat}/date=YYYY-MM-DD/{domain}/ ├── domain_metadata.json # 파이프라인 산출물 ├── robots.txt # 수집된 파일 ├── sitemap.xml └── domain_metadata.json.success # 체크포인트3. 마이그레이션 전략
섹션 제목: “3. 마이그레이션 전략”3.1 단계별 접근법
섹션 제목: “3.1 단계별 접근법”Phase 1: 코드 업데이트 (완료)
섹션 제목: “Phase 1: 코드 업데이트 (완료)”- 경로 빌더 함수 업데이트 (
src/lib/r2-seed-engine.ts) - 라우트 핸들러 업데이트 (
src/routes/seed-orchestrator.ts) - 문서 업데이트 (seed-engine-workflow.md 등)
Phase 2: 데이터 검증 및 준비
섹션 제목: “Phase 2: 데이터 검증 및 준비”- 기존 데이터 무결성 검증
- 마이그레이션 스크립트 테스트
- Dry-run 실행 및 결과 검증
Phase 3: 점진적 마이그레이션
섹션 제목: “Phase 3: 점진적 마이그레이션”-
datasets/→raw/마이그레이션 -
processing/→prod/마이그레이션 - 체크포인트 파일 마이그레이션
Phase 4: 검증 및 정리
섹션 제목: “Phase 4: 검증 및 정리”- 마이그레이션 결과 검증
- 레거시 데이터 정리
- 모니터링 및 알람 설정
3.2 마이그레이션 방법
섹션 제목: “3.2 마이그레이션 방법”3.2.1 서버 측 복사 (S3 CopyObject)
섹션 제목: “3.2.1 서버 측 복사 (S3 CopyObject)”// 장점: 네트워크 대역폭 절약, 빠른 속도// 단점: 스토리지 비용 일시적 증가await s3.copyObject({ CopySource: `${bucketName}/datasets/country=sg/category=news/date=2026-01-28/raw_0001.json`, Bucket: bucketName, Key: `raw/country=sg/category=news/date=2026-01-28/raw_0001.json`});3.2.2 배치 삭제 (S3 DeleteObjects)
섹션 제목: “3.2.2 배치 삭제 (S3 DeleteObjects)”// 복사 완료 후 원본 삭제await s3.deleteObjects({ Bucket: bucketName, Delete: { Objects: [ { Key: 'datasets/country=sg/category=news/date=2026-01-28/raw_0001.json' }, // ... 최대 1,000개까지 배치 처리 ] }});4. 기술적 구현
섹션 제목: “4. 기술적 구현”4.1 마이그레이션 스크립트 구조
섹션 제목: “4.1 마이그레이션 스크립트 구조”4.1.1 기존 스크립트 활용
섹션 제목: “4.1.1 기존 스크립트 활용”- 파일:
dev/migrate-r2-to-raw-prefix.ts - 기능: prefix 없는 객체를
raw/로 이동 - 확장:
datasets/→raw/,processing/→prod/지원
4.1.2 스크립트 실행 옵션
섹션 제목: “4.1.2 스크립트 실행 옵션”# Dry-run (실제 변경 없이 계획만 출력)pnpm migrate:r2:raw-prod:dry-run
# 복사만 수행 (원본 유지)pnpm migrate:r2:raw-prod:copy-only
# 복사 + 삭제 (완전 이전)pnpm migrate:r2:raw-prod:migrate
# 특정 파티션만 이전pnpm migrate:r2:raw-prod:migrate --country=sg --category=news --date=2026-01-284.2 데이터 무결성 보장
섹션 제목: “4.2 데이터 무결성 보장”4.2.1 체크섬 검증
섹션 제목: “4.2.1 체크섬 검증”// 복사 전후 SHA-256 해시 비교const sourceHash = await calculateObjectHash(sourceKey);const targetHash = await calculateObjectHash(targetKey);if (sourceHash !== targetHash) { throw new Error(`Hash mismatch: ${sourceKey} → ${targetKey}`);}4.2.2 메타데이터 보존
섹션 제목: “4.2.2 메타데이터 보존”// 원본 메타데이터 복사await s3.copyObject({ CopySource: sourceKey, Bucket: bucketName, Key: targetKey, MetadataDirective: 'COPY' // 메타데이터 보존});4.3 동시성 제어
섹션 제목: “4.3 동시성 제어”4.3.1 적응형 동시성
섹션 제목: “4.3.1 적응형 동시성”// 초기값: 30개 동시 처리let concurrency = 30;
// 에러율 기반 동적 조정if (errorRate > 0.05) { concurrency = Math.max(10, concurrency * 0.8);} else if (errorRate < 0.01) { concurrency = Math.min(50, concurrency * 1.2);}4.3.2 배치 처리
섹션 제목: “4.3.2 배치 처리”// 1,000개씩 배치로 처리const BATCH_SIZE = 1000;for (const batch of chunks(objects, BATCH_SIZE)) { await Promise.all(batch.map(obj => migrateObject(obj)));}5. 데이터 처리 파이프라인
섹션 제목: “5. 데이터 처리 파이프라인”5.1 Raw → Prod 워크플로우
섹션 제목: “5.1 Raw → Prod 워크플로우”5.1.1 입력 데이터 (Raw Stage)
섹션 제목: “5.1.1 입력 데이터 (Raw Stage)”raw/country={cc}/category={cat}/date=YYYY-MM-DD/├── raw_0001.json # 원본 JSON 데이터├── raw_0002.json # 청크 단위로 분할├── raw_metadata.json # 파티션 메타데이터└── raw_NNNN.json.success # 처리 완료 체크포인트raw_0001.json 구조 예시:
[ { "url": "https://mom.gov.sg/employment-practices/employment-act", "title": "Employment Act - Ministry of Manpower", "domain": "mom.gov.sg", "registrable_domain": "mom.gov.sg", "content_type": "text/html", "language": "en", "country": "sg" }, // ... 더 많은 레코드]5.1.2 처리 단계
섹션 제목: “5.1.2 처리 단계”Step 1: Seed Queue Processing
섹션 제목: “Step 1: Seed Queue Processing”// 입력: raw/.../.../raw_0001.json// 출력: DOMAIN_QUEUE 메시지들
const rawData = await r2.get(filePath);const records = JSON.parse(await rawData.text());
// 도메인별 그룹화const domainGroups = groupBy(records, 'registrable_domain');
// 각 도메인을 Queue로 전송for (const [domain, urls] of domainGroups) { await DOMAIN_QUEUE.send({ domain, urls, partition_info: { country, category, date } });}
// 성공 체크포인트 생성await r2.put(`${filePath}.success`, new Uint8Array(0));Step 2: Domain Queue Processing
섹션 제목: “Step 2: Domain Queue Processing”// 입력: DOMAIN_QUEUE 메시지// 출력: prod/.../{domain}/ 파일들
const { domain, urls, partition_info } = message;
// robots.txt 수집const robotsTxt = await fetchRobotsTxt(`https://${domain}/robots.txt`);await r2.put(buildRobotsTxtPath(...), robotsTxt);
// sitemap.xml 수집const sitemapXml = await fetchSitemap(`https://${domain}/sitemap.xml`);await r2.put(buildSitemapXmlPath(...), sitemapXml);
// 도메인 메타데이터 생성const metadata = { domain, urls, robots_txt_size: robotsTxt.length, sitemap_xml_size: sitemapXml.length, processed_at: new Date().toISOString()};await r2.put(buildDomainMetadataPath(...), JSON.stringify(metadata));
// 성공 체크포인트 생성await r2.put(buildDomainMetadataSuccessPath(...), new Uint8Array(0));5.1.3 출력 데이터 (Prod Stage)
섹션 제목: “5.1.3 출력 데이터 (Prod Stage)”prod/country={cc}/category={cat}/date=YYYY-MM-DD/{domain}/├── domain_metadata.json # 도메인 메타데이터├── robots.txt # 수집된 robots.txt├── sitemap.xml # 수집된 sitemap.xml└── domain_metadata.json.success # 처리 완료 체크포인트domain_metadata.json 구조 예시:
{ "domain": "mom.gov.sg", "urls": [ { "url": "https://mom.gov.sg/employment-practices/employment-act", "title": "Employment Act - Ministry of Manpower" } ], "robots_txt_size": 1024, "sitemap_xml_size": 2048, "processed_at": "2026-01-30T10:00:00.000Z"}5.2 체크포인트 시스템
섹션 제목: “5.2 체크포인트 시스템”5.2.1 Idempotency 보장
섹션 제목: “5.2.1 Idempotency 보장”// 처리 전 성공 체크포인트 확인const successPath = `${filePath}.success`;const exists = await r2.head(successPath);if (exists) { console.log('Already processed, skipping...'); return;}
// 실제 처리 수행await processFile(filePath);
// 성공 시 체크포인트 생성await r2.put(successPath, new Uint8Array(0));5.2.2 재처리 지원
섹션 제목: “5.2.2 재처리 지원”// force=true 옵션으로 재처리 가능if (force || !await r2.head(successPath)) { await processFile(filePath); await r2.put(successPath, new Uint8Array(0));}6. 위험 관리
섹션 제목: “6. 위험 관리”6.1 데이터 손실 방지
섹션 제목: “6.1 데이터 손실 방지”6.1.1 백업 전략
섹션 제목: “6.1.1 백업 전략”- 마이그레이션 전 전체 버킷 스냅샷 생성
- 중요 파티션별 개별 백업
- 롤백 계획 수립
6.1.2 검증 단계
섹션 제목: “6.1.2 검증 단계”// 마이그레이션 후 검증const sourceCount = await countObjects('datasets/');const targetCount = await countObjects('raw/');if (sourceCount !== targetCount) { throw new Error('Object count mismatch');}6.2 성능 영향 최소화
섹션 제목: “6.2 성능 영향 최소화”6.2.1 점진적 마이그레이션
섹션 제목: “6.2.1 점진적 마이그레이션”- 파티션별 순차 처리
- 피크 시간 회피
- 진행 상황 모니터링
6.2.2 리소스 제한
섹션 제목: “6.2.2 리소스 제한”// 동시성 제한으로 API 한도 준수const semaphore = new Semaphore(30);await semaphore.acquire();try { await migrateObject(object);} finally { semaphore.release();}7. 모니터링 및 알람
섹션 제목: “7. 모니터링 및 알람”7.1 진행 상황 추적
섹션 제목: “7.1 진행 상황 추적”// 마이그레이션 진행률 로깅console.log(`Progress: ${completed}/${total} (${percentage}%)`);console.log(`Rate: ${rate} objects/sec`);console.log(`ETA: ${eta} minutes`);7.2 에러 처리
섹션 제목: “7.2 에러 처리”// 에러 분류 및 재시도try { await migrateObject(object);} catch (error) { if (isRetryableError(error)) { await retry(() => migrateObject(object), { maxAttempts: 3 }); } else { console.error('Fatal error:', error); throw error; }}8. 실행 계획
섹션 제목: “8. 실행 계획”8.1 사전 준비 (1일)
섹션 제목: “8.1 사전 준비 (1일)”- 마이그레이션 스크립트 테스트
- Dry-run 실행 및 결과 검토
- 백업 생성
8.2 마이그레이션 실행 (2-3일)
섹션 제목: “8.2 마이그레이션 실행 (2-3일)”- 소규모 파티션 테스트 마이그레이션
- 점진적 전체 마이그레이션
- 실시간 모니터링
8.3 검증 및 정리 (1일)
섹션 제목: “8.3 검증 및 정리 (1일)”- 데이터 무결성 검증
- 애플리케이션 테스트
- 레거시 데이터 정리
8.4 롤백 계획
섹션 제목: “8.4 롤백 계획”- 문제 발생 시 즉시 중단
- 백업에서 복원
- 코드 롤백
9. 성공 기준
섹션 제목: “9. 성공 기준”9.1 데이터 무결성
섹션 제목: “9.1 데이터 무결성”- 모든 파일이 정확히 복사됨
- 체크섬 검증 통과
- 메타데이터 보존
9.2 시스템 안정성
섹션 제목: “9.2 시스템 안정성”- 애플리케이션 정상 동작
- API 응답 시간 유지
- 에러율 < 0.1%
9.3 운영 효율성
섹션 제목: “9.3 운영 효율성”- 새로운 파이프라인 정상 동작
- 모니터링 대시보드 업데이트
- 문서 업데이트 완료
10. 후속 작업
섹션 제목: “10. 후속 작업”10.1 레거시 정리
섹션 제목: “10.1 레거시 정리”- 30일 후 레거시 데이터 삭제
- 관련 코드 정리
- 문서 아카이브
10.2 최적화
섹션 제목: “10.2 최적화”- 성능 모니터링
- 비용 최적화
- 프로세스 개선
이 기획서는 안전하고 효율적인 데이터 마이그레이션을 위한 포괄적인 계획을 제공합니다. 각 단계별로 신중한 검증과 모니터링을 통해 데이터 무결성을 보장하면서 새로운 raw/prod 구조로의 전환을 완료할 수 있습니다.