抖音下载个人所有视频

磊落不羁 by:磊落不羁 分类:爬虫 时间:2年前 阅读:98 评论:0

今天写了个抖音下载个人所有视频 用的是selenium 自动化技术获取的链接 ,多进程下载。同时分享网上另外一个高手写的批量下载 ,带进度条

先发布我的代码,很好用,但是一个缺陷没有解决就是不能按照文件夹归类,原因是无法从网页获取用户名称(待解决)

from selenium import webdriver
from selenium.webdriver.common.by import By
import time
import requests
import re
import json
from urllib.parse import unquote
import warnings
from urllib import parse
import os
from concurrent.futures import ProcessPoolExecutor
warnings.filterwarnings("ignore")

def getvideo(url):
    try:
        #1、 通过url获取script内内容
        warnings.filterwarnings("ignore")

        headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36',
            'referer': url,
            'cookie': 'douyin.com; __ac_nonce=06317e37700308002c952; __ac_signature=_02B4Z6wo00f01onlKrQAAIDDKuRlcbJJXraJxS4AAMF6b6; ttwid=1%7CpR53EihdTN2mlWXH16pjjz6o-HWccVIUgOYTAuaXWus%7C1662509944%7C312579cf34d39b11e4f9c7b50ded8602298923556584b9a2c692560d8c52360e; douyin.com; strategyABtestKey=1662509944.784; s_v_web_id=verify_l7qvhvjy_8Ye3fupH_VbNA_4Kx6_ARuH_UcWzPBA2zLmZ; passport_csrf_token=c060c99d6d1106ebe36a14b73808efe9; passport_csrf_token_default=c060c99d6d1106ebe36a14b73808efe9; ttcid=b581befcd57b4302995d49d179618d1532; THEME_STAY_TIME=%22299506%22; IS_HIDE_THEME_CHANGE=%221%22; n_mh=H6gxeDoYMh6xC93fDYqruk8PV3w2EnpOwTaJ3F6h9Io; sso_uid_tt=d99e706574c9b0dca31552b084d0b4df; sso_uid_tt_ss=d99e706574c9b0dca31552b084d0b4df; toutiao_sso_user=10d9268ff9b588001f8ea05fbad87024; toutiao_sso_user_ss=10d9268ff9b588001f8ea05fbad87024; sid_ucp_sso_v1=1.0.0-KGM0MDA1MTQyY2FjMjI2OWI0NmUwZDliZWNjYjk5Njk4YjFkMGRjNjkKHQjIp-CG6gIQ98vfmAYY7zEgDDDMsNXWBTgGQPQHGgJobCIgMTBkOTI2OGZmOWI1ODgwMDFmOGVhMDVmYmFkODcwMjQ; ssid_ucp_sso_v1=1.0.0-KGM0MDA1MTQyY2FjMjI2OWI0NmUwZDliZWNjYjk5Njk4YjFkMGRjNjkKHQjIp-CG6gIQ98vfmAYY7zEgDDDMsNXWBTgGQPQHGgJobCIgMTBkOTI2OGZmOWI1ODgwMDFmOGVhMDVmYmFkODcwMjQ; odin_tt=39edd9c5d7bd41d3a8f1c9cc421c94fa8d1f4dfc358c845c32ab306f67ce2540f7dd848b652e6924f9cd2aa98b83f65a; passport_auth_status=a7282adbf496fb7332f517b356b9916b%2C; passport_auth_status_ss=a7282adbf496fb7332f517b356b9916b%2C; sid_guard=a5fd6644a9c1c0e934335fa5ae32a33d%7C1662510584%7C5183999%7CSun%2C+06-Nov-2022+00%3A29%3A43+GMT; uid_tt=fc3a93858ae728f9b40b81b79e359a7b; uid_tt_ss=fc3a93858ae728f9b40b81b79e359a7b; sid_tt=a5fd6644a9c1c0e934335fa5ae32a33d; sessionid=a5fd6644a9c1c0e934335fa5ae32a33d; sessionid_ss=a5fd6644a9c1c0e934335fa5ae32a33d; sid_ucp_v1=1.0.0-KGI3NzNlZjAwMGM3ZGEzOTQ5ZTE2MGE4NTIzZDMzMzNmZTBlZDlhMDAKFwjIp-CG6gIQ-MvfmAYY7zEgDDgGQPQHGgJobCIgYTVmZDY2NDRhOWMxYzBlOTM0MzM1ZmE1YWUzMmEzM2Q; ssid_ucp_v1=1.0.0-KGI3NzNlZjAwMGM3ZGEzOTQ5ZTE2MGE4NTIzZDMzMzNmZTBlZDlhMDAKFwjIp-CG6gIQ-MvfmAYY7zEgDDgGQPQHGgJobCIgYTVmZDY2NDRhOWMxYzBlOTM0MzM1ZmE1YWUzMmEzM2Q; FOLLOW_LIVE_POINT_INFO=%22MS4wLjABAAAAAu4N8ormI9OckkfhpNG_osIz0rCMHUj-RsEOhccuXRo%2F1662566400000%2F0%2F1662510586124%2F0%22; download_guide=%223%2F20220907%22; msToken=_tWWvRPka3SKDzelOtOrT6fWorU-evZmNcPM8TJK_KnIwDo0oLcMas20L1CL1yiApl4-n_5upPefwPIuhr5QDF0kEegxExV_VOE9cq8Uyxmx3j9NRpR6Jg==; msToken=6sJ1N_4jhMZpK4BhOaU9sQKDBHSPgOABdlIYR88QiPRyZhbGJGwAQcXjpSuQJWEYjM9EOfw5CVMcGpeeLeG7Y3AhhWvgmwH5-UtG-WcdjNwR702X5fGBYw==; home_can_add_dy_2_desktop=%221%22; tt_scid=qbnU1AuyFbk919tiQLcFYmBQwcGRzHyH0P7EoAgzXiXX2eT-9VYFvM1BGm4KtlXreb50',

        }
        res = requests.get(url=url,headers=headers,verify=False).text

        getjson=re.findall(r'<script id="RENDER_DATA" type="application/json">(.*?)</script><script>',res)[0]

        dejson = unquote(getjson,'utf-8')
        # print(dejson)
        dejson=json.loads(dejson) #转换字符串成json
        # dejson=dict(dejson)
        keys=[]
        for key, value in dejson.items():
            keys.append(key)
        # print(keys)
        address="https:"+dejson[keys[1]]['aweme']['detail']['video']['playAddr'][0]['src']  #获取视频播放地址
        tit=dejson[keys[1]]['aweme']['detail']['desc']  #获取视频名称
        # title= re.sub(r'[^*"/:?\|<>#]',"0",tit) #去除特殊字符并取前20字符
        a = re.findall(r'[^\*"/:?\\|<>]', tit, re.S)
        title = "".join(a)
        title=title.split("#")[0]
        title = title.strip()
        print('【'+title+'】'+'视频解析完成!开始下载...')
        #2、通过获取的视频名称和地址对视频进行下载保存到video文件夹
        filepath='video'
        if not os.path.exists(filepath):
            os.makedirs(filepath)
        filename=filepath+"/"+title+".mp4"
        if not os.path.exists(filename):
            response = requests.get(url=address, headers=headers, verify=False).content  # 获取视频内容数据
            with open(filename,'wb') as f:
                f.write(response)
                print(title+".mp4 下载完成")
        else:
            print("视频已存在,跳过!")
    except Exception as e:
        print(e)

#这段代码 将浏览器拉倒最下端
def scroll_to_bottom():
    """控制浏览器自动拉倒底部"""

    js = "return action=document.body.scrollHeight"
    # 初始化现在滚动条所在高度为0
    height = 0
    # 当前窗口总高度
    new_height = web.execute_script(js)

    while height < new_height:
        # 将滚动条调整至页面底部
        for i in range(height, new_height, 100):
            web.execute_script('window.scrollTo(0, {})'.format(i))
            time.sleep(0.2)
        height = new_height
        time.sleep(0.1)
        new_height = web.execute_script(js)

if __name__=="__main__":
    url = input("输入主页地址:")
    # url="https://www.douyin.com/user/MS4wLjABAAAATG70AclerZtTLz_CceUTShSIAEhDAyIUj_VI7m1ga-0"
    chrome_option = webdriver.ChromeOptions()
    chrome_option.add_argument('headless')  # 静默模式
    warnings.filterwarnings("ignore")
    web = webdriver.Chrome(options=chrome_option)
    # web.maximize_window()
    web.get(url=url)

    print('程序启动打开网页中..需要时间 请耐心等待..')
    scroll_to_bottom()
    print("启动视频地址统计程序..")

    #获取列表
    lis=web.find_elements(By.CSS_SELECTOR,'li')
    lists=[]

    for li in lis:
        try:
            adr=li.find_element(By.CSS_SELECTOR,'a').get_attribute('href')
            middle=adr.split('/')[-2]
            if middle != "video":
                continue
            lists.append(adr)
        except:
            continue
    print("共统计视频地址"+str(len(lists))+"个")
    # 获取视频主人名称
    # tt = web.find_element(By.XPATH,'/html/body/div[1]/div/div[2]/div/div/div[2]/div[1]/div[2]/h1/span/span/span/span/span/span')
    # title=tt.text
    # title=re.sub("的主页 - 抖音","_全部视频",title)
    # print(title)
    print("启动视频下载程序.. ")
    with ProcessPoolExecutor(10) as t:
        for url in lists:
            t.submit(getvideo,url=url)

    print("视频下载完毕,启动浏览器关闭程序")
    web.quit()


发布网上一个高手的代码,也很好用,就是用的包有些多,但是带进度条,直观

import linecache
import os
import re
from faker import Faker
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from tqdm import tqdm
import requests
import sqlite3
import urllib.parse


def creat_table(table_name):
    conn = sqlite3.connect('douyin.db')
    c = conn.cursor()
    c.execute(f'''CREATE TABLE IF NOT EXISTS t_{table_name}
           (ID INTEGER PRIMARY KEY AUTOINCREMENT,VID TEXT NOT NULL);''')
    conn.commit()
    conn.close()


def insert_data(table_name, vid):
    conn = sqlite3.connect('douyin.db')
    c = conn.cursor()
    cursor = c.execute(f"SELECT vid from t_{table_name}")
    already_have = False
    for row in cursor:
        if vid in row:
            already_have = True
    if already_have is False:
        c.execute(f"INSERT INTO t_{table_name} (ID,VID) VALUES (null,{vid})")
    conn.commit()
    conn.close()


def selet_data(table_name):
    conn = sqlite3.connect('douyin.db')
    c = conn.cursor()
    cursor = c.execute(f"SELECT VID from t_{table_name}")
    vid_list = [out_exp[0] for out_exp in cursor]
    return vid_list


class Douyin:
    def __init__(self, url):
        self.share_url = url
        self.headers = {
            'User-Agent': "Mozilla/5.0 (iPhone; U; CPU like Mac OS X; en) AppleWebKit/420+ (KHTML, like Gecko) Version/3.0 Mobile/1C28 Safari/419.3"
        }
        self.sec_uid = None
        self.uid = None
        self.nick_name = None

    def get_user_info(self):
        resp = requests.get(self.share_url, headers=self.headers)
        self.sec_uid = 'sec_uid=' + urllib.parse.parse_qs(urllib.parse.urlparse(resp.url).query)['sec_uid'][0]
        user_info = f'https://www.iesdouyin.com/web/api/v2/user/info/?{self.sec_uid}'
        resp = requests.get(user_info, headers=self.headers)
        user_data = {
            'signature': resp.json()['user_info']['signature'],
            'nickname': resp.json()['user_info']['nickname'],
            'aweme_count': resp.json()['user_info']['aweme_count'],
            'following_count': resp.json()['user_info']['following_count'],
            'total_favorited': resp.json()['user_info']['total_favorited'],
            'avatar': resp.json()['user_info']['avatar_larger']['url_list'][0],
        }
        self.uid = resp.json()['user_info']['uid']
        self.nick_name = re.sub(r'[<|>\/:"*?]', '_', resp.json()['user_info']['nickname'])
        creat_table(self.uid)
        return user_data

    def get_all_video(self):
        max_cursor = 0
        video_has_more = True
        all_video_list = []
        if self.sec_uid is None:
            self.get_user_info()
        while video_has_more is True:
            json_url = f'https://www.iesdouyin.com/web/api/v2/aweme/post/?{self.sec_uid}&' \
                       f'count=21&max_cursor={max_cursor}'
            resp = requests.get(json_url, headers=self.headers)
            video_has_more = resp.json()['has_more']
            max_cursor = resp.json()['max_cursor']
            video_list = resp.json()['aweme_list']
            for i in video_list:
                try:
                    all_video_list.append({'desc': i['desc'], 'vid': i['video']['vid'], 'aweme_id': i['aweme_id']})
                except KeyError:
                    all_video_list.append({'desc': i['desc'], 'vid': None, 'aweme_id': i['aweme_id']})
        return all_video_list

    def down_video(self, down_info, index):
        alreday_down = False
        retry = 0
        session = requests.session()
        session.mount('https://', HTTPAdapter(max_retries=5))
        alreday_down_video = selet_data(self.uid)
        for n in alreday_down_video:
            if down_info['aweme_id'] in n:
                alreday_down = True
                break
        if alreday_down is False:
            if os.path.exists(self.nick_name) is False:
                try:
                    os.makedirs(self.nick_name)
                except FileExistsError:
                    pass
            if down_info['desc'] == '':
                down_info['desc'] = down_info['aweme_id']
            down_info['desc'] = re.sub(r'[<|>\/:"*?\n]', '_', down_info['desc'])
            save_name = f'{self.nick_name}/{index.zfill(2)}_{down_info["desc"]}.mp4'
            if down_info["vid"]:
                download_url = f'https://aweme.snssdk.com/aweme/v1/play/?video_id={down_info["vid"]}&ratio=1080p'
                response = session.get(download_url, headers=self.headers, stream=True)
                while response.content == b'' and retry < 3:
                    self.headers = {
                        'User-Agent': Faker().chrome()
                    }
                    response = session.get(download_url, headers=self.headers)
                    retry += 1
                if response.content:
                    with open(save_name, 'wb') as file:
                        file.write(response.content)
                    insert_data(self.uid, down_info['aweme_id'])
                else:
                    with open(f'{self.nick_name}/下载失败视频.txt', 'a+') as file:
                        file.write(download_url + '\n')
            else:
                n = 0
                img_json_url = f'https://www.douyin.com/web/api/v2/aweme/iteminfo/?item_ids={down_info["aweme_id"]}'
                if os.path.exists(f'{self.nick_name}/{down_info["desc"]}') is False:
                    try:
                        os.makedirs(f'{self.nick_name}/{down_info["desc"]}')
                    except FileExistsError:
                        pass
                response = session.get(img_json_url, headers=self.headers)
                for i in response.json()["item_list"][0]["images"]:
                    n += 1
                    img_url = i["url_list"][0]
                    img_content = session.get(img_url, headers=self.headers).content
                    img_save_name = f'{self.nick_name}/{down_info["desc"]}/{n}.jpg'
                    with open(img_save_name, 'wb') as file:
                        file.write(img_content)


def main(share_url, down_all=False):
    share_url = re.search(r'[a-zA-z]+://[^\s]*', share_url).group()
    douyin = Douyin(share_url)
    info = douyin.get_user_info()
    print(f'作者:{info["nickname"]}\n视频数:{info["aweme_count"]}\n{"-" * 20}\n拉取作者所有作品中...')
    down_list = douyin.get_all_video()
    down_mode = '1'
    if not down_all:
        down_mode = input(f'{"-" * 20}\n选择下载模式:\n1.全部下载\n2.关键词匹配下载\n')

    def down_task(down_load):
        with ThreadPoolExecutor(max_workers=10) as t:
            obj_list = []
            for i in range((len(down_load))):
                obj = t.submit(douyin.down_video, down_load[i], str(i))
                obj_list.append(obj)
            with tqdm(total=len(down_load), ncols=100) as bar:
                for x in as_completed(obj_list):
                    bar.update(1)

    if down_mode == '1':
        if down_list:
            down_task(down_list)

        else:
            print(f'无视频可下载')
    elif down_mode == '2':
        k = 0
        filter_down_list = []
        keyword = input('请输入关键词:')
        for v in down_list:
            if keyword in v['desc']:
                filter_down_list.append(v)
        if len(filter_down_list) == 0:
            print('无匹配记录')
        else:
            print(f'共找到 {len(filter_down_list)} 条匹配记录, 开始下载')
            down_task(filter_down_list)
    else:
        print('输入错误')


if __name__ == '__main__':
    print('本工具用于抖音个人专辑下载视频使用\n分享链接样式:https://v.douyin.com/6M4cLGW/(手机分享主页链接)\n有问题联系QQ:247483085 \n======================================================================================')
    share_url_list = linecache.getlines('作者主页链接.txt')
    if share_url_list:
        print('已检测到有批量下载文件,进入批量下载模式\n')
        if input('请选择批量下载模式:\n1.下载所有作者所有视频\n2.手动选择每个作者下载模式\n') == '1':
            down_all = True
        else:
            down_all = False
        for i in share_url_list:
            main(i, down_all)
            print(f'当前任务完成\n{"*" * 30}\n')
        input('所有任务已完成')
    else:
        url = input('输入作者主页分享链接:')
        main(url)
        input('任务已完成')



非特殊说明,本文版权归原作者所有,转载请注明出处

本文地址:http://php.liulei.com.cn/?type=acticle&id=45

评论列表

发表评论

  • 昵称(必填)
  • 邮箱
  • 网址

TOP