
记一次使用 Python 爬取 B 站 UP 主信息的经历
·2 分钟·2,005 字·请注意时效性
#需求
尽可能获取 B 站美食博主列表,按照粉丝数排序,最终目的是能找出粉丝数前列 UP 主的 ID 和昵称。
#思路
- 将几个包含很多美食区视频的网页的 URL 存起来
- 通过请求 URL 拿到尽可能多的投稿视频的链接
- 通过视频链接拿到 UP 主的主页链接,并分析出 UP 主的 UID
- 通过 UID 请求信息接口,拿到 UP 主的 ID 以及粉丝数
#用到的库及其作用
csv:读写 CSV 文件json:读写 JSON 文件time:进行延时操作random:随机数功能实现codecs:打开文件(解决中文写入文件后乱码问题)requests:HTTP 请求库selenium:模拟浏览器操作fake_useragent:随机 User-Agent 库threading:多线程库BeautifulSoup:解析 HTML 与 XML 的库
#遇到的问题以及解决方案
#拿不到 HTML 结构
可能是因为没有设置 User-Agent,服务器无法确认请求是否由用户发出。提前设置好请求头 User-Agent(浏览器标识),伪装成浏览器,模拟用户正常请求,防止 IP 被封禁。这里可以自己设置 User-Agent 为指定浏览器标识,也可以用库来随机生成。建议使用 fake_useragent 库来随机生成 User-Agent 标识。
安装 fake_useragent:
pip install fake_useragent
使用示例:
from fake_useragent import UserAgent
UA = UserAgent().random
headers = {'User-Agent': UA}
#拿到的 HTML 结构不完整
可能是因为网页动态加载(拿到网页并用浏览器解析之后还需要 JavaScript 来动态更新网页内容),导致拿到的网页 HTML 结构残缺,拿不到想要的数据。可以使用 selenium 库来模拟浏览器爬虫,这样会等到网页加载完全再获取网页的 HTML。
安装 selenium:
pip install selenium
下载对应版本的浏览器驱动:
- Firefox:geckodriver
- Chrome:chromedriver
- Edge:MicrosoftWebDriver
- Opera:operadriver
使用示例:
from selenium import webdriver
# 创建浏览器实例
browser = webdriver.Chrome() # Chrome 浏览器
# browser = webdriver.Firefox() # Firefox 浏览器
# browser = webdriver.Edge() # Edge 浏览器
# 访问网页
browser.get(url)
# 获取网页 HTML 结构
html = browser.page_source
#数据保存和中文乱码问题
为了方便后续处理数据,这里使用 CSV 文件格式存储数据,同时使用 codecs 模块打开文件来防止中文乱码。CSV 格式文件很简单,Excel 也能打开并处理,Python 标准库里有 csv 模块来读写 CSV 格式文件。
注意:csv 和 codecs 都是 Python 标准库的一部分,无需额外安装。
使用示例:
import csv
import codecs
filename = "data.csv"
# 'ab' 为追加模式,'wb' 为覆盖模式
csv_file = codecs.open(filename, 'ab', "gbk")
# 创建 CSV 写入器
csv_writer = csv.writer(csv_file)
# 写入表头
csv_writer.writerow(("第一项", "第二项"))
# 写入一行数据
info = (1, 2)
csv_writer.writerow(info)
# 写入多行数据
infos = [(1, 2), (3, 4)]
csv_writer.writerows(infos)
# 关闭文件
csv_file.close()
#返回的数据是 JSON 格式
使用 json 模块处理 JSON 数据。
注意:json 是 Python 标准库的一部分,无需额外安装。
使用示例:
import json
jsonData = '{"name": "Viki"}'
# JSON 字符串转为 Python 字典(或其他数据类型)
data = json.loads(jsonData)
print(data['name']) # 输出:Viki
# Python 字典(或其他数据类型)转为 JSON 字符串
jsonStr = json.dumps(data)
print(jsonStr) # 输出:{"name": "Viki"}
#爬取速度太慢
使用 threading 模块实现多线程并发爬取。
注意:threading 是 Python 标准库的一部分,无需额外安装。
使用示例:
import threading
lst = [123, 456, 789]
def double(n):
result = n * n
print(f"{n} 的平方是 {result}")
return result
# 为每个任务创建线程
for n in lst:
double_thread = threading.Thread(target=double, args=(n,))
double_thread.start()
#完整代码实现
以下是完整的爬虫代码:
import csv
import json
import time
import codecs
import requests
import threading
from bs4 import BeautifulSoup
from selenium import webdriver
from fake_useragent import UserAgent
# B 站美食区相关页面 URL 列表
biliFoodUrls = [
'https://www.bilibili.com/v/food',
'https://www.bilibili.com/v/food/make',
'https://www.bilibili.com/v/food/rural',
'https://www.bilibili.com/v/food/record',
'https://www.bilibili.com/v/food/detective',
'https://www.bilibili.com/v/food/measurement',
'https://www.bilibili.com/v/popular/rank/food'
]
def grabVideoUrl(url, videoUrls, upUrls, upUids, upInfos, browser):
"""捕获视频 URL 和 UP 主 URL"""
if not url:
return
if url[0] == '/':
url = 'https:' + url
if 'bilibili.com/video' in url:
videoUrls.append(url)
print(f'捕获到视频 URL:\t{url}')
elif 'space.bilibili.com' in url:
upUrls.append(url)
print(f'捕获到 UP 主 URL:\t{url}')
def getVideosUrl(videoUrls, upUrls, upUids, upInfos, browser):
"""从美食区页面获取视频 URL"""
for web in biliFoodUrls:
print(f"开始处理页面:\t{web}")
browser.get(web)
html = browser.page_source
webBf = BeautifulSoup(html, 'html.parser')
al = webBf.find_all("a")
for a in al:
url = a.get('href')
args = (url, videoUrls, upUrls, upUids, upInfos, browser)
video_thread = threading.Thread(target=grabVideoUrl, args=args)
video_thread.start()
def grabUpsUrl(videoUrl, videoUrls, upUrls, upUids, upInfos, browser):
"""从视频页面获取 UP 主 URL"""
print(f"开始处理视频 URL:\t{videoUrl}")
headers = {
'User-Agent': UserAgent().random,
'Upgrade-Insecure-Requests': '1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, sdch, br',
'Accept-Language': 'zh-CN,zh;q=0.8',
}
html = requests.request('GET', videoUrl, headers=headers).content
webBf = BeautifulSoup(html, "html.parser")
upDiv = webBf.find(id='v_upinfo')
if not upDiv:
print(f"获取视频 UP 主信息遭到拦截\t{videoUrl}")
return
upUrl = upDiv.find_all('a')[0].get('href')
if upUrl[0] == '/':
upUrl = 'https:' + upUrl
upUrls.append(upUrl)
print(f'捕获到 UP 主 URL:\t{upUrl}')
def getUpsUrl(videoUrls, upUrls, upUids, upInfos, browser):
"""批量获取 UP 主 URL"""
for videoUrl in videoUrls:
args = (videoUrl, videoUrls, upUrls, upUids, upInfos, browser)
upUrl_thread = threading.Thread(target=grabUpsUrl, args=args)
upUrl_thread.start()
def getUpsId(videoUrls, upUrls, upUids, upInfos, browser):
"""从 UP 主 URL 中解析出 UID"""
for upUrl in upUrls:
if not upUrl:
continue
upUid = upUrl.split("#")[0].split('/')[-1]
upUids.append(upUid)
print(f'捕获到 UP 主 UID:\t{upUid}')
def getUpsInfo(videoUrls, upUrls, upUids, upInfos, browser):
"""通过 UID 获取 UP 主详细信息"""
api = 'https://api.bilibili.com/x/web-interface/card?jsonp=jsonp&mid='
# 去重
ids = set(upUids)
upUids = list(ids)
for index, upUid in enumerate(upUids):
print(f"开始处理 mid:{upUid}\t进度:{index+1}/{len(upUids)}")
headers = {
'User-Agent': UserAgent().random,
'Upgrade-Insecure-Requests': '1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, sdch, br',
'Accept-Language': 'zh-CN,zh;q=0.8',
}
res = requests.request('GET', f"{api}{upUid}", headers=headers)
data = res.json()['data']
n = 0
# 重试机制
while ((not data) and n < 100):
print(f"请求遭到服务器拦截,重试第 {n+1} 次中...\t总进度:{index+1}/{len(upUids)}")
headers = {
'User-Agent': UserAgent().random,
'Upgrade-Insecure-Requests': '1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, sdch, br',
'Accept-Language': 'zh-CN,zh;q=0.8',
}
res = requests.request('GET', f"{api}{upUid}", headers=headers)
n += 1
data = res.json()['data']
if n == 100:
continue
print(f"读取 UP 主信息(mid={upUid})成功")
upName = data['card']['name']
follower = data['follower']
mid = data['card']['mid']
upInfos.append({'id': upName, 'follower': follower, 'mid': mid})
# 按粉丝数排序
upInfos = sorted(upInfos, key=lambda upInfo: upInfo['follower'])
print("写入 CSV 文件中...")
filename = "bili.csv"
bili_csv = codecs.open(filename, 'ab', "gbk")
bili_csv_writer = csv.writer(bili_csv)
for upInfo in upInfos:
msg = f"写入中\tUP 主 ID:{upInfo['id']}\t粉丝数:{upInfo['follower']}\tmid={upInfo['mid']}"
print(msg)
info = (upInfo['id'], upInfo['follower'], upInfo['mid'])
try:
bili_csv_writer.writerow(info)
except:
continue
bili_csv.close()
print(f"写入到 {filename} 完成,本次共爬取到 {len(upInfos)} 个 UP 主信息")
def main(browser):
"""主函数,协调各个爬取步骤"""
videoUrls = []
upUrls = []
upUids = []
upInfos = []
# 获取视频 URL
getVideosUrl(videoUrls, upUrls, upUids, upInfos, browser)
# 获取 UP 主 URL
getUpsUrl(videoUrls, upUrls, upUids, upInfos, browser)
# 等待线程执行完成
time.sleep(20)
# 解析 UP 主 UID
getUpsId(videoUrls, upUrls, upUids, upInfos, browser)
# 获取 UP 主详细信息并保存
getUpsInfo(videoUrls, upUrls, upUids, upInfos, browser)
if __name__ == '__main__':
# 创建浏览器实例
browser = webdriver.Chrome()
# 循环爬取 100 次
for i in range(100):
print(f"开始第 {i+1} 次爬取...")
main(browser)
# 关闭浏览器
browser.close()
#注意事项
- 本代码仅供学习交流使用,请勿用于商业用途
- 爬取时请注意控制频率,避免对 B 站服务器造成过大压力
- B 站的页面结构和 API 可能会发生变化,代码需要相应调整
- 建议添加更完善的异常处理和日志记录
#参考资料
—— 本文完 ——