NiceLeeのBlog 用爱发电 bilibili~

Python 记一次针对图文混搭反爬的文本数据采集处理

2021-07-29
nIceLee

阅读:


偶然发现了一个图片藏文的小技巧😳

前言

这是我在学习实践的过程中遇到并且解决了的一个问题,因为零零散散花了一些(应该说挺多)功夫,故在此记录。

要说起这个事,先说说我的目标是什么。

简单来说,有那么一个目标站点,包含了一些散落在各个页面的文本数据,并鸡贼的随机将某些中文汉字以图片替代。大致效果如下:

  • 夏天就要是吃大瓜,冰镇的那种。

我们预计要得到最终数据为:

  • 夏天就要是吃大西瓜,冰镇的那种。

而不是:

  • 夏天就要是吃大<img src="/sources.png"/>瓜,冰镇的那种。

心路历程

简单的看了一下,对于原始的数据的获取,应该说是没啥问题的。小case。

要聚焦的地方应该说是单个汉字所对应的图片如何被正确地识别并转换为对应文本。

最开始的想法是直接莽,人工一个一个识别建立特征库,但在收集了部分数据后就已经发现了500+条图片网址。直接pass。

然后想了想,这不就是个文本识别的问题嘛,汉字图片没有加噪或者扭曲,基本不需要预处理的那一种。遂有了大致三种思路:

  1. 尝试使用现有的OCR库,预选Tesseract-OCR
    做了一些简单的测试,发觉英文识别率差强人意,中文识别率只能说一言难尽,惨不忍睹。

  2. 尝试使用网络上现有的文字识别API,预选百度智能云
    看了一下,调用方式还算简单,每月有免费额度。
    使用过程中花费最多的还是注册这一步骤。
    中文识别率简单测试了一下,还行。

  3. 尝试使用深度学习,预选框架PyTorch,模型待定,拟通过pillow将常用汉字库转成图片生成训练数据。
    备选方案。

接下来,尝试使用方案1、2对下载的文字图片进行处理,但是得到了令人沮丧的结果—-成功率为零。

猜测是图片分辨率问题,使用算法拉伸后再次尝试,仍旧不行。

百思不得其解,尝试使用截屏的方式截下原图,发现方案1不行,方案2可以。这就很奇怪。

偶然间发现其格式是png的,灵光一闪,刨除alpha通道信息,看看这些图片的RGB构图长啥样,终于发现了问题。

哈哈,真是鸡贼。最后通过alpha通道的透明度重构了图像,方案2识别率还算可以。

  • 原始图:
  • 去透明度:
  • 算法处理后:

以下为简单的处理代码:

import cv2  

if __name__ == '__main__':  
    img = cv2.imread(r'img.png', cv2.IMREAD_UNCHANGED)  
    b_channel, g_channel, r_channel, alpha_channel = cv2.split(img)
    alpha_channel = 255 - alpha_channel 
    img_BGRA = cv2.merge((alpha_channel, alpha_channel, alpha_channel, alpha_channel))
    cv2.imwrite("img.jpg", img_BGRA)
    #cv2.imshow("src", img_BGRA)  
    #cv2.waitKey(0) 

大致思路与结果

  1. 下载原始数据和图片
  2. 解析图片,建立图片文字映射关系
  3. 逐章节翻译原始数据
  4. 合并章节

config.py

# encoding:utf-8

dict_novel = {}
domain = "目标域名"
novel_id = "url获取的id"
novel_name = "名称"

root_dir = "data"


# client_id 为官网获取的AK, client_secret 为官网获取的SK
client_id = "官网获取的AK"
client_secret = "官网获取的SK"
access_token = '官网API申请的token,有时效性'

1_下载原始数据.py

# encoding:utf-8
import requests, re, os
from bs4 import BeautifulSoup
'''
{root_dir}\novels\{novel_name}\raw\{chapter_name}.txt
{root_dir}\novels\{novel_name}\done\{chapter_name}.txt
{root_dir}\pics_done\{pic_id}.jpg
{root_dir}\pics_raw\{pic_id}.png
{root_dir}\dict.txt
'''

class Novel:
    def __init__(self, domain, novel_id, root_dir):
        self.domain = domain
        self.novel_id = novel_id
        #self.novel_name = novel_name
        self.root_dir = root_dir
        self.last2page = False
        
    def save_raw(self, index=1):
        request_url = "http://%s/%s_%d/"%(self.domain, self.novel_id, index)
        soup = self.get_soup(request_url)
        # 获取小说名称
        self.novel_name = soup.select(".right > h1")[0].get_text()
        print("novel_name: ", self.novel_name)
        # 获取片段列表
        ul = soup.select(".bd > ul.list")[1]
        chapters = ul.select("li > a")
        for chapter in chapters:
            searchObj = re.search(r'/\d+/\d+/(\d+)\.html', chapter["href"])
            chapter_id = searchObj.group(1)
            #print(chapter_id)
            #print(chapter["href"])
            #print(chapter.get_text())
            chp = Chapter(domain=self.domain, novel_id=self.novel_id, novel_name=self.novel_name, chapter_id=chapter_id, root_dir=self.root_dir)
            chp.save_raw_chapter()
        print("当前页码:", index)
        pages = soup.select(".pagelistbox > .page")[0]
        nextPage = pages.select(".nextPage")[0]
        endPage = pages.select(".endPage")[0]
        if nextPage["href"] != endPage["href"]:
            self.save_raw(index = (index + 1))
        elif not self.last2page:
            self.last2page = True
            self.save_raw(index = (index + 1))
            
    def get_soup(self, request_url):
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
            "Accept-Encoding": "gzip, deflate",
            "Connection": "keep-alive",
            "Referer": "http://%s/%s/"%(self.domain, self.novel_id),
            "Upgrade-Insecure-Requests": "1",
            "Pragma": "no-cache",
            "Cache-Control": "no-cache",
        }
        response = requests.get(request_url, headers=headers)
            
        html = response.text
        #print(html)
        soup = BeautifulSoup(html,'lxml')
        return soup
class Chapter:

    def __init__(self, domain, novel_id, novel_name, chapter_id, root_dir):
        self.domain = domain
        self.novel_id = novel_id
        self.novel_name = novel_name
        self.chapter_id = chapter_id
        self.root_dir = root_dir
        
        
    def save_raw_chapter(self):
        request_url = "http://%s/%s/%s.html"%(self.domain, self.novel_id, self.chapter_id)
        soup = self.get_chapter_soup(request_url)
        
        # 获取章节名称
        self.chapter_name = soup.select("h1.page-title")[0].get_text()
        print("chapter_name: ", self.chapter_name)
        print (self.chapter_id)
        # 获取章节片段列表
        clips = soup.select("center.chapterPages a")[1:]
        #for clip in clips:
        #    print(clip["href"])
            
        # 遍历章节列表并保存    
        try:
            raw_path = "%s/novels/%s/raw/%s.txt"%(self.root_dir, self.novel_name, self.chapter_name)
            print(raw_path)
            current_dir = os.path.dirname(raw_path)
            if not os.path.exists(current_dir):
                os.makedirs(current_dir)
            with open(raw_path, "w", encoding="utf-8") as file:
                chapter_content = soup.select(".page-content>p")[0].decode_contents()
                file.write(chapter_content)
                self.parse_content(chapter_content)
                for clip in clips:
                    request_url = "http://%s/%s/%s"%(self.domain, self.novel_id, clip["href"])
                    soup = self.get_chapter_soup(request_url)
                    chapter_content = soup.select(".page-content>p")[0].decode_contents()
                    file.write(chapter_content)
                    self.parse_content(chapter_content)
        except Exception as e:
            print(e)
            print("%s下载失败"%(self.chapter_name)) 
        
    def get_chapter_soup(self, request_url):
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
            "Accept-Encoding": "gzip, deflate",
            "Connection": "keep-alive",
            "Referer": "http://%s/%s/"%(self.domain, self.novel_id),
            "Upgrade-Insecure-Requests": "1",
            "Pragma": "no-cache",
            "Cache-Control": "no-cache",
        }
        response = requests.get(request_url, headers=headers)
            
        html = response.text
        soup = BeautifulSoup(html,'lxml')
        return soup
        
    def parse_content(self, chapter_content):
        it = re.finditer(r'<img src="(/toimg/data/([0-9a-z]+)\.png)"/>', chapter_content) 
        for match in it: 
            path = "%s/pics_raw/%s.png"%(self.root_dir, match.group(2))
            pic_url = "http://%s/%s"%(self.domain, match.group(1))
            headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0",
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
                "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
                "Accept-Encoding": "gzip, deflate",
                "Connection": "keep-alive",
                "Referer": "http://%s/%s/"%(self.domain, self.novel_id),
                "Upgrade-Insecure-Requests": "1",
                "Pragma": "no-cache",
                "Cache-Control": "no-cache",
            }
            #print (match.group(1))
            if not os.path.exists(path):
                print (match.group(1))
                #print (match.group(2))
                tmp_path = path + ".tmp"
                if os.path.exists(tmp_path):
                    os.remove(tmp_path)
                try:
                    with open(tmp_path, "wb") as file:
                        response = requests.get(pic_url, stream=True, headers=headers, timeout=60)
                        for data in response.iter_content(chunk_size=1024 * 1024):
                            file.write(data)
                        response.close()
                    os.rename(tmp_path, path)
                except Exception as e:
                    print(e)
                    print("%s下载失败"%(path))
if __name__ == '__main__':  
    from config import *
    
    dir_pics_raw = "%s/pics_raw/"%(root_dir)
    if not os.path.exists(dir_pics_raw):
        os.makedirs(dir_pics_raw)
    
    novel = Novel(domain=domain, novel_id=novel_id, root_dir=root_dir)
    novel.save_raw(index = 1)
    #chapter = Chapter(domain=_domain, novel_id=_novel_id, novel_name=_novel_name, chapter_id=_chapter_id, root_dir=_root_dir)
    #chapter.save_raw_chapter()

2_建立图片文字映射关系.py

# encoding:utf-8
import cv2  
import os  
import base64
import requests  
import time  
import numpy as np

from config import *

dict_path = "%s/dict.txt"%(root_dir)
dir_pics_raw = "%s/pics_raw/"%(root_dir)
dir_pics_done = "%s/pics_done/"%(root_dir)

if not os.path.exists(dir_pics_raw):
    os.makedirs(dir_pics_raw)
if not os.path.exists(dir_pics_done):
    os.makedirs(dir_pics_done)




def import_data():
    try:
        with open(dict_path, "r", encoding="utf-8") as f:
            line = f.readline()
            while line:
                args = line.split("#")
                #print(args)
                dict_novel[args[0]] = args[1].strip()
                line = f.readline()
    except Exception as e:
        print(e)
        print("导入失败")
            
def export_data():
    try:
        with open(dict_path, "w", encoding="utf-8") as f:
            for key, value in dict_novel.items():
                line = "%s#%s\n"%(key, value)
                f.write(line)
    except Exception as e:
        print(e)
        print("保存失败")
        
def trans_pic2text(jpg_path):
    with open(jpg_path, 'rb') as f:
        img = base64.b64encode(f.read())
        params = {"image":img}
        #request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/general_basic?access_token=" + access_token
        request_url = "https://aip.baidubce.com/rest/2.0/ocr/v1/accurate_basic?access_token=" + access_token
        headers = {'content-type': 'application/x-www-form-urlencoded'}
        response = requests.post(request_url, data=params, headers=headers).json()
        #print(jpg_path)
        try:
            word = response["words_result"][0]["words"]
            #print(response)
            time.sleep(1)
            return word
        except Exception as e:
            print(jpg_path)
            print(response)
            export_data()
def trans_png2jpg2text(png_name):
    png_path = os.path.join(dir_pics_raw, png_name)
    jpg_path = os.path.join(dir_pics_done, png_name.replace("png", "jpg"))
    
    img = cv2.imread(png_path, cv2.IMREAD_UNCHANGED)  
    img = img.astype(np.uint8)
    height, width = img.shape[:2]  
  
    b_channel, g_channel, r_channel, alpha_channel = cv2.split(img)
    #alpha_channel = np.ones(b_channel.shape, dtype=b_channel.dtype) * 255
    alpha_channel = 255 - alpha_channel
    img_BGRA = cv2.merge((alpha_channel, alpha_channel, alpha_channel, alpha_channel))
    cv2.imwrite(jpg_path, img_BGRA)
    return trans_pic2text(jpg_path)
    
if __name__ == '__main__':  
    import_data()
    fileList = os.listdir(dir_pics_raw)
    count = 0
    #print(dict_novel)
    #print("0010005960" in dict_novel)
    for f in fileList: 
        if f.endswith(".png"):
            key = f[:-4]
            if key not in dict_novel:
                value = trans_png2jpg2text(png_name = f)
                print(key, value)
                dict_novel[key] = value
                count = count + 1
                if count == 50:
                    count = 0
                    export_data()
                    print("保存数据")
    export_data()

3_逐章节转换.py

# encoding:utf-8
import os  
import requests  
import re

from config import *

dict_path = "%s/dict.txt"%(root_dir)
dir_pics_raw = "%s/pics_raw/"%(root_dir)
dir_pics_done = "%s/pics_done/"%(root_dir)
dir_novel_raw = "%s/novels/%s/raw"%(root_dir, novel_name)
dir_novel_done = "%s/novels/%s/done"%(root_dir, novel_name)

if not os.path.exists(dir_novel_done):
    os.makedirs(dir_novel_done)

def import_data():
    try:
        with open(dict_path, "r", encoding="utf-8") as f:
            line = f.readline()
            while line:
                args = line.split("#")
                #print(args)
                dict_novel[args[0]] = args[1].strip()
                line = f.readline()
    except Exception as e:
        print(e)
        print("导入失败")
            
    
if __name__ == '__main__':  
    import_data()
    fileList = os.listdir(dir_novel_raw)
    #print(dict_novel)
    
    for f in fileList: 
        chapter_name = f[:-4]
        chapter_path = os.path.join(dir_novel_raw, f)
        chapter_path_done = os.path.join(dir_novel_done, f)
        #print(chapter_name)
        if os.path.exists(chapter_path_done):
            continue
        print(chapter_name)
        with open(chapter_path, "r", encoding="utf-8") as chp:
            chapter_raw_content = chp.read().replace("<br/>", "\n")
            chapter_content = ""
            it = re.finditer(r'<img src="(/toimg/data/([0-9a-z]+)\.png)"/>', chapter_raw_content) 
            p_start = 0
            for match in it: 
                #print(match.group())
                left = chapter_raw_content[p_start : match.start()]
                right = dict_novel[match.group(2)]
                p_start = match.end()
                chapter_content += left + right
            chapter_content += chapter_raw_content[p_start:]
            #print(chapter_content)
            with open(chapter_path_done, "w", encoding="utf-8") as file:
                file.write(chapter_content)
        #break

4_合并章节.py

# encoding:utf-8
import os  
import requests  
import re

from config import *

dict_path = "%s/dict.txt"%(root_dir)
dir_novel_done = "%s/novels/%s/done"%(root_dir, novel_name)
novel_path = "%s/novels/%s.txt"%(root_dir, novel_name)

if not os.path.exists(dir_novel_done):
    os.makedirs(dir_novel_done)

number_map = {
    "一": 1,
    "二": 2,
    "三": 3,
    "四": 4,
    "五": 5,
    "六": 6,
    "七": 7,
    "八": 8,
    "九": 9,
    "十": 10,
    "十一": 11,
    "十二": 12,
    "十三": 13,
    "十四": 14,
    "十五": 15,

}
def import_data():
    try:
        with open(dict_path, "r", encoding="utf-8") as f:
            line = f.readline()
            while line:
                args = line.split("#")
                #print(args)
                dict_novel[args[0]] = args[1].strip()
                line = f.readline()
    except Exception as e:
        print(e)
        print("导入失败")
            
def get_chapter_num(file_name):
    """
    此处可能需要具体问题具体分析
    """
    searchObj = re.search(r'\d+', file_name)
    chapter_num = searchObj.group()
    return int(chapter_num)
    """
    searchObj = re.search(r'第([一二三四五六七八九十]+)集.*第([一二三四五六七八九十]+)章', file_name)
    num1 = searchObj.group(1)
    num2 = searchObj.group(2)
    return number_map[num1] * 100 + number_map[num2]
    """
def merge_without_file_name():
    # 章节名排序
    fileList = os.listdir(dir_novel_done)
    fileList.sort(key=get_chapter_num)
    
    with open(novel_path, "w", encoding="utf-8") as f:
        for file in fileList:
            chapter_path_done = os.path.join(dir_novel_done, file)
            with open(chapter_path_done, "r", encoding="utf-8") as chp:
                # 读取章节内容后直接写入
                chapter_content = chp.read()
                f.write(chapter_content)
def merge_with_file_name():
    # 章节名排序
    fileList = os.listdir(dir_novel_done)
    fileList.sort(key=get_chapter_num)
    
    with open(novel_path, "w", encoding="utf-8") as f:
        for file in fileList:
            chapter_path_done = os.path.join(dir_novel_done, file)
            with open(chapter_path_done, "r", encoding="utf-8") as chp:
                chapter_content = chp.read()
                # 读取章节内容后先写标题,再写内容
                print(file)
                f.write(file[:-4])
                f.write('\r\n')
                f.write(chapter_content)
                f.write('\r\n')
if __name__ == '__main__':  
    #merge_with_file_name()
    merge_without_file_name()

内容
隐藏