from genericpath import exists import multiprocessing import time import os import base64 from xml.etree import ElementTree as ET from urllib.request import Request, urlopen import urllib.parse as urlparse import re import rsgdnd.utils as utils class projects: gta5_dlc = 282246 redemption2 = 3477070 redemption2_dlc = 3477070 def get_state_info(state): # 📕 🔐 🔒 ⏱️ ⌛ emojis = { "DEV": ("⏲️", "Dev (Fixing)"), "BUILD_RESOLVED": ("🔧", "Dev (Fixed, Awaiting Build)"), "TEST_MORE_INFO": ("❔", "Test (Need More Info)"), "TEST_TO_VERIFY": ("🔍", "Test (Verifying)"), "REVIEW_FIXED": ("✔️", "Review (Confirm Fixed)"), "REVIEW_WAIVED": ("🗑️", "Review (Confirm Waived)"), "REVIEW_DUPLICATE": ("♻️", "Review (Confirm Duplicate)"), "REVIEW_NOT_SEEN": ("🤷", "Review (Confirm No Repro)"), "CLOSED_FIXED": ("✔️", "Closed (Fixed)"), "CLOSED_WAIVED": ("🗑️", "Closed (Waived)"), "CLOSED_DUPLICATE": ("♻️", "Closed (Duplicate)"), "CLOSED_NOT_SEEN": ("🤷", "Closed (No Reproduction)"), "REVIEW_RESOLVED": ("🔧", "Review (Confirm Resolved)"), "PLANNED": ("👍", "Dev (Planned)"), "TEST_NOT_SEEN": ("🤷", "Test (Repro Required)"), "ON_HOLD": ("👎", "On Hold"), "DEV_FAIL": ("❌", "Dev (Reopened)"), #❗ "BLOCKED": ("⛔", "Blocked"), "BLOCKED_PLANNED": ("⛔", "Blocked (Planned)") } return emojis[state] if state in emojis else "😕" authtoken = "RSG_BSTAR_AUTHTOKEN" bstarfile = "bstartoken" def _make_request(url, authheader): try: protocol = "" if(url.startswith("http://") or url.startswith("https://")): protocol, url = url.split(":") protocol += ":" url = protocol+urlparse.quote(url) request = Request(url, headers=authheader) response = urlopen(request).read().decode() response = ET.fromstring(response) except Exception: utils.log("failed request: "+url) response = None return response def get_authheader(): if authtoken in os.environ: return {'Authorization' : 'Bearer ' + os.environ[authtoken]} filename = bstarfile if os.path.exists(filename): mseconds = time.time() - os.path.getmtime(filename).real if mseconds < (3600/2): with open(filename) as token: os.environ[authtoken] = token.read() return {'Authorization' : 'Bearer ' + os.environ[authtoken]} else: os.remove(filename) return None def get_base_url(): dev_url = "https://gateway-dev.bugstar.rockstargames.com/api/v1/Projects/" preprod_url = "https://gateway-preprod.bugstar.rockstargames.com/api/v1/Projects/" prod_url = "https://gateway.bugstar.rockstargames.com/api/v1/Projects/" return prod_url def authenticate(username, password, force_auth=False): if not force_auth: authheader = get_authheader() if authheader: return authheader utils.log("authenticating") authheader = f'{username}@rockstar.t2.corp:{password}' authheader = base64.b64encode(bytes(authheader,'utf-8')) authheader = authheader.decode('utf8') authheader = {'Authorization' : 'Basic ' + authheader} url = "https://gateway.bugstar.rockstargames.com/token/basic" request = Request(url, headers=authheader) try: response = urlopen(request).read().decode() os.environ[authtoken] = response filename = bstarfile if os.path.exists(filename): os.remove(filename) with open(filename, "w") as token: token.write(os.environ[authtoken]) # subprocess.check_call(["attrib","+H",filename]) return True except Exception as e: utils.log(e) return False bug_formats = ["http://bs/@bug/", "url:bugstar:", "bugstar://"] bug_regex = [re.compile(f"{x}([0-9]+)") for x in bug_formats] def extract_bug_ids(line): bugs = set() line = line.strip() if line.upper().startswith("BUG:"): line = line[4:] bline = re.split(";| |,", line) bline = [x.strip() for x in bline] for bug in bline: try: bugs.add(int(bug)) except Exception: pass for regex in bug_regex: # input("matching "+regex.pattern+" in "+line) matches = regex.findall(line) if matches: for match in matches: try: bugs.add(int(match)) except Exception as e: utils.log(repr(e)) pass # except Exception as e: # utils.log("failed to extract bug from: "+line) # return None return bugs def _get_bug_mp(args): return get_bug(*args) def get_bugs(project, bugs, processes=32): pool = multiprocessing.Pool(processes) args = [(project, bug) for bug in bugs if bug] xmls = pool.map(_get_bug_mp, args) return xmls @utils.file_cache("bstarcache/bugs") def get_bug(project, bug_num): authheader = get_authheader() url = get_base_url()+f"{str(project)}/Bugs/{str(bug_num)}" return _make_request(url, authheader) @utils.file_cache("bstarcache/buggroups") def get_bugs_by_group(project, user, group): authheader = get_authheader() url = get_base_url()+f"{str(project)}/Users/{user}/Bugs/Groups/{str(group)}" return _make_request(url, authheader) @utils.file_cache("bstarcache/bugsearches") def get_bugs_by_search(project, user, search): authheader = get_authheader() url = get_base_url()+f"{str(project)}/Users/{user}/Bugs/Searches/{str(search)}" return _make_request(url, authheader) @utils.file_cache("bstarcache/userdata") def get_user_data(project, user): authheader = get_authheader() url = get_base_url()+f"{str(project)}/Missions/UserData/{user}" return _make_request(url, authheader) @utils.file_cache("bstarcache/userdata", refresh=False) def get_bug_summary(project, user): authheader = get_authheader() url = get_base_url()+f"{str(project)}/Missions/Summary/{user}" return _make_request(url, authheader) @utils.file_cache("bstarcache/missions", refresh=False) def get_bug_summary(project): authheader = get_authheader() url = get_base_url()+f"{str(project)}/Missions/Summary/" return _make_request(url, authheader) @utils.file_cache("bstarcache/platforms", refresh=86400) def get_platforms(project): authheader = get_authheader() url = get_base_url()+f"{str(project)}/Platforms/" return _make_request(url, authheader) @utils.file_cache("bstarcache/states", refresh=86400) def get_states(project): authheader = get_authheader() url = get_base_url()+f"{str(project)}/States/" return _make_request(url, authheader) @utils.file_cache("bstarcache/users", refresh=86400) def get_users(project): authheader = get_authheader() url = get_base_url()+f"{str(project)}/Users" return _make_request(url, authheader) @utils.file_cache("bstarcache/user", refresh=86400) def get_user(project, user): authheader = get_authheader() url = get_base_url()+f"{str(project)}/Users/{str(user)}" return _make_request(url, authheader) @utils.file_cache("bstarcache/teams", refresh=86400) def get_teams(project): authheader = get_authheader() url = get_base_url()+f"{str(project)}/Teams" return _make_request(url, authheader) @utils.file_cache("bstarcache/teams", refresh=86400) def get_teams_by_id(project, id): authheader = get_authheader() url = get_base_url()+f"{str(project)}/Teams/{str(id)}" return _make_request(url, authheader) @utils.file_cache("bstarcache/defaults", refresh=86400) def get_defaults(project): authheader = get_authheader() url = get_base_url()+f"{str(project)}/Defaults" return _make_request(url, authheader) # @utils.file_cache("bstarcache/current_user") def get_current_user(project): authheader = get_authheader() url = get_base_url()+f"{str(project)}/CurrentUser/" return _make_request(url, authheader) @utils.file_cache("bstarcache/bugs") def get_user_tagged_bugs(project, user, tag): authheader = get_authheader() url = get_base_url()+f"{str(project)}/{user}/TaggedBugs/{tag}" return _make_request(url, authheader) def login(): import rsgdnd.login as login utils.log("Logging into bugstar") details = login.get_details("rsg") while(not authenticate(**details, force_auth=True)): utils.log("Failed login.") if login.write_details("rsg"): details = login.get_details("rsg") else: utils.log("Aborting login") return False utils.log("Succesfully logged into bugstar!") return True