Package Libs :: Module libheap
[hide private]
[frames] | no frames]

Source Code for Module Libs.libheap

  1  #!/usr/bin/env python 
  2  """ 
  3  Immunity Heap API for Immunity Debugger 
  4   
  5  (c) Immunity, Inc. 2004-2006 
  6   
  7   
  8  U{Immunity Inc.<http://www.immunityinc.com>} Debugger Heap Library for python 
  9   
 10   
 11  """ 
 12   
 13  __VERSION__ = '1.3' 
 14   
 15  import immutils 
 16  import struct 
 17  import string 
 18  from UserList import UserList 
 19  HEAP_MAX_FREELIST = 0x80 
 20   
 21   
 22   
23 -class PHeap:
24 - def __init__(self, imm, heapddr = 0, restore = False):
25 """ 26 Windows 32 Heap Class 27 28 @rtype: PHEAP object 29 """ 30 self.imm = imm 31 self.address = heapddr 32 self.chunks = [] 33 self.restore = restore 34 self.Segments = [] 35 if heapddr: 36 self._grabHeap()
37 38 39
40 - def _grabHeap(self):
41 try: 42 heaps = self.imm.readMemory( self.address, 0x588 ) 43 except WindowsError, msg: 44 raise Exception, "Failed to get heap at address : 0x%08x" % heapaddr 45 46 index = 0x8 47 (self.Signature, self.Flags, self.ForceFlags, self.VirtualMemoryThreshold,\ 48 self.SegmentReserve, self.SegmentCommit, self.DeCommitFreeBlockThreshold, self.DeCommitTotalBlockThreshold,\ 49 self.TotalFreeSize, self.MaximumAllocationSize, self.ProcessHeapListIndex, self.HeaderValidateLength,\ 50 self.HeaderValidateCopy,self.NextAvailableTagIndex, self.MaximumTagIndex, self.TagEntries, \ 51 self.UCRSegments, self.UnusedUnCommittedRanges, self.AlignRound, self.AlignMask) =\ 52 struct.unpack("LLLLLLLLLLHHLHHLLLLL", heaps[ index : index + (0x50-8) ]) 53 54 index+= 0x50-8 55 self.VirtualAllocedBlock = struct.unpack("LL", heaps[ index : index + 8 ]) 56 index+=8 57 self._Segments = struct.unpack("L" * 64, heaps[ index: index+ 64*4 ]) 58 index+=64*4 59 self.FreeListInUseLong = struct.unpack("LLLL" , heaps[ index : index + 16 ]) 60 index+=16 61 (self.FreeListInUseTerminate,self.AllocatorBackTraceIndex) = struct.unpack("HH", heaps[ index : index + 4 ]) 62 index+=4 63 self.Reserved1= struct.unpack("LL", heaps[ index : index + 8 ]) 64 index+=8 65 self.PseudoTagEntries= struct.unpack("L", heaps[ index : index + 4]) 66 index+=4 67 self.FreeList=[] 68 69 # Getting the FreeList 70 for a in range(0, 128): 71 free_entry = [] 72 # Previous and Next Chunk of the head of the double linked list 73 (prev, next) = struct.unpack("LL", heaps[ index + a*8 : index + a*8 + 8 ]) 74 75 free_entry.append((self.address + index+ a * 8, prev, next)) 76 base_entry = self.address + index + a * 8 77 78 # Loop over the Double Linked List until next == to the begging of the list. 79 while next != base_entry: 80 tmp = next 81 try: 82 (prev,next) = struct.unpack("LL", self.imm.readMemory(next, 0x8)) 83 except: 84 break 85 86 free_entry.append( (tmp, prev,next) ) 87 88 self.FreeList.append(free_entry) 89 90 index+=256*4 91 (self.LockVariable, self.CommitRoutine, self.Lookaside, self.LookasideLockCount)=\ 92 struct.unpack("LLLL", heaps[index:index+16]) 93 94 # the first segment is the heap on the base address (the 2nd chunk) 95 #self.Segments. 96 for a in range(0, 64): 97 if self._Segments[a] == 0x0: 98 break 99 s = Segment( self.imm, self._Segments[a] ) 100 self.Segments.append( s ) 101 #imm.Log("Segment[%d]: 0x%08x" % (a, self.Segments[a])) 102 # BaseAddress 103 if self.restore: 104 self.getRestoredChunks( s.BaseAddress ) 105 else: 106 self.getChunks( s.BaseAddress ) 107 for idx in s.Pages: 108 self.imm.Log("> 0x%08x" % idx) 109 if self.restore: 110 self.getRestoredChunks( idx ) 111 else: 112 self.getChunks( idx )
113
114 - def printFreeListInUse(self, uselog=None):
115 """ 116 Print the Heap's FreeListInUse bitmask 117 118 @type uselog: Log Function 119 @param uselog: (Optional, Def: Log Window) Log function that display the information 120 """ 121 tbl= ["FreeListInUse %s %s"% (immutils.decimal2binary(self.FreeListInUseLong[0]), immutils.decimal2binary(self.FreeListInUseLong[1])),\ 122 " %s %s" % (immutils.decimal2binary(self.FreeListInUseLong[2]), immutils.decimal2binary(self.FreeListInUseLong[3]))] 123 if uselog: 124 for a in tbl: 125 uselog(a) 126 return tbl
127
128 - def printFreeList(self, uselog = None):
129 """ 130 Print the Heap's FreeList 131 132 @type uselog: Log Function 133 @param uselog: (Optional, Def: Log Window) Log function that display the information 134 """ 135 log = self.imm.Log 136 if uselog: 137 log = uselog 138 for a in range(0, 128): 139 entry= self.FreeList[a] 140 e=entry[0] 141 142 log("[%03x] 0x%08x -> [ 0x%08x | 0x%08x ] " % (a, e[0], e[1], e[2]), address = e[0]) 143 for e in entry[1:]: 144 try: 145 sz = self.get_chunk( e[0] - 8 ).size 146 except: 147 sz = 0 148 log(" 0x%08x -> [ 0x%08x | 0x%08x ] (%08x)" % (e[0], e[1], e[2], sz), address= e[0]) 149 return 0x0
150 151 # Get Chunnks restored
152 - def getRestoredChunks(self, address):
153 """ 154 Enumerate Chunks of the current heap using a restore heap 155 156 @type address: DWORD 157 @param address: Address where to start getting chunks 158 159 @rtype: List of win32heapchunks 160 @return: Chunks 161 """ 162 163 imm = self.imm 164 165 oldheap = imm.getKnowledge("saved_heap_%08x" % self.address) #retriving the heap 166 if not oldheap: 167 imm.Log("Coudln't use restore mode: No saved Heap") 168 return self.getChunks(address) 169 170 ptr = address 171 # null chunk 172 backchunk = self.get_chunk(imm, ptr, self.address) 173 174 backchunk.size = backchunk.psize 175 backchunk.usize = backchunk.upsize 176 177 while 1: 178 179 try: 180 c = self.get_chunk(imm, ptr, self.address) 181 except: 182 return self.chunks 183 184 #ptr+= c.size * 8 185 next = ptr + c.usize 186 187 try: 188 sizes = imm.readLong( next ) 189 previous = (sizes>>16) & 0xffff 190 except Exception: 191 previous = 0 # unable to read 192 193 # When to restore? 194 # o Chunk size is zero 195 # o Chunk previous size is zero 196 # o When Size is different from next chunk previous size 197 # o Next chunk previous size is zero (means, readLong fails) and the chunk is not a top chunk 198 # o When the size of the backward chunk is different for the chunk Size 199 if (not c.size) or (c.size != previous and not c.istop()) or (not previous and not c.istop()) or (backchunk.size != c.psize) : 200 restoredchunk = oldheap.findChunkByAddress(ptr) 201 202 if restoredchunk: 203 c = restoredchunk 204 c.setRestored() 205 next = ptr + c.usize 206 ptr = next 207 self.chunks.append(c) 208 backchunk = c 209 210 211 if c.istop() or c.size == 0: 212 break 213 214 backchunk = c 215 216 return self.chunks
217
218 - def findChunkByAddress(self, addr):
219 """ 220 Find a Chunks by its address 221 222 @type address: DWORD 223 @param address: Address to search for 224 225 @rtype: win32heapchunks 226 @return: Chunk 227 """ 228 229 for a in self.chunks: 230 if a.addr == addr: 231 return a 232 return None
233
234 - def getChunks(self, address, size = 0xffffffffL):
235 """ 236 Enumerate Chunks of the current heap 237 238 @type address: DWORD 239 @param address: Address where to start getting chunks 240 241 @type size: DWORD 242 @param size: (Optional, Def: All) Amount of chunks 243 244 @rtype: List of win32heapchunks 245 @return: Chunks 246 """ 247 imm = self.imm 248 249 ptr = address 250 251 while size: 252 253 try: 254 c = self.get_chunk( ptr ) 255 except Exception, msg: 256 imm.Log("Failed to grab chunks> " + str(msg) ) 257 return self.chunks 258 259 self.chunks.append(c) 260 261 #c.printchunk() 262 ptr+= c.usize 263 if c.istop() or c.size == 0: 264 break 265 size -= 1 266 267 return self.chunks
268
269 - def get_chunk(self, addr):
270 return win32heapchunk(self.imm, addr, self)
271
272 -class Segment:
273 - def __init__(self, imm, addr):
274 self.address = addr 275 addr += 8 # AVOID THE ENTRY ITSELF 276 mem = imm.readMemory(addr, 0x34) 277 278 (self.Signature, self.Flags, self.Heap, self.LargestUnCommitedRange, self.BaseAddress,\ 279 self.NumberOfPages, self.FirstEntry, self.LastValidEntry, self.NumberOfUnCommittedPages,\ 280 self.NumberOfUnCommittedRanges, self.UnCommittedRanges, self.AllocatorBackTraceIndex,\ 281 self.Reserved, self.LastEntryInSegment) = struct.unpack("LLLLLLLLLLLHHL", mem) 282 #imm.Log("SEGMENT: 0x%08x Sig: %x" % (self.address, self.Signature), address = self.address ) 283 #imm.Log("Heap: %08x LargetUncommit %08x Base: %08x" % (self.Heap, self.LargestUnCommitedRange, self.BaseAddress)) 284 #imm.Log("NumberOfPages %08x FirstEntry: %08x LastValid: %08x" % (self.NumberOfPages, self.FirstEntry, self.LastValidEntry)) 285 #imm.Log("Uncommited: %08x" % self.UnCommittedRanges) 286 self.Pages = [] 287 if self.UnCommittedRanges: 288 i = 0 289 addr = self.UnCommittedRanges 290 while addr != 0: 291 mem = imm.readMemory( addr, 0x10 ) 292 ( C_Next, C_Addr, C_Size, C_Filler) = struct.unpack( "LLLL", mem ) 293 #imm.Log( ">> Memory: 0x%08x Address: 0x%08x (a: %08x) Size: %x" % ( addr, C_Next, C_Addr,C_Size) ) 294 self.Pages.append( C_Addr + C_Size ) 295 addr = C_Next
296
297 -class VistaPHeap(PHeap):
298 - def __init__(self, imm, heapddr = 0, restore = False):
299 PHeap.__init__(self, imm, heapddr, restore)
300
301 - def _grabHeap(self):
302 try: 303 heapmem = self.imm.readMemory( self.address + 8 , 0x120 ) 304 except WindowsError, msg: 305 raise Exception, "Failed to get heap at address : 0x%08x" % heapaddr 306 index = 8 307 (self.SegmentSignature, self.SegmentFlags, self.SegmentListEntry_Flink, self.SegmentListEntry_Blink, self.Heap, self.BaseAddress, self.NumberOfPages, self.FirstEntry, self.LastValidEntry, self.NumberofUncommitedPages, self.NumberofUncommitedRanges, self.SegmentAllocatorBackTraceIndex, self.Reserved, self.UCRSegmentList_Flink, self.UCRSegmentList_Blink, self.Flags, self.ForceFlags, self.CompatibilityFlags, self.EncodeFlagMask, self.EncodingKey, self.EncodingKey2, self.PointerKey, self.Interceptor_debug, self.VirtualMemoryThreshold, self.Signature, self.SegmentReserve, self.SegmentCommit, self.DeCommitThresholdBlock, self.DeCommitThresholdTotal, self.TotalFreeSize, self.MaxAllocationSize, self.ProcessHeapsListIndex, self.HeaderValidateLength, self.HeaderValidateCopy, self.NextAvailableTagIndex, self.MaximumTagIndex, self.TagEntries, self.UCRList_Flink, self.UCRList_Blink, self.AlignRound, self.AlignMask, self.VirtualAlloc_Flink, self.VirtualAlloc_Blink, self.SegmentList_Flink, self.SegmentList_Blink, self.AllocatorBackTraceIndex, self.NonDedicatedListLenght, self.BlocksIndex, self.UCRIndex, self.PseudoTagEntries, self.FreeList_Flink, self.FreeList_Blink, self.LockVariable, self.CommitRoutine, self.FrontEndHeap, self.FrontHeapLockCount, self.FrontEndHeapType, self.TotalMemoryReserved, self.TotalMemoryCommited, self.TotalMemoryLargeUCR, self.TotalSizeInVirtualBlocks, self.TotalSegments, self.TotalUCRs, self.CommitOps, self.DecommitOps, self.LockAcquires, self.LockCollisions, self.CommitRate, self.DeCommitRate, self.CommitFailures, self.InBlockCommitFailures, self.CompactHeapCalls, self.CompactedUCRs, self.InBlockDecommits, self.InBlockDecommitSize, self.TunningParameters) = struct.unpack("L" * 11 + "HH" + "L" *18 + "HHLHH" + "L" * 19 + "HH" + "L" * 19, heapmem) 308 # XXX: TODO Loop over the Segments 309 self.imm.Log("FreeList: 0x%08x | 0x%08x" % (self.FreeList_Flink, self.FreeList_Blink) ) 310 head = self.address +0x10 311 addr = self.SegmentList_Blink 312 self.Segments.append( self.address ) 313 self.getChunks( self.address ) 314 self.imm.Log("segment: 0x%08x 0x%08x" % (self.SegmentList_Flink, self.SegmentList_Blink) ) 315 while head != addr: 316 self.Segments.append( addr - 0x10 ) 317 self.getChunks( addr - 0x10 ) 318 addr = self.imm.readLong( addr ) 319 320 #self.FreeList_Flink 321 322 self.getBlocks( self.BlocksIndex ) 323 if self.FrontEndHeap: 324 self.LFH = LFHeap( self.imm, self.FrontEndHeap )
325
326 - def getBlocks(self, startaddr):
327 self.blocks = [] 328 addr = startaddr 329 330 while addr: 331 block = Blocks( self.imm, addr ) 332 self.blocks.append( block ) 333 block.FreeList=[] 334 memory = self.imm.readMemory( block.Buckets, 0x80*8 ) 335 if block.FreeListInUsePtr: 336 block.setFreeListInUse( struct.unpack("LLLL", self.imm.readMemory( block.FreeListInUsePtr, 4*4 )) ) 337 338 # Getting the FreeList 339 for a in range(0, 128): 340 free_entry = [] 341 # Previous and Next Chunk of the head of the double linked list 342 (fwlink, heap_bucket) = struct.unpack("LL", memory[a *8 : a *8 + 8] ) 343 if fwlink: 344 try: 345 (next, prev) = struct.unpack("LL", self.imm.readMemory( fwlink, 8) ) 346 except: 347 next, prev = (0,0) 348 self.imm.Log("Error with 0x%x" % fwlink) 349 free_entry.append( (fwlink,