|
登录后更精彩...O(∩_∩)O...
您需要 登录 才可以下载或查看,没有账号?立即注册
×
抖音爬虫(主页、喜欢列表全部下载)
抖音爬虫(主页、喜欢列表全部下载)说明
以下代码仅供交流,主要爬取抖音单个用户的主页或喜欢中涉及的所有视频; 文件夹内共以下内容: - 爬虫:douyin_spider.py
- 多线程下载器:douyin_download_N_thread.py
- data.json:记录爬取到的信息,供下载器使用
- 文件夹download_files:储存下载的视频文件
演示
爬虫代码:(data里面还有很多信息,自己可以打印出来看看)
[Python] 纯文本查看 复制代码 import requests
import json
import time
import os
os.chdir(os.path.dirname(os.path.realpath(__file__)))
def get_data(sec_uid,max_cursor,mode):
headers = {
'Connection': 'keep-alive',
'Accept': 'application/json, text/plain, */*',
'Agw-Js-Conv': 'str',
'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Mobile Safari/537.36',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
params = {
'reflow_source': 'reflow_page',
'sec_uid': sec_uid,
'count': '100',
'max_cursor': max_cursor,
}
response = requests.get(f'https://m.douyin.com/web/api/v2/aweme/{mode}/', params=params, headers=headers)
data = response.json()
for d in data['aweme_list']:
output = {}
output['title'] = d['desc']
output['VideoUrl'] = d['video']['play_addr']['url_list'][-1]
output['img'] = d['video']['dynamic_cover']['url_list'][-1]
output['id'] = d['aweme_id']
result.append(output)
print(output['title'])
return data['max_cursor'],data['has_more']
def run(sec_uid,mode='post'):
max_cursor = '0'
for i in range(100):
x = get_data(sec_uid,max_cursor,mode)
if x[1]:
max_cursor = x[0]
get_data(sec_uid,max_cursor,mode)
else:
break
if __name__ == '__main__':
result = []
choice = input('''
*******************************************************
请选择需要下载的类别序号(1/2):
1、该账号主页的所有视频;
2、改账号喜欢列表所有视频。
*******************************************************\n
''')
sec_uid = input('''
*******************************************************
请选择需账号的sec_uid码:
例如:主页链接(PC网页打开)
链接:[url=https://www.douyin.com/user/MS4wLjABAAAAfeHUJALUV_hro9kN7QT5I9pe9DNVDSkiCTiqfK0ziZo?vid=7151405922777107753]https://www.douyin.com/user/MS4w ... 7151405922777107753[/url]
sec_uid码:MS4wLjABAAAAfeHUJALUV_hro9kN7QT5I9pe9DNVDSkiCTiqfK0ziZo
*******************************************************\n''')
sel = {'1':'post','2':'like'}
mode = sel[choice]
path = f'data.json'
try:
run(sec_uid,mode)
print('完成搜索数量:',len(result))
with open(path,'w',encoding='utf-8') as f:
json.dump(result,f,indent=4, ensure_ascii=False)
except:
print('完成搜索数量(lost_part):',len(result))
with open(path,'w',encoding='utf-8') as f:
json.dump(result,f,indent=4, ensure_ascii=False)
time.sleep(3)
多线程下载器代码
[Python] 纯文本查看 复制代码 import queue
import threading
import time
import requests
import json
import re
import os
os.chdir(os.path.dirname(os.path.realpath(__file__)))
class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
# print ("开启线程:" + self.name)
process_data(self.name, self.q)
# print ("退出线程:" + self.name)
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
task_arg = q.get()
queueLock.release()
main(task_arg)
print ("%s processing %s" % (threadName, '*'*20))
else:
queueLock.release()
time.sleep(1)
def thread_task(threadList,task_args,n):
global workQueue,queueLock,exitFlag
queueLock = threading.Lock()
workQueue = queue.Queue(n)
exitFlag = 0
threads = []
threadID = 1
# 创建新线程
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# 填充队列
queueLock.acquire()
for task_arg in task_args:
workQueue.put(task_arg)
queueLock.release()
# 等待队列清空
while not workQueue.empty():
pass
# 通知线程是时候退出
exitFlag = 1
# 等待所有线程完成
for t in threads:
t.join()
print("退出主线程")
def main(data):
# print(data)
url = data['VideoUrl']
id = data['id']
title = data['title']
if title == "":
name = id
else:
name = title
intab = r'[?*/\|.:><]'
name = re.sub(intab, "", name).replace(" ","")
try:
response = requests.get(url, headers=headers)
except:
print('网页请求错误:\n','*'*100+'\n',name+'\n','*'*100)
try:
with open (f'{outout_dir}//{name}.mp4','wb') as b:
b.write(response.content)
print('已下载:',name)
except:
print('下载错误:\n','*'*100+'\n',name+'\n','*'*100)
if __name__ == '__main__':
threadList = []
for i in range(50):
threadList.append(f'Thread-{i+1}')
outout_dir = r'./download_files'
from faker import Factory
fake = Factory().create('zh_CN')
headers = {'User-Agent': fake.user_agent()}
with open('data.json','r',encoding='utf-8') as f:
data_json = json.load(f)
task_args = data_json
thread_task(threadList, task_args,len(task_args))
|
|