Skip to content

Raw/Prod 데이터 이전 워크플로우 기획서

This content is not available in your language yet.

Raw/Prod 데이터 이전 워크플로우 기획서

섹션 제목: “Raw/Prod 데이터 이전 워크플로우 기획서”
  • 기존 3단계 데이터 구조(datasets/processing/prod)에서 2단계 구조(raw/prod)로 마이그레이션
  • 데이터 무결성을 보장하면서 안전한 이전 수행
  • 운영 중단 없이 점진적 마이그레이션 실행
  • Source: datasets/ prefix 파일들 → Target: raw/ prefix
  • Source: processing/ prefix 파일들 → Target: prod/ prefix
  • 관련 코드 및 문서 업데이트
newsfork-datasets/
├── datasets/country={cc}/category={cat}/date=YYYY-MM-DD/
│ ├── raw_0001.json # 원본 데이터
│ ├── raw_0002.json
│ ├── raw_metadata.json # 메타데이터
│ └── raw_NNNN.json.success # 체크포인트
├── processing/country={cc}/category={cat}/date=YYYY-MM-DD/{domain}/
│ ├── domain_metadata.json # 도메인 메타데이터
│ ├── robots.txt # 수집된 파일
│ ├── sitemap.xml
│ └── domain_metadata.json.success # 체크포인트
└── prod/ # 최종 산출물 (사용 안함)
newsfork-datasets/
├── raw/country={cc}/category={cat}/date=YYYY-MM-DD/
│ ├── raw_0001.json # 불변 원본
│ ├── raw_0002.json
│ ├── raw_metadata.json # 메타데이터
│ └── raw_NNNN.json.success # 체크포인트
└── prod/country={cc}/category={cat}/date=YYYY-MM-DD/{domain}/
├── domain_metadata.json # 파이프라인 산출물
├── robots.txt # 수집된 파일
├── sitemap.xml
└── domain_metadata.json.success # 체크포인트
  • 경로 빌더 함수 업데이트 (src/lib/r2-seed-engine.ts)
  • 라우트 핸들러 업데이트 (src/routes/seed-orchestrator.ts)
  • 문서 업데이트 (seed-engine-workflow.md 등)
  • 기존 데이터 무결성 검증
  • 마이그레이션 스크립트 테스트
  • Dry-run 실행 및 결과 검증
  • datasets/raw/ 마이그레이션
  • processing/prod/ 마이그레이션
  • 체크포인트 파일 마이그레이션
  • 마이그레이션 결과 검증
  • 레거시 데이터 정리
  • 모니터링 및 알람 설정
// 장점: 네트워크 대역폭 절약, 빠른 속도
// 단점: 스토리지 비용 일시적 증가
await s3.copyObject({
CopySource: `${bucketName}/datasets/country=sg/category=news/date=2026-01-28/raw_0001.json`,
Bucket: bucketName,
Key: `raw/country=sg/category=news/date=2026-01-28/raw_0001.json`
});
// 복사 완료 후 원본 삭제
await s3.deleteObjects({
Bucket: bucketName,
Delete: {
Objects: [
{ Key: 'datasets/country=sg/category=news/date=2026-01-28/raw_0001.json' },
// ... 최대 1,000개까지 배치 처리
]
}
});

4.1 마이그레이션 스크립트 구조

섹션 제목: “4.1 마이그레이션 스크립트 구조”
  • 파일: dev/migrate-r2-to-raw-prefix.ts
  • 기능: prefix 없는 객체를 raw/로 이동
  • 확장: datasets/raw/, processing/prod/ 지원
Terminal window
# Dry-run (실제 변경 없이 계획만 출력)
pnpm migrate:r2:raw-prod:dry-run
# 복사만 수행 (원본 유지)
pnpm migrate:r2:raw-prod:copy-only
# 복사 + 삭제 (완전 이전)
pnpm migrate:r2:raw-prod:migrate
# 특정 파티션만 이전
pnpm migrate:r2:raw-prod:migrate --country=sg --category=news --date=2026-01-28
// 복사 전후 SHA-256 해시 비교
const sourceHash = await calculateObjectHash(sourceKey);
const targetHash = await calculateObjectHash(targetKey);
if (sourceHash !== targetHash) {
throw new Error(`Hash mismatch: ${sourceKey}${targetKey}`);
}
// 원본 메타데이터 복사
await s3.copyObject({
CopySource: sourceKey,
Bucket: bucketName,
Key: targetKey,
MetadataDirective: 'COPY' // 메타데이터 보존
});
// 초기값: 30개 동시 처리
let concurrency = 30;
// 에러율 기반 동적 조정
if (errorRate > 0.05) {
concurrency = Math.max(10, concurrency * 0.8);
} else if (errorRate < 0.01) {
concurrency = Math.min(50, concurrency * 1.2);
}
// 1,000개씩 배치로 처리
const BATCH_SIZE = 1000;
for (const batch of chunks(objects, BATCH_SIZE)) {
await Promise.all(batch.map(obj => migrateObject(obj)));
}
raw/country={cc}/category={cat}/date=YYYY-MM-DD/
├── raw_0001.json # 원본 JSON 데이터
├── raw_0002.json # 청크 단위로 분할
├── raw_metadata.json # 파티션 메타데이터
└── raw_NNNN.json.success # 처리 완료 체크포인트

raw_0001.json 구조 예시:

[
{
"url": "https://mom.gov.sg/employment-practices/employment-act",
"title": "Employment Act - Ministry of Manpower",
"domain": "mom.gov.sg",
"registrable_domain": "mom.gov.sg",
"content_type": "text/html",
"language": "en",
"country": "sg"
},
// ... 더 많은 레코드
]
// 입력: raw/.../.../raw_0001.json
// 출력: DOMAIN_QUEUE 메시지들
const rawData = await r2.get(filePath);
const records = JSON.parse(await rawData.text());
// 도메인별 그룹화
const domainGroups = groupBy(records, 'registrable_domain');
// 각 도메인을 Queue로 전송
for (const [domain, urls] of domainGroups) {
await DOMAIN_QUEUE.send({
domain,
urls,
partition_info: { country, category, date }
});
}
// 성공 체크포인트 생성
await r2.put(`${filePath}.success`, new Uint8Array(0));
// 입력: DOMAIN_QUEUE 메시지
// 출력: prod/.../{domain}/ 파일들
const { domain, urls, partition_info } = message;
// robots.txt 수집
const robotsTxt = await fetchRobotsTxt(`https://${domain}/robots.txt`);
await r2.put(buildRobotsTxtPath(...), robotsTxt);
// sitemap.xml 수집
const sitemapXml = await fetchSitemap(`https://${domain}/sitemap.xml`);
await r2.put(buildSitemapXmlPath(...), sitemapXml);
// 도메인 메타데이터 생성
const metadata = {
domain,
urls,
robots_txt_size: robotsTxt.length,
sitemap_xml_size: sitemapXml.length,
processed_at: new Date().toISOString()
};
await r2.put(buildDomainMetadataPath(...), JSON.stringify(metadata));
// 성공 체크포인트 생성
await r2.put(buildDomainMetadataSuccessPath(...), new Uint8Array(0));
prod/country={cc}/category={cat}/date=YYYY-MM-DD/{domain}/
├── domain_metadata.json # 도메인 메타데이터
├── robots.txt # 수집된 robots.txt
├── sitemap.xml # 수집된 sitemap.xml
└── domain_metadata.json.success # 처리 완료 체크포인트

domain_metadata.json 구조 예시:

{
"domain": "mom.gov.sg",
"urls": [
{
"url": "https://mom.gov.sg/employment-practices/employment-act",
"title": "Employment Act - Ministry of Manpower"
}
],
"robots_txt_size": 1024,
"sitemap_xml_size": 2048,
"processed_at": "2026-01-30T10:00:00.000Z"
}
// 처리 전 성공 체크포인트 확인
const successPath = `${filePath}.success`;
const exists = await r2.head(successPath);
if (exists) {
console.log('Already processed, skipping...');
return;
}
// 실제 처리 수행
await processFile(filePath);
// 성공 시 체크포인트 생성
await r2.put(successPath, new Uint8Array(0));
// force=true 옵션으로 재처리 가능
if (force || !await r2.head(successPath)) {
await processFile(filePath);
await r2.put(successPath, new Uint8Array(0));
}
  • 마이그레이션 전 전체 버킷 스냅샷 생성
  • 중요 파티션별 개별 백업
  • 롤백 계획 수립
// 마이그레이션 후 검증
const sourceCount = await countObjects('datasets/');
const targetCount = await countObjects('raw/');
if (sourceCount !== targetCount) {
throw new Error('Object count mismatch');
}
  • 파티션별 순차 처리
  • 피크 시간 회피
  • 진행 상황 모니터링
// 동시성 제한으로 API 한도 준수
const semaphore = new Semaphore(30);
await semaphore.acquire();
try {
await migrateObject(object);
} finally {
semaphore.release();
}
// 마이그레이션 진행률 로깅
console.log(`Progress: ${completed}/${total} (${percentage}%)`);
console.log(`Rate: ${rate} objects/sec`);
console.log(`ETA: ${eta} minutes`);
// 에러 분류 및 재시도
try {
await migrateObject(object);
} catch (error) {
if (isRetryableError(error)) {
await retry(() => migrateObject(object), { maxAttempts: 3 });
} else {
console.error('Fatal error:', error);
throw error;
}
}
  • 마이그레이션 스크립트 테스트
  • Dry-run 실행 및 결과 검토
  • 백업 생성
  • 소규모 파티션 테스트 마이그레이션
  • 점진적 전체 마이그레이션
  • 실시간 모니터링
  • 데이터 무결성 검증
  • 애플리케이션 테스트
  • 레거시 데이터 정리
  • 문제 발생 시 즉시 중단
  • 백업에서 복원
  • 코드 롤백
  • 모든 파일이 정확히 복사됨
  • 체크섬 검증 통과
  • 메타데이터 보존
  • 애플리케이션 정상 동작
  • API 응답 시간 유지
  • 에러율 < 0.1%
  • 새로운 파이프라인 정상 동작
  • 모니터링 대시보드 업데이트
  • 문서 업데이트 완료
  • 30일 후 레거시 데이터 삭제
  • 관련 코드 정리
  • 문서 아카이브
  • 성능 모니터링
  • 비용 최적화
  • 프로세스 개선

이 기획서는 안전하고 효율적인 데이터 마이그레이션을 위한 포괄적인 계획을 제공합니다. 각 단계별로 신중한 검증과 모니터링을 통해 데이터 무결성을 보장하면서 새로운 raw/prod 구조로의 전환을 완료할 수 있습니다.