CTFZone2018-Signature server

2018-07-24  本文已影响0人  Robin_Tan

翻译自:https://github.com/p4-team/ctf/tree/master/2018-07-21-ctfzone-quals/crypto_signature

给了server.py的代码:

#!/usr/bin/python
import sys
import hashlib
import logging
import SocketServer
import base64
from flag import secret
from checksum_gen import WinternizChecksum


logger = logging.getLogger()
logger.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
logger.addHandler(ch)


HASH_LENGTH=32
CHECKSUM_LENGTH=4
MESSAGE_LENGTH=32
CHANGED_MESSAGE_LENGTH=MESSAGE_LENGTH+CHECKSUM_LENGTH
BITS_PER_BYTE=8
show_flag_command="show flag"+(MESSAGE_LENGTH-9)*"\xff"
admin_command="su admin"+(MESSAGE_LENGTH-8)*"\x00"
PORT = 1337

def extend_signature_key(initial_key):
  full_sign_key=str(initial_key)
  for i in range(0,255):
    for j in range(0,CHANGED_MESSAGE_LENGTH):
      full_sign_key+=hashlib.sha256(full_sign_key[j*HASH_LENGTH+i*CHANGED_MESSAGE_LENGTH*HASH_LENGTH:(j+1)*HASH_LENGTH+i*CHANGED_MESSAGE_LENGTH*HASH_LENGTH]).digest()
  return full_sign_key
class Signer:
  
  def __init__(self):
    with open("/dev/urandom","rb") as f:
      self.signkey=f.read(HASH_LENGTH*CHANGED_MESSAGE_LENGTH)
    self.full_sign_key=extend_signature_key(self.signkey)
    self.wc=WinternizChecksum()
    self.user_is_admin=False

  def sign_byte(self,a,ind):
    assert(0<=a<=255)
    signature=self.full_sign_key[(CHANGED_MESSAGE_LENGTH*a+ind)*HASH_LENGTH:(CHANGED_MESSAGE_LENGTH*a+ind+1)*HASH_LENGTH]
    return signature

  def sign(self,data):
    decoded_data=base64.b64decode(data)
    if len(decoded_data)>MESSAGE_LENGTH:
      return "Error: message too large"
    if decoded_data==show_flag_command or decoded_data==admin_command:
      return "Error: nice try, punk"
    decoded_data+=(MESSAGE_LENGTH-len(decoded_data))*"\xff"
    decoded_data+=self.wc.generate(decoded_data)
    signature=""
    for i in range(0, CHANGED_MESSAGE_LENGTH):
      signature+=self.sign_byte(ord(decoded_data[i]),i)
    return base64.b64encode(decoded_data)+','+base64.b64encode(signature)
  
  def execute_command(self,data_sig):
    (data_with_checksum, signature)=map(base64.b64decode,data_sig.split(','))
    data=data_with_checksum[:MESSAGE_LENGTH]
    data_checksummed=data+self.wc.generate(data)
    if data_checksummed!=data_with_checksum:
      return "Error: wrong checksum!"
    signature_for_comparison=""
    for i in range(0, CHANGED_MESSAGE_LENGTH):
      signature_for_comparison+=self.sign_byte(ord(data_with_checksum[i]),i)
    if signature!=signature_for_comparison:
      return "Error: wrong signature!"
    if data==admin_command:
      self.user_is_admin=True
      return "Hello, admin"
    if data==show_flag_command:
      if self.user_is_admin:
        return "The flag is %s"%secret
      else:
        return "Only admin can get the flag\n"
    else:
      return "Unknown command\n"
def process(data,signer):
  [query,params]=data.split(':')
  params=params.rstrip("\n")
  if query=="hello":
    return "Hi"
  elif query=="sign":
    return signer.sign(params)
  elif query=="execute_command":
    return signer.execute_command(params)
  else:
    return "bad query"

class ThreadedTCPRequestHandler(SocketServer.BaseRequestHandler):
  
  def handle(self):
    signer=Signer()
    logger.info("%s client sconnected" % self.client_address[0])
    self.request.sendall("Welcome to the Tiny Signature Server!\nYou can sign any messages except for controlled ones\n")
    while True:
      data = self.request.recv(2048)
      try:
        ret = process(data,signer)
      except Exception:
        ret = 'Error'
      try:
        self.request.sendall(ret + '\n')
      except Exception:
        break

  def finish(self):
    logger.info("%s client disconnected" % self.client_address[0])


class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
  pass

if __name__ == '__main__':
  server = ThreadedTCPServer(('0.0.0.0', PORT), ThreadedTCPRequestHandler)
  server.allow_reuse_address = True
  server.serve_forever()

根据代码,这个服务器有三个功能

有两条比较重要的命令,切换为admin以及请求flag。这两条指令具体是:

show_flag_command = "show flag" + (MESSAGE_LENGTH - 9) * "\xff"
admin_command = "su admin" + (MESSAGE_LENGTH - 8) * "\x00"

我们可以具体看下签名是如何生成的:

def sign(self, data):
    decoded_data = base64.b64decode(data)
    if len(decoded_data) > MESSAGE_LENGTH:
        return "Error: message too large"
    if decoded_data == show_flag_command or decoded_data == admin_command:
        return "Error: nice try, punk"
    decoded_data += (MESSAGE_LENGTH - len(decoded_data)) * "\xff"
    decoded_data += self.wc_generate(decoded_data)
    signature = ""
    for i in range(0, CHANGED_MESSAGE_LENGTH):
        signature += self.sign_byte(ord(decoded_data[i]), i)
    return base64.b64encode(decoded_data) + ',' + base64.b64encode(signature)

需要注意的是对敏感命令的检查是在padding之前发生的,这也就意味着我们可以直接发送show flag字符串,这将会通过检查,服务器会对其进行padding 并 签名。我们可以比较简单的获得签了名的show flag指令。
比较难获得的是切换为admin身份的指令,因为其padding为\x00,因此没有办法像上面一样绕过检查。
因此我们只需要伪造对该条指令的签名。
当我们深入签名生成算法,我们可以观察到两点:

此外服务器会依次对checksum和签名进行检查,并告诉我们具体是哪步出错。

第一步需要伪造checksum,可以看到checksum的长度为4字节,如果要直接爆破 的 话2^32还是有点大。但如果我们尝试让服务器签名一些输入,并观察他的返回值,会发现checksum的后两个字节永远是\x00\x00,因此只需要爆破2个字节。
因此我们可以在我们想要执行的指令后加上穷举的两个字节,两个\x00 以及一些随机字节作为签名,并发送,如果服务器返回 incorrect signature 就说明了checksum 猜对了。
代码如下:

def find_checkum_conflict(s, wanted_msg, signature):
    print("Looking for checksum conflict")
    for a in range(256):
        for b in range(256):
            forged = wanted_msg + chr(a) + chr(b) + "\x00\x00"
            result = execute_command(s, forged, signature)
            if 'wrong signature' in result:
                print('Found checksum conflict for', a, b)
                return a, b

第二部需要伪造正确的签名,在一开始的分析中,我们提到签名是逐字节生成的,这意味着如果我们发送admin_command,把器最后一个字节替换掉,我们将得到前31字节的正确签名。同时由于后两个字节恒为\x00,因此这两个字节的签名也是正确的。
这样,我们只缺中间3个字节的签名。由于checksum只有两个字节,而前面的message有32个字节,显然会有很多冲突,所以我们可以爆破找到一个输入和我们想要的命令有同样的checksum。同时我们可以让这些输入的结尾都为'\x00',这样当我们找到一个checksum冲突的时候,同时也获得了\x00 对应的签名。
代码实现如下:

def get_proper_signature(checksum_we_need, s, original_signature_chunks):
    print("Looking for signature suffix for conflicting checksum")
    i = 0
    while True:
        msg = long_to_bytes(i)
        pad = 32 - len(msg)
        msg = msg + ('a' * (pad - 1)) + "\x00"
        result = sign(s, msg)
        ext_msg, signature = map(base64.b64decode, result.split(","))
        if ext_msg[32:36] == checksum_we_need:
            forged_signature_chunks = chunk(signature, 32)
            return "".join(original_signature_chunks[:-5] + forged_signature_chunks[-5:])
        i += 1

这样我们就获得了正确的签名,即可执行两条指令,获得flag:

def main():
    url = "crypto-02.v7frkwrfyhsjtbpfcppnu.ctfz.one"
    port = 1337
    s = nc(url, port)
    receive_until_match(s, "You can sign any messages except for controlled ones")
    receive_until(s, "\n")
    msg = "show flag"
    show_flag_command = sign(s, msg)
    msg = "su admin" + (32 - 9) * "\x00"
    almost_admin_command = sign(s, msg)
    print(almost_admin_command)
    msg, signature = map(base64.b64decode, almost_admin_command.split(","))
    signature_chunks = chunk(signature, 32)
    wanted_msg = 'su admin\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
    a, b = find_checkum_conflict(s, wanted_msg, signature)
    checksum = chr(a) + chr(b) + "\x00\x00"
    forged_msg = wanted_msg + checksum
    signature = get_proper_signature(checksum, s, signature_chunks)
    print(execute_command(s, forged_msg, signature))
    send(s, 'execute_command:' + show_flag_command)
    interactive(s)


main()
上一篇下一篇

猜你喜欢

热点阅读