import os import re import time import datetime import multiprocessing import rsgdnd import rsgdnd.p4 as rsgp4 import rsgdnd.bugstar as bugstar import rsgdnd.swarm as swarm import rsgdnd.swarm as swarm import rsgdnd.utils as utils from dataclasses import dataclass stickbug_period = re.compile(f"stickbug-([0-9]+)\.html") swarm_clurl = "https://swarm.rockstargames.com/changes/" # swarm_uurl = "https://swarm.rockstargames.com/users/" swarm_uurl = "https://swarm.rockstargames.com/reviews/?author=" how_recent = 7 userblacklist = [ # "svcrsgautoparait", # "svcrsggbauto", # "svcrsgautoparab", # "buildernorth" ] css = """ """ bugline_html = """{emoji} {date}: 📑 🐛 - {summary}""" userline_html = """

{username}

📧""" branchline_html = """

{branch}

""" branchdesc_html = """{branch}: {paths}""" html = """ Stick Bug

🐛 Stick Bug 📑

{header}

{body}
""" @dataclass(frozen=True, order=True) class ChangeInfo: bugid: str change: str @utils.trywrap() def update_html(branches=rsgdnd.branches, outfile="stickbug", period=None): utils.log("Generating report...") if period: mydate = datetime.datetime.strptime(period, "%d%m%Y") mydate = mydate.date() end = mydate + datetime.timedelta(how_recent) else: mydate = datetime.date.today() - datetime.timedelta(how_recent) end = mydate + datetime.timedelta(how_recent) bugs = {} bugset = set() # Gather P4 change and bug information for branch, val in branches.items(): utils.log("------------------------------------") if period: output = rsgp4.get_changes(val, start=mydate, end=end) else: output = rsgp4.get_changes(val, start=mydate) user = None change = None date = None for line in output: user = rsgp4.extract_user(line) or user if user in userblacklist: continue date = rsgp4.extract_date(line) or date date = str(date) change = rsgp4.extract_changelist(line) or change ids = bugstar.extract_bug_ids(line) if ids: for id in ids: if user not in bugs: bugs[user] = { branch: {}} if branch not in bugs[user]: bugs[user][branch] = {} if date not in bugs[user][branch]: bugs[user][branch][date] = set() bugs[user][branch][date].add(ChangeInfo(id, change)) bugset.add(id) bugs = dict(sorted(bugs.items())) buglist = bugstar.get_bugs(bugstar.projects.gta5_dlc, bugset) bugmap = {int(x.find("id").text) : x for x in buglist if x} lines = [] header = [] header.append("
") header.append(f"Bugs were found in changelists between {str(mydate)} and {str(end)}

paths:

") header.append("") header.append("
") if bugs: for user in bugs: lines.append(f"
") lines.append(userline_html.format_map({"username":user, "userlink": swarm_uurl+user, "email":swarm.get_user(user)["Email"]})) for branch in bugs[user]: lines.append("
") lines.append(branchline_html.format_map({"branch":branch})) lines.append("") lines.append("
") lines.append("
") lines.append("
") derp = html.format_map({"body": "\n".join(lines), "header": "\n".join(header)}) if not os.path.exists("stickbug/"): os.mkdir("stickbug") with open("stickbug/"+outfile+".html", "w", encoding="utf-8") as update_fs: update_fs.write(derp) if not period: datefile = mydate.strftime("stickbug/stickbug-%d%m%Y.html") with open(datefile, "w", encoding="utf-8") as update_fs: update_fs.write(derp) queue_timestamps = {} def update_queued(queue): while True: try: update = queue.get() if update: if update[0] not in queue_timestamps or queue_timestamps[update[0]] > time.time()+300: queue_timestamps[update[0]] = time.time() update_html(outfile=update[0], period=update[1]) except Exception: pass def update_loop(): while True: update_html() time.sleep(300) bugstar.login() rsgp4.login() swarm.login() pending = multiprocessing.Queue() def handle_service_get(self, in_path, response): if in_path in ["stickbug.html", "stickbug/stickbug.html", "stickbug/stickbug.css", "stickbug/stickbug.js"]: return True in_path = str(in_path) match = stickbug_period.search(in_path) if match: if(os.path.exists("stickbug/stickbug-"+match.group(1)+".html")): pending.put(["stickbug/stickbug-"+match.group(1), match.group(1)]) return True #stickbug folder is appended. update_html(outfile="stickbug-"+match.group(1), period=match.group(1)) return True return False mp_update_loop = None mp_update_queue = None def handle_service_start(): global mp_update_loop global mp_update_queue if mp_update_queue or mp_update_queue: utils.log("this is very bad, we should not be starting the service multiple times.") else: if(bugstar.login() and rsgp4.login() and swarm.login()): mp_update_loop = multiprocessing.Process(target=update_loop) mp_update_queue = multiprocessing.Process(target=update_queued,args=(pending,)) mp_update_loop.start() mp_update_queue.start() else: return False def handle_service_stop(): global mp_update_loop global mp_update_queue if not any([mp_update_queue, mp_update_queue]): utils.log("this is very bad, we should not be stopping a service that isn't setup.") else: mp_update_loop.kill() mp_update_queue.kill() mp_update_loop = None mp_update_queue = None pass