Flask代码审计SQL注入命令/代码执行反序列化文件操作XXESSRFXSS其他审计实战后记reference
Flask代码审计
SQL注入
1、正确的使用直白一点就是:使用”逗号”,而不是”百分号”
stmt = "SELECT * FROM table WHERE id=?"
connection.execute(stmt, (value,))
#或者
cursor.execute("SELECT * FROM users WHERE name = ?", (username,))
2、检查所有SQL语句是否使用+
、%s
或f-string
直接拼接用户输入
"SELECT * FROM table WHERE id=" + value
"SELECT * FROM table WHERE id=%s" % value
"SELECT * FROM table WHERE id={0}".format(value)
3、SQLAlchemy的text()
是否进行参数化
from sqlalchemy import text
# 错误用法
stmt = text(f"SELECT * FROM users WHERE name = '{username}'")
# 正确用法
stmt = text("SELECT * FROM users WHERE name = :username").bindparams(username=username)
#或
query = "SELECT * FROM articles WHERE title LIKE :keyword"
result = db.session.execute(query, {"keyword": f"%{keyword}%"})
4、ORM安全使用:优先使用ORM方法
# SQLAlchemy ORM
User.query.filter_by(username=username).first()
命令/代码执行
1、危险函数popen、system、commands、subprocess、exec、eval
import subprocess
@app.route('/ping')
def ping():
ip = request.args.get('ip')
result = subprocess.run(["ping", "-c", "1", ip], capture_output=True, text=True)
return f"<pre>{result.stdout}</pre>"
#subprocess.run() 绑定参数,不会执行恶意命令。
2、SSTI
render_template_string
3、第三方库风险
import yaml
# 漏洞示例:使用默认Loader
data = yaml.load(user_input, Loader=yaml.Loader) # 可触发任意代码执行
4、反序列化漏洞:pickle、marshal、PyYAML
import pickle
# 漏洞示例:反序列化用户可控数据
data = request.get_data()
obj = pickle.loads(data) # 攻击者可构造恶意序列化对象(如反弹Shell)
反序列化
反序列化漏洞的核心是程序将不可信的序列化数据还原为对象时,未验证数据合法性,导致攻击者通过构造恶意序列化数据执行任意代码。常见场景:
-
pickle
模块的不安全使用:pickle.loads()
直接反序列化用户输入。 -
PyYAML
的不安全加载:yaml.load()
默认支持执行构造函数(如!!python/object
)。 -
自定义反序列化逻辑:开发者自行实现的
__reduce__
方法被利用
1、不安全模块:pickle
、marshal
、PyYAML
等。
2、防御措施:优先使用安全格式如JSON(json.loads()
)代替pickle
。
3、第三方库的反序列化操作危险库:PyYAML
(默认Loader不安全)、dill
、shelve
等
4、安全使用pickle
:仅反序列化可信数据确保序列化数据来源可信。签名验证对序列化数据签名,防止篡改。
import hmac, pickle key = b'secret_key' data = request.get_data() # 验证HMAC签名 if not hmac.compare_digest(hmac.new(key, data).digest(), request.headers.get('Signature')): abort(403) obj = pickle.loads(data)
5、安全使用PyYAML
,强制使用安全Loader:如SafeLoader
或FullLoader
import yaml data = yaml.load(user_input, Loader=yaml.SafeLoader) # 禁用构造函数
文件操作
1、关键函数
file()、file.save()、open()、codecs.open()
2、安全文件上传(白名单限制文件类型、重命名上传文件、magic验证文件内容)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'} def allowed_file(filename): return '.' in filename and \ filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS if file and allowed_file(file.filename): file.save("safe_path") import uuid secure_name = str(uuid.uuid4()) + ".png" # 生成随机文件名或通过secure_filename() file.save(f"/uploads/{secure_name}") import magic mime = magic.from_buffer(file.read(1024), mime=True) if mime not in ['image/png', 'image/jpeg', 'image/gif']: abort(400, "Invalid file type")
3、防止路径遍历
from werkzeug.utils import secure_filename import os filename = secure_filename(request.form['filename']) # 过滤特殊字符 base_dir = os.path.abspath("/var/data") target_path = os.path.join(base_dir, filename) # 确保目标路径在base_dir下 if not os.path.commonprefix([base_dir, target_path]) == base_dir: abort(403, "Invalid path")
4、安全文件下载:映射文件名到安全ID,不直接暴露文件路径
# 数据库存储文件ID与真实路径的映射 @app.route('/download/<file_id>') def download(file_id): file_path = db.get_file_path(file_id) # 从数据库查询安全路径 return send_file(file_path)
XXE
1、直接解析用户XML并用危险函数/库解析:lxml
、xml.etree.ElementTree
、defusedxml
未安全配置
xml_data='''<!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///c:/cc.txt">]> <data>&xxe;</data>''' from lxml import etree # 漏洞示例:直接解析用户输入 root = etree.fromstring(xml_data) # 允许解析外部实体 print(root.text)
2、禁用外部实体解析、输入过滤、使用JSON替代XML
from defusedxml.ElementTree import parse tree = parse(xml_file) # 默认禁用外部实体 from lxml import etree parser = etree.XMLParser(resolve_entities=False) # 禁用实体解析 root = etree.fromstring(xml_data, parser=parser) if re.search(r"<!ENTITY|SYSTEM|PUBLIC", xml_data, re.IGNORECASE): abort(400, "Invalid XML")
SSRF
1、直接请求、间接URL拼接用户提供的URL(urllib、requests等库)
# 漏洞示例:直接请求用户输入URL url = request.form['url'] response = requests.get(url) # 输入"file:///etc/passwd"可能读取文件(取决于库支持) # 漏洞示例:拼接用户输入到内部API user_id = request.args.get('id') internal_url = f"http://internal-api:8080/user/{user_id}" requests.get(internal_url) # 输入"id@evil.com"可能绕过校验 # requests.get()、urllib.request.urlopen()
2、名单校验域名/IP、限制协议类型、防止DNS重绑定攻击
ALLOWED_DOMAINS = {'example.com', 'cdn.example.net'} from urllib.parse import urlparse def is_allowed_url(url): parsed = urlparse(url) if parsed.hostname in ALLOWED_DOMAINS: return True return False if not is_allowed_url(url): abort(400, "Invalid URL") parsed = urlparse(url) if parsed.scheme not in ['http', 'https']: abort(400, "Unsupported protocol") import socket from urllib.parse import urlparse parsed = urlparse(url) hostname = parsed.hostname # 解析DNS并验证IP是否合法 resolved_ip = socket.gethostbyname(hostname) if resolved_ip in ['127.0.0.1', '169.254.169.254']: abort(403, "Forbidden IP")
XSS
1、直接渲染未转义的用户输入(|safe
过滤器或Markup
类或render_template_string
或直接return输入)
from flask import Markup user_input = request.args.get('q') return render_template('search.html', result=Markup(user_input)) <script> var userData = "{{ search_query|safe }}"; // 输入"; alert(1);// </script>
2、payload
<svg><script>alert&#40;1&#41;</script></svg> <!-- HTML实体编码绕过 --> <script>alert(1)</script> <img src=x onerror=alert(1)> <img/src='https://www.freebuf.com/articles/web/1'/onerror=alert(0)>
3、使用escape()
过滤用户输入,使用render_template渲染,设置 CSP(内容安全策略)
from markupsafe import escape @app.route('/comment', methods=['POST']) def comment(): username = escape(request.args.get('username')) return render_template('profile.html', username=username)
4、文件上传
攻击者上传恶意xss.svg
<svg xmlns="http://www.w3.org/2000/svg" version="1.1">
<circle cx="100" cy="50" r="40" stroke="black" stroke- fill="red" />
<script>alert(1)</script>
</svg>
解决方案:禁止上传 SVG、HTML 文件或者对于所有用户上传的文件,使用
Content-Disposition: attachment
,防止直接在浏览器解析
return send_from_directory(UPLOAD_FOLDER, filename, as_attachment=True)
其他
CSRF
from flask_wtf.csrf import CSRFProtect csrf = CSRFProtect(app)
权限校验
from flask_login import current_user user = User.query.get(user_id) if user.id != current_user.id: abort(403)
逻辑漏洞
# 漏洞示例:未加锁的余额扣减导致并发 user.balance -= amount db.session.commit() # 并发请求可能导致余额为负
组件漏洞
pip install safety safety scan safety system-scan safety scan --apply-fixes
cors
# Flask-CORS 示例 from flask_cors import CORS CORS(app, origins=["https://www.attacker.com"], supports_credentials=True)
Origin: http://www.attacker.com #返回如下 Access-Control-Allow-Origin: https://www.attacker.com Access-Control-Allow-Credentials: true #或者如下 Access-Control-Allow-Origin: null Access-Control-Allow-Credentials: true
配置 | 风险等级 | 说明 |
---|---|---|
Access-Control-Allow-Origin: * +Credentials: true |
安全(但错误) | 浏览器强制阻断请求 |
Access-Control-Allow-Origin: 任意值 +Credentials: true |
高危 | 数据可被攻击者窃取 |
Access-Control-Allow-Origin: null +Credentials: true |
高危 | 特殊场景下可绕过同源策略 |
修复核心:不要未经校验将客户端提供的Origin
直接返回*,且携带凭证时禁止使用通配符*
和null
。
key环境变量
import os app.secret_key = os.getenv("SECRET_KEY", "fallback_secret")
最小权限原则
普通用户 只授予
SELECT, INSERT, UPDATE
权限,管理员用户 才能DROP
或ALTER
数据库
CREATE USER 'appuser'@'localhost' IDENTIFIED BY 'strongpassword'; GRANT SELECT, INSERT, UPDATE ON mydb.* TO 'appuser'@'localhost'; CREATE USER 'admin'@'localhost' IDENTIFIED BY 'adminpassword'; GRANT ALL PRIVILEGES ON mydb.* TO 'admin'@'localhost';
缓存安全
from flask import Flask, request, jsonify from flask_caching import Cache import redis app = Flask(__name__) # 配置缓存(假设Redis在内网,已开启密码和TLS) app.config['CACHE_TYPE'] = 'RedisCache' app.config['CACHE_REDIS_HOST'] = '10.0.0.5' # 内网IP app.config['CACHE_REDIS_PORT'] = 6379 app.config['CACHE_REDIS_PASSWORD'] = 'strong_redis_password' app.config['CACHE_KEY_PREFIX'] = 'myapp:' # 使用命名空间隔离数据 app.config['CACHE_DEFAULT_TIMEOUT'] = 300 cache = Cache(app) # 示例接口:存取用户数据(假设数据敏感,先加密后存储) import hashlib import base64 def encrypt_data(data, key='secret_key'): # 这里仅示例使用简单的哈希加盐方式(实际请使用更安全的加密方法) return base64.b64encode(hashlib.sha256((data + key).encode()).digest()).decode() def decrypt_data(data, key='secret_key'): # 加密后无法直接解密,此处仅为示例 return data @app.route('/store', methods=['POST']) def store(): username = request.form.get('username') secret_info = request.form.get('secret_info') if not username or not secret_info: return jsonify({'error': 'Missing parameters'}), 400 # 加密敏感数据 encrypted = encrypt_data(secret_info) cache_key = f"user:{username}:info" cache.set(cache_key, encrypted) return jsonify({'message': 'Stored securely'}), 200 @app.route('/retrieve', methods=['GET']) def retrieve(): username = request.args.get('username') if not username: return jsonify({'error': 'Missing username'}), 400 cache_key = f"user:{username}:info" encrypted = cache.get(cache_key) if not encrypted: return jsonify({'error': 'No data found'}), 404 # 此处调用解密(如果能解密的话) info = decrypt_data(encrypted) return jsonify({'username': username, 'secret_info': info}), 200 if __name__ == '__main__': app.run(debug=False)
审计实战
目标为某flask的cms,查找|safe
关键词发现在article.html中存在该关键词
{% if render_recommendations is defined %} {{ render_recommendations()|safe }} {% endif %}
在__init__.py
中声明render_recommendations上下文,查看render_recommendations()函数
def init_app(self, app): """初始化插件""" super().init_app(app) # 注册模板函数 def render_recommendations(): """渲染推荐模板""" template_path = os.path.join(os.path.dirname(__file__), 'templates', 'recommendations.html') if os.path.exists(template_path): with open(template_path, 'r', encoding='utf-8') as f: template = f.read() return Markup(render_template_string(template)) return '' # 直接添加到 Jinja2 环境 app.jinja_env.globals['render_recommendations'] = render_recommendations
这里Markup和|safe将recommendations.html内容添加到article.html,如下
<div class="bg-white dark:bg-gray-800 rounded-lg shadow-sm p-8 mt-12 mb-8"> <h3 class="text-xl font-bold mb-8 text-gray-900 dark:text-white flex items-center"> <svg fill="none" stroke="currentColor" viewBox="0 0 24 24"> <path stroke-linecap="round" stroke-linejoin="round" stroke- d="M13 10V3L4 14h7v7l9-11h-7z"></path> </svg> 相关推荐 </h3> <div class="mb-2"> <!-- 推荐内容将通过 JS 动态加载 --> </div> </div> <script src="https://www.freebuf.com/articles/web/{{ url_for('article_recommender_static', filename='js/recommendations.js') }}"></script>
js文件如下:
功能模块 | 作用 |
---|---|
EMPTY_STATE_HTML |
当无推荐文章时显示空状态 |
ERROR_STATE_HTML |
当加载失败时显示错误状态 |
renderArticleCard(article) |
生成文章卡片 HTML |
renderTags(tags) |
渲染标签(最多 2 个) |
loadRecommendations(articleId) |
请求并渲染推荐文章 |
DOMContentLoaded 事件 |
页面加载后自动获取推荐文章 |
function renderArticleCard(article) {
return `
<a href="https://www.freebuf.com/article/${article.id}"
class="group block bg-gray-50 dark:bg-gray-700 rounded-lg p-6 transition-all duration-200 hover:shadow-md hover:bg-gray-100 dark:hover:bg-gray-600">
<div class="space-y-4">
<h4 class="font-bold text-gray-900 dark:text-gray-100 group-hover:text-blue-600 dark:group-hover:text-blue-400 transition-colors duration-200 line-clamp-2">
${article.title}
</h4>
<p class="text-sm text-gray-600 dark:text-gray-300 line-clamp-2">
${article.summary}
</p>
<div class="flex items-center justify-between">
<span class="text-sm text-gray-500 dark:text-gray-400">
${article.category}
</span>
<div class="flex flex-wrap gap-2">
${renderTags(article.tags)}
</div>
</div>
</div>
</a>
`;
}
title、category、tags三处均存在存储型xss,提交恶意payload<img/src='https://www.freebuf.com/articles/web/1'/onerror=alert(0)>
后,在其他文章的相关推荐功能处渲染js代码完成弹窗
后记
Referer /Origin
在 HTTP 请求中,Referer
和Origin
是两个与请求来源相关的头部字段,但它们的用途、格式和安全特性有显著区别。以下是它们的核心差异:
1、定义与格式
字段 | 定义 | 格式示例 | 包含路径信息 |
---|---|---|---|
Referer | 表示当前请求的 来源页面完整 URL | https://example.com/page.html |
✅ 包含路径(如/page.html ) |
Origin | 表示当前请求的 协议 + 域名 + 端口(不包含路径) | https://example.com:8080 |
❌ 不包含路径 |
2、主要用途
字段 | 应用场景 | 典型用例 |
---|---|---|
Referer | - 统计流量来源 - 防止图片盗链(Hotlinking) - 分析用户行为 | 用户从pageA.html 点击链接跳转到pageB.html ,Referer 值为pageA.html |
Origin | - CORS(跨域资源共享)安全机制 - 限制跨域请求来源 | 浏览器发起跨域 AJAX 请求时,自动添加Origin 头供服务器验证 |
3、发送条件
字段 | 触发发送的请求类型 | 浏览器行为 |
---|---|---|
Referer | - 页面跳转(如点击) - 资源加载(如 , ``) - 表单提交(GET/POST) |
默认发送,但可能被浏览器策略(如Referrer-Policy )或用户设置阻止 |
Origin | - 跨域 AJAX(fetch /XMLHttpRequest ) -POST 请求(部分浏览器) - WebSocket 连接 |
仅在跨域请求或特定方法(如POST )时发送 |
事务执行失败
如果事务执行失败,数据库连接可能会卡住,影响其他查询。
try: db.session.add(new_user) db.session.commit() except Exception as e: db.session.rollback() # 事务回滚,防止数据库状态异常 print(f"Error: {e}")
reference
https://mp.weixin.qq.com/s/y1ta34MzowUnOvFnShk2MQ https://www.freebuf.com/news/168362.html https://blog.hackall.cn/cvesubmit/614.html https://www.freebuf.com/articles/web/404899.html https://blog.neargle.com/2016/07/25/log-of-simple-code-review-about-python-base-webapp/ https://www.cnblogs.com/xiaozi/p/7268506.html
4A评测 - 免责申明
本站提供的一切软件、教程和内容信息仅限用于学习和研究目的。
不得将上述内容用于商业或者非法用途,否则一切后果请用户自负。
本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑或手机中彻底删除上述内容。
如果您喜欢该程序,请支持正版,购买注册,得到更好的正版服务。如有侵权请邮件与我们联系处理。敬请谅解!
程序来源网络,不确保不包含木马病毒等危险内容,请在确保安全的情况下或使用虚拟机使用。
侵权违规投诉邮箱:4ablog168#gmail.com(#换成@)