Files
2025-09-29 00:52:08 +02:00

403 lines
15 KiB
Python
Executable File

import concurrent.futures as futures
import threading
import json
import re
import shlex
import rsgdnd.compare as compare
import rsgdnd.utils as utils
import argparse
import time
import os
import rsgdnd.p4 as p4
# =======================================
# insert the table and rpf lines into html
html = """
<!DOCTYPE html>
<html>
<head>
<link rel="icon" href="data:image/svg+xml,<svg xmlns=%22http://www.w3.org/2000/svg%22 viewBox=%220 0 100 100%22><text y=%22.9em%22 font-size=%2290%22>🖥️</text></svg>">
<meta content="text/html;charset=utf-8" http-equiv="Content-Type" />
<meta content="utf-8" http-equiv="encoding" />
<meta http-equiv="Cache-Control" content="no-cache, no-store, must-revalidate" />
<meta http-equiv="Pragma" content="no-cache" />
<meta http-equiv="Expires" content="0" />
<title>{title}</title>
<script>
var fromheaders = {fromheaders};
var toheaders = {toheaders};
</script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.5.1/jquery.min.js"></script>
<script src="compare.js"></script>
<link rel="stylesheet" href="compare.css">
</head>
<body>
<div class="compare">
<div class="prev" onclick="previous_page()"><span class="arrows">&#10094;</span></div>
<div class="next" onclick="next_page()"><span class="arrows">&#10095;</span></div>
<div class="diff_container" id="diff_container">
{diff}
</div>
</div>
</body>
</html>
"""
link_re = re.compile("<a href=\"#difflib_chg_to[0-9]+__[0-9]+\">[nf]</a>")
top_re = re.compile("<a href=\"#difflib_chg_to[0-9]+__top\">t</a>")
diffheader_re = re.compile("<td class=\"diff_header\"")
exception_lock = threading.Lock()
last_exception = ""
def get_rpf_lines(lines, root):
linenum = 0
current_path = ""
ret = []
for x in lines:
linenum += 1
x = x.rstrip()
if x[0] == "X":
x = x.replace(root, "")
ret.append([linenum, x])
current_path = x+":/"
elif ".rpf" in x:
# the depth here is counting spaces before the first
depth = len(x.split("/", 1)[0]) / 4
current_path += ":/"
current_path = current_path.split(".rpf:/", int(depth))
current_path[-1] = x.lstrip()
current_path = ".rpf:/".join(current_path)
ret.append([linenum, current_path])
return ret
def get_file_lines(lines, root):
linenum = 0
ret = []
for x in lines:
linenum += 1
x = x.rstrip()
if root in x and ("add change" in x or "edit change" in x):
x = root + x.split(root)[1]
ret.append([linenum, x])
return ret
def get_args(**kwargs):
def no_exit(status, message):
if message:
raise ValueError(message)
else:
raise ValueError("Argparser exited with no message, status: "+str(status))
parser = compare.get_diff_args(**kwargs)
if "exit_on_error" in kwargs and not kwargs["exit_on_error"]:
parser.exit = no_exit
parser.add_argument("-o", '--file_out', metavar='comparison', type=str, default="comparison",
help='output file name, it will be output to /compare/"comparison".html/txt')
parser.add_argument("-oc", '--open_comparison', action="store_true",
help='open the comparison html file after it\'s made')
parser.add_argument("-dt", '--dump_tree', action="store_true",
help='is this a dump tree')
return parser
@utils.trywrap()
def main():
# Begin the program.
args = get_args().parse_args()
if run_cmdline(args) == 0:
if args.open_comparison:
file_out = os.path.basename(args.file_out)
if args.diffstyle == "html":
file_out = file_out+".html"
else:
file_out = file_out+".txt"
os.system("cd compare && "+file_out)
exit(0)
else:
exit(1)
def run_cmdline(args):
global html
global last_exception
global exception_lock
utils.log(os.path.basename(__file__))
utils.log("========================")
utils.log("supplied args:")
for k, v in vars(args).items():
if v:
utils.log(f" {k}: {v}")
utils.log("preparing comparison.")
start = time.time()
diff_args = {}
for k, v in vars(args).items():
if k in [d.dest for d in compare.get_diff_args()._actions]:
diff_args[k] = v
try:
diff, lines_a, lines_b = compare.diff(**diff_args)
except Exception as error:
with exception_lock:
last_exception = str(error)
utils.log("exception: "+last_exception)
return -1
utils.log("diff complete")
if args.diffstyle == "html":
file_out = os.path.basename(args.file_out)
title = file_out
file_out = "compare/"+file_out+".html"
if args.dump_tree:
try:
root_a = os.path.commonpath([l for l in lines_a if l[0] == "X"]).replace("\\", "/")
root_b = os.path.commonpath([l for l in lines_b if l[0] == "X"]).replace("\\", "/")
headers_a = get_rpf_lines(lines_a, root_a)
headers_b = get_rpf_lines(lines_b, root_b)
except Exception as e:
utils.log("exception: "+str(e))
headers_a = []
headers_b = []
else:
root_a = args.file_a.rsplit("#", 1)[0]
root_a = root_a.rsplit("@", 1)[0]
root_a = os.path.dirname(root_a)
try:
root_a = p4.where(root_a)[0]
except Exception as e:
utils.log("exception: "+str(e))
root_b = args.file_b.rsplit("#", 1)[0]
root_b = root_b.rsplit("@", 1)[0]
root_b = os.path.dirname(root_b)
try:
root_b = p4.where(root_b)[0]
except Exception as e:
utils.log("exception: "+str(e))
headers_a = get_file_lines(lines_a,root_a)
headers_b = get_file_lines(lines_b,root_b)
out_html = html.format_map({"fromheaders":headers_a, "toheaders":headers_b, "diff":diff, "title":title})
if not args.dump_tree:
body_split = out_html.split("<body>")
for header in headers_a+headers_b:
body_split[1] = body_split[1].replace(header[1], "")
out_html = "<body>".join(body_split)
# add a some add tbody headers.
out_html = out_html.replace("</tbody>", "</tbody><tr class=\"SectionGap\"><td class=\"diff_next\"></td><td colspan=\"2\"></td><td class=\"diff_next\"></td><td colspan=\"2\"></td></tr>")
out_html = out_html.replace("<tbody>", "<tbody><tr class=\"SectionGap\"><td class=\"diff_next\"></td><td colspan=\"2\"></td><td class=\"diff_next\"></td><td colspan=\"2\"></td></tr></tbody>", 1)
# add more useful links.
out_html = link_re.sub("<a onclick='link_clicked(this)' style='cursor: pointer;'>🔗</a>", out_html)
out_html = top_re.sub("<a onclick='link_clicked(this)' style='cursor: pointer;'>🔗</a>", out_html)
out_html = diffheader_re.sub("<td class=\"diff_header\" onclick='link_clicked(this)' style='cursor: pointer;'", out_html)
diff = out_html
else:
file_out = os.path.basename(args.file_out)
file_out = "compare/"+file_out+".txt"
utils.log("writing to "+file_out)
with open(file_out, "w", encoding="utf-8") as of:
of.writelines(diff)
end = time.time()
utils.log(f"done! generated {file_out} in "+str(end - start))
return 0
def get_sanatised_user_cmds(address, args):
cmds = get_saved_cmds(address)
user = utils.ip_to_user(address)
for k in cmds:
try:
cmd = shlex.split(cmds[k])
cmd = vars(args.parse_args(cmd))
cmd["file_out"] = cmd["file_out"].replace("-"+user, "")
cmds[k] = cmd
except Exception as e:
utils.log("Exception: "+str(e))
pass
return cmds
def handle_service_get(self, in_path, response):
utils.log(f"{in_path} processed by compare")
if in_path in ["compare", "compare/server.css", "compare/compare.css", "compare/compare.js"]:
return True
if in_path.startswith("compare/") and in_path.endswith(".html"):
return True
if in_path.startswith("compare/") and in_path.endswith(".txt"):
return True
if in_path == "compare/api/get.cmdline.info":
args = get_args(exit_on_error=False)
response["title"] = os.path.basename(__file__)
response["description"] = args.description
response["args"] = {}
for a in args._actions:
if a.default != '==SUPPRESS==':
if a.nargs:
response["args"][a.dest] = []
elif isinstance(a, argparse._StoreTrueAction):
response["args"][a.dest] = False
elif isinstance(a, argparse._StoreFalseAction):
response["args"][a.dest] = False # this is probably a bit wrong.
else:
response["args"][a.dest] = a.type(a.default)
del response["args"]["perforce"] # you have to used perforce
response["cmds"] = get_sanatised_user_cmds(self.client_address[0], args)
response["ok"] = True
return True
return False
def args_to_shell(cmd, args):
out_shell = []
args = args._actions
for a in args:
if a.dest in cmd:
if isinstance(cmd[a.dest], str) and cmd[a.dest]:
out_shell.append(a.option_strings[0] + " " + cmd[a.dest])
elif isinstance(cmd[a.dest], list) and cmd[a.dest]:
out_shell.append(a.option_strings[0]+" "+ " ".join(cmd[a.dest]))
elif isinstance(cmd[a.dest], bool) and cmd[a.dest]:
out_shell.append(a.option_strings[0])
return " ".join(out_shell)
def get_user_cmds(client_address):
user = utils.ip_to_user(client_address)
if os.path.exists("compare/"+user+"-cmds.json"):
with open("compare/"+user+"-cmds.json") as cmds:
return json.loads(cmds.read())
return {}
def save_user_cmds(client_address, cmds):
user = utils.ip_to_user(client_address)
filename = "compare/"+user+"-cmds.json"
with open(filename, "w") as out_cmds:
out_cmds.write(json.dumps(cmds, indent=4))
def get_saved_cmds(client_address):
with open("compare/cmds.json") as cmds:
cmds = json.loads(cmds.read())
for k, v in get_user_cmds(client_address).items():
if k not in cmds:
cmds[k] = v
return cmds
def handle_service_post(self, in_path, data, response):
global last_exception
def write_failure(response, data):
response["ok"] = True
response["status"] = "failure"
response["log"] = "Found an issue with the supplied arguments:\n"+json.dumps(data, indent=4)+"\n"
with exception_lock:
response["log"] += "Last Exception: "+last_exception
args = None
cmds = get_saved_cmds(self.client_address[0])
user_cmd = False
if in_path == "compare/api/run.cmd":
if "cmd" in data:
if data["cmd"] in cmds:
args = get_args(exit_on_error=False)
cmd = shlex.split(cmds[data["cmd"]])
try:
args = args.parse_args(cmd)
except Exception as error:
with exception_lock:
last_exception = str(error)
write_failure(response, data)
return True
user_cmd = True if data["cmd"] in get_user_cmds(self.client_address[0]) else False
if in_path == "compare/api/save.cmdline":
file_out = data["file_out"]
args = get_args(exit_on_error=False)
shell = args_to_shell(data, args).replace("\\", "/")
try:
split = shlex.split(shell)
args.parse_args(split)
except Exception as error:
with exception_lock:
last_exception = str(error)
write_failure(response, data)
return True
cmds[file_out] = shell
save_user_cmds(self.client_address[0], cmds)
response["ok"] = True
response["status"] = "success"
response["cmds"] = get_sanatised_user_cmds(self.client_address[0], args)
response["log"] = "wrote: { '"+file_out+"': '"+shell+"' }"
return True
if in_path == "compare/api/remove.cmdline":
cmds = get_user_cmds(self.client_address[0])
file_out = data["file_out"]
if file_out in cmds:
del cmds[file_out]
save_user_cmds(self.client_address[0], cmds)
response["ok"] = True
response["status"] = "success"
response["cmds"] = get_sanatised_user_cmds(self.client_address[0], get_args(exit_on_error=False))
response["log"] = "removed "+file_out
else:
response["ok"] = True
response["status"] = "failure"
response["log"] = "could not remove "+file_out
return True
if in_path == "compare/api/run.cmdline" or args:
if args:
data = vars(args)
else:
user_cmd = True
if user_cmd:
data["file_out"] = data["file_out"]+"-"+utils.ip_to_user(self.client_address[0])
data["perforce"] = True
with futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_cmdline, utils.Bunch(data))
try:
ret = future.result(timeout=30)
if ret == 0:
file_out = os.path.basename(data["file_out"])
if data["diffstyle"] == "html":
file_out += ".html"
else:
file_out += ".txt"
response["ok"] = True
response["status"] = "success"
response["log"] = "Supplied arguments:\n"+json.dumps(data, indent=4)+"\n"
response["log"] += "<a href='"+file_out+"' target='_blank'>"+file_out+"</a>"
if "open_comparison" in data and data["open_comparison"]:
response["links"] = [file_out]
else:
write_failure(response, data)
return True
except futures.TimeoutError as error:
utils.log(error)
response["ok"] = True
response["status"] = "failure"
response["log"] = "Timed out trying to fulfill request"
return True
return False
def handle_service_start():
pass
def handle_service_stop():
pass
if __name__ == "__main__":
main()