记一次使用 Python 爬取 B 站 up 主信息经历
·2 分钟·已发布 1,818 天,请注意时效性
需求
尽可能获取 B 站美食博主列表 按照粉丝数排序 最终目的是能找出粉丝数前列 up 主的 id 名
思路
- 将几个包含很多美食区视频的网页的 url 存起来
- 通过请求 url 拿到尽可能多的投稿视频的链接
- 通过视频链接拿到 up 主的主页链接 并分析出 up 主的 uid
- 通过 uid 请求信息接口 拿到 up 主的 id 以及粉丝数
用到的库及其作用
csv: 读写 csv 文件json: 读写 json 文件time: 进行延时操作random: 随机数功能实现codecs: 打开文件 (解决中文写入文件后乱码问题)requests: 请求库selenium: 模拟浏览器UserAgent: 随机 UA 库threading: 多线程库BeautifulSoup: 解析 HTML 与 XML 的库
遇到的问题以及解决方案
拿不到 html 结构
可能是因为没有设置 UA 服务器无法确认请求是否由用户发出。提前设置好请求头 UA(浏览器标识), 伪装成浏览器, 模拟用户正常请求, 防止 IP 被 ban ~~如果已经被 ban 那就算啦~~。这里可以自己设置 UA 为指定浏览器标识 也可以用库来随机生成。这里建议使用 fake_useragent 库来随机生成 UA 标识。安装 fake_useragent:
pip install fake_useragent
使用:
from fake_useragent import UserAgent
UA = UserAgent().random
headers = {'User-Agent': UA}
拿到的 html 结构不完整
可能是因为网页动态加载的(拿到网页并用浏览器解析之后还需要 JS 来动态更新网页内容), 导致拿到的网页的 HTML 结构残缺, 拿不到想要的数据。可以使用 Selenium 库来模拟浏览器爬虫 这样他会等到网页加载完全再拿出网页的 HTML。安装 Selenium:
pip install selenium
下载你使用的对应版本的浏览器驱动:
- Firefox: geckodriver
- Chrome: chromedriver
- IE: IEDriverServer
- Edge: MicrosoftWebDriver
- Opera: operadriver
- PhantomJS: phantomjs
使用:
# driver = webdriver.Firefox() # Firefox浏览器
# driver = webdriver.Chrome() # Chrome浏览器
# driver = webdriver.Ie() # Internet Explorer浏览器
# driver = webdriver.Edge() # Edge浏览器
# driver = webdriver.Opera() # Opera浏览器
# driver = webdriver.PhantomJS() # PhantomJS
# 这里使用对应的浏览器标识创建实例
browser = webdriver.Chrome()
browser.get(web)
# html 即为拿到的网页html结构字符串
html = browser.page_source
无法永久保存且不会乱码的数据
我太菜了,不会使数据库,所以只能存本地了。为了数据后续处理的方便,这里使用了 csv 文件格式存数据,同时使用了 codecs 打开文件来防止乱码。csv 格式文件很简单,Excel 也能打开并处理,Python 里有 csv 库来读写 csv 格式文件。安装 csv 及 codecs 库:
pip install csv codecs
使用:
filename = "data.csv"
# ab 为追加 wb 为覆盖 更详细的内容自行搜索
csv_file = codecs.open(filename, 'ab', "gbk")
# 打开实例
csv_writer = csv.writer(csv_file)
# 写入表头
csv_writer.writerow(("第一项","第二项"))
info = (1,2)
infos = [(1, 2), (3, 4)]
# 写入一行数据
csv_writer.writerow(info)
# 写入多行数据
csv_writer.writerow(infos)
返回的数据是 json 格式
使用 json 库处理,安装 json 库:
pip install json
使用:
import json
jsonData = '{"name": "Viki"}'
# json字符串转为Python的字典(或者其他数据)
data = json.load(jsonData)
print(data['name']) # Viki
# Python的字典(或者其他数据)转为json字符串
jsonStr = json.dumps(data)
print(jsonStr) '{"name": "Viki"}'
爬取速度太慢
使用 threading 库实现多线程,安装 threading:
pip install threading
使用:
lst = [123, 456, 789]
def double(n):
return n*n
for n in lst:
double_thread = threading.Thread(target=double, args=(n))
double_thread.start()
代码实现
import csv
import json
import time
import random
import codecs
import urllib
import requests
import threading
from bs4 import BeautifulSoup
from selenium import webdriver
from fake_useragent import UserAgent
biliFoodUrls = [
'https://www.bilibili.com/v/food',
'https://www.bilibili.com/v/food/make',
'https://www.bilibili.com/v/food/rural',
'https://www.bilibili.com/v/food/record',
'https://www.bilibili.com/v/food/detective',
'https://www.bilibili.com/v/food/measurement',
'https://www.bilibili.com/v/popular/rank/food'
]
def grabVideoUrl(url, videoUrls, upUrls, upUids, upInfos, browser):
if not url:
return
if url[0] == '/':
url = 'https:'+url
if 'bilibili.com/video' in url:
videoUrls.append(url)
print(f'捕获到视频url:\t{url}')
elif 'space.bilibili.com' in url:
upUrls.append(url)
print(f'捕获到up主url:\t{url}')
def getVideosUrl(videoUrls, upUrls, upUids, upInfos, browser):
for web in biliFoodUrls:
print(f"开始处理页面:\t{web}")
browser.get(web)
html = browser.page_source
webBf = BeautifulSoup(html, 'html.parser')
al = webBf.find_all("a")
for a in al:
url = a.get('href')
args = (url, videoUrls, upUrls, upUids, upInfos, browser)
video_thread = threading.Thread(target=grabVideoUrl, args=args)
video_thread.start()
def grabUpsUrl(videoUrl, videoUrls, upUrls, upUids, upInfos, browser):
print(f"开始处理视频url:\t{videoUrl}")
headers = {
'User-Agent': UserAgent().random,
'Upgrade-Insecure-Requests': '1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, sdch, br',
'Accept-Language': 'zh-CN,zh;q=0.8',
}
html = requests.request('GET', videoUrl, headers=headers).content
webBf = BeautifulSoup(html, "html.parser")
upDiv = webBf.find(id='v_upinfo')
if not upDiv:
print(f"获取视频up主信息遭到拦截\t{videoUrl}")
return
upUrl = upDiv.find_all('a')[0].get('href')
if upUrl[0] == '/':
upUrl = 'https:' + upUrl
upUrls.append(upUrl)
print(f'捕获到up主url:\t{upUrl}')
def getUpsUrl(videoUrls, upUrls, upUids, upInfos, browser):
for videoUrl in videoUrls:
args = (videoUrl, videoUrls, upUrls, upUids, upInfos, browser)
upUrl_thread = threading.Thread(target=grabUpsUrl, args=args)
upUrl_thread.start()
def getUpsId(videoUrls, upUrls, upUids, upInfos, browser):
for upUrl in upUrls:
if not upUrl:
continue
upUid = upUrl.split("#")[0].split('/')[-1]
upUids.append(upUid)
print(f'捕获到up主id:\t{upUid}')
def getUpsInfo(videoUrls, upUrls, upUids, upInfos, browser):
api = 'https://api.bilibili.com/x/web-interface/card?jsonp=jsonp&mid='
ids = set(upUids)
upUids = list(ids)
for index, upUid in enumerate(upUids):
print(f"开始处理mid:{upUid}\t进度:{index+1}/{len(upUids)}")
headers = {
'User-Agent': UserAgent().random,
'Upgrade-Insecure-Requests': '1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, sdch, br',
'Accept-Language': 'zh-CN,zh;q=0.8',
}
res = requests.request('GET', f"{api}{upUid}", headers=headers)
data = res.json()['data']
n = 0
while((not data) and n < 100):
print(f"请求遭到服务器拦截 重试第{n+1}次中...\t总进度:{index+1}/{len(upUids)}")
headers = {
'User-Agent': UserAgent().random,
'Upgrade-Insecure-Requests': '1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, sdch, br',
'Accept-Language': 'zh-CN,zh;q=0.8',
}
res = requests.request('GET', f"{api}{upUid}", headers=headers)
n += 1
data = res.json()['data']
if n == 100:
continue
print(f"读取up主信息(mid={upUid})成功")
# if not data:
# print(f"读取up主信息(mid={upUid})的请求遭到服务器拦截")
# continue
upName = data['card']['name']
follower = data['follower']
mid = data['card']['mid']
upInfos.append({'id': upName, 'follower': follower, 'mid': mid})
upInfos = sorted(upInfos, key=lambda upInfo: upInfo['follower'])
print("写入Excel文件中...")
filename = "bili.csv"
bili_csv = codecs.open(filename, 'ab', "gbk")
bili_csv_writer = csv.writer(bili_csv)
for upInfo in upInfos:
msg = f"写入中\tUp主id:{upInfo['id']}\t粉丝数:{upInfo['follower']}\tmid={upInfo['mid']}"
print(msg)
info = (upInfo['id'], upInfo['follower'], upInfo['mid'])
try:
bili_csv_writer.writerow(info)
except:
continue
bili_csv.close()
print(f"写入到{filename}完成 本次共爬取到{len(upInfos)}个up主信息")
def main(browser):
videoUrls = []
upUrls = []
upUids = []
upInfos = []
getVideosUrl(videoUrls, upUrls, upUids, upInfos, browser)
getUpsUrl(videoUrls, upUrls, upUids, upInfos, browser)
time.sleep(20)
getUpsId(videoUrls, upUrls, upUids, upInfos, browser)
getUpsInfo(videoUrls, upUrls, upUids, upInfos, browser)
if __name__ == '__main__':
browser = webdriver.Chrome()
for i in range(100):
print(f"开始第{i+1}次爬取...")
main(browser)
browser.close()
参考
—— 本文完 ——