""" Process memory editor @version: 0.0.1 @copyright: 2005, Cody Brocious @license: LGPL @author: Cody Brocious @todo: Optimize searching where ever possible """ from ctypes import * from ctypes.wintypes import * psapi = windll.psapi kernel = windll.kernel32 user = windll.user32 PROCESS_ALL_ACCESS = 0x1F0FFF PROCESS_QUERY_INFORMATION = 0x0400 PROCESS_VM_READ = 0x0010 def EnumProcesses(): """ Enumerate processes and return a map of module names versus PIDs. @return: Dictionary in the format of Module : PID @rtype: C{dict} """ pidArray = (c_ulong * 256)() bytesReturned = c_ulong() hModule = c_ulong() count = byref(c_ulong()) modname = c_buffer(30) psapi.EnumProcesses(byref(pidArray), sizeof(pidArray), byref(bytesReturned)) pidArray = [i for i in pidArray][:bytesReturned.value/sizeof(c_ulong())] ret = {} for pid in pidArray: hProcess = kernel.OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid) if hProcess: psapi.EnumProcessModules(hProcess, byref(hModule), sizeof(hModule), count) psapi.GetModuleBaseNameA(hProcess, hModule.value, modname, sizeof(modname)) ret[''.join([ i for i in modname if i != '\x00'])] = pid for i in range(30): modname[i] = '\x00' kernel.CloseHandle(hProcess) return ret class Process: """ An active process. @ivar handle: Process handle. @type handle: C{object} @ivar cacheBlocks: Toggles block location caching. @type cacheBlocks: C{bool} @ivar blocks: Block cache @type blocks: C{list} """ def __init__(self, name=None, pid=None, cacheBlocks=True): """ Initializes a Process object. You must pass either a name or PID. @ivar name: The name of the process. @type name: C{str} @ivar pid: The PID of the process. @type pid: C{int} @ivar cacheBlocks: Cache block locations. @type cacheBlocks: C{bool} """ if name != None: proclist = EnumProcesses() if name in proclist: pid = proclist[name] else: pid = None if pid != None: self.handle = kernel.OpenProcess(PROCESS_ALL_ACCESS, False, pid) else: self.handle = None self.cacheBlocks = cacheBlocks self.blocks = [] def findAccessibleBlock(self, addr, size, granularity, useCache=True): """ Finds the largest block of accessible memory, both backward and forward from the address and size given. @ivar addr: Address to start at. @type addr: C{long} @ivar size: Size to start at. @type size: C{int} @ivar granularity: Granularity this block was found at. @type granularity: C{int} @ivar useCache: Take advantage of block caching. @type useCache: C{bool} @return: Tuple containing the address and size of the block. @rtype: (C{long}, C{int}) """ if self.cacheBlocks and useCache: for (s, e, bg) in self.blocks: if s <= addr and addr < e: return (s, e) bs = size if self.read(addr, bs + 1): bs += 0x1000000 while self.read(addr, bs): bs += 0x1000000 bs -= 0x1000000 bs += 0x100000 while self.read(addr, bs): bs += 0x100000 bs -= 0x100000 bs += 0x10000 while self.read(addr, bs): bs += 0x10000 bs -= 0x10000 bs += 0x1000 while self.read(addr, bs): bs += 0x1000 bs -= 0x1000 bs += 0x100 while self.read(addr, bs): bs += 0x100 bs -= 0x100 bs += 1 while self.read(addr, bs): bs += 1 bs -= 1 if self.read(addr - 1, bs): orig = addr addr -= 0x1000000 while self.read(addr, bs): addr -= 0x1000000 addr += 0x1000000 addr -= 0x100000 while self.read(addr, bs): addr -= 0x100000 addr += 0x100000 addr -= 0x10000 while self.read(addr, bs): addr -= 0x10000 addr += 0x10000 addr -= 0x1000 while self.read(addr, bs): addr -= 0x1000 addr += 0x1000 addr -= 0x100 while self.read(addr, bs): addr -= 0x100 addr += 0x100 addr -= 1 while self.read(addr, bs): addr -= 1 addr += 1 bs += orig - addr if self.cacheBlocks: self.blocks.append((addr, addr+bs, granularity)) return addr, bs def findAccessible(self, start=0, end=0x80000000L, size=1, granularity=(0x10000, 0x1000, 0x100, 1), useCache=True): """ Finds accessible blocks of memory in the range you specify. @ivar start: Address to start search in. @type start: C{long} @ivar end: Address to end at. @type end: C{long} @ivar size: Minimum size of the block. @type size: C{int} @ivar granularity: Granularity of the search. The higher you make this, the faster it will go, but you may miss blocks if it is above 1. The default will do two rough searches then one fine. This can be either a single granularity or a tuple of them. @type granularity: C{int}, C{tuple} @ivar useCache: Take advantage of block caching. @type useCache: C{bool} @return: Yields a tuple of addresses and sizes of blocks. @rtype: (C{long}, C{int}) """ if type(granularity) != list and type(granularity) != tuple: granularity = (granularity, ) recheck = start for g in granularity: print 'Searching at 0x%X' % g addr = start while addr < end: # print '%08X -- %i' % (addr, len(self.blocks)) # Uncomment to print addresses as they're searched if self.cacheBlocks and useCache and recheck <= addr: exists = False for (s, e, bg) in self.blocks: if s <= addr and addr < e: addr = e + 1 exists = True yield s, e-s break elif s > addr and bg <= g: addr = s + 1 exists = None break if exists or exists == None: continue for (s, e, bg) in self.blocks: if addr < s and bg <= g: recheck = s if self.read(addr, size): a, bs = self.findAccessibleBlock(addr, size, granularity=g, useCache=True) yield a, bs addr = a + bs else: addr += g def search(self, data, start=0, end=0x80000000L, granularity=(0x10000, 0x1000, 0x100, 1)): """ Searches for a given piece of data in the range you specify. @ivar data: Data to search for. @type data: C{str} @ivar start: Address to start search in. @type start: C{long} @ivar end: Address to end at. @type end: C{long} @ivar size: Minimum size of the block. @type size: C{int} @ivar granularity: Granularity of the search. The higher you make this, the faster it will go, but you may miss blocks if it is above 1. The default will do two rough searches then one fine. This can be either a single granularity or a tuple of them. @type granularity: C{int}, C{tuple} @return: Yields matching addresses. @rtype: C{long} """ for (addr, bs) in self.findAccessible(start=start, end=end, size=len(data), granularity=granularity): d = self.read(addr, bs) if d: s = 0 while True: off = d.find(data, s) if off == -1: break else: yield addr + off s = off + 1 def read(self, addr, len=1): """ Reads the specified number of bytes from the given address. @ivar addr: Address to read from. @type addr: C{long} @ivar len: Number of bytes to read. @type len: C{int} @return: None if no handle is available, False if it couldn't read, or a string containing the bytes read. @rtype: C{bool} or C{none} """ if not self.handle: return None buffer = create_string_buffer(len) bytesRead = c_ulong(0) if kernel.ReadProcessMemory(self.handle, addr, buffer, len, byref(bytesRead)): return buffer.raw else: return False def write(self, addr, val): """ Writes the specified data to the given address. @ivar addr: Address to write to. @type addr: C{long} @ivar val: Bytes to write. @type val: C{str} @return: None if no handle is available or boolean for whether it could or couldn't write. @rtype: C{bool} or C{none} """ if not self.handle: return None bytesWritten = c_ulong(0) if kernel.WriteProcessMemory(self.handle, addr, c_char_p(val), len(val), byref(bytesWritten)): return True else: return False def close(self): """ Closes the process handle. """ if self.handle: kernel.CloseHandle(self.handle) def loadCache(self, filename): """ Load block cache from disk. @ivar filename: Filename to load from. @type filename: C{str} """ self.blocks = [tuple([int(i) for i in block.split(' ')]) for block in file(filename).read().split('\n') if block] def saveCache(self, filename): """ Saves block cache to disk. @ivar filename: Filename to write to. @type filename: C{str} """ file(filename, 'w').write('\n'.join(['%i %i %i' % block for block in self.blocks])) def loadLibrary(self, dll): """ Loads a library into memory and returns its handle. @ivar dll: DLL to load. @type dll: C{str} @return: Library handle @rtype: C{class} """ return kernel.LoadLibraryA(dll) def getFunction(self, lib, name): """ Gets the address of a given function in a library. @ivar lib: Library handle. @type lib: C{class} @ivar name: Function name. @type name: C{str} @return: Function address @rtype: C{int} """ return kernel.GetProcAddress(lib, name) def setHook(self, lib, function, message): """ Sets a windows hook. @ivar lib: Library handle. @type lib: C{class} @ivar function: Function handle. @type function: C{class} @ivar message: Message to hook. @type message: C{int} @return: Hook handle. @rtype: C{int} """ return user.SetWindowsHookExA(message, function, lib, 0)