爬虫 多进程 多线程 协程
多线程
# 线程, 进程 # 进程是资源单位, 每一个进程至少要有一个线程 # 线程是执行单位 # 启动每一个程序默认都会有一个主线程 # def func(): # for i in range(1000): # print("func", i) # # # if __name__ == '__main__': # func() # for i in range(1000): # print("main", i) # 多线程 from threading import Thread # 线程类 # def func(): # for i in range(1000): # print("func", i) # # # if __name__ == '__main__': # t = Thread(target=func) # 创建线程并给线程安排任务 # t.start() # 多线程状态为可以开始工作状态, 具体的执行时间由CPU决定 # # for i in range(1000): # print("main", i) class MyThread(Thread): # def run(self): # 固定的 -> 当线程被执行的时候, 被执行的就是run() for i in range(1000): print("子线程", i) if __name__ == '__main__': t = MyThread() # t.run() # 方法的调用了. -> 单线程???? t.start() # 开启线程 for i in range(1000): print("主线程", i)
多进程
from multiprocessing import Process from threading import Thread # def func(): # for i in range(1000): # print("子进程", i) # # # if __name__ == '__main__': # p = Process(target=func) # p.start() # for i in range(1000): # print("主进程", i) def func(name): # ?? for i in range(1000): print(name, i) if __name__ == '__main__': t1 = Thread(target=func, args=("周杰伦",)) # 传递参数必须是元组 t1.start() t2 = Thread(target=func, args=("王力宏",)) t2.start()
线程池和进程池
# 线程池: 一次性开辟一些线程. 我们用户直接给线程池子提交任务. 线程任务的调度交给线程池来完成 from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor def fn(name): for i in range(1000): print(name, i) if __name__ == '__main__': # 创建线程池 with ThreadPoolExecutor(50) as t: for i in range(100): t.submit(fn, name=f"线程{i}") # 等待线程池中的任务全部执行完毕. 才继续执行(守护) print("123")
线程池和进程池实例
# 1. 如何提取单个页面的数据 # 2. 上线程池,多个页面同时抓取 import requests from lxml import etree import csv from concurrent.futures import ThreadPoolExecutor f = open("data.csv", mode="w", encoding="utf-8") csvwriter = csv.writer(f) def download_one_page(url): # 拿到页面源代码 resp = requests.get(url) html = etree.HTML(resp.text) table = html.xpath("/html/body/div[2]/div[4]/div[1]/table")[0] # trs = table.xpath("./tr")[1:] trs = table.xpath("./tr[position()>1]") # 拿到每个tr for tr in trs: txt = tr.xpath("./td/text()") # 对数据做简单的处理: \\ / 去掉 txt = (item.replace("\\", "").replace("/", "") for item in txt) # 把数据存放在文件中 csvwriter.writerow(txt) print(url, "提取完毕!") if __name__ == '__main__': # for i in range(1, 14870): # 效率及其低下 # download_one_page(f"http://www.xinfadi.com.cn/marketanalysis/0/list/{i}.shtml") # 创建线程池 with ThreadPoolExecutor(50) as t: for i in range(1, 200): # 199 * 20 = 3980 # 把下载任务提交给线程池 t.submit(download_one_page, f"http://www.xinfadi.com.cn/marketanalysis/0/list/{i}.shtml") print("全部下载完毕!")
协程
# import time # # # def func(): # print("我爱黎明") # time.sleep(3) # 让当前的线程处于阻塞状态. CPU是不为我工作的 # print("我真的爱黎明") # # # if __name__ == '__main__': # func() # # """ # # input() 程序也是处于阻塞状态 # # requests.get(bilibili) 在网络请求返回数据之前, 程序也是处于阻塞状态的 # # 一般情况下, 当程序处于 IO操作的时候. 线程都会处于阻塞状态 # # # 协程: 当程序遇见了IO操作的时候. 可以选择性的切换到其他任务上. # # 在微观上是一个任务一个任务的进行切换. 切换条件一般就是IO操作 # # 在宏观上,我们能看到的其实是多个任务一起在执行 # # 多任务异步操作 # # # 上方所讲的一切. 都是在单线程的条件下 # """ # python编写协程的程序 import asyncio import time # async def func(): # print("你好啊, 我叫赛利亚") # # # if __name__ == '__main__': # g = func() # 此时的函数是异步协程函数. 此时函数执行得到的是一个协程对象 # # print(g) # asyncio.run(g) # 协程程序运行需要asyncio模块的支持 # async def func1(): # print("你好啊, 我叫潘金莲") # # time.sleep(3) # 当程序出现了同步操作的时候. 异步就中断了 # await asyncio.sleep(3) # 异步操作的代码 # print("你好啊, 我叫潘金莲") # # # async def func2(): # print("你好啊, 我叫王建国") # # time.sleep(2) # await asyncio.sleep(2) # print("你好啊, 我叫王建国") # # # async def func3(): # print("你好啊, 我叫李雪琴") # await asyncio.sleep(4) # print("你好啊, 我叫李雪琴") # # # if __name__ == '__main__': # f1 = func1() # f2 = func2() # f3 = func3() # tasks = [ # f1, f2, f3 # ] # t1 = time.time() # # 一次性启动多个任务(协程) # asyncio.run(asyncio.wait(tasks)) # t2 = time.time() # print(t2 - t1) async def func1(): print("你好啊, 我叫潘金莲") await asyncio.sleep(3) print("你好啊, 我叫潘金莲") async def func2(): print("你好啊, 我叫王建国") await asyncio.sleep(2) print("你好啊, 我叫王建国") async def func3(): print("你好啊, 我叫李雪琴") await asyncio.sleep(4) print("你好啊, 我叫李雪琴") async def main(): # 第一种写法 # f1 = func1() # await f1 # 一般await挂起操作放在协程对象前面 # 第二种写法(推荐) tasks = [ asyncio.create_task(func1()), # py3.8以后加上asyncio.create_task() asyncio.create_task(func2()), asyncio.create_task(func3()) ] await asyncio.wait(tasks) if __name__ == '__main__': t1 = time.time() # 一次性启动多个任务(协程) asyncio.run(main()) t2 = time.time() print(t2 - t1) # # 在爬虫领域的应用 # async def download(url): # print("准备开始下载") # await asyncio.sleep(2) # 网络请求 requests.get() # print("下载完成") # # # async def main(): # urls = [ # "http://www.baidu.com", # "http://www.bilibili.com", # "http://www.163.com" # ] # # # 准备异步协程对象列表 # tasks = [] # for url in urls: # d = asycio.create_task(download(url)) # tasks.append(d) # # # tasks = [asyncio.create_task(download(url)) for url in urls] # 这么干也行哦~ # # # 一次性把所有任务都执行 # await asyncio.wait(tasks) # # if __name__ == '__main__': # asyncio.run(main())
协程 aiohttp模块应用
# requests.get() 同步的代码 -> 异步操作aiohttp # pip install aiohttp import asyncio import aiohttp urls = [ "http://kr.shanghai-jiuxin.com/file/2020/1031/191468637cab2f0206f7d1d9b175ac81.jpg", "http://kr.shanghai-jiuxin.com/file/2020/1031/563337d07af599a9ea64e620729f367e.jpg", "http://kr.shanghai-jiuxin.com/file/2020/1031/774218be86d832f359637ab120eba52d.jpg" ] async def aiodownload(url): # 发送请求. # 得到图片内容 # 保存到文件 name = url.rsplit("/", 1)[1] # 从右边切, 切一次. 得到[1]位置的内容 async with aiohttp.ClientSession() as session: # requests async with session.get(url) as resp: # resp = requests.get() # 请求回来了. 写入文件 # 可以自己去学习一个模块, aiofiles with open(name, mode="wb") as f: # 创建文件 f.write(await resp.content.read()) # 读取内容是异步的. 需要await挂起, resp.text() print(name, "搞定") async def main(): tasks = [] for url in urls: tasks.append(aiodownload(url)) await asyncio.wait(tasks) if __name__ == '__main__': asyncio.run(main())
协程案例 下载一个小说
# http://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4306063500"} => 所有章节的内容(名称, cid) # 章节内部的内容 # http://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4306063500","cid":"4306063500|11348571","need_bookinfo":1} import requests import asyncio import aiohttp import aiofiles import json """ 1. 同步操作: 访问getCatalog 拿到所有章节的cid和名称 2. 异步操作: 访问getChapterContent 下载所有的文章内容 """ async def aiodownload(cid, b_id, title): data = { "book_id":b_id, "cid":f"{b_id}|{cid}", "need_bookinfo":1 } data = json.dumps(data) url = f"http://dushu.baidu.com/api/pc/getChapterContent?data={data}" async with aiohttp.ClientSession() as session: async with session.get(url) as resp: dic = await resp.json() async with aiofiles.open(title, mode="w", encoding="utf-8") as f: await f.write(dic['data']['novel']['content']) # 把小说内容写出 async def getCatalog(url): resp = requests.get(url) dic = resp.json() tasks = [] for item in dic['data']['novel']['items']: # item就是对应每一个章节的名称和cid title = item['title'] cid = item['cid'] # 准备异步任务 tasks.append(aiodownload(cid, b_id, title)) await asyncio.wait(tasks) if __name__ == '__main__': b_id = "4306063500" url = 'http://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"' + b_id + '"}' asyncio.run(getCatalog(url))
协程下载91视频案例
""" 思路: 1. 拿到主页面的页面源代码, 找到iframe 2. 从iframe的页面源代码中拿到m3u8文件的地址 3. 下载第一层m3u8文件 -> 下载第二层m3u8文件(视频存放路径) 4. 下载视频 5. 下载秘钥, 进行解密操作 6. 合并所有ts文件为一个mp4文件 """ import requests from bs4 import BeautifulSoup import re import asyncio import aiohttp import aiofiles from Crypto.Cipher import AES # pycryptodome import os def get_iframe_src(url): resp = requests.get(url) main_page = BeautifulSoup(resp.text, "html.parser") src = main_page.find("iframe").get("src") return src # return "https://boba.52kuyun.com/share/xfPs9NPHvYGhNzFp" # 为了测试 def get_first_m3u8_url(url): resp = requests.get(url) # print(resp.text) obj = re.compile(r'var main = "(?P<m3u8_url>.*?)"', re.S) m3u8_url = obj.search(resp.text).group("m3u8_url") # print(m3u8_url) return m3u8_url def download_m3u8_file(url, name): resp = requests.get(url) with open(name, mode="wb") as f: f.write(resp.content) async def download_ts(url, name, session): async with session.get(url) as resp: async with aiofiles.open(f"video2/{name}", mode="wb") as f: await f.write(await resp.content.read()) # 把下载到的内容写入到文件中 print(f"{name}下载完毕") async def aio_download(up_url): # https://boba.52kuyun.com/20170906/Moh2l9zV/hls/ tasks = [] async with aiohttp.ClientSession() as session: # 提前准备好session async with aiofiles.open("越狱第一季第一集_second_m3u8.txt", mode="r", encoding='utf-8') as f: async for line in f: if line.startswith("#"): continue # line就是xxxxx.ts line = line.strip() # 去掉没用的空格和换行 # 拼接真正的ts路径 ts_url = up_url + line task = asyncio.create_task(download_ts(ts_url, line, session)) # 创建任务 tasks.append(task) await asyncio.wait(tasks) # 等待任务结束 def get_key(url): resp = requests.get(url) return resp.text async def dec_ts(name, key): aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC) async with aiofiles.open(f"video2/{name}", mode="rb") as f1,\ aiofiles.open(f"video2/temp_{name}", mode="wb") as f2: bs = await f1.read() # 从源文件读取内容 await f2.write(aes.decrypt(bs)) # 把解密好的内容写入文件 print(f"{name}处理完毕") async def aio_dec(key): # 解密 tasks = [] async with aiofiles.open("越狱第一季第一集_second_m3u8.txt", mode="r", encoding="utf-8") as f: async for line in f: if line.startswith("#"): continue line = line.strip() # 开始创建异步任务 task = asyncio.create_task(dec_ts(line, key)) tasks.append(task) await asyncio.wait(tasks) def merge_ts(): # mac: cat 1.ts 2.ts 3.ts > xxx.mp4 # windows: copy /b 1.ts+2.ts+3.ts xxx.mp4 lst = [] with open("越狱第一季第一集_second_m3u8.txt", mode="r", encoding="utf-8") as f: for line in f: if line.startswith("#"): continue line = line.strip() lst.append(f"video2/temp_{line}") s = " ".join(lst) # 1.ts 2.ts 3.ts os.system(f"cat {s} > movie.mp4") print("搞定!") def main(url): # 1. 拿到主页面的页面源代码, 找到iframe对应的url iframe_src = get_iframe_src(url) # 2. 拿到第一层的m3u8文件的下载地址 first_m3u8_url = get_first_m3u8_url(iframe_src) # 拿到iframe的域名 # "https://boba.52kuyun.com/share/xfPs9NPHvYGhNzFp" iframe_domain = iframe_src.split("/share")[0] # 拼接出真正的m3u8的下载路径 first_m3u8_url = iframe_domain+first_m3u8_url # https://boba.52kuyun.com/20170906/Moh2l9zV/index.m3u8?sign=548ae366a075f0f9e7c76af215aa18e1 # print(first_m3u8_url) # 3.1 下载第一层m3u8文件 download_m3u8_file(first_m3u8_url, "越狱第一季第一集_first_m3u8.txt") # 3.2 下载第二层m3u8文件 with open("越狱第一季第一集_first_m3u8.txt", mode="r", encoding="utf-8") as f: for line in f: if line.startswith("#"): continue else: line = line.strip() # 去掉空白或者换行符 hls/index.m3u8 # 准备拼接第二层m3u8的下载路径 # https://boba.52kuyun.com/20170906/Moh2l9zV/ + hls/index.m3u8 # https://boba.52kuyun.com/20170906/Moh2l9zV/hls/index.m3u8 # https://boba.52kuyun.com/20170906/Moh2l9zV/hls/cFN8o3436000.ts second_m3u8_url = first_m3u8_url.split("index.m3u8")[0] + line download_m3u8_file(second_m3u8_url, "越狱第一季第一集_second_m3u8.txt") print("m3u8文件下载完毕") # 4. 下载视频 second_m3u8_url_up = second_m3u8_url.replace("index.m3u8", "") # 异步协程 asyncio.run(aio_download(second_m3u8_url_up)) # 测试的使用可以注释掉 # 5.1 拿到秘钥 key_url = second_m3u8_url_up + "key.key" # 偷懒写法, 正常应该去m3u8文件里去找 key = get_key(key_url) # 5.2 解密 asyncio.run(aio_dec(key)) # 6. 合并ts文件为mp4文件 merge_ts() if __name__ == '__main__': url = "https://www.91kanju.com/vod-play/541-2-1.html" main(url) # 简单的问题复杂化, 复杂的问题简单化 # 秒杀()
非特殊说明,本文版权归原作者所有,转载请注明出处
评论列表
发表评论