爬虫 多进程 多线程 协程

磊落不羁 by:磊落不羁 分类:爬虫 时间:2年前 阅读:91 评论:0

多线程

# 线程, 进程
# 进程是资源单位, 每一个进程至少要有一个线程
# 线程是执行单位

# 启动每一个程序默认都会有一个主线程


# def func():
#     for i in range(1000):
#         print("func", i)
#
#
# if __name__ == '__main__':
#     func()
#     for i in range(1000):
#         print("main", i)

#  多线程
from threading import Thread  # 线程类


# def func():
#     for i in range(1000):
#         print("func", i)
#
#
# if __name__ == '__main__':
#     t = Thread(target=func)  # 创建线程并给线程安排任务
#     t.start()  # 多线程状态为可以开始工作状态, 具体的执行时间由CPU决定
#
#     for i in range(1000):
#         print("main", i)


class MyThread(Thread):  #
    def run(self):  # 固定的    -> 当线程被执行的时候, 被执行的就是run()
        for i in range(1000):
            print("子线程", i)


if __name__ == '__main__':
    t = MyThread()
    # t.run()  # 方法的调用了. -> 单线程????
    t.start()  # 开启线程

    for i in range(1000):
        print("主线程", i)

多进程

from multiprocessing import Process
from threading import Thread


# def func():
#     for i in range(1000):
#         print("子进程", i)
#
#
# if __name__ == '__main__':
#     p = Process(target=func)
#     p.start()
#     for i in range(1000):
#         print("主进程", i)


def func(name):  # ??
    for i in range(1000):
        print(name, i)


if __name__ == '__main__':
    t1 = Thread(target=func, args=("周杰伦",))  # 传递参数必须是元组
    t1.start()

    t2 = Thread(target=func, args=("王力宏",))
    t2.start()


线程池和进程池

# 线程池: 一次性开辟一些线程. 我们用户直接给线程池子提交任务. 线程任务的调度交给线程池来完成
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def fn(name):
    for i in range(1000):
        print(name, i)


if __name__ == '__main__':
    # 创建线程池
    with ThreadPoolExecutor(50) as t:
        for i in range(100):
            t.submit(fn, name=f"线程{i}")
    # 等待线程池中的任务全部执行完毕. 才继续执行(守护)
    print("123")

线程池和进程池实例

# 1. 如何提取单个页面的数据
# 2. 上线程池,多个页面同时抓取
import requests
from lxml import etree
import csv
from concurrent.futures import ThreadPoolExecutor

f = open("data.csv", mode="w", encoding="utf-8")
csvwriter = csv.writer(f)


def download_one_page(url):
    # 拿到页面源代码
    resp = requests.get(url)
    html = etree.HTML(resp.text)
    table = html.xpath("/html/body/div[2]/div[4]/div[1]/table")[0]
    # trs = table.xpath("./tr")[1:]
    trs = table.xpath("./tr[position()>1]")
    # 拿到每个tr
    for tr in trs:
        txt = tr.xpath("./td/text()")
        # 对数据做简单的处理: \\  / 去掉
        txt = (item.replace("\\", "").replace("/", "") for item in txt)
        # 把数据存放在文件中
        csvwriter.writerow(txt)
    print(url, "提取完毕!")


if __name__ == '__main__':
    # for i in range(1, 14870):  # 效率及其低下
    #     download_one_page(f"http://www.xinfadi.com.cn/marketanalysis/0/list/{i}.shtml")

    # 创建线程池
    with ThreadPoolExecutor(50) as t:
        for i in range(1, 200):  # 199 * 20 = 3980
            # 把下载任务提交给线程池
            t.submit(download_one_page, f"http://www.xinfadi.com.cn/marketanalysis/0/list/{i}.shtml")

    print("全部下载完毕!")

协程

# import time
#
#
# def func():
#     print("我爱黎明")
#     time.sleep(3)  # 让当前的线程处于阻塞状态. CPU是不为我工作的
#     print("我真的爱黎明")
#
#
# if __name__ == '__main__':
#     func()
#
# """
# # input() 程序也是处于阻塞状态
# # requests.get(bilibili) 在网络请求返回数据之前, 程序也是处于阻塞状态的
# # 一般情况下, 当程序处于 IO操作的时候. 线程都会处于阻塞状态
#
# # 协程: 当程序遇见了IO操作的时候. 可以选择性的切换到其他任务上.
# # 在微观上是一个任务一个任务的进行切换. 切换条件一般就是IO操作
# # 在宏观上,我们能看到的其实是多个任务一起在执行
# # 多任务异步操作
#
# # 上方所讲的一切. 都是在单线程的条件下
# """


# python编写协程的程序
import asyncio
import time


# async def func():
#     print("你好啊, 我叫赛利亚")
#
#
# if __name__ == '__main__':
#     g = func()  # 此时的函数是异步协程函数. 此时函数执行得到的是一个协程对象
#     # print(g)
#     asyncio.run(g)  # 协程程序运行需要asyncio模块的支持



# async def func1():
#     print("你好啊, 我叫潘金莲")
#     # time.sleep(3)  # 当程序出现了同步操作的时候. 异步就中断了
#     await asyncio.sleep(3)  # 异步操作的代码
#     print("你好啊, 我叫潘金莲")
#
#
# async def func2():
#     print("你好啊, 我叫王建国")
#     # time.sleep(2)
#     await asyncio.sleep(2)
#     print("你好啊, 我叫王建国")
#
#
# async def func3():
#     print("你好啊, 我叫李雪琴")
#     await asyncio.sleep(4)
#     print("你好啊, 我叫李雪琴")
#
#
# if __name__ == '__main__':
#     f1 = func1()
#     f2 = func2()
#     f3 = func3()
#     tasks = [
#         f1, f2, f3
#     ]
#     t1 = time.time()
#     # 一次性启动多个任务(协程)
#     asyncio.run(asyncio.wait(tasks))
#     t2 = time.time()
#     print(t2 - t1)


async def func1():
    print("你好啊, 我叫潘金莲")
    await asyncio.sleep(3)
    print("你好啊, 我叫潘金莲")


async def func2():
    print("你好啊, 我叫王建国")
    await asyncio.sleep(2)
    print("你好啊, 我叫王建国")


async def func3():
    print("你好啊, 我叫李雪琴")
    await asyncio.sleep(4)
    print("你好啊, 我叫李雪琴")


async def main():
    # 第一种写法
    # f1 = func1()
    # await f1  # 一般await挂起操作放在协程对象前面
    # 第二种写法(推荐)
    tasks = [
        asyncio.create_task(func1()),  # py3.8以后加上asyncio.create_task()
        asyncio.create_task(func2()),
        asyncio.create_task(func3())
    ]
    await asyncio.wait(tasks)


if __name__ == '__main__':
    t1 = time.time()
    # 一次性启动多个任务(协程)
    asyncio.run(main())
    t2 = time.time()
    print(t2 - t1)


# # 在爬虫领域的应用
# async def download(url):
#     print("准备开始下载")
#     await asyncio.sleep(2)  # 网络请求  requests.get()
#     print("下载完成")
#
#
# async def main():
#     urls = [
#         "http://www.baidu.com",
#         "http://www.bilibili.com",
#         "http://www.163.com"
#     ]
#
#     # 准备异步协程对象列表
#     tasks = []
#     for url in urls:
#         d = asycio.create_task(download(url))
#         tasks.append(d)
#
#     # tasks = [asyncio.create_task(download(url)) for url in urls]  # 这么干也行哦~
#
#     # 一次性把所有任务都执行
#     await asyncio.wait(tasks)
#
# if __name__ == '__main__':
#     asyncio.run(main())

协程 aiohttp模块应用

# requests.get() 同步的代码 -> 异步操作aiohttp
# pip install aiohttp

import asyncio
import aiohttp

urls = [
    "http://kr.shanghai-jiuxin.com/file/2020/1031/191468637cab2f0206f7d1d9b175ac81.jpg",
    "http://kr.shanghai-jiuxin.com/file/2020/1031/563337d07af599a9ea64e620729f367e.jpg",
    "http://kr.shanghai-jiuxin.com/file/2020/1031/774218be86d832f359637ab120eba52d.jpg"
]


async def aiodownload(url):
    # 发送请求.
    # 得到图片内容
    # 保存到文件
    name = url.rsplit("/", 1)[1]  # 从右边切, 切一次. 得到[1]位置的内容
    async with aiohttp.ClientSession() as session:  # requests
        async with session.get(url) as resp:  # resp = requests.get()
            # 请求回来了. 写入文件
            # 可以自己去学习一个模块, aiofiles
            with open(name, mode="wb") as f:  # 创建文件
                f.write(await resp.content.read())  # 读取内容是异步的. 需要await挂起, resp.text()

    print(name, "搞定")


async def main():
    tasks = []
    for url in urls:
        tasks.append(aiodownload(url))

    await asyncio.wait(tasks)


if __name__ == '__main__':
    asyncio.run(main())


协程案例 下载一个小说


# http://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4306063500"}  => 所有章节的内容(名称, cid)
# 章节内部的内容
# http://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4306063500","cid":"4306063500|11348571","need_bookinfo":1}

import requests
import asyncio
import aiohttp
import aiofiles
import json

"""
1. 同步操作: 访问getCatalog 拿到所有章节的cid和名称
2. 异步操作: 访问getChapterContent 下载所有的文章内容
"""

async def aiodownload(cid, b_id, title):
    data = {
        "book_id":b_id,
        "cid":f"{b_id}|{cid}",
        "need_bookinfo":1
    }
    data = json.dumps(data)
    url = f"http://dushu.baidu.com/api/pc/getChapterContent?data={data}"

    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            dic = await resp.json()

            async with aiofiles.open(title, mode="w", encoding="utf-8") as f:
                await f.write(dic['data']['novel']['content'])  # 把小说内容写出


async def getCatalog(url):
    resp = requests.get(url)
    dic = resp.json()
    tasks = []
    for item in dic['data']['novel']['items']:  # item就是对应每一个章节的名称和cid
        title = item['title']
        cid = item['cid']
        # 准备异步任务
        tasks.append(aiodownload(cid, b_id, title))
    await asyncio.wait(tasks)


if __name__ == '__main__':
    b_id = "4306063500"
    url = 'http://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"' + b_id + '"}'
    asyncio.run(getCatalog(url))

协程下载91视频案例

"""
思路:
    1. 拿到主页面的页面源代码, 找到iframe
    2. 从iframe的页面源代码中拿到m3u8文件的地址
    3. 下载第一层m3u8文件 -> 下载第二层m3u8文件(视频存放路径)
    4. 下载视频
    5. 下载秘钥, 进行解密操作
    6. 合并所有ts文件为一个mp4文件
"""
import requests
from bs4 import BeautifulSoup
import re
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES  # pycryptodome
import os


def get_iframe_src(url):
    resp = requests.get(url)
    main_page = BeautifulSoup(resp.text, "html.parser")
    src = main_page.find("iframe").get("src")
    return src
    # return "https://boba.52kuyun.com/share/xfPs9NPHvYGhNzFp"  # 为了测试


def get_first_m3u8_url(url):
    resp = requests.get(url)
    # print(resp.text)
    obj = re.compile(r'var main = "(?P<m3u8_url>.*?)"', re.S)
    m3u8_url = obj.search(resp.text).group("m3u8_url")
    # print(m3u8_url)
    return m3u8_url


def download_m3u8_file(url, name):
    resp = requests.get(url)
    with open(name, mode="wb") as f:
        f.write(resp.content)


async def download_ts(url, name, session):
    async with session.get(url) as resp:
        async with aiofiles.open(f"video2/{name}", mode="wb") as f:
            await f.write(await resp.content.read())  # 把下载到的内容写入到文件中
    print(f"{name}下载完毕")


async def aio_download(up_url):  # https://boba.52kuyun.com/20170906/Moh2l9zV/hls/
    tasks = []
    async with aiohttp.ClientSession() as session:  # 提前准备好session
        async with aiofiles.open("越狱第一季第一集_second_m3u8.txt", mode="r", encoding='utf-8') as f:
            async for line in f:
                if line.startswith("#"):
                    continue
                # line就是xxxxx.ts
                line = line.strip()  # 去掉没用的空格和换行
                # 拼接真正的ts路径
                ts_url = up_url + line
                task = asyncio.create_task(download_ts(ts_url, line, session))  # 创建任务
                tasks.append(task)

            await asyncio.wait(tasks)  # 等待任务结束


def get_key(url):
    resp = requests.get(url)
    return resp.text


async def dec_ts(name, key):
    aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC)
    async with aiofiles.open(f"video2/{name}", mode="rb") as f1,\
        aiofiles.open(f"video2/temp_{name}", mode="wb") as f2:

        bs = await f1.read()  # 从源文件读取内容
        await f2.write(aes.decrypt(bs))  # 把解密好的内容写入文件
    print(f"{name}处理完毕")


async def aio_dec(key):
    # 解密
    tasks = []
    async with aiofiles.open("越狱第一季第一集_second_m3u8.txt", mode="r", encoding="utf-8") as f:
        async for line in f:
            if line.startswith("#"):
                continue
            line = line.strip()
            # 开始创建异步任务
            task = asyncio.create_task(dec_ts(line, key))
            tasks.append(task)
        await asyncio.wait(tasks)


def merge_ts():
    # mac: cat 1.ts 2.ts 3.ts > xxx.mp4
    # windows: copy /b 1.ts+2.ts+3.ts xxx.mp4
    lst = []
    with open("越狱第一季第一集_second_m3u8.txt", mode="r", encoding="utf-8") as f:
        for line in f:
            if line.startswith("#"):
                continue
            line = line.strip()
            lst.append(f"video2/temp_{line}")

    s = " ".join(lst)  # 1.ts 2.ts 3.ts
    os.system(f"cat {s} > movie.mp4")
    print("搞定!")


def main(url):
    # 1. 拿到主页面的页面源代码, 找到iframe对应的url
    iframe_src = get_iframe_src(url)
    # 2. 拿到第一层的m3u8文件的下载地址
    first_m3u8_url = get_first_m3u8_url(iframe_src)
    # 拿到iframe的域名
    # "https://boba.52kuyun.com/share/xfPs9NPHvYGhNzFp"
    iframe_domain = iframe_src.split("/share")[0]
    # 拼接出真正的m3u8的下载路径
    first_m3u8_url = iframe_domain+first_m3u8_url
    # https://boba.52kuyun.com/20170906/Moh2l9zV/index.m3u8?sign=548ae366a075f0f9e7c76af215aa18e1
    # print(first_m3u8_url)
    # 3.1 下载第一层m3u8文件
    download_m3u8_file(first_m3u8_url, "越狱第一季第一集_first_m3u8.txt")
    # 3.2 下载第二层m3u8文件
    with open("越狱第一季第一集_first_m3u8.txt", mode="r", encoding="utf-8") as f:
        for line in f:
            if line.startswith("#"):
                continue
            else:
                line = line.strip()  # 去掉空白或者换行符  hls/index.m3u8
                # 准备拼接第二层m3u8的下载路径
                # https://boba.52kuyun.com/20170906/Moh2l9zV/ + hls/index.m3u8
                # https://boba.52kuyun.com/20170906/Moh2l9zV/hls/index.m3u8
                # https://boba.52kuyun.com/20170906/Moh2l9zV/hls/cFN8o3436000.ts
                second_m3u8_url = first_m3u8_url.split("index.m3u8")[0] + line
                download_m3u8_file(second_m3u8_url, "越狱第一季第一集_second_m3u8.txt")
                print("m3u8文件下载完毕")

    # 4. 下载视频
    second_m3u8_url_up = second_m3u8_url.replace("index.m3u8", "")
    # 异步协程
    asyncio.run(aio_download(second_m3u8_url_up))  # 测试的使用可以注释掉

    # 5.1 拿到秘钥
    key_url = second_m3u8_url_up + "key.key"  # 偷懒写法, 正常应该去m3u8文件里去找
    key = get_key(key_url)
    # 5.2 解密
    asyncio.run(aio_dec(key))

    # 6. 合并ts文件为mp4文件
    merge_ts()


if __name__ == '__main__':
    url = "https://www.91kanju.com/vod-play/541-2-1.html"
    main(url)
    # 简单的问题复杂化, 复杂的问题简单化
    # 秒杀()


非特殊说明,本文版权归原作者所有,转载请注明出处

本文地址:http://php.liulei.com.cn/?type=acticle&id=28

评论列表

发表评论

  • 昵称(必填)
  • 邮箱
  • 网址

TOP