fragile automated extension attack

This commit is contained in:
Christoph J. Scherr 2023-04-27 20:47:22 +02:00
parent ab47c1d3f9
commit 31b1a7a10c
Signed by: PlexSheep
GPG Key ID: 25B4ACF7D88186CC
1 changed files with 77 additions and 40 deletions

View File

@ -10,6 +10,7 @@ Since this (auth) hash did not have a name before, I gave it the name 'authur1'
""" """
import argparse import argparse
import random import random
import sys
# FIXME make proper pyi Implementation for the rust part # FIXME make proper pyi Implementation for the rust part
# only used for bit rotation # only used for bit rotation
@ -21,6 +22,7 @@ from plexcryptool import binary
SHIFT_LENGTH = 17 SHIFT_LENGTH = 17
DEFINED_INITIAL = bytearray([0x52, 0x4f, 0x46, 0x4c]) DEFINED_INITIAL = bytearray([0x52, 0x4f, 0x46, 0x4c])
PADDING = 0xff PADDING = 0xff
EXT_TEST_STR = "EXTENSION ATTACK WORKS"
# constants for Circular shifting # constants for Circular shifting
# constant value defined in limits.h, it's 8 (bit) on my machine, on yours probably too. # constant value defined in limits.h, it's 8 (bit) on my machine, on yours probably too.
@ -41,6 +43,11 @@ def inner_authur1(input: int) -> int:
return output return output
def inner_authur1_optimized(input: int) -> int:
# plexcryptool.binary uses u32 for shifting
output: int = input ^ (binary.rotl32(input, SHIFT_LENGTH))
return output
def authur1(input: bytearray, def authur1(input: bytearray,
verbose: bool = False, verbose: bool = False,
first_state: bytearray = DEFINED_INITIAL, first_state: bytearray = DEFINED_INITIAL,
@ -61,9 +68,7 @@ def authur1(input: bytearray,
continue continue
# else # else
assert len(internal_buffer) == 4, "internal buffer of authur1 not 4 byte long" assert len(internal_buffer) == 4, "internal buffer of authur1 not 4 byte long"
accuint: int = int.from_bytes(accumulator) accumulator: bytearray = bytearray(inner_authur1_optimized(int.from_bytes(accumulator) ^ int.from_bytes(internal_buffer)).to_bytes(4))
accuint: int = inner_authur1(accuint ^ int.from_bytes(internal_buffer))
accumulator: bytearray = bytearray(accuint.to_bytes(4))
internal_buffer.clear() internal_buffer.clear()
assert len(internal_buffer) == 0 assert len(internal_buffer) == 0
internal_buffer.append(in_byte) internal_buffer.append(in_byte)
@ -80,9 +85,7 @@ def authur1(input: bytearray,
assert len(internal_buffer) == 4, "internal buffer of authur1 not 4 byte long" assert len(internal_buffer) == 4, "internal buffer of authur1 not 4 byte long"
# same as above, one last time # same as above, one last time
assert len(accumulator) == 4, "accumulator too long: %d bytes" % len(accumulator) assert len(accumulator) == 4, "accumulator too long: %d bytes" % len(accumulator)
accuint: int = int.from_bytes(accumulator) accumulator: bytearray = bytearray(inner_authur1_optimized(int.from_bytes(accumulator) ^ int.from_bytes(internal_buffer)).to_bytes(4))
accuint: int = inner_authur1(accuint ^ int.from_bytes(internal_buffer))
accumulator: bytearray = bytearray(accuint.to_bytes(4))
assert len(accumulator) == 4, "accumulator too long: %d bytes" % len(accumulator) assert len(accumulator) == 4, "accumulator too long: %d bytes" % len(accumulator)
if verbose: if verbose:
@ -90,9 +93,28 @@ def authur1(input: bytearray,
# now Q the accumulator and return # now Q the accumulator and return
# if input = "" this step breaks things, just remove it. # if input = "" this step breaks things, just remove it.
if len(input) != 0 and not return_last_state: if len(input) != 0 and not return_last_state:
accuint: int = int.from_bytes(accumulator) accumulator: bytearray = bytearray(inner_authur1(int.from_bytes(accumulator)).to_bytes(4))
accuint: int = inner_authur1(accuint) return accumulator
accumulator: bytearray = bytearray(accuint.to_bytes(4))
def authur1_optimized(input: bytearray, first_state: bytearray = DEFINED_INITIAL) -> bytearray:
internal_buffer: bytearray = bytearray()
accumulator: bytearray = first_state
for in_byte in input:
if len(internal_buffer) < 4:
internal_buffer.append(in_byte)
continue
accumulator: bytearray = bytearray(inner_authur1_optimized(int.from_bytes(accumulator) ^ int.from_bytes(internal_buffer)).to_bytes(4))
internal_buffer.clear()
internal_buffer.append(in_byte)
while len(internal_buffer) < 4:
internal_buffer.append(PADDING)
accumulator: bytearray = bytearray(inner_authur1_optimized(int.from_bytes(accumulator) ^ int.from_bytes(internal_buffer)).to_bytes(4))
# ignore the case of ""
#if len(input) != 0:
accuint: int = int.from_bytes(accumulator)
accuint: int = inner_authur1(accuint)
accumulator: bytearray = bytearray(accuint.to_bytes(4))
return accumulator return accumulator
def keyed_hash(message: bytearray, key: bytearray, verbose: bool = False, return_last_state: bool = False) -> bytearray: def keyed_hash(message: bytearray, key: bytearray, verbose: bool = False, return_last_state: bool = False) -> bytearray:
@ -102,7 +124,7 @@ def keyed_hash(message: bytearray, key: bytearray, verbose: bool = False, return
mic: bytearray = authur1(input, verbose, return_last_state=return_last_state) mic: bytearray = authur1(input, verbose, return_last_state=return_last_state)
return mic return mic
def extension_attack(valid_pairs: list): def extension_attack(valid_pairs: list[tuple[bytearray, bytearray]]) -> tuple[bytearray,bytearray]:
""" """
Extension attack against keyed hash of authur1 Extension attack against keyed hash of authur1
@ -148,47 +170,58 @@ def extension_attack(valid_pairs: list):
print("%s has length %s" % (msg, len(msg))) print("%s has length %s" % (msg, len(msg)))
if len(msg) % 4 == 0: if len(msg) % 4 == 0:
# we have a message of the right length! # we have a message of the right length!
target_pair = (msg, mic) target_pair = (msg, mic, msg)
else: else:
# for now, just use the first one? # for now, just use the first one?
original_msg = msg
while len(msg) % 4 != 0: while len(msg) % 4 != 0:
msg.append(PADDING) msg.append(PADDING)
target_pair = (msg, mic) target_pair = (msg, mic, original_msg)
if target_pair is None: if target_pair is None:
print("The given originals were not sufficient to perform an extension attack.\n"+ print("The given originals were not sufficient to perform an extension attack.\n"+
"We need a message, which has a length that is a multiple of 16 (Bytes).") "We need a message, which has a length that is a multiple of 16 (Bytes).")
return raise Exception("No fitting original value given")
# now find the last internal state. # now find the last internal state.
# inner_authur1 cannot be reversed, so we need to brute force it # inner_authur1 cannot be reversed, so we need to brute force it
# the key space is only 2**32, so it should be possible # the key space is only 2**32, so it should be possible
# we need to check everyone of these 2**32 against the valid hashes to find a working one # we need to check everyone of these 2**32 against the valid hashes to find a working one
found_the_state = False
KEY_SPACE = 2**32 KEY_SPACE = 2**32
extension_msg = bytearray("ef".encode()) # only the last block
last_block = target_pair[0][-4:]
extension_msg = last_block
# given mic for "abcdef" 0f6b8802 # given mic for "abcdef" 0f6b8802
looking_for = bytearray(0x0f6b8802.to_bytes(4)) looking_for = target_pair[1]
last_internal_state = None last_internal_state = 0
print("=" * 120) print("=" * 120)
print("extension_msg for bruteforce: %s" % extension_msg)
print("looking for result: %s" % looking_for.hex())
print("Bruteforcing the internal state, this might take a while") print("Bruteforcing the internal state, this might take a while")
print("=" * 120) print("=" * 120)
for i in range(0, KEY_SPACE): for i in range(0, KEY_SPACE):
mic = authur1(extension_msg, False, bytearray(i.to_bytes(4))) mic = authur1_optimized(extension_msg, bytearray(i.to_bytes(4)))
if found_the_state: if i % 0x1000 == 0:
break msg = "state %08x | hash %s | search %s" % (i, mic.hex(), looking_for.hex())
if not mic is None and mic == looking_for: sys.stdout.write('%s\r' % msg)
print("=" * 120) sys.stdout.flush()
if mic == looking_for:
print("\n" + "=" * 120)
print("FOUND THE THING") print("FOUND THE THING")
found_the_state = True
last_internal_state = i last_internal_state = i
print("IT IS %s" % hex(last_internal_state)) break
print("IT IS %08x" % last_internal_state)
print("=" * 120)
forged_input: bytearray = bytearray(b'HELLO')
print("Trying to forge a mic for an extended version with input:\n%08x\n(%s)" % (
int.from_bytes(forged_input),
forged_input.decode(errors="replace")
))
print("=" * 120)
extension_text = "EXTENSION ATTACK" hacked_mic = authur1_optimized(forged_input, first_state=bytearray(last_internal_state.to_bytes(4)))
hacked_mic = authur1(bytearray(extension_text.encode()), True, bytearray(last_internal_state.to_bytes(4)))
print("Hacked MIC: %s" % hacked_mic.hex()) print("Hacked MIC: %s" % hacked_mic.hex())
return (forged_input, hacked_mic)
def test(): def test():
init: int = int.from_bytes(DEFINED_INITIAL) init: int = int.from_bytes(DEFINED_INITIAL)
@ -225,20 +258,24 @@ def test_extension_attack():
""" """
Test the attack against a known last state Test the attack against a known last state
""" """
key = bytearray(0x133773310000.to_bytes(16)) # this key produces a mic that is fast to bruteforce with my code
message = bytearray("1234".encode()) # (in combination with "AAAAaa" as input ofc)
ext_message = bytearray("EXT".encode()) key = bytearray(0x289488ae6d71c82da1502c0130ec04e0.to_bytes(16))
message = bytearray(b'AAAAaa')
# we need to bruteforce this, skip for the test # we need to bruteforce this, skip for the test
mic = keyed_hash(message, key) mic = keyed_hash(message, key, True)
last_internal_state = keyed_hash(message, key, return_last_state=True) print("testing against mic: %s" % mic.hex())
forged_mic = authur1(ext_message, False, last_internal_state) #last_internal_state = keyed_hash(message, key, return_last_state=True)
message.extend(ext_message) forged_data = extension_attack([(message, mic)])
validated_mic = keyed_hash(message, key) print("extension attack returned the following: %s,%s" % (forged_data[0].decode(errors="replace"), forged_data[1].hex()))
assert validated_mic == forged_mic, "forged mic\n%sis not valid\n%s" % ( forged_mic.hex(), validated_mic.hex() ) print("=" * 120)
print("Manual extension attack with known last internal state works (%s == %s)" % (forged_mic.hex(), validated_mic.hex())) message = message[:4]
message.extend(forged_data[0])
print("generating real mic for %x (%s)" % (int.from_bytes(message), message.decode(errors="replace")))
validated_mic = keyed_hash(message, key, True)
assert validated_mic == forged_data[1], "forged mic %s is not valid. (%s)" % ( forged_data[1].hex(), validated_mic.hex() )
print("Manual extension attack with known last internal state works (%s == %s)" % (forged_data[1].hex(), validated_mic.hex()))
def main(): def main():
parser = argparse.ArgumentParser(prog="authur1", description='Implementation and attack for the custom authur1 hash. Don\'t actually use this hash!') parser = argparse.ArgumentParser(prog="authur1", description='Implementation and attack for the custom authur1 hash. Don\'t actually use this hash!')
parser.add_argument('-i', '--hash', type=str, metavar="MSG", parser.add_argument('-i', '--hash', type=str, metavar="MSG",