Files
2025-09-29 00:52:08 +02:00

99 lines
3.1 KiB
Python
Executable File

from urllib.request import Request, urlopen
import urllib.parse as urlparse
import rsgdnd.utils as utils
import rsgdnd.p4 as p4
import time
import base64
import json
import os
def _make_request(url, authheader):
try:
protocol = ""
if(url.startswith("http://") or url.startswith("https://")):
protocol, url = url.split(":")
protocol += ":"
# this is needed in bugstar but not so much for swarm.
#url = protocol+urlparse.quote(url)
url = protocol+url
request = Request(url, headers=authheader)
response = urlopen(request).read().decode()
response = json.loads(response)
except Exception:
utils.log("failed request: "+url)
response = None
return response
swarm_host = "https://swarm.rockstargames.com"
class api():
get_users = "/api/v9/users"
check_auth = "/api/v9/login/checkauth"
authtoken = "RSG_SWARM_AUTHTOKEN"
swarmfile = "swarmtoken"
def get_authheader():
if authtoken in os.environ:
return {'Authorization' : "Basic "+os.environ[authtoken]}
filename = swarmfile
if os.path.exists(filename):
mseconds = time.time() - os.path.getmtime(filename).real
if mseconds < (3600/2):
with open(filename) as token:
os.environ[authtoken] = token.read()
return {'Authorization' : "Basic "+os.environ[authtoken]}
else:
os.remove(filename)
return None
def authenticate(username, password, force_auth=False):
if not force_auth:
authheader = get_authheader()
if authheader:
return authheader
token = p4.get_token()
token = f"{username}:{token}"
token = base64.b64encode(bytes(token,'utf-8'))
token = token.decode('utf8')
authheader = {"Authorization": "Basic "+token}
url = f"{swarm_host}{api.check_auth}"
response = _make_request(url, authheader)
try:
if response["code"] == 200:
os.environ[authtoken] = token
filename = swarmfile
if os.path.exists(filename):
os.remove(filename)
with open(filename, "w") as tokenfile:
tokenfile.write(os.environ[authtoken])
return True
else:
return False
except Exception as e:
utils.log(e)
return False
@utils.file_cache("swarmcache/users", 0)
def get_user(username):
authheader = get_authheader()
get_user = "?users="+username
url = swarm_host+api.get_users+get_user
response = _make_request(url, authheader)
return response[0] if response else None
def login():
import rsgdnd.login as login
utils.log("Logging into swarm")
details = login.get_details("rsg")
while(not authenticate(**details, force_auth=True)):
utils.log("Failed login.")
if login.write_details("rsg"):
details = login.get_details("rsg")
else:
utils.log("Aborting login")
return False
utils.log("Succesfully logged into swarm!")
return True