关于URL采集的构想与实现

2024-08-16 119 0

闲言碎语

最近公司要求检查公司网站首页是否被挂了暗链,网上查了下对应脚本较少且,于是就写了一个关于获取网站的链接的脚本,随着要求的不断增加,再加上一些天马行空的想象,最后写了一款URL采集器

前言

URL采集是一项重要的工作,它能帮我们快速的采集到符合需求的相关URL,但市面上大部分的URL采集软件的原理都是利用多个搜索引擎的接口,输入关键字,如:采集招聘网址URL,一般是输入求职/招聘等关键字,然后对每个接口进行最大化的采集网址,自定义黑名单URL,最后去重。
这意味着需要尽可能多的接口包括但不限于谷歌、百度等,然后传参对返回的页面提取网址基于黑名单过滤部分网址,最后迭代页数。
看上去没错,输入关键词获取相关的网址。但却隐藏着几个缺点:
1、采集网址都是被搜索引擎收录的,导致许多符合需求的URL无法采集到
2、过滤不细致,只靠去重+黑名单过滤,采集到的站点不能保证是需要的
3、采集URL每个人都可以用,关键词也差不多,导致最后采集的结果也差不多,这对于网安人员来说并不友好,因为这就意味着你好不容易找到一个漏洞站点可能已被许多人利用过

功能

为了解决上诉缺点,我打算写一个URL深度采集脚本,前期构想的功能点:
1、提供两个入口,一个搜索引擎接口或导入采集好的网址
2、传入关键字爬取到符合需求网址再次自动进行友链爬行
3、导入的文本能先筛选掉不符合的站点,而后自定义是否进行友链爬取
4、用户可自定义URL黑白名单、URL网站标题黑白名单,URL网页内容黑白名单
简略流程图如下:
关于URL采集的构想与实现插图
banner

title = '''
       __   _______   __    __  .______       __
      |  | |   ____| |  |  |  | |   _  \     |  |
      |  | |  |__    |  |  |  | |  |_)  |    |  |
.--.  |  | |   __|   |  |  |  | |      /     |  |
|  `--'  | |  |      |  `--'  | |  |\  \----.|  `----.
 \______/  |__|  _____\______/  | _| `._____||_______|
                |______|
                                        Author:JF
                                        Version:V1.0
        '''

URL采集源码

友链采集

方式一:正则过滤

def GetLink(url):
    UA = random.choice(headerss)
    headers = {'User-Agent': UA, 'Connection': 'close'}
    link_urls = []
    try:
        r = requests.get(url, headers=headers, verify=False, timeout=timeout)
        encoing = requests.utils.get_encodings_from_content(r.text)[0]
        content = r.content.decode(encoing)
        urls = [f"{urlparse(url).scheme}://{urlparse(url).netloc}"  for url in re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*,]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', content, re.I)]
        for url in list(set(urls)):
            url = url.replace('\')','')
            link_urls.append(url)
            #判断存活
            # try:
            #     r = requests.get(url, timeout=5, verify=False)
            #     if b'Service Unavailable' not in r.content and b'The requested URL was not found on' not in r.content and b'The server encountered an internal error or miscon' not in r.content:
            #         if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
            #             link_urls.append(url)
            # except Exception as error:
            #     pass
    except:
        pass
    return list(set(link_urls))

方式二:bs4过滤

def GetLink(url):
    UA = random.choice(headerss)
    headers = {'User-Agent': UA, 'Connection': 'close'}
    try:
        r = requests.get(url, headers=headers, verify=False)
        encoding = requests.utils.get_encodings_from_content(r.text)[0]
        content = r.content.decode(encoding)

        # 使用BeautifulSoup解析HTML内容
        soup = BeautifulSoup(content, 'html.parser')

        # 提取页面中可能包含URL的常见标签属性
        bs4_urls = set()
        for tag in ['a', 'img', 'script', 'link']:
            for attr in ['href', 'src']:
                for element in soup.find_all(tag):
                    if attr in element.attrs:
                        href = element.get(attr)
                        if href and (href.startswith('http://') or href.startswith('https://')):
                            parsed = urlparse(href)
                            url = f"{parsed.scheme}://{parsed.netloc}"
                            bs4_urls.add(url)
    except Exception as e:
        pass

    #url存活加入
    link_urls = []
    for bs4_url in bs4_urls:
        try:
            r = requests.head(bs4_url, timeout=5, headers=headers, verify=False)
            if r.status_code == 200 or r.status_code == 301 or r.status_code == 301:
                    link_urls.append(bs4_url)
        except Exception as error:
            pass
    return link_urls

关键字采集

调用百度搜索接口对用户输入的关键字进行搜索并提取出前7页的url

def BDUrl(key):
    cookie = input('请输入cookie:')
    bd_headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36",
        "Referer": "https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&rsv_idx=2&ch=&tn=baiduhome_pg&bar=&wd=123&oq=123&rsv_pq=896f886f000184f4&rsv_t=fdd2CqgBgjaepxfhicpCfrqeWVSXu9DOQY5WyyWqQYmsKOC%2Fl286S248elzxl%2BJhOKe2&rqlang=cn",
        # "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
        "Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7",
        "Sec-Fetch-Mode": "navigate",
        "Cookie": cookie,
        "Connection": "Keep-Alive",
    }
    bd_url = []
    for page in range(0, 8):
        url = 'http://www.baidu.com/s?wd={}&pn={}0'
        try:
            r = requests.get(url.format(key, page), headers=bd_headers, verify=False)
            encoing = requests.utils.get_encodings_from_content(r.text)[0]
            content = r.content.decode(encoing)
            result = [f"{urlparse(url).scheme}://{urlparse(url).netloc}" for url in re.findall('mu="(.*?)"', content)[1:]]
            #result = [url.split('//')[1].split('/')[0] for url in re.findall('mu="(.*?)"', content)[1:]]
            for res_url in list(set(result)):
                bd_url.append(res_url)
                #判断存活
                # try:
                #     r = requests.get(res_url, timeout=5, verify=False)
                #     if b'Service Unavailable' not in r.content and b'The requested URL was not found on' not in r.content and b'The server encountered an internal error or miscon' not in r.content:
                #         if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                #             bd_url.append(res_url)
                # except Exception as error:
                #     pass
        except:
            pass
    return list(set(bd_url))

ini配置文件

脚本核心:用户通过自定义配置文件的内容筛选出想要的url

[User]
# 程序的用户名
whoami = JF

#state友链爬行,0关闭,1开启
#其它:
#None不检测该关键词
#支持或(or)逻辑,即 |
#优先级:网址黑名单>网址白名单>标题黑名单>标题白名单>网页内容黑名单>网页内容白名单

[Config]
#友链爬行
state = 0

# 网址黑名单
black_url = None

# 网址白名单
white_url = None

# 标题黑名单
black_title = None

# 标题白名单
white_title = 安全狗

# 网页内容黑名单
black_content = None

# 网页内容白名单
white_content = None

# 连接超时5秒
timeout = 5

用户规则树

通过配置文件所写的函数

方式一:参数为url列表

def RuleUrl(urls):
    ruleurls = []
    UA = random.choice(headerss)
    header = {'User-Agent': UA, 'Connection': 'close'}
    # 第一步,URL黑名单限制,在黑名单内的全部排除
    black_url_or = []
    for url in urls:
        if black_url == 'None':
            black_url_or.append(url)
        elif '|' in black_url:
            black_url_key = black_url.split('|')
            if all(key not in url for key in black_url_key):
                black_url_or.append(url)
        else:
            black_url_key = black_url
            if any(key not in url for key in black_url_key):
                black_url_or.append(url)

    # 第二步,URL白名单限制,出现在白名单的才保存
    white_url_or = []
    for url in black_url_or:
        if white_url == 'None':
            white_url_or.append(url)
        elif '|' in white_url:
            white_url_key = white_url.split('|')
            if any(key in url for key in white_url_key):
                white_url_or.append(url)
        else:
            white_url_key = white_url
            if all(key in url for key in white_url_key):
                white_url_or.append(url)

    # 第三步,网站标题黑名单过滤,在黑名单内的全部排除
    black_title_or = []
    for url in white_url_or:
        if black_title == 'None':
            black_title_or.append(url)
        elif '|' in black_title:
            black_title_key = black_title.split('|')
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    title = re.findall('<title>(.*?)</title>', content, re.S)
                    if all(key not in title[0] for key in black_title_key):
                        black_title_or.append(url)
            except:
                pass
        else:
            black_title_key = black_title
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    title = re.findall('<title>(.*?)</title>', content, re.S)
                    if all(key not in title[0] for key in black_title_key):
                        black_title_or.append(url)
            except:
                pass

    # 第四步,网站标题白名单过滤,出现在白名单的才保存
    white_title_or = []
    for url in black_title_or:
        if white_title == 'None':
            white_title_or.append(url)
        elif '|' in white_title:
            white_title_key = white_title.split('|')
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    title = re.findall('<title>(.*?)</title>', content, re.S)
                    if any(key in title[0] for key in white_title_key):
                        white_title_or.append(url)
            except:
                pass
        else:
            white_title_key = white_title
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    title = re.findall('<title>(.*?)</title>', content, re.S)
                    if all(key in title[0] for key in white_title_key):
                        white_title_or.append(url)
            except:
                pass

    # 第五步,网页内容黑名单过滤,出现在黑名单的全排除
    black_content_or = []
    for url in white_title_or:
        if black_content == 'None':
            black_content_or.append(url)
        elif '|' in black_content:
            black_content_key = black_content.split('|')
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    if all(key not in content for key in black_content_key):
                        black_content_or.append(url)
            except:
                pass
        else:
            black_content_key = black_content
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    if any(key not in content for key in black_content_key):
                        black_content_or.append(url)
            except:
                pass
    # 第六步,网页内容白名单过滤,只保存出现在白名单内的url
    white_content_or = []
    for url in black_content_or:
        if white_content == 'None':
            white_content_or.append(url)
        elif '|' in white_content:
            white_content_key = white_content.split('|')
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    if any(key in content for key in white_content_key):
                        white_content_or.append(url)
            except:
                pass
        else:
            white_content_key = white_content
            try:
                r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
                encoing = requests.utils.get_encodings_from_content(r.text)[0]
                content = r.content.decode(encoing)
                if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
                    if all(key in content for key in white_content_key):
                        white_content_or.append(url)
            except:
                pass

    return white_content_or

方式二:接收参数改为单个url

第一版的整体逻辑虽然实现了,但效率太慢了,这一版改了整体逻辑
把接收参数改为单个url,用true/false判断传入url是否满足条件,而后实现并发

def rule_url(url):
    # 第一步,URL黑名单限制,在黑名单内的全部排除
    if black_url != 'None' and (any(key in url for key in black_url.split('|'))):
        return False

    # 第二步,URL白名单限制,出现在白名单的才保存
    if white_url != 'None' and(all(key not in url for key in white_url.split('|'))):
        return False

    try:
        UA = random.choice(headerss)
        header = {'User-Agent': UA, 'Connection': 'close'}
        r = requests.get(url=url, headers=header, verify=False, timeout=timeout)
        if r.status_code == 200 or r.status_code == 301 or r.status_code == 302:
            encoing = requests.utils.get_encodings_from_content(r.text)[0]
            content = r.content.decode(encoing)
            title = re.findall('<title>(.*?)</title>', content, re.S)[0]
            # 第三步,网站标题黑名单过滤,在黑名单内的全部排除
            if black_title != 'None' and (any(key in title for key in black_title.split('|'))):
                return False
            # 第四步,网站标题白名单过滤,出现在白名单的才保存
            if white_title != 'None' and (all(key not in title for key in white_title.split('|'))):
                return False

            # 第五步,网页内容黑名单过滤,出现在黑名单的全排除
            if black_content != 'None' and (any(key in content for key in black_content.split('|'))):
                return False
            # 第六步,网页内容白名单过滤,只保存出现在白名单内的url
            if white_content != 'None' and (all(key not in content for key in white_content.split('|'))):
                return False

            return url
        else:
            return False
    except:
        return False
    return False

入口函数

# 程序入口
def Result():
    print(f'当前用户:{whoami}')
    if state == '0':
        print(f'[-]友链爬行:关闭')
    elif state == '1':
        print(f'[+]友链爬行:开启')
    else:
        print(f'[x]友链爬行:输入0/1!')
    if black_url == 'None':
        print(f'[-]网址黑名单:关闭', end='')
    else:
        print(f'[+]网址黑名单:开启', end='')
    if white_url == 'None':
        print(f'  [-]网址白名单:关闭')
    else:
        print(f'  [+]网址白名单:开启')

    if black_title == 'None':
        print(f'[-]标题黑名单:关闭', end='')
    else:
        print(f'[+]标题黑名单:开启', end='')
    if white_title == 'None':
        print(f'  [-]标题白名单:关闭')
    else:
        print(f'  [+]标题白名单:开启')

    if black_content == 'None':
        print(f'[-]网页黑名单:关闭', end='')
    else:
        print(f'[+]网页黑名单:开启', end='')
    if black_content == 'None':
        print(f'  [-]网页白名单:关闭')
    else:
        print(f'  [+]网页白名单:开启')
    print('='*50)
    print('0:关键字扫描       1:导入文本扫描')
    try:
        num = int(input('请选择启动方式(0/1):'))
        if num == 0:
            rurls = set()
            keywor = input('请输入关键字:')
            t1 = time.time()

            bd = bd_urls(keywor,num)
            rule_bd_urls = set()
            with ThreadPoolExecutor(max_workers=10) as executor:
                results_bd = executor.map(rule_url, bd)
            with lock:
                for url in results_bd:
                    if url:
                        rule_bd_urls.add(url)

            # 每个url再进行友链检查
            l_urls = set()
            rule_link_urls = set()
            for url in rule_bd_urls:
                link_urls = get_links(url)
                l_urls.update(link_urls)
            with ThreadPoolExecutor(max_workers=10) as executor:
                results_link = executor.map(rule_url, l_urls)
            with lock:
                for url in results_link:
                    if url:
                        rule_link_urls.add(url)
            # #可以直接保存集合或列表,减少IO
            with open(f'{keywor}_url.txt', 'a+', encoding='utf-8') as a:
                a.write('\n'.join(rule_link_urls))
            t2 = time.time()
            print(f'扫描结束,耗时:{t2-t1},结果已保存至【{keywor}_url.txt】中')
        elif num == 1:
            print('提示:文本中的网址需带有协议类型,如:http/https')
            urls = set([url.strip() for url in open(input('将需要扫描的url拖到此窗口:'),'r',encoding='utf-8')])
            print(f'文本内共发现{len(urls)}个站点,扫描中。。。')
            result = set()
            if state == '0':
                t1 = time.time()
                with ThreadPoolExecutor(max_workers=10) as executor:
                    results_link = executor.map(rule_url, urls)
                with lock:
                    for url in results_link:
                        if url:
                            result.add(url)
            elif state == '1':
                t1 = time.time()
                l_url = set()
                for url in urls:
                    l = get_links(url)
                    l_url.update(l)

                with ThreadPoolExecutor(max_workers=10) as executor:
                    results_link = executor.map(rule_url, l_url)
                with lock:
                    for url in results_link:
                        if url:
                            result.add(url)
            with open(filename, 'a+', encoding='utf-8') as a:
                a.write('\n'.join(result))
            t2 = time.time()
            t = str(t2-t1).split('.')[0]
            print(f'扫描结束,耗时:{t}s,结果已保存至【{filename}】中')

        else:
            print('输入有误,程序结束!')

    except Exception as e:
        print(f'输入有误,程序结束,错误类型:{e}')

if __name__ == '__main__':
    title = '''
           _ ______   _    _ _____  _
          | |  ____| | |  | |  __ \| |
          | | |__    | |  | | |__) | |
      _   | |  __|   | |  | |  _  /| |
     | |__| | |      | |__| | | \ \| |____ 
      \____/|_|       \____/|_|  \_\______|
    '''
    #print(title)
    Result()

使用测试

脚本测试了1w+站点,目前运行正常

配置文件config.ini

  • None不检测该关键词,只支持或逻辑,即符号|

  • 不检测可用None,字段不可放空,否则脚本无法正常运行

  • state只支持0/1,0关闭导入文本的友链爬行,1开启导入文本的友链爬行

  • 关键字优先级:网址黑 > 网址白 > 标题黑 > 标题白 > 网页内容黑 > 网页内容白

演示:
注:爬取结束后结果以txt格式保存在当前目录下

1、通过搜索引擎进行爬取教育类站点

关于URL采集的构想与实现插图1

2、通过导入的文本先筛选出教育类站点,不进行友链爬取

关于URL采集的构想与实现插图2

3、通过导入的文本先筛选出教育类站点,再进行友链爬取

关于URL采集的构想与实现插图3

结语

通过上述的代码,你可以完成该脚本,如果你不想麻烦也可以下载我打包好的,目前该脚本已打包上传到github:https://github.com/JiangFengSec/JF_URL,感兴趣的师傅可以下载尝试下,对你有帮助的话帮忙点个stars,感谢!


4A评测 - 免责申明

本站提供的一切软件、教程和内容信息仅限用于学习和研究目的。

不得将上述内容用于商业或者非法用途,否则一切后果请用户自负。

本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑或手机中彻底删除上述内容。

如果您喜欢该程序,请支持正版,购买注册,得到更好的正版服务。如有侵权请邮件与我们联系处理。敬请谅解!

程序来源网络,不确保不包含木马病毒等危险内容,请在确保安全的情况下或使用虚拟机使用。

侵权违规投诉邮箱:4ablog168#gmail.com(#换成@)

相关文章

电力企业 | 安全建设框架
HTB-Infiltrator:一文带你走进域渗透
JAVA安全 | Classloader:理解与利用一篇就够了
多角度揭秘威胁行为组织CryptoCore复杂的加密货币欺诈活动
网络空间的“边水往事”?针对华语黑产及用户进行攻击的 APT-K-UN3 活动分析
伪装“黑神话悟空修改器”传播木马的活动分析

发布评论