Files
2025-09-29 00:52:08 +02:00

459 lines
14 KiB
Python
Executable File

# - - - - - - - - - - - - - - - - - - - - - -
# To Do
# 1. [DONE]Unpack on multiple threads AddJobsToQueue should be ok with this
# 2. ignore certian extensions ( is this really worth it?)
# 3. update op as we go along instead of in one go at the end
# - - - - - - - - - - - - - - - - - - - - - -
import sys, os, argparse, threading, queue, time, glob, subprocess, shutil
import cProfile, io, pstats
# - - - - - - - - - - - - - - - - - - - - - -
# program options, can be overwritten in command line
# - - - - - - - - - - - - - - - - - - - - - -
encoding = "ascii"
chunkSize = 1024*1024 #1 megabyte steps for now....
threadChunk = 8*1024*2014 #tasks split into 64 MB chunks
reportWidth = 32 #the amount of text around the finding listed
tempDir = "D:/temp_search/"
nThreads = 24
nUnpackThreads = 3
unpacktool = "%RS_TOOLSIRONLIB%/lib/RSG.Pipeline.RpfExtract.exe"
lstKeywords = [] #this will be set by a command line argument later on
lstInputFiles = [] #this will be set by a command line argument later on
taskQueue = queue.Queue()
resultQueue = queue.Queue()
allFileQueue = queue.Queue()
rpfQueue = queue.Queue() #stores all the rpfs we have to cnvert
# For storing reports
class CReport:
def __init__(self, filename, rpf, keyword, chunk, loc):
self.filename = filename
self.rpf = rpf
self.location = loc
self.keyword = keyword
self.string = chunk
def __str__(self):
if self.rpf != None:
return "\t{1}/{0}:\n\t\toffset:{2}\n\t\t...{3}...\n".format( self.filename[self.filename.rindex(".rpf")+4:], self.rpf, self.location, self.string )
else:
return "\t{0}:\n\t\toffset:{1}\n\t\t...{2}...\n".format( self.filename, self.location, self.string )
# Yeilds the bytes between start and start + length in a file in steps of step
def bytes_from_file(filename, start=0, length = 1024, step=1024):
if not os.path.isfile(filename):
return
with open(filename, "rb") as f:
f.seek(start, 0)
total_length = 0
while total_length < length:
chunk = f.read( step )
if chunk:
yield bytes( chunk )
else:
break
total_length = total_length + step
f.close()
#returns the contents of a file as a list where each new line denotes a different item in the list
def parse_file(filename):
if not os.path.isfile(filename):
return
with open(filename, "r") as f:
while True:
line = f.readline()
if line:
if "#" in line or "//" in line[:2] or len(line) == 0 or "\n" in line[:2]:
continue
if "\n" in line:
yield line[:-1]
else:
yield line
else:
break
f.close()
#todo evaluate the string as a literal
def create_list(str):
if "[" in str:
return []
else:
return [f for f in parse_file(str)]
#worker class, reads specific bytes from a file and then checks them for our strings
class ThreadReadFile(threading.Thread):
WAITING = 0
WORKING = 1
killAll = False
def __init__(self, queue, rqueue, namae):
threading.Thread.__init__(self, name=namae)
self.queue = queue
self.rqueue = rqueue
self.daemon = True
self.status = ThreadReadFile.WAITING
def run(self):
while True:
if ThreadReadFile.killAll: #we were told to quit
return;
try:
data = self.queue.get(block=False)
file, rpf, start, length = data
self.work(file, rpf, start, length)
except queue.Empty:
self.status = ThreadReadFile.WAITING
time.sleep(1.0)
except Exception as e:
print(str(e))
def work(self, file, rpf, start, length):
self.status = ThreadReadFile.WORKING
for chunk in bytes_from_file(file, start=start, length=length, step=chunkSize):
lchunk = chunk.lower()
#Get a list of the words we find
lstFound = [keyword for keyword in lstKeywords if bytes(keyword, encoding).lower() in lchunk]
for keyword in lstFound:
#It might be found multiple times within the chunk, lets just report them
#all
index = 0
while index < len(lchunk):
index = bytes(lchunk).find(bytes(keyword, encoding).lower(), index)
if index == -1:
break
self.rqueue.put(CReport(file, rpf, keyword, chunk[index-reportWidth:index+reportWidth], start + index) )
index += len(keyword)
#worker class, unpacks rpf files
class ThreadUnpackFile(threading.Thread):
WAITING = 0
WORKING = 1
killAll = False
def __init__(self, rpfQ, namae):
threading.Thread.__init__(self, name=namae)
self.iqueue = rpfQ #input comes from here, and some of the results go here too
#self.oqueue = fileQ #results go here
self.daemon = True
self.status = ThreadUnpackFile.WAITING
def run(self):
while True:
if ThreadUnpackFile.killAll: #we were told to quit
return;
try:
data = self.iqueue.get(block=False)
file, unpack_dir, rpf_parent = data
self.work(file, unpack_dir, rpf_parent)
except queue.Empty:
self.status = ThreadUnpackFile.WAITING
time.sleep(1.0)
except Exception as e:
print(str(e))
def work(self, file, unpack_dir, rpf_parent):
self.status = ThreadUnpackFile.WORKING
rpfFile = os.path.abspath(file)
unpackDir = os.path.join(unpack_dir,os.path.basename(file))
rpf_parent = os.path.join(rpf_parent, os.path.basename(file)).replace("\\","/") #for result reporting, so we can have a clear idea of what rpf it came from
if not os.path.exists(unpackDir):
os.makedirs(unpackDir)
subprocess.call([unpacktool, "--output", unpackDir, "--overwrite", file])
#add to the queue
lstUnpackedFiles = []
lstUnpackedRPFs = []
for dir,_,_ in os.walk(unpackDir):
#RPFs may be within RPFS
lstTemp = [f.replace("\\","/") for f in glob.glob(os.path.join(dir,"*.*")) if not os.path.isdir(f)]
lstUnpackedFiles.extend([f for f in lstTemp if not ".rpf" in f[-4:].lower()])
lstUnpackedRPFs.extend([(f, unpackDir, rpf_parent) for f in set(lstTemp) - set(lstUnpackedFiles)])
#add the normal files to the task queue
AddJobsToQueue( [(f, rpf_parent) for f in lstUnpackedFiles], longestKeyLength )
#for f in lstUnpackedFiles:
# self.oqueue.put(f)
for f in lstUnpackedRPFs:
self.iqueue.put(f)
#lstAllFiles.extend(lstUnpackedFiles)
#add the rpfs to the RPF list we are currently looping through
#lstAllRPFFiles.extend(lstUnpackedRPFs)
#idx += 1
#delete the rpf file if it is in the tempdir
if tempDir.lower() in file.lower():
os.remove( file )
def CheckPath(path):
lkw = [keyword for keyword in lstKeywords if keyword.lower() in path]
for kw in lkw:
resultQueue.put(CReport(path, None, kw, "[in file path]", 0) )
def AddJobsToQueue(lstFiles, keyLength):
# prework, setup jobs for threading
size = 0
for file, rpfFile in lstFiles:
absFile = os.path.abspath(file.replace("\\","/"))
size = os.path.getsize(absFile)
taskpos = 0;
while taskpos < size:
#longestKeyLength is on the off chance that the devide happens on a
#keyword, splitting it
taskQueue.put( ( absFile, rpfFile, max( 0, taskpos-keyLength ), threadChunk ) )
taskpos += threadChunk
size += os.path.getsize(absFile)
#if we are checking the file path do it here
if args.includepath == True:
CheckPath(file)
allFileQueue.put(file)
return size
# - - - - - - - - - - - - - - - - - - - - - -
# Setup the input args
# - - - - - - - - - - - - - - - - - - - - - -
parser = argparse.ArgumentParser()
parser.add_argument( "files", help='''
input file (a text file conatining a list of files to search separated by new lines)
can also be a python style list e.g. ["x:\a.exe","x:\b.bin"]
''')
parser.add_argument( "keywords", help="keyword file (a text file conatining a list of keywords to search separated by new lines) ")
parser.add_argument( "output", help="text file to dump output to")
parser.add_argument( "-t", "--tempdir", help="Location to extract RPFs to default : {0}".format(tempDir) )
parser.add_argument( "-w", "--reportwidth", help="size of surrounding area to be reported. Default : {0}".format(reportWidth) )
parser.add_argument( "-v", "--verbose", help="output extra info", action='store_true' )
parser.add_argument( "-n", "--numthreads", help="number of threads" )
parser.add_argument( "-i", "--includepath", help="include the file path in the search", action='store_true' )
args = parser.parse_args()
if not "RS_TOOLSIRONLIB" in os.environ:
print("RS_TOOLSIRONLIB not set")
sys.exit()
unpacktool = os.path.expandvars(unpacktool)
# setup based on args
if args.files == None or args.keywords == None:
parser.print_help()
sys.exit()
lstInputFiles = create_list(args.files)
lstKeywords = create_list(args.keywords)
#for kw in lstKeywords:
# if
longestKeyLength = len(max(lstKeywords, key=len))
op = None
try:
op = open(args.output, 'w')
except OSError:
print("output path not valid...output will be dumped to console")
op = None
# handle error here
if not args.tempdir == None:
tempDir = args.tempdir
tempDir = tempDir.replace("\\","/")
if "system32" in tempDir or "c:/" == tempDir or "c://" == tempDir:
print("\n\n\nYou have set a dangerous directory for your temp\n\n\n")
sys.exit()
if not os.path.exists( tempDir ):
try:
os.makedirs( tempDir )
except:
print("\ncouldn't create temporary folder....exiting\n")
sys.exit()
else:
shutil.rmtree( tempDir )
if not args.numthreads == None:
try:
nThreads = int(args.numthreads)
except:
nThreads = 1
#these are now both Queues to make them thread safe
#lstAllFiles = []
#lstAllRPFFiles = [] #a list of tuples for convenience (filepath, temp_dir, rpf_parent)
#this is so we can keep track of where RPFs actually came from
lstLooseFiles = []
#for input in lstInputFiles:
idx = 0
while idx < len(lstInputFiles):
input = lstInputFiles[idx]
#files
if not os.path.isdir(input):
if input.endswith(".rpf"):
#lstAllRPFFiles.extend((f, tempDir, f) for f in glob.glob(file))
for f in glob.glob(input):
rpfQueue.put((f, tempDir, os.path.dirname(f) ))
else:
lstTmpFiles = glob.glob(input)
for f in lstTmpFiles:
if f.endswith(".rpf"):
rpfQueue.put( ( f, tempDir, os.path.dirname(f) ) )
else:
lstLooseFiles.append(f)
#folders
else:
for dir, subdir, files in os.walk(input):
for tmp in files:
lstInputFiles.append( os.path.join(dir,tmp) )
idx = idx + 1
AddJobsToQueue( [(f,None) for f in lstLooseFiles], longestKeyLength )
#- - - - - - - - - - - - - - - - - -
# Start the worker threads working on on RPF files
# and then unpack the RPf ones
#- - - - - - - - - - - - - - - - - -
lstReadThreads = []
lstUnpackThreads = []
#scans through data looking for keywords
for i in range(0, nThreads):
lstReadThreads.append( ThreadReadFile( taskQueue, resultQueue, "[ReadThread{0}]".format(i) ) )
#feeds the read threads with unpacked data
for i in range(0, nUnpackThreads):
lstUnpackThreads.append( ThreadUnpackFile(rpfQueue, "[UnpackThread{0}]".format(i) ) )
print("{0} jobs in total, between {1} threads".format( taskQueue.qsize(), nThreads ) )
for thread in lstReadThreads:
thread.start()
for thread in lstUnpackThreads:
thread.start()
#- - - - - - - - - - - - - - - - - -
# unpack RPF files here, and add unpacked files to job queue
#- - - - - - - - - - - - - - - - - -
print("Working")
#p = cProfile.Profile()
#p.enable()
oldnTasks = -1
oldnScan = -1
while True:
nU = len([1 for t in lstUnpackThreads if t.status == ThreadUnpackFile.WORKING])
nR = len([1 for t in lstReadThreads if t.status == ThreadReadFile.WORKING])
nTasks = rpfQueue.qsize()
nScan = taskQueue.qsize()
if oldnTasks != nTasks or oldnScan != nScan:
sys.stdout.write("{0} Unpack Jobs left, [{1}/{2}] Threads Active\n".format( nTasks+nU, nU, nUnpackThreads ))
sys.stdout.write("{0} Scan Jobs left, [{1}/{2}] Threads Active\n".format( nScan+nR, nR, nThreads ))
sys.stdout.flush()
if oldnScan != nScan:
oldnScan = nScan
if oldnTasks != nTasks:
oldnTasks = nTasks
if nU == 0 and nTasks == 0: #break on unpacking finishing
ThreadUnpackFile.killAll = True
break
time.sleep(2)
#possibly redundant
for thread in lstUnpackThreads:
thread.join()
#- - - - - - - - - - - - - - - -
# ok we are done unpacking lets
# start waiting for the worker threads to finish
#- - - - - - - - - - - - - - - -
while threading.activeCount() > 1:
n = len([1 for t in lstReadThreads if t.status == ThreadReadFile.WORKING])
nTasks = taskQueue.qsize()
if oldnScan != n:
sys.stdout.write("{0} Scan Jobs left, [{1}/{2}] Threads Active\n".format(nTasks+n, n, nThreads))
sys.stdout.flush()
if oldnScan != n:
oldnScan = n
if n == 0 and nTasks == 0:
ThreadReadFile.killAll = True
#sys.stdout.write("{0} Jobs left, [{1}/{2}] Threads Active\n".format(nTasks, n, nThreads))
break;
time.sleep(2)
sys.stdout.flush()
# - - - - - - - - - - - - - - - -
# sort output data
# - - - - - - - - - - - - - - - -
nResults = 0
dctGroup = {}
while True:
try:
q = resultQueue.get(False)
if not q.keyword in dctGroup:
dctGroup[ q.keyword ] = []
dctGroup[ q.keyword ].append( q )
nResults += 1
except queue.Empty:
break
# - - - - - - - - - - - - - - - -
# write output data
# - - - - - - - - - - - - - - - -
writeFunc = print
if op:
writeFunc = op.write
writeFunc("results:{0}\n".format(nResults))
lstAllFiles = []
while True:
try:
file = allFileQueue.get(block=False)
lstAllFiles.append(file)
except queue.Empty:
break
except Exception as e:
break
if args.verbose:
writeFunc("keywords:\n")
for kw in lstKeywords:
writeFunc("\t{0}\n".format(kw))
writeFunc("files:\n")
for file in lstAllFiles:
writeFunc("\t{0}\n".format(file))
writeFunc("number of parsed files : {0}\n".format( len(lstAllFiles) ))
#writeFunc("size of all parsed files : {0}\n".format( projected_size ))
for key in dctGroup:
writeFunc( key+" ({0}) \n".format(len(dctGroup[key])) )
for item in dctGroup[key]:
writeFunc(str(item))
if op:
op.close()
#possibly redundant
for thread in lstReadThreads:
thread.join()