NiceLeeのBlog 用爱发电 bilibili~

Python 某站漫画的佛系随缘下载

2021-01-01
nIceLee

阅读:


啊这。
因为本身就可以打包,权当练习。

chapter.py

# coding=utf-8
import requests
import time
import os
import re
from bs4 import BeautifulSoup
from functools import wraps
import asyncio
from concurrent.futures import ThreadPoolExecutor

class retry(object):
    def __init__(self, retry = 3, sleep = 1):
        self.retry = retry
        self.sleep = sleep
    def __call__(self, func):
        def checkRun(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                if(self.retry == 0):
                    raise e
                else:
                    self.retry -= 1
                    time.sleep(self.sleep)
                    return checkRun(*args, **kwargs)
                    
        @wraps(func)
        def run(*args, **kwargs):
            return checkRun(*args, **kwargs)    
        return run
     
class Chapter:

    def __init__(self, session, chpId, domain):
        self.session = session
        self.domain = domain
        self.chpId = chpId
        self.commonHeaders = {
            "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
            "Accept-Encoding": "gzip",
            'Connection': 'keep-alive',
            'Pragma': 'no-cache',
            'Cache-Control': 'no-cache',
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:82.0) Gecko/20100101 Firefox/82.0",
            "Referer": "https://%s/album/%s/"%(domain, chpId),
        }
        self.session.headers.update(self.commonHeaders)  
    
    @retry(retry=3) #最大重试3次,4次全部报错,才会报错
    def getHtml(self, chp_url):
        return self.session.get(chp_url, timeout=30).text
    
    def downloadAll(self, folder):
        if not os.path.exists(folder):
            os.makedirs(folder) 
        
        # 获取所有图片链接
        chp_url = "https://%s/photo/%s?read_mode=read-by-page"%(self.domain, self.chpId)
        html = self.getHtml(chp_url)
        soup = BeautifulSoup(html,'lxml')
        img_template = soup.select(".img_template img#album_photo_")[0]
        img_prefix = img_template["data-src"]        
        img_options = soup.select("#pageselect option ")
        img_suffixs = [  op["data-page"] for op in img_options ]
        
        loop = asyncio.get_event_loop()
        threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="download_")
        tasks = []  
        # 逐个下载链接
        for img_suffix in img_suffixs:
            url = r"%s%s?v=%d"%(img_prefix, img_suffix, int(time.time()))
            path = r'%s/%s' % (folder, img_suffix)
            #future = threadPool.submit(th_download_pic, url, path)
            task = loop.run_in_executor(threadPool, self.th_download_pic, url, path)
            tasks.append(task)
            
        #threadPool.shutdown(wait=True)
        loop.run_until_complete(asyncio.wait(tasks))
        print("")
    
    def th_download_pic(self, url, path):
        #print("下载中... %s"%(path))
        print("\r下载中... %s"%(path), sep='', end='', flush=True)
        if not os.path.exists(path):
            tmp_path = path + ".tmp"
            if os.path.exists(tmp_path):
                os.remove(tmp_path)
            try:
                with open(tmp_path, "wb") as file:
                    response = requests.get(url, stream=True, headers=self.commonHeaders, timeout=60)
                    for data in response.iter_content(chunk_size=1024 * 1024):
                        file.write(data)
                    response.close()
                os.rename(tmp_path, path)
            except Exception as e:
                print(e)
                print("%s下载失败"%(path))
            time.sleep(1)  

        
if __name__ == '__main__':
    session = requests.Session()
    chpt = Chapter(session, "105925", domain_of_comic) #和谐去掉
    chpt.downloadAll("test")

comic.py

# coding=utf-8
import requests
import time
import os
import re
from bs4 import BeautifulSoup
from chapter import Chapter

def getWebsites():
    html = requests.get(domain_of_comic_intro, timeout=30).text#和谐去掉
    soup = BeautifulSoup(html,'lxml')
    p_all = soup.select(".has-luminous-vivid-orange-color")
    sites = [ p.text for p in p_all ]
    print(sites)
    return sites
    
class Comic:

    def __init__(self, session, comicId, domain):
        self.session = session
        self.domain = domain
        self.comicId = comicId
        self.commonHeaders = {
            "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
            "Accept-Encoding": "gzip",
            'Connection': 'keep-alive',
            'Pragma': 'no-cache',
            'Cache-Control': 'no-cache',
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:82.0) Gecko/20100101 Firefox/82.0",
        }
        self.session.headers.update(self.commonHeaders)  
    
    def downloadAll(self, folder, start = 0):
        # 获取所有章节链接
        comic_url = "https://%s/album/%s/"%(self.domain, self.comicId)
        html = self.session.get(comic_url, timeout=30).text
        soup = BeautifulSoup(html,'lxml')
        comicName = soup.select("div.pull-left[itemprop~=name]")[0].text.strip().replace("/",".")
        print(comicName)
        folder = folder.replace("{comicName}", comicName)
        
        chpt_all = soup.select("ul.btn-toolbar")[0]
        t_title_all = chpt_all.select("li")
        t_url_all = chpt_all.select("a")
        titles = [ re.sub(r" |\||\r|\t|\?|(最新)|(\d{4}-\d{2}-\d{2}$)", "", tag.text.strip()).strip().replace("\n","_").replace("/",".") for tag in t_title_all ]
        chptIds = [ tag["href"][tag["href"].rindex("/")+1:] for tag in t_url_all ]
        
        if not os.path.exists(folder):
            os.makedirs(folder)
        for i in range(len(titles)):
            if i < start:
                continue
            print("正在下载 ", titles[i])
            chpt = Chapter(self.session, chptIds[i], self.domain)
            path = "%s/%s"%(folder, titles[i])
            chpt.downloadAll(path)
    
if __name__ == '__main__':
    session = requests.Session()
    # 获取可以直连的域名
    #domain = getWebsites()[-1]
    domain = getWebsites()[1]
    # 根据id初始化
    comic = Comic(session, "105924", domain)
    # 指定保存文件夹, 从第几章开始下载 (index start from 0)
    # comic.downloadAll("test", 0)
    comic.downloadAll("{comicName}", 5)

内容
隐藏