Key Takeaways
针对 RAG 场景,说明为何垂直站定时抓取需要代理与去重,并给出从调度、代理请求、解析到向量化落库的完整架构与 Python 实现。
RAG 知识库构建中的代理与抓取:从定时采写到向量化落库的技术实现
RAG(Retrieval-Augmented Generation)依赖高质量、可更新的知识库。知识库往往来自垂直站点或文档站的定时抓取,请求量大、目标站多,容易触发反爬与封禁。代理在抓取层分散 IP、配合调度与去重,是保证 RAG 数据管线稳定更新的关键。本文先做结合分析(为什么 RAG 场景需要代理与定时抓取),再给出一套技术实现(调度 + 代理抓取 + 解析 + 向量化落库的架构与 Python 示例)。
一、结合分析:为什么 RAG 需要代理与可控抓取
1.1 RAG 知识库的数据特点
- 来源以网页/文档为主:帮助文档、产品页、博客、论坛等,需要从目标站抓取 HTML 或 API。
- 需要持续更新:知识会变,RAG 效果依赖数据新鲜度,因此需要定时、增量抓取而非一次性全量。
- 多站点、多页面:单站可能有成千上万个 URL,多站叠加后请求量级大,同一出口 IP 集中请求极易触发限流或封禁。
若直接用固定 IP 高频抓取,轻则 429/验证码,重则封 IP,导致抓取中断、知识库更新失败。因此需要在抓取层引入代理(动态轮换或按会话),并配合调度、去重、限速,在“更新频率”与“可稳定性”之间取得平衡。
1.2 代理在 RAG 管线中的价值
| 能力 | 在 RAG 中的作用 |
|------|------------------|
| IP 分散 | 多站点、多 URL 的请求分散到不同出口 IP,降低单 IP 请求密度,减轻目标站风控。 |
| 多地域(可选) | 部分站点按地区返回内容时,可用地域绑定代理获取“本地化”页面,丰富知识库视角。 |
| 会话保持(可选) | 对需要登录或 Cookie 的文档站,可在一定时间内复用同一 IP,避免频繁登录。 |
| 可观测与重试 | 配合状态码、重试、熔断,可统计成功率与延迟,便于做 SLA 与容量规划。 |
结论:在 RAG 知识库构建管线中,代理应作为抓取层的标准组件,与调度器、去重、解析、向量化一起构成完整链路。
1.3 与“无代理”“一次性爬取”的对比
- 无代理、单机爬取:实现简单,但规模稍大即易被封,不适合生产级 RAG 更新。
- 一次性全量爬取:无法反映知识变更,RAG 效果随时间下降;且全量往往请求量更大,封禁风险更高。
- 代理 + 定时增量抓取:按策略(如按站、按 URL 优先级)定时调度,配合去重与变更检测,只抓取新增或变更页面,请求量可控;代理保证出口分散,稳定性好,是推荐方式。
二、技术实现:整体架构与数据流
2.1 管线概览
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ 调度器 │────▶│ 抓取 Worker │────▶│ 代理服务 │
│ (定时/队列) │ │ (代理+重试+限速) │ │ (动态代理 API) │
└─────────────────┘ └────────┬────────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ 去重/变更检测 │◀────│ 原始 HTML/文本 │ │ 目标站点 │
└────────┬────────┘ └──────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌──────────────────┐
│ 解析与分块 │────▶│ 向量化 + 落库 │ ← RAG 检索使用
└─────────────────┘ └──────────────────┘- 调度器:按 Cron 或队列驱动,产出待抓取 URL 列表(可从 DB/配置文件/站点地图读取),支持优先级与去重(避免重复入队)。
- 抓取 Worker:从调度器或队列取 URL,经代理发起 HTTP 请求,做重试、限速;输出原始 HTML 或清洗后文本。
- 代理服务:提供动态代理端点(如 BytesFlows 等),按请求或按会话轮换 IP,可选地域绑定。
- 去重/变更检测:按 URL 或内容指纹判断是否为新页或已变更,仅对新增/变更做后续解析与向量化,节省算力与存储。
- 解析与分块:将 HTML 转为纯文本或结构化块(按标题、段落等),便于分块策略(固定长度、按段落等)。
- 向量化 + 落库:调用 Embedding 接口得到向量,写入向量库(如 Milvus、Chroma、pgvector 等),供 RAG 检索使用。
代理仅出现在“抓取 Worker → 代理服务 → 目标站点”这一段,下游解析与向量化与代理无耦合。
2.2 代理接入方式(与上一篇一致)
- 按请求轮换:每个 URL 使用不同 IP,防封效果好,适合绝大多数文档站。
- 按会话保持:同一站点、同一会话窗口内复用 IP,适合需登录的文档源。
- 地域绑定:需要“某地区版本”页面时,在代理 URL 上带
country=xx等参数。
实现上,抓取 Worker 内将 HTTP 客户端的 proxy 指向代理网关即可;网关由代理服务商提供。
三、Python 实现:调度、代理抓取、解析与落库示意
下面用 Python 串联:调度产出 URL → 经代理抓取 → 简单解析与分块 → 调用 Embedding 并落库。实际生产可替换为 Celery/Redis 队列、Scrapy、Playwright 等,此处以最小可运行示例为主。
3.1 依赖与配置
# pip install requests beautifulsoup4 openai # 向量化示例用 OpenAI,可换成其他
import os
import time
import hashlib
import requests
from bs4 import BeautifulSoup
from typing import Iterator
# 代理网关(替换为实际代理服务地址,如 BytesFlows)
PROXY_GATEWAY = os.getenv("PROXY_GATEWAY", "http://user:pass@proxy.example.com:8080")
REQUEST_TIMEOUT = 30
MAX_RETRIES = 3
RETRY_BACKOFF = 2
CHUNK_SIZE = 500 # 字符数,按需调整
CHUNK_OVERLAP = 50 # 块间重叠3.2 经代理抓取单 URL(带重试)
def fetch_with_proxy(url: str) -> str | None:
proxies = {"http": PROXY_GATEWAY, "https": PROXY_GATEWAY}
for attempt in range(MAX_RETRIES):
try:
r = requests.get(
url,
proxies=proxies,
timeout=REQUEST_TIMEOUT,
headers={"User-Agent": "RAG-Crawler/1.0 (Knowledge Base)"},
)
if r.status_code == 429:
time.sleep(RETRY_BACKOFF ** attempt)
continue
r.raise_for_status()
return r.text
except requests.RequestException:
time.sleep(RETRY_BACKOFF ** attempt)
return None3.3 HTML 解析与分块
def extract_text(html: str) -> str:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style"]):
tag.decompose()
return soup.get_text(separator="\n", strip=True)
def content_fingerprint(text: str) -> str:
return hashlib.sha256(text.encode()).hexdigest()
def chunk_text(text: str, size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]:
chunks = []
start = 0
while start < len(text):
end = start + size
chunks.append(text[start:end])
start = end - overlap
return chunks3.4 单 URL 全流程:抓取 → 去重 → 分块 → 向量化落库(示意)
# 此处用占位表示“写入向量库”;实际替换为 Milvus/Chroma/OpenAI 等
def embed_and_store(chunks: list[str], url: str, doc_id: str):
for i, chunk in enumerate(chunks):
# 调用 Embedding API 得到向量(示例省略)
# vector = openai.Embedding.create(input=chunk, model="text-embedding-3-small")["data"][0]["embedding"]
# vector_db.upsert(collection, [{"id": f"{doc_id}_{i}", "url": url, "text": chunk, "vector": vector}])
print(f" stored chunk {i+1}/{len(chunks)} for {url}")
def process_one_url(url: str, seen_fingerprints: set) -> bool:
html = fetch_with_proxy(url)
if not html:
return False
text = extract_text(html)
fp = content_fingerprint(text)
if fp in seen_fingerprints:
return True # 跳过重复内容
seen_fingerprints.add(fp)
chunks = chunk_text(text)
doc_id = hashlib.sha256(url.encode()).hexdigest()[:12]
embed_and_store(chunks, url, doc_id)
return True3.5 调度入口:批量 URL 顺序抓取(可改为队列消费)
def run_scheduled_crawl(urls: list[str]):
seen = set()
for url in urls:
ok = process_one_url(url, seen)
print(f"{'OK' if ok else 'FAIL'}: {url}")
time.sleep(1) # 简单限速,生产可用 token bucket 等
if __name__ == "__main__":
urls = [
"https://example.com/docs/page1",
"https://example.com/docs/page2",
]
run_scheduled_crawl(urls)将 PROXY_GATEWAY 换为实际代理地址(如 BytesFlows),即可得到一条可运行的 RAG 向抓取管线:调度 → 代理抓取 → 去重 → 解析分块 → 向量化落库。生产环境可将 run_scheduled_crawl 改为从 Redis/DB 消费 URL、用 Celery 做定时任务,并将 embed_and_store 接真实向量库与 Embedding 服务。
四、可观测与稳定性建议
- 日志:记录每个 URL 的 status_code、耗时、是否命中去重;按站点统计成功率与延迟。
- 限速:单站 QPS 上限、全局并发上限,避免打满代理配额并减轻目标站压力。
- 熔断:某站点短时间内失败率过高时,暂停该站任务一段时间再重试。
- 增量与优先级:优先抓取 sitemap 中 lastmod 较新的 URL,或按引用热度排序,提高更新效率。
五、小结
- 结合分析:RAG 知识库依赖多站点、可持续更新的抓取;固定 IP 高频抓取易触发风控,代理在抓取层分散请求,是保证稳定更新的关键;配合定时调度与去重,形成可持续的 RAG 数据管线。
- 技术实现:调度器产出 URL → 抓取 Worker 经代理请求 → 去重/变更检测 → 解析分块 → 向量化落库;文中 Python 示例覆盖从抓取到落库的完整链路,可将代理网关替换为 BytesFlows 等,并按需接入队列与真实向量库,即可在 RAG 场景下落地“代理 + 定时抓取 + 向量化”的闭环。
与上一篇《AI 数据管线中的动态代理》搭配:上一篇侧重通用采集 + 动态代理 + 重试熔断;本文侧重 RAG 场景 + 调度 + 去重 + 解析与向量化,两篇共同覆盖“代理与 AI 技术结合”的典型技术实现。