Python Spider学习笔记(一):爬取B站视频基本信息
后台-插件-广告管理-内容页头部广告(手机) |
一、创作来源
最近搞数据分析需要爬取B站上相关视频的内容,但打开两年前的代码却发现已经跑不通了,或者说根本就是漏洞百出。经过一段时间的缝缝补补,我发现是B站的网页代码更换的原因。(应该是吧,不确定哈!)由于当时写代码的时候也是东抄西抄,最后搞得自己也看不懂是什么意思(鬼知道当时的程序怎么跑起来的)。索性从头来过,自己学自己写。
二、第一部分:利用Selenium获取BV_ID
对于B站视频来说,只要知道了他的BV号就相当于一个人你知道了他的身份证号,想要知晓他的更多的信息也就不是什么难事儿了,因此在本文中,我们要进行的第一步就是获取到我们想要爬取信息的B站视频的身份证——BV_ID。
这是第一步,也是最关键的一步。这个时候就需要用到selenium这个库,关于这个库的详细介绍和科普可以去别的博客下了解,这里我们不过多赘述。
Selenium库的安装:
pip install selenium具体使用方法如下:
引入必要的库
- from selenium import webdriver
- from selenium.webdriver.common.by import By
- from selenium.webdriver.chrome.options import Options
定义一个selenium的爬虫
这里只用关键词作为形参,其余的部分都可以以这些关键词为核心进行生成
- def spider_bvid(keyword):
- """
- 利用seleniume获取搜索结果的bvid,供给后续程序使用
- :param keyword: 搜索关键词
- :return: 生成去重的output_filename = f'{keyword}BV号.csv'
- """
首先,定义一个用于写入爬取结果的文件,这里因为数据量大,我选择了csv文件,大家在自己运行的时候,可以选择自己喜欢的文件形式。
接着设置无界面爬虫,窗口大小并禁用gpu加速,以减少浏览器的内存占用,防止出现浏览器崩溃的情况。
- # 保存的文件名
- input_filename = f'{keyword}BV号.csv'
- # 启动爬虫
- options = Options()
- options.add_argument('--headless')
- options.add_argument('--disable-gpu')
- browser = webdriver.Chrome(options=options) # 设置无界面爬虫
- browser.set_window_size(1400, 900) # 设置全屏,注意把窗口设置太小的话可能导致有些button无法点击
- browser.get('https://bilibili.com')
- # 刷新一下,防止搜索button被登录弹框遮住
- browser.refresh()
- print("============成功进入B站首页!!!===========")
利用网页元素定位找到B站首页的搜索框和搜索按钮,输入我们要搜索的关键词,确定点击搜索按钮。
- input = browser.find_element(By.CLASS_NAME, 'nav-search-input')
- button = browser.find_element(By.CLASS_NAME, 'nav-search-btn')
- # 输入关键词并点击搜索
- input.send_keys(keyword)
- button.click()
- print(f'==========成功搜索{keyword}相关内容==========')
成功进入搜索结果页面后,我们本来的技术思路是:1、根据网页元素定位,css、xpath或其他方法定位到页面最下方的页数box和下一页box;2、通过获取最后一页的box中的text值和下一页的text值,利用循环不断模拟点击下一页,从而达到爬取所有结果页面的内容的目的。
但B站网页代码更改后,显示为34页,网页内容检查后显示为42页(至多),因而会导致页面定位的不同,在我个人运行程序的过程中,出现过第一个关键词搜索可以定位到最大页数box,但是利用循环输入的下一个keyword就无法定位到相同的box。
因而我们更换思路:B站搜索结果显示的并非全部内容,而是至多一千多个,也就是说页面数最大是42页。因而当我们手动搜索后发现结果页面页数较多时,可以直接设置最大页数为42。
同样,由于本小白并不会定位这个新的下一页按钮,因而我通过循环输入页码数,拼成搜索的网页URL进而达到与模拟点击下一页相同的效果。但这种操作方法的结果就是会出现重复爬取第情况,因而需要在最后进行去重操作。
另外值得一说的是,如果各位像研究元素定位到话,我之前看到本站里有一位大佬“潘帕斯的雄鹰”,他写了一个只用selenium进行滚动爬取搜索结果下所有视频的一级、二级评论的博客。代码是开源的,那个里面的定位和断点续爬都做得很好,有时间的话可以研究一下。
- # 设置窗口
- all_h = browser.window_handles
- browser.switch_to.window(all_h[1])
- # B站最多显示42页
- total_page = 42
- # 同样由于B站网页代码的更改,通过找到并点击下一页的方式个人暂不能实现
- #(对,不会分析那个破网页!!!)
- for i in range(0, total_page):
- # url 需要根据不同关键词进行调整内容!!!
- # 这里的url需要自己先搜索一下然后复制网址进来
- url = (f"https://search.bilibili.com/all?keyword={keyword}"
- f"&from_source=webtop_search&spm_id_from='你自己的'&search_source='你自己的'&page={i}")
- print(f"===========正在尝试获取第{i + 1}页网页内容===========")
- print(f"===========本次的url为:{url}===========")
- browser.get(url)
- # 这里请求访问网页的时间也比较久(可能因为我是macos),所以是否需要等待因设备而异
- # 取消刷新并长时间休眠爬虫以避免爬取太快导致爬虫抓取到js动态加载源码
- # browser.refresh()
- print('正在等待页面加载:3')
- time.sleep(1)
- print('正在等待页面加载:2')
- time.sleep(1)
- print('正在等待页面加载:1')
- time.sleep(1)
能够顺利获取到所有页面结果之后,我们就可以直接分析页面,因为我们只需要获取到BV号就可以,因此并不需要重复爬取一些后面可以轻易获得的数据。
这里直接使用bs4对页面进行分析,直接定位到card中的herf,获取每个视频的详情页URL,在这个URL中可以拆分出我们需要的BV号。
- # 直接分析网页
- html = browser.page_source
- # print("网页源码" + html) 用于判断是否获取成功
- soup = BeautifulSoup(html, 'lxml')
- infos = soup.find_all(class_='bili-video-card')
- bv_id_list = []
- for info in infos:
- # 只定位视频链接
- href = info.find('a').get('href')
- # 拆分
- split_url_data = href.split('/')
- # 利用循环删除拆分出现的空白
- for element in split_url_data:
- if element == '':
- split_url_data.remove(element)
- # 打印检验内容
- # print(split_url_data)
- # 获取bvid
- bvid = split_url_data[2]
- # 利用if语句直接去重
- if bvid not in bv_id_list:
- bv_id_list.append(bvid)
- for bvid_index in range(0, len(bv_id_list)):
- # 写入 input_filename
- write_to_csv_bvid(input_filename, bv_id_list[bvid_index])
- # 输出提示进度
- print('写入文件成功')
- print("===========成功获取第" + str(i + 1) + "次===========")
- time.sleep(1)
- i += 1
- # 退出爬虫
- browser.quit()
- # 打印信息显示是否成功
- print(f'==========爬取完成。退出爬虫==========')
写入文件后,我们就能得到去重之后的BV号了,下面就可以通过BV号来爬取我们需要的视频的基本信息了。
三、第二部分 Request函数请求访问
Request函数不用多说,涉及到爬虫的程序大多都会用到,这里也不再赘述。另一方面,使用request函数的原因是bilibili的api开放接口可以轻松地获取到我们想要的信息。
这里也可以使用bilibili-api库,但本文不使用的原因是:bilibili-api库在获取视频详细信息时需要进行异步请求,但直接在循环中调用异步请求的函数会导致各种报错,小白肯定是无法解决的,哪怕翻阅资料也很难看懂。(对,就是我没看懂。)例如: aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected、pipe broken等。
因此这里用比较笨的方法,通过BV号拼接成视频数据api接口的URL在进行访问,返回的页面转成json格式,然后直接读取json字典中的值,进行调用。
这里想要了解更多B站API接口的,可以去Github上查看。
https://github.com/SocialSisterYi/bilibili-API-collect此处的关于视频信息的API接口URL为:
- #A EXAMPLE : https://api.bilibili.com/x/web-interface/view?bvid=BV1n24y1D75V
- api_url = f'https://api.bilibili.com/x/web-interface/view?bvid={bv_id}'
写在最前面:使用Request函数时,一定要注意时间间隔。如果你的设备接入URL很快,那么可以适当增加间隔,如果接入较慢,可以适当减少间隔。Time.sleep()的数值至少要大于1.5s,不然轻则报错,重则被叔叔封网络IP。(当然,如果真的被封IP了,换个网络环境就行了。比如你在家里用的无线网,那么切换到自己的手机流量热点就可以解决。)
requests.exceptions.SSLError: HTTPSConnectionPool(host='api.bilibili.com', port=443)具体的调用代码为:
首先,我们需要编写我们自己的请求头。User-Agenta是我们自己的网络代理,可以有chrome、firefox等,Cookies就是网站获取到一些记录文件,主要用于识别。这两个值都可以通过网页抓包来获取。以Chrome为例,登录B站后,点开任意一个视频播放后,按下F12(win)或option+command+I(mac)后,进入network部分,尽量找到total?list开头的js文件,里面就可以比较轻松地找到我们需要的这两个值。
接着,传入BV号后,拼接成可用的URL后,可以自己拼好后先用浏览器打开进行检验和分析一下,以确保URL的有效性。之后,利用json库返回json形式的网页源码。返回的值基本是字典,很好操作。
- def get_video_info(bv_id):
- headers = {
- 'User-Agent': "你的",
- 'Cookie': "你的"}
- api_url = f'https://api.bilibili.com/x/web-interface/view?bvid={bv_id}'
- # 打印本次要获取的bvid,用于错误时确认
- print(f"正在进行爬取uid为:{bv_id}的UP主的粉丝数量与作品总数")
- print(f"==========本次获取数据的视频BV号为:{bv_id}==========")
- print(f"url为:{api_url}")
- # https://api.bilibili.com/x/web-interface/view?BV1n24y1D75V
- video_info = requests.get(url=api_url, headers=headers)
- video_info_json = json.loads(video_info.text)
得到网页源码后,我们需要的值都存放在"data"标签中,直接根据我们对需要进行调用就可以。我这里是新建了一个字典,进行存储值,大家也可以不用这么麻烦。 对于相关值,英文名对应的中文意义,可以参考这篇知乎专栏的介绍,也可在上面的github文档中进行查看。
- # 创建存放的字典
- info_dict = {}
- # 信息解读
- # https://zhuanlan.zhihu.com/p/618885790
- # 视频bvid,即bv号
- bvid = video_info_json['data']['bvid']
- info_dict['bvid'] = bvid
- # 视频aid,即av号
- aid = video_info_json['data']['aid']
- info_dict['aid'] = aid
- # 视频cid,用于获取弹幕信息
- cid = video_info_json['data']['cid']
- info_dict['cid'] = cid
- # 作者id
- mid = video_info_json['data']['owner']['mid']
- info_dict['mid'] = mid
- # up主昵称
- name = video_info_json['data']['owner']['name']
- info_dict['name'] = name
- # 视频标题
- title = video_info_json['data']['title']
- info_dict['title'] = title
- # 视频标签
- tname = video_info_json['data']['tname']
- info_dict['tname'] = tname
- # 视频发布时间戳
- pubdate = video_info_json['data']['pubdate']
- # 转化时间戳
- pub_datatime = datetime.fromtimestamp(pubdate)
- # 整体格式
- pub_datatime_strf = pub_datatime.strftime('%Y-%m-%d %H:%M:%S')
- # 日期
- date = re.search(r"(\d{4}-\d{1,2}-\d{1,2})", pub_datatime_strf)
- info_dict['pub_date'] = date.group()
- # 时间
- pub_time = re.search(r"(\d{1,2}:\d{1,2}:\d{1,2})", pub_datatime_strf)
- info_dict['pub_time'] = pub_time.group()
- # 视频创建时间戳
- # ctime = info['ctime']
- # 视频简介
- desc = video_info_json['data']['desc']
- info_dict['desc'] = desc
- # 视频播放量
- view = video_info_json['data']['stat']['view']
- info_dict['view'] = view
- # 点赞数
- like = video_info_json['data']['stat']['like']
- info_dict['like'] = like
- # 投币数
- coin = video_info_json['data']['stat']['coin']
- info_dict['coin'] = coin
- # 收藏数
- favorite = video_info_json['data']['stat']['favorite']
- info_dict['favorite'] = favorite
- # 分享数
- share = video_info_json['data']['stat']['share']
- info_dict['share'] = share
- # 评论数
- repiy = video_info_json['data']['stat']['reply']
- info_dict['reply'] = repiy
- # 视频弹幕数量
- danmaku = video_info_json['data']['stat']['danmaku']
- info_dict['danmaku'] = danmaku
- print(f'=========={bv_id} 的视频基本信息已成功获取==========')
- # 发布作品时的动态
- # dynamic = info['dynamic']
- print('正在等待,以防访问过于频繁\n')
- time.sleep(3)
- return info_dict
如此就返回了带有我们数据的字典,后续可以直接调用。
获取UP主信息的整体思路相同,这里就不再赘述,直接贴上代码:
- def get_user_info(uid):
- """
- 通过uid(即mid)获取UP主的粉丝总数和作品总数
- :param uid: mid
- :return:user_info_dict
- """
- # 定义空字典用于存放数据
- # 粉丝数 follower
- # 作品总数 archive
- user_info_dict = {}
- # 首先写入请求头
- # 设置用户代理 User_Agent及Cookies
- headers = {
- 'User-Agent': "",
- 'Cookie': ""}
- # 将传入的的uid组成up主主页的api_url
- # A Example: https://api.bilibili.com/x/web-interface/card?mid=1177893348
- api_url = f'https://api.bilibili.com/x/web-interface/card?mid={uid}'
- # https://api.bilibili.com/x/web-interface/view?BV1n24y1D75V
- # 打印次数,数据量大,便于查看进程
- print(f"正在进行爬取uid为:{uid}的UP主的粉丝数量与作品总数")
- # 打印本次要获取的uid,用于错误时确认
- print(f"==========本次获取数据的up主的uid为:{uid}==========")
- print(f"url为{api_url}")
- # 利用requests进行访问,并返回需要的封装信息
- up_info = requests.get(url=api_url, headers=headers)
- # 不知道会不会被封ip,保险起见
- # time.sleep(2)
- # 将数据转化为json格式
- up_info_json = json.loads(up_info.text)
- # 利用json定位相关数据
- fans_number = up_info_json['data']['card']['fans']
- user_info_dict['follower'] = fans_number
- archive_count = up_info_json['data']['archive_count']
- user_info_dict['archive'] = archive_count
- print(f'=========={bv_id} 的作者基本信息已成功获取==========\n')
- # 等待
- print('正在等待,以防访问过于频繁\n')
- time.sleep(1.5)
- return user_info_dict
四、第三部分 最后的调用
上面的函数都写好之后,我们只需要创建一个主入口,之后直接调用函数就可以。
- if __name__ == '__main__':
- # 针对不同内容修改搜索关键词!!!!
- keywords = ["1", "2"]
- for keyword in keywords:
- # 自动爬取多个主题时须注意上面的最大页数定位问题
- # 爬取后生成去重了的len(keywords)个f'{keyword}BV号.csv'文件
- spider_bvid(keyword)
- for keyword in keywords:
- # 拼接成文件名
- csv_to_merge = f'{keyword}BV号.csv'
- # 合并后生成未去重的文件
- merge_csv(input_filename=csv_to_merge, output_filename='BV号合并.csv')
- # 遍历读取bv_id
- filename = 'BV号合并.csv'
- # 打开文件并去重
- open_csv = pd.read_csv(filename)
- open_csv.drop_duplicates(subset='BV号')
- bv_id_list = np.array(open_csv['BV号'])
- """
- # 第一次调用,若读取csv进行爬取时,意外中断
- # 则更改为读取txt文本,将已爬取第bvid删除,以达到断点续爬的目的
- for bvid in bv_id_list:
- with open("bv_id_list.txt", 'a') as f:
- f.write(bvid+'\n')
- """
- with open("bv_id_list.txt", 'r') as f:
- bv_id_list = f.readlines()
- # 循环写入内容
- for i in range(0, len(bv_id_list)):
- bv_id = bv_id_list[i]
- print(f'正在进行第{i+1}次爬取\n')
- # 获取视频所有的基本信息
- video_info = get_video_info(bv_id)
- bvid = video_info['bvid']
- aid = video_info['aid']
- cid = video_info['cid']
- mid = video_info['mid']
- name = video_info['name']
- title = video_info['title']
- tname = video_info['tname']
- pub_date = video_info['pub_date']
- pub_time = video_info['pub_time']
- desc = video_info['desc']
- view = video_info['view']
- like = video_info['like']
- coin = video_info['coin']
- favorite = video_info['favorite']
- share = video_info['share']
- reply = video_info['reply']
- danmaku = video_info['danmaku']
- # 传播效果计算公式
- Communication_Index = math.log(
- 0.5 * int(view) + 0.3 * (int(like) + int(coin) + int(favorite)) + 0.2 * (int(reply) + int(danmaku)))
- # 获取作者的相关信息
- user_info = get_user_info(uid=mid)
- follower = user_info['follower']
- archive = user_info['archive']
- write_to_csv(filename='视频基本信息.csv', bvid=bvid, aid=aid, cid=cid, mid=mid, name=name, follower=follower,
- archive=archive, title=title, tname=tname, pub_date=pub_date, pub_time=pub_time, desc=desc,
- view=view, like=like, coin=coin, favorite=favorite, share=share, reply=reply, danmaku=danmaku,
- communication_index=Communication_Index)
- print(f'==========第{i+1}个BV号:{bv_id}的相关数据已写入csv文件中==========')
- print('==================================================\n')
五、完整代码
需要注明的一些事情:
1、writr_to_csv()函数是抄的大佬的代码,虽然后面else基本用不到,但是前面的真的很好用,大家可以自己理解一下再改一改。
2、关于传播效果计算公式,这个是引用自己大佬的论文,如果有需要请注明引用,学术不端是很严重的事情。
引用:陈强,张杨一,马晓悦,等. 政务 B 站号信息传播效果影响因素与实证研究[J]. 图书情报工作,2020,64( 22) : 126 - 134.
3、代码中的一些部分,我是以非常笨的方法解决到,有同学优化了之后请贴在评论区交流学习。
- # -*- coding: utf-8 -*-
- """
- @ Project : pythonProject
- @ File : spider bilibi.py
- @ IDE : PyCharm
- @ Auther : Avi-OvO-CreapDiem
- @ Date : 2023/9/2 08:49
- @ Purpose :
- """
- import re
- import os
- import csv
- import time
- import math
- import json
- import requests
- import numpy as np
- import pandas as pd
- from datetime import datetime
- from bs4 import BeautifulSoup
- from selenium import webdriver
- from selenium.webdriver.common.by import By
- from selenium.webdriver.chrome.options import Options
- def merge_csv(input_filename, output_filename):
- """
- 读取csv文件内容,并写入新的文件
- :param input_filename: 传入的文件名称
- :param output_filename: 写入的新文件的名称
- :return: 向新文件中写入input_filename中的内容
- """
- # 读取文件
- csv_data_read = pd.read_csv(input_filename)
- # 获取文件总行数
- number_of_row = (len(csv_data_read))
- # 循环该csv文件中的所有行,并写入信息
- for i in range(0, number_of_row):
- row_info = csv_data_read.values[i]
- # 输出查看内容
- # print(row_info)
- # 具体内容
- row_content = row_info[0]
- # 写入
- write_to_csv_bvid(output_filename, row_content)
- # 退出循环
- # 打印进度
- print(f'成功向{output_filename}中写入了{input_filename}的全部信息')
- def write_to_csv_bvid(input_filename, bvid):
- """
- 写入新的csv文件,若没有则创建,须根据不同程序进行修改
- :param input_filename: 写入的文件名称
- :param bvid: BV号
- :return: 生成写入的input_filename文件
- """
- # OS 判断路径是否存在
- file_exists = os.path.isfile(input_filename)
- # 设置最大尝试次数
- max_retries = 50
- retries = 0
- while retries < max_retries:
- try:
- with open(input_filename, mode='a', encoding='utf-8', newline='') as csvfile:
- fieldnames = ['BV号']
- writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
- if not file_exists:
- writer.writeheader()
- writer.writerow({
- 'BV号': bvid
- })
- # print('写入文件成功')
- break # 如果成功写入,跳出循环
- except PermissionError as e:
- retries += 1
- print(f"将爬取到的数据写入csv时,遇到权限错误Permission denied,文件可能被占用或无写入权限: {e}")
- print(f"等待3s后重试,将会重试50次... (尝试 {retries}/{max_retries})")
- time.sleep(3) # 等待10秒后重试
- else:
- print("将爬取到的数据写入csv时遇到权限错误,且已达到最大重试次数50次,退出程序")
- def spider_bvid(keyword):
- """
- 利用seleniume获取搜索结果的bvid,供给后续程序使用
- :param keyword: 搜索关键词
- :return: 生成去重的output_filename = f'{keyword}BV号.csv'
- """
- # 保存的文件名
- input_filename = f'{keyword}BV号.csv'
- # 启动爬虫
- options = Options()
- options.add_argument('--headless')
- options.add_argument('--disable-gpu')
- browser = webdriver.Chrome(options=options) # 设置无界面爬虫
- browser.set_window_size(1400, 900) # 设置全屏,注意把窗口设置太小的话可能导致有些button无法点击
- browser.get('https://bilibili.com')
- # 刷新一下,防止搜索button被登录弹框遮住
- browser.refresh()
- print("============成功进入B站首页!!!===========")
- input = browser.find_element(By.CLASS_NAME, 'nav-search-input')
- button = browser.find_element(By.CLASS_NAME, 'nav-search-btn')
- # 输入关键词并点击搜索
- input.send_keys(keyword)
- button.click()
- print(f'==========成功搜索{keyword}相关内容==========')
- # 设置窗口
- all_h = browser.window_handles
- browser.switch_to.window(all_h[1])
- """
- # 这里可以通过xpath或者其他方法找到B站搜索结果页最下方的页码数值
- # 但B站网页代码更改后,显示为34页,网页内容检查后显示为42页(至多)
- # 由于我们的搜索结果很多,肯定超出B站最大显示的42页,故而直接设置最大页数为42
- # 找到最后一个页码所在位置,并获取值
- # total_btn = browser.find_element(By.XPATH,"//*[@id="i_cecream"]/div/div[2]/div[2]/div/div/div/div[4]/div/div/button[9]"")
- # //*[@id="i_cecream"]/div/div[2]/div[2]/div/div/div/div[4]/div/div/button[9]
- # total = int(total_btn)
- # print(f'==========成功搜索! 总页数: {total}==========')
- """
- # B站最多显示42页
- total_page = 42
- # 同样由于B站网页代码的更改,通过找到并点击下一页的方式个人暂不能实现(对,不会分析那个破网页!!!)
- # 因此这里利用总页数进行循环访问来实现自动翻页的效果
- for i in range(0, total_page):
- # url 需要根据不同关键词进行调整内容!!!
- url = (f"https://search.bilibili.com/all?keyword={keyword}"
- f"&from_source=webtop_search&spm_id_from=333.1007&search_source=5&page={i}")
- print(f"===========正在尝试获取第{i + 1}页网页内容===========")
- print(f"===========本次的url为:{url}===========")
- browser.get(url)
- # 这里请求访问网页的时间也比较久(可能因为我是macos),所以是否需要等待因设备而异
- # 取消刷新并长时间休眠爬虫以避免爬取太快导致爬虫抓取到js动态加载源码
- # browser.refresh()
- print('正在等待页面加载:3')
- time.sleep(1)
- print('正在等待页面加载:2')
- time.sleep(1)
- print('正在等待页面加载:1')
- time.sleep(1)
- # 直接分析网页
- html = browser.page_source
- # print("网页源码" + html) 用于判断是否获取成功
- soup = BeautifulSoup(html, 'lxml')
- infos = soup.find_all(class_='bili-video-card')
- bv_id_list = []
- for info in infos:
- # 只定位视频链接
- href = info.find('a').get('href')
- # 拆分
- split_url_data = href.split('/')
- # 利用循环删除拆分出现的空白
- for element in split_url_data:
- if element == '':
- split_url_data.remove(element)
- # 打印检验内容
- # print(split_url_data)
- # 获取bvid
- bvid = split_url_data[2]
- # 利用if语句直接去重
- if bvid not in bv_id_list:
- bv_id_list.append(bvid)
- for bvid_index in range(0, len(bv_id_list)):
- # 写入 input_filename
- write_to_csv_bvid(input_filename, bv_id_list[bvid_index])
- # 输出提示进度
- print('写入文件成功')
- print("===========成功获取第" + str(i + 1) + "次===========")
- time.sleep(1)
- i += 1
- # 退出爬虫
- browser.quit()
- # 打印信息显示是否成功
- print(f'==========爬取完成。退出爬虫==========')
- def write_to_csv(filename, bvid, aid, cid, mid, name, follower, archive, title, tname, pub_date, pub_time, desc,
- view, like, coin, favorite, share, reply, danmaku, communication_index):
- """
- 向csv文件中写入B站视频相关的基本信息,未按路径找到文件,则新建文件
- :param filename: 写入数据的文件名
- :param bvid: BV号
- :param aid: AV号
- :param cid: 用于获取弹幕文本的
- :param mid: UP主的ID
- :param name: UP主名称
- :param follower: UP主粉丝数
- :param archive: UP主作品总数
- :param title: 标题
- :param tname: tag名称
- :param pub_date: 发布日期
- :param pub_time: 发布时间
- :param desc: 视频简介
- :param view: 播放量
- :param like: 点赞数
- :param coin: 投币数
- :param favorite: 收藏数
- :param share: 分享数
- :param reply: 评论数
- :param danmaku: 弹幕数
- :param communication_index: 传播效果公式的值
- :return:
- """
- file_exists = os.path.isfile(filename)
- max_retries = 50
- retries = 0
- while retries < max_retries:
- try:
- with open(filename, mode='a', encoding='utf-8', newline='') as csvfile:
- fieldnames = ['BV号', 'AV号', 'CID', 'UP主ID', 'UP主名称', 'UP主粉丝数', '作品总数', '视频标题',
- '视频分类标签',
- '发布日期', '发布时间', '视频简介', '播放量', '点赞数', '投币数', '收藏数', '分享数',
- '评论数',
- '弹幕数', '传播效果指数']
- writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
- if not file_exists:
- writer.writeheader()
- writer.writerow({
- 'BV号': bvid, 'AV号': aid, 'CID': cid, 'UP主ID': mid, 'UP主名称': name, 'UP主粉丝数': follower,
- '作品总数': archive, '视频标题': title, '视频分类标签': tname, '发布日期': pub_date,
- '发布时间': pub_time,
- '视频简介': desc, '播放量': view, '点赞数': like, '投币数': coin, '收藏数': favorite,
- '分享数': share,
- '评论数': reply, '弹幕数': danmaku, '传播效果指数': communication_index
- })
- break # 如果成功写入,跳出循环
- except PermissionError as e:
- retries += 1
- print(f"将爬取到的数据写入csv时,遇到权限错误Permission denied,文件可能被占用或无写入权限: {e}")
- print(f"等待3s后重试,将会重试50次... (尝试 {retries}/{max_retries})")
- else:
- print("将爬取到的数据写入csv时遇到权限错误,且已达到最大重试次数50次,退出程序")
- def get_user_info(uid):
- """
- 通过uid(即mid)获取UP主的粉丝总数和作品总数
- :param uid: mid
- :return:user_info_dict
- """
- # 定义空字典用于存放数据
- # 粉丝数 follower
- # 作品总数 archive
- user_info_dict = {}
- # 首先写入请求头
- # 设置用户代理 User_Agent及Cookies
- headers = {
- 'User-Agent': "",
- 'Cookie': ""}
- # 将传入的的uid组成up主主页的api_url
- # A Example: https://api.bilibili.com/x/web-interface/card?mid=1177893348
- api_url = f'https://api.bilibili.com/x/web-interface/card?mid={uid}'
- # https://api.bilibili.com/x/web-interface/view?BV1n24y1D75V
- # 打印次数,数据量大,便于查看进程
- print(f"正在进行爬取uid为:{uid}的UP主的粉丝数量与作品总数")
- # 打印本次要获取的uid,用于错误时确认
- print(f"==========本次获取数据的up主的uid为:{uid}==========")
- print(f"url为{api_url}")
- # 利用requests进行访问,并返回需要的封装信息
- up_info = requests.get(url=api_url, headers=headers)
- # 不知道会不会被封ip,保险起见
- # time.sleep(2)
- # 将数据转化为json格式
- up_info_json = json.loads(up_info.text)
- # 利用json定位相关数据
- fans_number = up_info_json['data']['card']['fans']
- user_info_dict['follower'] = fans_number
- archive_count = up_info_json['data']['archive_count']
- user_info_dict['archive'] = archive_count
- print(f'=========={bv_id} 的作者基本信息已成功获取==========\n')
- # 等待
- print('正在等待,以防访问过于频繁\n')
- time.sleep(1.5)
- return user_info_dict
- def get_video_info(bv_id):
- headers = {
- 'User-Agent': "",
- 'Cookie': ""}
- api_url = f'https://api.bilibili.com/x/web-interface/view?bvid={bv_id}'
- # 打印本次要获取的bvid,用于错误时确认
- print(f"正在进行爬取uid为:{bv_id}的UP主的粉丝数量与作品总数")
- print(f"==========本次获取数据的视频BV号为:{bv_id}==========")
- print(f"url为:{api_url}")
- # https://api.bilibili.com/x/web-interface/view?BV1n24y1D75V
- video_info = requests.get(url=api_url, headers=headers)
- video_info_json = json.loads(video_info.text)
- # 创建存放的字典
- info_dict = {}
- # 信息解读
- # https://zhuanlan.zhihu.com/p/618885790
- # 视频bvid,即bv号
- bvid = video_info_json['data']['bvid']
- info_dict['bvid'] = bvid
- # 视频aid,即av号
- aid = video_info_json['data']['aid']
- info_dict['aid'] = aid
- # 视频cid,用于获取弹幕信息
- cid = video_info_json['data']['cid']
- info_dict['cid'] = cid
- # 作者id
- mid = video_info_json['data']['owner']['mid']
- info_dict['mid'] = mid
- # up主昵称
- name = video_info_json['data']['owner']['name']
- info_dict['name'] = name
- # 视频标题
- title = video_info_json['data']['title']
- info_dict['title'] = title
- # 视频标签
- tname = video_info_json['data']['tname']
- info_dict['tname'] = tname
- # 视频发布时间戳
- pubdate = video_info_json['data']['pubdate']
- # 转化时间戳
- pub_datatime = datetime.fromtimestamp(pubdate)
- # 整体格式
- pub_datatime_strf = pub_datatime.strftime('%Y-%m-%d %H:%M:%S')
- # 日期
- date = re.search(r"(\d{4}-\d{1,2}-\d{1,2})", pub_datatime_strf)
- info_dict['pub_date'] = date.group()
- # 时间
- pub_time = re.search(r"(\d{1,2}:\d{1,2}:\d{1,2})", pub_datatime_strf)
- info_dict['pub_time'] = pub_time.group()
- # 视频创建时间戳
- # ctime = info['ctime']
- # 视频简介
- desc = video_info_json['data']['desc']
- info_dict['desc'] = desc
- # 视频播放量
- view = video_info_json['data']['stat']['view']
- info_dict['view'] = view
- # 点赞数
- like = video_info_json['data']['stat']['like']
- info_dict['like'] = like
- # 投币数
- coin = video_info_json['data']['stat']['coin']
- info_dict['coin'] = coin
- # 收藏数
- favorite = video_info_json['data']['stat']['favorite']
- info_dict['favorite'] = favorite
- # 分享数
- share = video_info_json['data']['stat']['share']
- info_dict['share'] = share
- # 评论数
- repiy = video_info_json['data']['stat']['reply']
- info_dict['reply'] = repiy
- # 视频弹幕数量
- danmaku = video_info_json['data']['stat']['danmaku']
- info_dict['danmaku'] = danmaku
- print(f'=========={bv_id} 的视频基本信息已成功获取==========')
- # 发布作品时的动态
- # dynamic = info['dynamic']
- print('正在等待,以防访问过于频繁\n')
- time.sleep(1.5)
- return info_dict
- if __name__ == '__main__':
- # 针对不同内容修改搜索关键词!!!!
- keywords = ["1", "2"]
- for keyword in keywords:
- # 自动爬取多个主题时须注意上面的最大页数定位问题
- # 爬取后生成去重了的len(keywords)个f'{keyword}BV号.csv'文件
- spider_bvid(keyword)
- for keyword in keywords:
- # 拼接成文件名
- csv_to_merge = f'{keyword}BV号.csv'
- # 合并后生成未去重的文件
- merge_csv(input_filename=csv_to_merge, output_filename='BV号合并.csv')
- # 遍历读取bv_id
- filename = 'BV号合并.csv'
- # 打开文件并去重
- open_csv = pd.read_csv(filename)
- open_csv.drop_duplicates(subset='BV号')
- bv_id_list = np.array(open_csv['BV号'])
- """
- # 第一次调用,若读取csv进行爬取时,意外中断
- # 则更改为读取txt文本,将已爬取第bvid删除,以达到断点续爬的目的
- for bvid in bv_id_list:
- with open("bv_id_list.txt", 'a') as f:
- f.write(bvid+'\n')
- with open("bv_id_list.txt", 'r') as f:
- bv_id_list = f.readlines()
- """
- # 循环写入内容
- for i in range(0, len(bv_id_list)):
- bv_id = bv_id_list[i]
- print(f'正在进行第{i+1}次爬取\n')
- # 获取视频所有的基本信息
- video_info = get_video_info(bv_id)
- bvid = video_info['bvid']
- aid = video_info['aid']
- cid = video_info['cid']
- mid = video_info['mid']
- name = video_info['name']
- title = video_info['title']
- tname = video_info['tname']
- pub_date = video_info['pub_date']
- pub_time = video_info['pub_time']
- desc = video_info['desc']
- view = video_info['view']
- like = video_info['like']
- coin = video_info['coin']
- favorite = video_info['favorite']
- share = video_info['share']
- reply = video_info['reply']
- danmaku = video_info['danmaku']
- # 传播效果计算公式
- Communication_Index = math.log(
- 0.5 * int(view) + 0.3 * (int(like) + int(coin) + int(favorite)) + 0.2 * (int(reply) + int(danmaku)))
- # 获取作者的相关信息
- user_info = get_user_info(uid=mid)
- follower = user_info['follower']
- archive = user_info['archive']
- write_to_csv(filename='视频基本信息.csv', bvid=bvid, aid=aid, cid=cid, mid=mid, name=name, follower=follower,
- archive=archive, title=title, tname=tname, pub_date=pub_date, pub_time=pub_time, desc=desc,
- view=view, like=like, coin=coin, favorite=favorite, share=share, reply=reply, danmaku=danmaku,
- communication_index=Communication_Index)
- print(f'==========第{i+1}个BV号:{bv_id}的相关数据已写入csv文件中==========')
- print('==================================================\n')
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。
在线投稿:投稿 站长QQ:1888636
后台-插件-广告管理-内容页尾部广告(手机) |