From 31b1a7a10c70bee1124cb58fd7461e600e7dfcf8 Mon Sep 17 00:00:00 2001 From: PlexSheep Date: Thu, 27 Apr 2023 20:47:22 +0200 Subject: [PATCH] fragile automated extension attack --- plexcryptool/authur1.py | 117 ++++++++++++++++++++++++++-------------- 1 file changed, 77 insertions(+), 40 deletions(-) diff --git a/plexcryptool/authur1.py b/plexcryptool/authur1.py index 9a1f120..73bd20c 100755 --- a/plexcryptool/authur1.py +++ b/plexcryptool/authur1.py @@ -10,6 +10,7 @@ Since this (auth) hash did not have a name before, I gave it the name 'authur1' """ import argparse import random +import sys # FIXME make proper pyi Implementation for the rust part # only used for bit rotation @@ -21,6 +22,7 @@ from plexcryptool import binary SHIFT_LENGTH = 17 DEFINED_INITIAL = bytearray([0x52, 0x4f, 0x46, 0x4c]) PADDING = 0xff +EXT_TEST_STR = "EXTENSION ATTACK WORKS" # constants for Circular shifting # constant value defined in limits.h, it's 8 (bit) on my machine, on yours probably too. @@ -41,6 +43,11 @@ def inner_authur1(input: int) -> int: return output +def inner_authur1_optimized(input: int) -> int: + # plexcryptool.binary uses u32 for shifting + output: int = input ^ (binary.rotl32(input, SHIFT_LENGTH)) + return output + def authur1(input: bytearray, verbose: bool = False, first_state: bytearray = DEFINED_INITIAL, @@ -61,9 +68,7 @@ def authur1(input: bytearray, continue # else assert len(internal_buffer) == 4, "internal buffer of authur1 not 4 byte long" - accuint: int = int.from_bytes(accumulator) - accuint: int = inner_authur1(accuint ^ int.from_bytes(internal_buffer)) - accumulator: bytearray = bytearray(accuint.to_bytes(4)) + accumulator: bytearray = bytearray(inner_authur1_optimized(int.from_bytes(accumulator) ^ int.from_bytes(internal_buffer)).to_bytes(4)) internal_buffer.clear() assert len(internal_buffer) == 0 internal_buffer.append(in_byte) @@ -80,9 +85,7 @@ def authur1(input: bytearray, assert len(internal_buffer) == 4, "internal buffer of authur1 not 4 byte long" # same as above, one last time assert len(accumulator) == 4, "accumulator too long: %d bytes" % len(accumulator) - accuint: int = int.from_bytes(accumulator) - accuint: int = inner_authur1(accuint ^ int.from_bytes(internal_buffer)) - accumulator: bytearray = bytearray(accuint.to_bytes(4)) + accumulator: bytearray = bytearray(inner_authur1_optimized(int.from_bytes(accumulator) ^ int.from_bytes(internal_buffer)).to_bytes(4)) assert len(accumulator) == 4, "accumulator too long: %d bytes" % len(accumulator) if verbose: @@ -90,9 +93,28 @@ def authur1(input: bytearray, # now Q the accumulator and return # if input = "" this step breaks things, just remove it. if len(input) != 0 and not return_last_state: - accuint: int = int.from_bytes(accumulator) - accuint: int = inner_authur1(accuint) - accumulator: bytearray = bytearray(accuint.to_bytes(4)) + accumulator: bytearray = bytearray(inner_authur1(int.from_bytes(accumulator)).to_bytes(4)) + return accumulator + +def authur1_optimized(input: bytearray, first_state: bytearray = DEFINED_INITIAL) -> bytearray: + internal_buffer: bytearray = bytearray() + accumulator: bytearray = first_state + for in_byte in input: + if len(internal_buffer) < 4: + internal_buffer.append(in_byte) + continue + accumulator: bytearray = bytearray(inner_authur1_optimized(int.from_bytes(accumulator) ^ int.from_bytes(internal_buffer)).to_bytes(4)) + internal_buffer.clear() + internal_buffer.append(in_byte) + while len(internal_buffer) < 4: + internal_buffer.append(PADDING) + accumulator: bytearray = bytearray(inner_authur1_optimized(int.from_bytes(accumulator) ^ int.from_bytes(internal_buffer)).to_bytes(4)) + + # ignore the case of "" + #if len(input) != 0: + accuint: int = int.from_bytes(accumulator) + accuint: int = inner_authur1(accuint) + accumulator: bytearray = bytearray(accuint.to_bytes(4)) return accumulator def keyed_hash(message: bytearray, key: bytearray, verbose: bool = False, return_last_state: bool = False) -> bytearray: @@ -102,7 +124,7 @@ def keyed_hash(message: bytearray, key: bytearray, verbose: bool = False, return mic: bytearray = authur1(input, verbose, return_last_state=return_last_state) return mic -def extension_attack(valid_pairs: list): +def extension_attack(valid_pairs: list[tuple[bytearray, bytearray]]) -> tuple[bytearray,bytearray]: """ Extension attack against keyed hash of authur1 @@ -148,47 +170,58 @@ def extension_attack(valid_pairs: list): print("%s has length %s" % (msg, len(msg))) if len(msg) % 4 == 0: # we have a message of the right length! - target_pair = (msg, mic) + target_pair = (msg, mic, msg) else: # for now, just use the first one? + original_msg = msg while len(msg) % 4 != 0: msg.append(PADDING) - target_pair = (msg, mic) + target_pair = (msg, mic, original_msg) if target_pair is None: print("The given originals were not sufficient to perform an extension attack.\n"+ "We need a message, which has a length that is a multiple of 16 (Bytes).") - return + raise Exception("No fitting original value given") # now find the last internal state. # inner_authur1 cannot be reversed, so we need to brute force it # the key space is only 2**32, so it should be possible # we need to check everyone of these 2**32 against the valid hashes to find a working one - found_the_state = False KEY_SPACE = 2**32 - extension_msg = bytearray("ef".encode()) + # only the last block + last_block = target_pair[0][-4:] + extension_msg = last_block # given mic for "abcdef" 0f6b8802 - looking_for = bytearray(0x0f6b8802.to_bytes(4)) - last_internal_state = None + looking_for = target_pair[1] + last_internal_state = 0 print("=" * 120) + print("extension_msg for bruteforce: %s" % extension_msg) + print("looking for result: %s" % looking_for.hex()) print("Bruteforcing the internal state, this might take a while") print("=" * 120) for i in range(0, KEY_SPACE): - mic = authur1(extension_msg, False, bytearray(i.to_bytes(4))) - if found_the_state: - break - if not mic is None and mic == looking_for: - print("=" * 120) + mic = authur1_optimized(extension_msg, bytearray(i.to_bytes(4))) + if i % 0x1000 == 0: + msg = "state %08x | hash %s | search %s" % (i, mic.hex(), looking_for.hex()) + sys.stdout.write('%s\r' % msg) + sys.stdout.flush() + if mic == looking_for: + print("\n" + "=" * 120) print("FOUND THE THING") - found_the_state = True last_internal_state = i - print("IT IS %s" % hex(last_internal_state)) + break + print("IT IS %08x" % last_internal_state) + print("=" * 120) + forged_input: bytearray = bytearray(b'HELLO') + print("Trying to forge a mic for an extended version with input:\n%08x\n(%s)" % ( + int.from_bytes(forged_input), + forged_input.decode(errors="replace") + )) + print("=" * 120) - extension_text = "EXTENSION ATTACK" - - hacked_mic = authur1(bytearray(extension_text.encode()), True, bytearray(last_internal_state.to_bytes(4))) + hacked_mic = authur1_optimized(forged_input, first_state=bytearray(last_internal_state.to_bytes(4))) print("Hacked MIC: %s" % hacked_mic.hex()) - + return (forged_input, hacked_mic) def test(): init: int = int.from_bytes(DEFINED_INITIAL) @@ -225,20 +258,24 @@ def test_extension_attack(): """ Test the attack against a known last state """ - key = bytearray(0x133773310000.to_bytes(16)) - message = bytearray("1234".encode()) - ext_message = bytearray("EXT".encode()) + # this key produces a mic that is fast to bruteforce with my code + # (in combination with "AAAAaa" as input ofc) + key = bytearray(0x289488ae6d71c82da1502c0130ec04e0.to_bytes(16)) + message = bytearray(b'AAAAaa') # we need to bruteforce this, skip for the test - mic = keyed_hash(message, key) - last_internal_state = keyed_hash(message, key, return_last_state=True) - forged_mic = authur1(ext_message, False, last_internal_state) - message.extend(ext_message) - validated_mic = keyed_hash(message, key) - assert validated_mic == forged_mic, "forged mic\n%sis not valid\n%s" % ( forged_mic.hex(), validated_mic.hex() ) - print("Manual extension attack with known last internal state works (%s == %s)" % (forged_mic.hex(), validated_mic.hex())) + mic = keyed_hash(message, key, True) + print("testing against mic: %s" % mic.hex()) + #last_internal_state = keyed_hash(message, key, return_last_state=True) + forged_data = extension_attack([(message, mic)]) + print("extension attack returned the following: %s,%s" % (forged_data[0].decode(errors="replace"), forged_data[1].hex())) + print("=" * 120) + message = message[:4] + message.extend(forged_data[0]) + print("generating real mic for %x (%s)" % (int.from_bytes(message), message.decode(errors="replace"))) + validated_mic = keyed_hash(message, key, True) + assert validated_mic == forged_data[1], "forged mic %s is not valid. (%s)" % ( forged_data[1].hex(), validated_mic.hex() ) + print("Manual extension attack with known last internal state works (%s == %s)" % (forged_data[1].hex(), validated_mic.hex())) - - def main(): parser = argparse.ArgumentParser(prog="authur1", description='Implementation and attack for the custom authur1 hash. Don\'t actually use this hash!') parser.add_argument('-i', '--hash', type=str, metavar="MSG",