增强版磁力链接搜索神器

December 09, 2023
测试
测试
测试
测试
37 分钟阅读

比特洪流(BitTorrent)是一种内容分发协议,由布拉姆·科恩自主开发。它采用高效的软件分发系统和点对点技术共享大体积文件(如一部电影或电视节目),并使每个用户像网络重新分配结点那样提供上传服务。一般的下载服务器为每一个发出下载请求的用户提供下载服务,而BitTorrent的工作方式与之不同。分配器或文件的持有者将文件发送给其中一名用户,再由这名用户转发给其它用户,用户之间相互转发自己所拥有的文件部分,直到每个用户的下载都全部完成。这种方法可以使下载服务器同时处理多个大体积文件的下载请求,而无须占用大量带宽。

实现这种协议的方式有很多,磁力链接是其中最常见的一种,有磁力链接就等于有了一切。今天我就来讲一下如何使用 Python 爬虫来获取尽可能多的磁力链接。

概述

虽然我在以前写过一个磁力链接的搜索神器,但那个只爬了一个网站,实际上磁力链接的网站有很多,我们要做的是同时爬很多网站。为了确保效率和可靠性,如何提升效率?其实很简单,因为一个网站对应一个服务器,我们可以直接令每一个网站的爬虫作为一个线程,直接同时爬取多个网站,充分利用 I/O 资源,提升效率。那么如何提升可靠性呢?因为一台机器短时间访问一个网站太多次会被封 IP,我们可以控制访问频率(使用 sleep 做等待),也可以让 Web 服务器认为是多个机器在访问,而不是我一个机器(设置代理)。我在这里直接使用 sleep 做等待了,毕竟免费代理太不靠谱了。

抽象共有的属性

对于每一个网站的爬虫,都需要 headers ,关键字和代理这 3 个字段,还有一个出错处理的装饰器(也就是方法/函数),因此我们直接抽象出一个 Spider 基类,因为每一个 spider 是一个线程,所以这个 Spider 基类继承 Thread,代码如下:

class Spider(Thread):
    def __init__(self, k, headers=None, proxies=None):
        super().__init__()
        self.keyword = k
        self.headers = headers
        self.proxies = proxies

    @staticmethod
    def except_exception(f):
        def wrapper(*args, **kwargs):
            try:
                f(*args, **kwargs)
            except BaseException as e:
                str(e)
        return wrapper

定义具体的 Spider

具体的 Spider 定义起来很简单,继承上面定义的抽象的 Spider,然后重写抽象 Spider 的父类 Thread 中的 run 方法就行了,另外不要忘了给这个 run 方法加上出错处理的功能,因为多线程是一处崩溃就全部崩溃的,就算一个网站的爬虫出现了问题,我也不能让它影响其他网站的数据采集和整个程序的运行。其中一个 Spider 的代码如下:

class CiLiSqlSpider(Spider):
    """
    磁力社区 https://www.cilisql.com/        """
    @Spider.except_exception
    def run(self):
        magnet_link_pattern = compile(r'<a title=".*?" target="_blank" href="(https://www\.cilisql\.com/h/\d+)">')
        name_pattern = compile(r'<h1 class="T1">(.*?)</h1>')
        size_pattern = compile(r'<p>文件大小: (\d+\.?\d*)\xa0(.*?)</p>')
        magnet_pattern = compile(r'<a href="(magnet:\?xt=urn:btih:.*?&amp;dn=.*?)">magnet:\?xt=urn:btih:.*?</a>')
        for page in range(1, 101):
            sleep(1+random())
            text = get(f'https://www.cilisql.com/search/{quote(self.keyword)}/?c=&s=time&p={page}',
                       headers=self.headers, proxies=self.proxies).content.decode()
            magnet_links = magnet_link_pattern.findall(text)
            if not magnet_links:
                break
            for magnet_link in magnet_links:
                sleep(1+random())
                text = get(magnet_link, headers=self.headers, proxies=self.proxies).content.decode()
                name = name_pattern.findall(text)
                size = size_pattern.findall(text)
                magnet = magnet_pattern.findall(text)
                print(f'名称:{name[0]}\n大小:{size[0][0]+size[0][1]}\n磁力链接:{magnet[0]}\n')

其余的具体的 Spider 和这个差不多,都是在 run 方法中做查找并输出,只是具体的细节不一样而已。

整个爬虫的源代码(不完整)

实际上也不能算是完整的源代码,有几个具体的 Spider 我只是写了个框架,但是大家自己完善一下应该不是什么问题。

from requests import get, post
from urllib.parse import quote
from re import compile
from time import sleep
from random import random
from threading import Thread
"""
DiggBT种子搜索神器 http://www.diggbtcn.me樱桃BT https://www.yingtaobt.com/磁力社区 https://www.cilisql.com/BT磁力链 https://www.bturl.tv/磁力吧 http://www.ciliba.pw/BT Kitty http://newbtkitty.pw/磁力猫 https://www.cilimao.cc/磁力福利 http://cilifuli.pw/卧槽搜搜 http://www.wcs444.com/傻逼吧 https://www.findcl.co/Nyaa搜索 https://nyaa.si/蓝光电影下载网 http://film.blu-raydisc.tv/蓝光网 http://www.languang.co/"""


class Spider(Thread):
    def __init__(self, k, headers=None, proxies=None):
        super().__init__()
        self.keyword = k
        self.headers = headers
        self.proxies = proxies

    @staticmethod
    def except_exception(f):
        def wrapper(*args, **kwargs):
            try:
                f(*args, **kwargs)
            except BaseException as e:
                str(e)
        return wrapper


class DigGBtCnSpider(Spider):
    """
    DiggBT种子搜索神器 http://www.diggbtcn.me    """
    def run(self):
        pass


class YinGTaoBtSpider(Spider):
    """
    樱桃BT https://www.yingtaobt.com/    """
    def run(self):
        pass


class CiLiSqlSpider(Spider):
    """
    磁力社区 https://www.cilisql.com/    """
    @Spider.except_exception
    def run(self):
        magnet_link_pattern = compile(r'<a title=".*?" target="_blank" href="(https://www\.cilisql\.com/h/\d+)">')
        name_pattern = compile(r'<h1 class="T1">(.*?)</h1>')
        size_pattern = compile(r'<p>文件大小: (\d+\.?\d*)\xa0(.*?)</p>')
        magnet_pattern = compile(r'<a href="(magnet:\?xt=urn:btih:.*?&amp;dn=.*?)">magnet:\?xt=urn:btih:.*?</a>')
        for page in range(1, 101):
            sleep(1+random())
            text = get(f'https://www.cilisql.com/search/{quote(self.keyword)}/?c=&s=time&p={page}',
                       headers=self.headers, proxies=self.proxies).content.decode()
            magnet_links = magnet_link_pattern.findall(text)
            if not magnet_links:
                break
            for magnet_link in magnet_links:
                sleep(1+random())
                text = get(magnet_link, headers=self.headers, proxies=self.proxies).content.decode()
                name = name_pattern.findall(text)
                size = size_pattern.findall(text)
                magnet = magnet_pattern.findall(text)
                print(f'名称:{name[0]}\n大小:{size[0][0]+size[0][1]}\n磁力链接:{magnet[0]}\n')


class BtUrlSpider(Spider):
    """
    BT磁力链 https://www.bturl.tv/    """
    def run(self):
        pass


class CiLiBaSpider(Spider):
    """
    磁力吧 http://www.ciliba.pw/    """
    def run(self):
        pass


class NewBtKittySpider(Spider):
    """
    BT Kitty http://newbtkitty.pw/    """
    def run(self):
        pass


class CiLiMaoSpider(Spider):
    """
    磁力猫 https://www.cilimao.cc/    """
    @Spider.except_exception
    def run(self):
        url = post('https://www.cilimao.cc/', headers=self.headers, proxies=self.proxies).url
        for page in range(1, 501):
            sleep(1+random())
            text = get(f'{url}search?word={quote(self.keyword)}&page={page}', headers=self.headers,
                       proxies=self.proxies).content.decode()
            bai_du_wang_pan_code = self.bai_du_wang_pan(text)
            magnet_code = self.magnet(url, text)
            if magnet_code == bai_du_wang_pan_code == 0:
                break

    @staticmethod
    def bai_du_wang_pan(text):
        bai_du_wang_pan_pattern = compile(r'<a class="Search__result_title___24kb_" href="(https://pan.baidu.com/.*?)" '
                                          r'target="_blank" data-reactid="\d{3}">(.*?)</a>')
        links_and_names = bai_du_wang_pan_pattern.findall(text)
        if not links_and_names:
            return 0
        seps = compile(r'[&;]')
        for link, name in links_and_names:
            name = name.replace('<em>', '').replace('</em>', '')
            name_chars = seps.split(name)
            for i in range(len(name_chars)):
                if name_chars[i].startswith('#'):
                    name_chars[i] = chr(int(name_chars[i][1:]))
            name = ''.join(name_chars)
            print(f'名称:{name}\n链接:{link}\n')

    def magnet(self, url, text):
        magnet_link_pattern = compile(r'<a href="/(information/.*?)" target="_blank" class="Search__result_title___24kb'
                                      r'_" data-reactid="\d{3}">.*?</a>')
        magnet_links = magnet_link_pattern.findall(text)
        if not magnet_links:
            return 0
        name_pattern = compile(r'<p id="Information__title___3V6H-" data-reactid="\d+">(.*?)</p>')
        size_pattern = compile(r'&#25991;&#20214;&#22823;&#23567;&#65306;<!-- /react-text --><b data-reactid="\d+">'
                               r'(.*?)</b>')
        magnet_pattern = compile(r'<a href="(magnet:\?xt=urn:btih:.*?)" class=".*?" data-reactid="\d+">magnet:\?xt=urn:'
                                 r'btih:.*?</a>')
        seps = compile(r'[&;]')
        for magnet_link in magnet_links:
            sleep(1+random())
            text = get(url+magnet_link, headers=self.headers, proxies=self.proxies).content.decode()
            name = name_pattern.findall(text)[0]
            size = size_pattern.findall(text)[0]
            magnet = magnet_pattern.findall(text)[0]
            name_chars = seps.split(name)
            for i in range(len(name_chars)):
                if name_chars[i].startswith('#'):
                    name_chars[i] = chr(int(name_chars[i][1:]))
            name = ''.join(name_chars)
            print(f'名称:{name}\n大小:{size}\n磁力链接:{magnet}\n')


class CiLiFuLiSpider(Spider):
    """
    磁力福利 http://cilifuli.pw/    """
    def run(self):
        pass


class Wcs444Spider(Spider):
    """
    卧槽搜搜 http://www.wcs444.com/    """
    @Spider.except_exception
    def run(self):
        magnet_link_pattern = compile(r'<a class="title" href="(/i/.*?)">.*?</a>')
        name_pattern = compile(r'<h1>(.*?)</h1>')
        size_pattern = compile(r'<p>文件大小:(.*?)</p>')
        magnet_pattern = compile(r'<a rel="nofollow" href=.*?>(magnet:\?xt=urn:btih:.*?)</a>')
        for page in range(1, 10000):
            sleep(1+random())
            text = get(f'http://www.wcs444.com/s/{self.keyword}-hot-desc-{page}').content.decode()
            magnet_links = magnet_link_pattern.findall(text)
            if not magnet_links:
                break
            for magnet_link in magnet_links:
                sleep(1+random())
                text = get(f'http://www.wcs444.com{magnet_link}', headers=self.headers,
                           proxies=self.proxies).content.decode()
                name = name_pattern.findall(text)[0]
                size = size_pattern.findall(text)[0]
                magnet = magnet_pattern.findall(text)[0]
                print(f'名称:{name}\n大小:{size}\n磁力链接:{magnet}\n')


class FindClSpider(Spider):
    """
    傻逼吧 https://www.findcl.co/    """
    def run(self):
        pass


class NyAaSpider(Spider):
    """
    Nyaa搜索 https://nyaa.si/    """
    @Spider.except_exception
    def run(self):
        # https://nyaa.si/?f=0&c=0_0&q=A
        text = get(f'https://nyaa.si/?f=0&c=0_0&q={quote(self.keyword)}', headers=self.headers,
                   proxies=self.proxies).content.decode()
        total_pattern = compile(r'Displaying results 1-\d+ out of (\d+) results\.')
        total = total_pattern.findall(text)
        if total:
            total_page = int(total[0])//75+1
            name_pattern = compile(r'<a href="/view/\d+" title=".*?">(.*?)</a>')
            size_pattern = compile(r'<td class="text-center">(\d+\.\d .*?B)</td>')
            magnet_pattern = compile(r'<a href="magnet:\?xt=urn:btih:.*?&amp;dn=.*?">')
            names = name_pattern.findall(text)
            sizes = size_pattern.findall(text)
            magnets = magnet_pattern.findall(text)
            for name, size, magnet in zip(names, sizes, magnets):
                print(f'名称:{name}\n大小:{size}\n磁力链接:{magnet}\n')
            for page in range(2, total_page+1):
                sleep(1+random())
                # https://nyaa.si/?f=0&c=0_0&q=A&p=14
                text = get(f'https://nyaa.si/?f=0&c=0_0&q={self.keyword}&p={page}').content.decode()
                names = name_pattern.findall(text)
                sizes = size_pattern.findall(text)
                magnets = magnet_pattern.findall(text)
                for name, size, magnet in zip(names, sizes, magnets):
                    print(f'名称:{name}\n大小:{size}\n磁力链接:{magnet}\n')


class BLuRayDiscSpider(Spider):
    """
    蓝光电影下载网 http://film.blu-raydisc.tv/    """
    def run(self):
        pass


class LanGuAngSpider(Spider):
    """
    蓝光网 http://www.languang.co/    """
    def run(self):
        pass


if __name__ == '__main__':
    keyword = input('关键字:')
    threads = []
    ci_li_sql_headers = {'Upgrade-Insecure-Requests': '1',
                         'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chr'
                                       'ome/63.0.3239.132 Safari/537.36'}
    ci_li_sql_spider = CiLiSqlSpider(keyword, headers=ci_li_sql_headers)
    threads.append(ci_li_sql_spider)
    ci_li_mao_headers = {'Upgrade-Insecure-Requests': '1',
                         'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chr'
                                       'ome/63.0.3239.132 Safari/537.36'}
    ci_li_mao_spider = CiLiMaoSpider(keyword, headers=ci_li_mao_headers)
    threads.append(ci_li_mao_spider)
    wcs444_headers = {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
                      'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9',
                      'Connection': 'keep-alive', 'Host': 'www.wcs444.com', 'Upgrade-Insecure-Requests': '1',
                      'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome'
                                    '/63.0.3239.132 Safari/537.36'}
    wcs444_spider = Wcs444Spider(keyword, headers=wcs444_headers)
    threads.append(wcs444_spider)
    ny_aa_headers = {'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
                     'accept-encoding': 'gzip, deflate, br', 'accept-language': 'zh-CN,zh;q=0.9',
                     'upgrade-insecure-requests': '1',
                     'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/'
                                   '63.0.3239.132 Safari/537.36'}
    ny_aa_spider = NyAaSpider(keyword, headers=ny_aa_headers)
    threads.append(ny_aa_spider)
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

运行结果如图所示。

继续阅读

更多来自我们博客的帖子

如何安装 BuddyPress
由 测试 December 17, 2023
经过差不多一年的开发,BuddyPress 这个基于 WordPress Mu 的 SNS 插件正式版终于发布了。BuddyPress...
阅读更多
Filter如何工作
由 测试 December 17, 2023
在 web.xml...
阅读更多
如何理解CGAffineTransform
由 测试 December 17, 2023
CGAffineTransform A structure for holding an affine transformation matrix. ...
阅读更多