|
Package Libs ::
Module libheap
|
|
1
2 """
3 Immunity Heap API for Immunity Debugger
4
5 (c) Immunity, Inc. 2004-2006
6
7
8 U{Immunity Inc.<http://www.immunityinc.com>} Debugger Heap Library for python
9
10
11 """
12
13 __VERSION__ = '1.3'
14
15 import immutils
16 import struct
17 import string
18 from UserList import UserList
19 HEAP_MAX_FREELIST = 0x80
20
21
22
24 - def __init__(self, imm, heapddr = 0, restore = False):
25 """
26 Windows 32 Heap Class
27
28 @rtype: PHEAP object
29 """
30 self.imm = imm
31 self.address = heapddr
32 self.chunks = []
33 self.restore = restore
34 self.Segments = []
35 if heapddr:
36 self._grabHeap()
37
38
39
41 try:
42 heaps = self.imm.readMemory( self.address, 0x588 )
43 except WindowsError, msg:
44 raise Exception, "Failed to get heap at address : 0x%08x" % heapaddr
45
46 index = 0x8
47 (self.Signature, self.Flags, self.ForceFlags, self.VirtualMemoryThreshold,\
48 self.SegmentReserve, self.SegmentCommit, self.DeCommitFreeBlockThreshold, self.DeCommitTotalBlockThreshold,\
49 self.TotalFreeSize, self.MaximumAllocationSize, self.ProcessHeapListIndex, self.HeaderValidateLength,\
50 self.HeaderValidateCopy,self.NextAvailableTagIndex, self.MaximumTagIndex, self.TagEntries, \
51 self.UCRSegments, self.UnusedUnCommittedRanges, self.AlignRound, self.AlignMask) =\
52 struct.unpack("LLLLLLLLLLHHLHHLLLLL", heaps[ index : index + (0x50-8) ])
53
54 index+= 0x50-8
55 self.VirtualAllocedBlock = struct.unpack("LL", heaps[ index : index + 8 ])
56 index+=8
57 self._Segments = struct.unpack("L" * 64, heaps[ index: index+ 64*4 ])
58 index+=64*4
59 self.FreeListInUseLong = struct.unpack("LLLL" , heaps[ index : index + 16 ])
60 index+=16
61 (self.FreeListInUseTerminate,self.AllocatorBackTraceIndex) = struct.unpack("HH", heaps[ index : index + 4 ])
62 index+=4
63 self.Reserved1= struct.unpack("LL", heaps[ index : index + 8 ])
64 index+=8
65 self.PseudoTagEntries= struct.unpack("L", heaps[ index : index + 4])
66 index+=4
67 self.FreeList=[]
68
69
70 for a in range(0, 128):
71 free_entry = []
72
73 (prev, next) = struct.unpack("LL", heaps[ index + a*8 : index + a*8 + 8 ])
74
75 free_entry.append((self.address + index+ a * 8, prev, next))
76 base_entry = self.address + index + a * 8
77
78
79 while next != base_entry:
80 tmp = next
81 try:
82 (prev,next) = struct.unpack("LL", self.imm.readMemory(next, 0x8))
83 except:
84 break
85
86 free_entry.append( (tmp, prev,next) )
87
88 self.FreeList.append(free_entry)
89
90 index+=256*4
91 (self.LockVariable, self.CommitRoutine, self.Lookaside, self.LookasideLockCount)=\
92 struct.unpack("LLLL", heaps[index:index+16])
93
94
95
96 for a in range(0, 64):
97 if self._Segments[a] == 0x0:
98 break
99 s = Segment( self.imm, self._Segments[a] )
100 self.Segments.append( s )
101
102
103 if self.restore:
104 self.getRestoredChunks( s.BaseAddress )
105 else:
106 self.getChunks( s.BaseAddress )
107 for idx in s.Pages:
108 self.imm.Log("> 0x%08x" % idx)
109 if self.restore:
110 self.getRestoredChunks( idx )
111 else:
112 self.getChunks( idx )
113
115 """
116 Print the Heap's FreeListInUse bitmask
117
118 @type uselog: Log Function
119 @param uselog: (Optional, Def: Log Window) Log function that display the information
120 """
121 tbl= ["FreeListInUse %s %s"% (immutils.decimal2binary(self.FreeListInUseLong[0]), immutils.decimal2binary(self.FreeListInUseLong[1])),\
122 " %s %s" % (immutils.decimal2binary(self.FreeListInUseLong[2]), immutils.decimal2binary(self.FreeListInUseLong[3]))]
123 if uselog:
124 for a in tbl:
125 uselog(a)
126 return tbl
127
129 """
130 Print the Heap's FreeList
131
132 @type uselog: Log Function
133 @param uselog: (Optional, Def: Log Window) Log function that display the information
134 """
135 log = self.imm.Log
136 if uselog:
137 log = uselog
138 for a in range(0, 128):
139 entry= self.FreeList[a]
140 e=entry[0]
141
142 log("[%03x] 0x%08x -> [ 0x%08x | 0x%08x ] " % (a, e[0], e[1], e[2]), address = e[0])
143 for e in entry[1:]:
144 try:
145 sz = self.get_chunk( e[0] - 8 ).size
146 except:
147 sz = 0
148 log(" 0x%08x -> [ 0x%08x | 0x%08x ] (%08x)" % (e[0], e[1], e[2], sz), address= e[0])
149 return 0x0
150
151
153 """
154 Enumerate Chunks of the current heap using a restore heap
155
156 @type address: DWORD
157 @param address: Address where to start getting chunks
158
159 @rtype: List of win32heapchunks
160 @return: Chunks
161 """
162
163 imm = self.imm
164
165 oldheap = imm.getKnowledge("saved_heap_%08x" % self.address)
166 if not oldheap:
167 imm.Log("Coudln't use restore mode: No saved Heap")
168 return self.getChunks(address)
169
170 ptr = address
171
172 backchunk = self.get_chunk(imm, ptr, self.address)
173
174 backchunk.size = backchunk.psize
175 backchunk.usize = backchunk.upsize
176
177 while 1:
178
179 try:
180 c = self.get_chunk(imm, ptr, self.address)
181 except:
182 return self.chunks
183
184
185 next = ptr + c.usize
186
187 try:
188 sizes = imm.readLong( next )
189 previous = (sizes>>16) & 0xffff
190 except Exception:
191 previous = 0
192
193
194
195
196
197
198
199 if (not c.size) or (c.size != previous and not c.istop()) or (not previous and not c.istop()) or (backchunk.size != c.psize) :
200 restoredchunk = oldheap.findChunkByAddress(ptr)
201
202 if restoredchunk:
203 c = restoredchunk
204 c.setRestored()
205 next = ptr + c.usize
206 ptr = next
207 self.chunks.append(c)
208 backchunk = c
209
210
211 if c.istop() or c.size == 0:
212 break
213
214 backchunk = c
215
216 return self.chunks
217
219 """
220 Find a Chunks by its address
221
222 @type address: DWORD
223 @param address: Address to search for
224
225 @rtype: win32heapchunks
226 @return: Chunk
227 """
228
229 for a in self.chunks:
230 if a.addr == addr:
231 return a
232 return None
233
234 - def getChunks(self, address, size = 0xffffffffL):
235 """
236 Enumerate Chunks of the current heap
237
238 @type address: DWORD
239 @param address: Address where to start getting chunks
240
241 @type size: DWORD
242 @param size: (Optional, Def: All) Amount of chunks
243
244 @rtype: List of win32heapchunks
245 @return: Chunks
246 """
247 imm = self.imm
248
249 ptr = address
250
251 while size:
252
253 try:
254 c = self.get_chunk( ptr )
255 except Exception, msg:
256 imm.Log("Failed to grab chunks> " + str(msg) )
257 return self.chunks
258
259 self.chunks.append(c)
260
261
262 ptr+= c.usize
263 if c.istop() or c.size == 0:
264 break
265 size -= 1
266
267 return self.chunks
268
270 return win32heapchunk(self.imm, addr, self)
271
274 self.address = addr
275 addr += 8
276 mem = imm.readMemory(addr, 0x34)
277
278 (self.Signature, self.Flags, self.Heap, self.LargestUnCommitedRange, self.BaseAddress,\
279 self.NumberOfPages, self.FirstEntry, self.LastValidEntry, self.NumberOfUnCommittedPages,\
280 self.NumberOfUnCommittedRanges, self.UnCommittedRanges, self.AllocatorBackTraceIndex,\
281 self.Reserved, self.LastEntryInSegment) = struct.unpack("LLLLLLLLLLLHHL", mem)
282
283
284
285
286 self.Pages = []
287 if self.UnCommittedRanges:
288 i = 0
289 addr = self.UnCommittedRanges
290 while addr != 0:
291 mem = imm.readMemory( addr, 0x10 )
292 ( C_Next, C_Addr, C_Size, C_Filler) = struct.unpack( "LLLL", mem )
293
294 self.Pages.append( C_Addr + C_Size )
295 addr = C_Next
296
298 - def __init__(self, imm, heapddr = 0, restore = False):
300
302 try:
303 heapmem = self.imm.readMemory( self.address + 8 , 0x120 )
304 except WindowsError, msg:
305 raise Exception, "Failed to get heap at address : 0x%08x" % heapaddr
306 index = 8
307 (self.SegmentSignature, self.SegmentFlags, self.SegmentListEntry_Flink, self.SegmentListEntry_Blink, self.Heap, self.BaseAddress, self.NumberOfPages, self.FirstEntry, self.LastValidEntry, self.NumberofUncommitedPages, self.NumberofUncommitedRanges, self.SegmentAllocatorBackTraceIndex, self.Reserved, self.UCRSegmentList_Flink, self.UCRSegmentList_Blink, self.Flags, self.ForceFlags, self.CompatibilityFlags, self.EncodeFlagMask, self.EncodingKey, self.EncodingKey2, self.PointerKey, self.Interceptor_debug, self.VirtualMemoryThreshold, self.Signature, self.SegmentReserve, self.SegmentCommit, self.DeCommitThresholdBlock, self.DeCommitThresholdTotal, self.TotalFreeSize, self.MaxAllocationSize, self.ProcessHeapsListIndex, self.HeaderValidateLength, self.HeaderValidateCopy, self.NextAvailableTagIndex, self.MaximumTagIndex, self.TagEntries, self.UCRList_Flink, self.UCRList_Blink, self.AlignRound, self.AlignMask, self.VirtualAlloc_Flink, self.VirtualAlloc_Blink, self.SegmentList_Flink, self.SegmentList_Blink, self.AllocatorBackTraceIndex, self.NonDedicatedListLenght, self.BlocksIndex, self.UCRIndex, self.PseudoTagEntries, self.FreeList_Flink, self.FreeList_Blink, self.LockVariable, self.CommitRoutine, self.FrontEndHeap, self.FrontHeapLockCount, self.FrontEndHeapType, self.TotalMemoryReserved, self.TotalMemoryCommited, self.TotalMemoryLargeUCR, self.TotalSizeInVirtualBlocks, self.TotalSegments, self.TotalUCRs, self.CommitOps, self.DecommitOps, self.LockAcquires, self.LockCollisions, self.CommitRate, self.DeCommitRate, self.CommitFailures, self.InBlockCommitFailures, self.CompactHeapCalls, self.CompactedUCRs, self.InBlockDecommits, self.InBlockDecommitSize, self.TunningParameters) = struct.unpack("L" * 11 + "HH" + "L" *18 + "HHLHH" + "L" * 19 + "HH" + "L" * 19, heapmem)
308
309 self.imm.Log("FreeList: 0x%08x | 0x%08x" % (self.FreeList_Flink, self.FreeList_Blink) )
310 head = self.address +0x10
311 addr = self.SegmentList_Blink
312 self.Segments.append( self.address )
313 self.getChunks( self.address )
314 self.imm.Log("segment: 0x%08x 0x%08x" % (self.SegmentList_Flink, self.SegmentList_Blink) )
315 while head != addr:
316 self.Segments.append( addr - 0x10 )
317 self.getChunks( addr - 0x10 )
318 addr = self.imm.readLong( addr )
319
320
321
322 self.getBlocks( self.BlocksIndex )
323 if self.FrontEndHeap:
324 self.LFH = LFHeap( self.imm, self.FrontEndHeap )
325
327 self.blocks = []
328 addr = startaddr
329
330 while addr:
331 block = Blocks( self.imm, addr )
332 self.blocks.append( block )
333 block.FreeList=[]
334 memory = self.imm.readMemory( block.Buckets, 0x80*8 )
335 if block.FreeListInUsePtr:
336 block.setFreeListInUse( struct.unpack("LLLL", self.imm.readMemory( block.FreeListInUsePtr, 4*4 )) )
337
338
339 for a in range(0, 128):
340 free_entry = []
341
342 (fwlink, heap_bucket) = struct.unpack("LL", memory[a *8 : a *8 + 8] )
343 if fwlink:
344 try:
345 (next, prev) = struct.unpack("LL", self.imm.readMemory( fwlink, 8) )
346 except:
347 next, prev = (0,0)
348 self.imm.Log("Error with 0x%x" % fwlink)
349 free_entry.append( (fwlink,