Ero Carrera (ero) <ero carrera gmail com> |
Friday, July 6 2007 03:59.00 CDT |
Recently Phantal (aka Brian) left some comments on my blog and in OpenRCE on some calculations he did following up on my post Scanning data for entropy anomalies.
He develops the algorithm aiming at improving the execution speed of the entropy "scanner" example I had shown. I ran through his steps and arrived to the same conclusions he did on his latest comment. I just thought itd be worth showing his work as a separate post rather than just a comment.
His idea is, by looking into the standard definition of entropy , to isolate all that doesnt change in the expression when the window slides and just update the entropy, instead of blindly recalculating its value from scratch for each offset of the scan window.
Shannon s entropy, usually represented as H, takes the following form if we work with the 256 possible byte values as the symbols :

where p(b) is the probability of the occurrence of a given byte.
H, the entropy, will tend towards its maximum value, 8, if the data has the maximum possible entropy. In such case the probability of each byte occurring would be the same which produces 
Note that, although this is usually thought of as measuring the "amount of randomness", it is not that much the case. A sequence of bytes starting at 0 and increasing until 255 going through all the values in order would reach the maximum entropy value 8, even that it is all but random.
The probability of a given byte appearing in our window can also be expressed as . being the number of times the byte appears within the window and the width of the window.
The expression for the entropy can be expanded as follows

The entropy after sliding the window, , will have the same sum expansion except for two terms, the ones of the bytes going out and entering the window. We can then just update those and recalculate the expression by first removing the old values for the incoming and outgoing bytes and then adding the new values for both, after updating their count.

and thats all. Now on to some implementations in Mathematica and Python (but creating a Mathematica function with Pythonika). His implementation in C can be found in the comments of the previous post.
EntropyScan = Function[{Data, WindowScanSize}, SummationTerm [Prob_] := If[Prob > 0, Prob Log[2, Prob], 0]; (* Get the initial chunk and calculate the entropy *) CurrentChunk = Data[[ Range[1, WindowScanSize] ]];
(* Calculate initial byte count and probabilities *) ByteCounts = Table[Count[CurrentChunk, i - 1], {i, 1, 256}]; ByteProbs = Table[ByteCounts[[i]]/WindowScanSize, {i, 1, 256}]; FilteredByteProbs = Select[ByteProbs, # > 0 &]; H = - Total[ Table[FilteredByteProbs[[i]] Log[2, FilteredByteProbs[[i]]], {i, 1, Length[FilteredByteProbs]}]]; Entropies = {H}; (* Slide the window and recalculate for incoming and outgoing bytes *) For[offset = 1, offset + WindowScanSize <= Length[Data], offset++, (* Get incoming and outgoing bytes *) ByteOut = Data[[offset]] + 1; ByteIn = Data[[offset + WindowScanSize]] + 1; (* Get the old probabilities *) OldValByteOut = SummationTerm[ByteProbs[[ByteOut]]]; OldValByteIn = SummationTerm[ByteProbs[[ByteIn]]]; (* Update counters and values *) ByteCounts[[ByteOut]]--; ByteCounts[[ByteIn]]++; ByteProbs[[ByteOut]] = ByteCounts[[ByteOut]]/WindowScanSize; ByteProbs[[ByteIn]] = ByteCounts[[ByteIn]]/WindowScanSize;
(* Get the new probabilities *) ValByteOut = SummationTerm[ByteProbs[[ByteOut]]]; ValByteIn = SummationTerm[ByteProbs[[ByteIn]]];
(* Update the entropy *) H = H + OldValByteOut + OldValByteIn - ValByteIn - ValByteOut; Entropies = Append[Entropies, H]; ]; Entropies ]; |
Py["import math"]
EntropyScanPython = PyFunction["< def entropy_scan(args): data = args[0] window_size = float(args[1]) summation_term = lambda p: p*math.log(p,2) if p>0 else 0 current_chunk = data[:int(window_size)] byte_counts = [ len(filter(lambda a:a==i, current_chunk)) for i in range(256)] byte_probs = [byte_counts[i]/window_size for i in range(256)] H = -sum( [byte_probs[i]*math.log(byte_probs[i], 2) for i in range(256) if byte_probs[i]>0]) entropies = [H] for offset in range(len(data)-window_size): byte_out, byte_in = data[offset], data[int(offset+window_size)] old_val_byte_out = summation_term(byte_probs[byte_out]) old_val_byte_in = summation_term(byte_probs[byte_in]) byte_counts[byte_out] -= 1; byte_counts[byte_in] += 1; byte_probs[byte_out] = byte_counts[byte_out]/window_size; byte_probs[byte_in] = byte_counts[byte_in]/window_size; val_byte_out = summation_term(byte_probs[byte_out]) val_byte_in = summation_term(byte_probs[byte_in]) H = H + old_val_byte_out + old_val_byte_in - val_byte_out - val_byte_in entropies.append(H) return entropies >"];
|
|