"""
branchdesc_html = """{branch}: {paths}"""
html = """
Stick Bug
🐛 Stick Bug 📑
{header}
{body}
"""
@dataclass(frozen=True, order=True)
class ChangeInfo:
bugid: str
change: str
@utils.trywrap()
def update_html(branches=rsgdnd.branches, outfile="stickbug", period=None):
utils.log("Generating report...")
if period:
mydate = datetime.datetime.strptime(period, "%d%m%Y")
mydate = mydate.date()
end = mydate + datetime.timedelta(how_recent)
else:
mydate = datetime.date.today() - datetime.timedelta(how_recent)
end = mydate + datetime.timedelta(how_recent)
bugs = {}
bugset = set()
# Gather P4 change and bug information
for branch, val in branches.items():
utils.log("------------------------------------")
if period:
output = rsgp4.get_changes(val, start=mydate, end=end)
else:
output = rsgp4.get_changes(val, start=mydate)
user = None
change = None
date = None
for line in output:
user = rsgp4.extract_user(line) or user
if user in userblacklist:
continue
date = rsgp4.extract_date(line) or date
date = str(date)
change = rsgp4.extract_changelist(line) or change
ids = bugstar.extract_bug_ids(line)
if ids:
for id in ids:
if user not in bugs:
bugs[user] = { branch: {}}
if branch not in bugs[user]:
bugs[user][branch] = {}
if date not in bugs[user][branch]:
bugs[user][branch][date] = set()
bugs[user][branch][date].add(ChangeInfo(id, change))
bugset.add(id)
bugs = dict(sorted(bugs.items()))
buglist = bugstar.get_bugs(bugstar.projects.gta5_dlc, bugset)
bugmap = {int(x.find("id").text) : x for x in buglist if x}
lines = []
header = []
header.append("")
header.append(f"Bugs were found in changelists between {str(mydate)} and {str(end)}
paths:
")
header.append("
")
for branch in branches:
inner = branchdesc_html.format_map({"branch":branch, "paths": " ".join(branches[branch])})
header.append(f"
{inner}
")
header.append("
")
header.append("")
if bugs:
for user in bugs:
lines.append(f"
")
lines.append(userline_html.format_map({"username":user, "userlink": swarm_uurl+user, "email":swarm.get_user(user)["Email"]}))
for branch in bugs[user]:
lines.append("
")
for date in bugs[user][branch]:
changeinfo = bugs[user][branch][date]
for info in changeinfo:
bug = bugmap[info.bugid] if info.bugid in bugmap else None
clid = info.change
if bug:
summary = bug.find("summary").text
summary = summary.replace("<", "<")
summary = summary.replace(">", ">")
bugid = bug.find("id").text
buglink = bugstar.bug_formats[0]+str(bugid)
cllink = swarm_clurl+clid
state = bug.find("state").text
emoji, desc = bugstar.get_state_info(state)
bugformat = {
"buglineid": "bl"+clid+bugid,
"bugstatus": desc,
"emoji": emoji,
"date": date,
"clid": clid,
"cllink": cllink,
"bugid": bugid,
"buglink": buglink,
"summary": summary
}
inner = bugline_html.format_map(bugformat)
lines.append(f"
{inner}
")
lines.append("
")
lines.append("
")
lines.append("")
lines.append("
")
derp = html.format_map({"body": "\n".join(lines), "header": "\n".join(header)})
if not os.path.exists("stickbug/"):
os.mkdir("stickbug")
with open("stickbug/"+outfile+".html", "w", encoding="utf-8") as update_fs:
update_fs.write(derp)
if not period:
datefile = mydate.strftime("stickbug/stickbug-%d%m%Y.html")
with open(datefile, "w", encoding="utf-8") as update_fs:
update_fs.write(derp)
queue_timestamps = {}
def update_queued(queue):
while True:
try:
update = queue.get()
if update:
if update[0] not in queue_timestamps or queue_timestamps[update[0]] > time.time()+300:
queue_timestamps[update[0]] = time.time()
update_html(outfile=update[0], period=update[1])
except Exception:
pass
def update_loop():
while True:
update_html()
time.sleep(300)
bugstar.login()
rsgp4.login()
swarm.login()
pending = multiprocessing.Queue()
def handle_service_get(self, in_path, response):
if in_path in ["stickbug.html", "stickbug/stickbug.html", "stickbug/stickbug.css", "stickbug/stickbug.js"]:
return True
in_path = str(in_path)
match = stickbug_period.search(in_path)
if match:
if(os.path.exists("stickbug/stickbug-"+match.group(1)+".html")):
pending.put(["stickbug/stickbug-"+match.group(1), match.group(1)])
return True
#stickbug folder is appended.
update_html(outfile="stickbug-"+match.group(1), period=match.group(1))
return True
return False
mp_update_loop = None
mp_update_queue = None
def handle_service_start():
global mp_update_loop
global mp_update_queue
if mp_update_queue or mp_update_queue:
utils.log("this is very bad, we should not be starting the service multiple times.")
else:
if(bugstar.login() and rsgp4.login() and swarm.login()):
mp_update_loop = multiprocessing.Process(target=update_loop)
mp_update_queue = multiprocessing.Process(target=update_queued,args=(pending,))
mp_update_loop.start()
mp_update_queue.start()
else:
return False
def handle_service_stop():
global mp_update_loop
global mp_update_queue
if not any([mp_update_queue, mp_update_queue]):
utils.log("this is very bad, we should not be stopping a service that isn't setup.")
else:
mp_update_loop.kill()
mp_update_queue.kill()
mp_update_loop = None
mp_update_queue = None
pass