Thomas Pollet (xz) <thomas pollet gmail com> |
Saturday, December 16 2006 06:54.32 CST |
I keep myself idle lately coding an API Fuzzer.
The idea is simple:
-init:
-load program
-set hooks
-load fuzz testcases in address space
-fuzzing loop:
on api call entry:
first pass
-save process snapshot ( thread context, memory)
-get arguments
-monitor mem access
on api call return
-restore memory
-restore thread context (=> eip on api entry)
-fuzz api call arguments
e.g.:
main.cpp:
---------
extern "C" __declspec(dllimport) void myfun ( int * a,int b,int c);
void main(void)
{
int a = 6;
myfun(&a,7,10);
}
dll.cpp:
--------
#include <iostream>
using namespace std;
extern "C" __declspec(dllexport) void myfun(int * a,int b, int c)
{
cout << "a: " << *a << " | b: " << b << " | c: " << c << "\n";
}
compile and link:
$ cl -LD dll.cpp && cl main.cpp /LINK dll.lib
$ ./main
a: 6 | b: 7 | c: 10
this is the testcase:
hook myfun when called from main and fuzz the arguments...
$ python fuzztest.py | grep a: # grep to ommit debug prints
a: 6 | b: 7 | c: 10
a: 6 | b: 256 | c: 32
a: 6 | b: 4096 | c: 0
a: 6 | b: 64 | c: 16
great... as b and c are integers they are fuzzed in the 3 fuzzing loops.
To get some more interesting results we have to fuzz all the arguments, i.e. pointers. This appears to be more difficult than expected: I have to monitor which memory locations are
accessed, then fuzz those locations. I thought putting memory breakpoints on the stack would help do the trick but pydbg doesn't support stack memory breakpoints. I'm trying to work around this by setting PAGE_NOACCESS on the stack pages and an access violation callback but this isn't finished yet. I do have a start for a wxpython gui:
That's awesome! "In memory" fuzzing is a concept I've been playing with for a bit but I never took it as far as creating a GUI module for PaiMei (which looks fantastic btw).
Are you detecting / handling issues like the target thread locking on a resource? Will you be releasing this as it matures? |
|
I'll release it when I get some usable functionality out of it. I don't have lots of spare time and this is taking more time than expected, I hope to finish v0.01 some day next year :) |
This really is great work xz. I've been thinking about doing this for a while now...
my thoughts were to build something like in v0.01 and then extend it into a distributed application.
I look forward to playing with the release - please keep us updated! |
to reproduce the example, below is part of the source for the fuzzer. (only integers are fuzzed, added some code to trace stack accesses which is useless to this point).
The hooking.py is in paimei/utils
#!c:\python24\python.exe
#
#PyFuzz
#Copyright (C) 2006 Thomas Pollet <[email protected]>
#
#
from pydbg import *
from hooking import *
from pydbg import *
from random import *
from pydbg.defines import *
from pydbg.memory_snapshot_block import *
from pydbg import pydasm as pydasm
#numbers used for fuzzing
fnums=[ 0x100000000,
0xffffffff,
0x80000000,
0x40000000,
0x20000000,
0x10000000,
0x01000000,
0x00100000,
0x00010000,
0x00001000,
0x00000100,
0x00000010,
0x00000001,
0 ]
class fuzz_core(pydbg,pydbg_core):
'''
'''
def __init__(self,fuzzer=None,num_loops=0,trace_stack=True):
pydbg.__init__(self)
self.hook_container=hook_container()
self.func_args=[]
self.mem_blocks={}
self.num_loops=num_loops
self.trace_stack=trace_stack
self.stack_hits=[]
def hook(self,dll,func,num_args):
'''
Register which api to fuzz
'''
self.loop_num=0
self.dll=dll
self.func=func
self.arg_threshold=num_args
self.set_callback(EXCEPTION_BREAKPOINT, self.set_hooks)
def set_hooks (self,dbg):
'''
Here the hooks are set. The program needs to be running.
'''
if self.first_breakpoint:
try:
self.hookadd=pydbg.func_resolve_debuggee(self,self.dll,self.func)
self.hook_container.add(self,self.hookadd, len(self.func_args), self.hook_entry, self.hook_return)
self.ignore()
except:
#no hooks
pass
return DBG_CONTINUE
def ignore(self):
'''
KiFastSystemCall screws up beyond imagination, hooking solves it
Don't really understand, if you do, please enlighten me...
'''
kifadd=self.func_resolve("ntdll.dll","KiFastSystemCall")
self.hook_container.add(self,kifadd, 1 , lambda x,y:DBG_CONTINUE, lambda x,y,z:DBG_CONTINUE)
def hook_entry (self,dbg,args=0):
'''
hook_entry saves the process state
'''
if self.loop_num == 0 :
self.suspend_all_threads()
self.stack_top=self.context.Esp
self.get_args()
self.thread_context=self.get_thread_context(self.h_thread)
if self.trace_stack:
#set stack protections to noaccess to 'implement' memory breakpoints
self.stack_prot()
self.set_callback(EXCEPTION_ACCESS_VIOLATION, self.stackguard)
self.resume_all_threads()
pass
def hook_return (self,dbg,args,retval):
'''
hook_return resets the process state to be the same as before the api call
...ready for another round of fuzzing
'''
self.stack_prot(PAGE_READWRITE)
self.set_callback(EXCEPTION_SINGLE_STEP, self.ss)
if self.loop_num != self.num_loops :
self.loop_num += 1
self.set_thread_context(self.thread_context,self.h_thread)
self.fuzz_args()
else:
self.finish()
self.hook_container.remove(self,self.hookadd)
pass
def stackguard(self,dbg):
'''
The stack pages are set to PAGE_NOACCESS so the access violation exception handler
stackguard gets called if the api call needs info from the stack
'''
self.single_step(True)
self.set_callback(EXCEPTION_SINGLE_STEP, self.afterstack)
self.stack_prot()
self.set_callback(EXCEPTION_ACCESS_VIOLATION, self.fuzz_exception_handler_access_violation)
self.disasm(self.context.Eip)
#print "eip1: ", pydasm.get_instruction_string(self.instruction, pydasm.FORMAT_INTEL, 0)
if self.instruction.op2.type == pydasm.OPERAND_TYPE_MEMORY :
#If the operand is in memory, the address is computed from a segment register
#and any of the following values:
#a base register, an index register, a scaling factor, a displacement.
#BASE + (INDEX * SCALE) + DISPLACEMENT
ctx=self.context
regs=[ctx.Eax,ctx.Ecx,ctx.Edx,ctx.Ebx,ctx.Esp,ctx.Ebp,ctx.Esi,ctx.Edi,0]
op=self.instruction.op2
add=regs[op.basereg] + ( regs[op.indexreg] * op.scale ) + op.displacement
#we are only interested in mem accesses outside api stack frame
if add > self.stack_top :
self.stack_hits.append(add)
'''
print "address: %i eip: %x %s" % ( (regval(op.basereg) + ( regval(op.indexreg) * op.scale ) + op.displacement ), self.context.Eip,( pydasm.get_instruction_string(self.instruction, pydasm.FORMAT_INTEL, 0), self.context.Eip ))
'''
return DBG_CONTINUE
def afterstack(self,dbg):
'''
'''
#print "eip2: ", pydasm.get_instruction_string(self.instruction, pydasm.FORMAT_INTEL, 0)
self.set_callback(EXCEPTION_SINGLE_STEP, lambda x:DBG_CONTINUE )
self.exception_handler_single_step()
self.stack_prot()
self.set_callback(EXCEPTION_ACCESS_VIOLATION, self.stackguard)
self.set_callback(EXCEPTION_SINGLE_STEP, self.afterstack)
return DBG_CONTINUE
def fuzz_exception_handler_access_violation(self,dbg):
'''
'''
print "!!Access Violation !!\neip : 0x%x" % self.context.Eip
self.dump_context()
self.stack_prot()
def stack_prot(self,prot=None):
'''
stack_prot switches protections on stack pages
'''
stack_top=self.stack_top
page_size=0x1000
while stack_top <= self.stack_range()[0] :
mbi = self.virtual_query(stack_top)
mbi.Protect ^= PAGE_NOACCESS | PAGE_READWRITE
if prot :
mbi.Protect = prot
self.virtual_protect(mbi.BaseAddress, page_size, mbi.Protect)
stack_top += page_size
def ss(self,dbg):
'''
single step exception handler used when the api is analysed (after first loop)
default single step exception handler is reset
'''
self.set_callback(EXCEPTION_SINGLE_STEP, lambda x:DBG_CONTINUE )
self.exception_handler_single_step()
return DBG_CONTINUE
def finish(self):
'''
print info gathered
'''
for h in self.stack_hits:
print "stackhit : %x" % h
def get_args(self):
'''
save the api call arguments
'''
for i in range(self.arg_threshold):
arg=self.get_arg(i+1)
self.func_args.append(arg)
self.get_mem(arg)
if self.is_pointer(arg):
pass
def get_mem(self,arg):
'''
Save the memory blocks the pointer args are pointing to
'''
block=None
try:
mbi=self.virtual_query(arg)
base=mbi.BaseAddress
size=mbi.RegionSize
type=mbi.Type
block = self.read(base , size )
except:
pass
self.mem_blocks[arg]=block
return block
def fuzz_args(self):
'''
fuzzing routine: arguments are fuzzed
'''
wts=lambda x : chr((x & 0xff000000)>>24) + chr((x & 0xff0000)>>16) + \
chr((x & 0xff00)>>8) + chr(x & 0xff)
esp=self.context.Esp
for i in range(self.arg_threshold):
arg = self.func_args[i]
arg_add = esp + 4 * i
if self.is_pointer(arg):
print 'arg %i is a pointer: 0x%08x' % (i+1,arg)
print 'value : %s' % self.hex_dump(self.read(arg,4),arg)
else: #argument is a number, fuzz it
fuzzed=choice(fnums)
print "number: %s" % self.hex_dump(self.read(arg_add , 4 ),arg_add)
print 'fuzzed : 0x%08x ' % fuzzed
self.write(arg_add ,wts(fuzzed), 4 )
def is_pointer(self,arg):
'''
determine if a value is a pointer into rw memory
this function isn't fool proof, but it'll do for now
'''
try:
mbi = self.virtual_query(arg)
except:
return False
if not mbi.Protect & PAGE_READWRITE or arg > 0x70000000:
return False
return True
f=fuzz_core(num_loops=3,trace_stack=True)
f.load('main.exe')
f.hook('dll.dll','myfun',num_args=3)
f.run()
|
|