Files
gtav-src/tools_ng/bin/ConfluencePythonTools/CommonLibrary/ConfluenceController.py
T
2025-09-29 00:52:08 +02:00

822 lines
31 KiB
Python
Executable File

from requests.adapters import HTTPAdapter
from requests.packages.urllib3.poolmanager import PoolManager
from requests.packages.urllib3.exceptions import ( InsecureRequestWarning, InsecurePlatformWarning )
import requests
import ssl
import os
import warnings
import sys
import json
class MyAdapter(HTTPAdapter):
def init_poolmanager(self, connections, maxsize, block=False):
self.poolmanager = PoolManager(num_pools=connections,
maxsize=maxsize,
block=block,
ssl_version=ssl.PROTOCOL_TLSv1_2)
HUB_BASE = 'https://hub.gametools.dev/'
HUB_SITE = HUB_BASE + "rest/api/content/"
CREDENTIAL = "Basic c3ZjcnNnY29uZmx1ZW5jZXdlYjpqdW5rLU5RZWhUUHE="
class ConfluencePage():
def __init__(self):
self.pageId = None
self.body = None
self.version = None
self.title = None
self.status = None
class ConfluenceAttachmentData():
def __init__(self):
self.attId = None
self.link = None
self.version = None
self.labels = None
self.name = None
self.container = None
class Representation(object):
STORAGE = "storage"
MARKDOWN = "markdown"
WIKI = "wiki"
class ConfluenceUploadController():
def __init__(self):
self.adapterSetup()
########################################################################
# Function to check whether a variable is a long type
########################################################################
def IsLong(self, inputString):
try:
long(inputString)
return True
except ValueError:
return False
########################################################################
# Function to take the content of a file and
# upload them to a page as an attachment
########################################################################
def postAttachmentFile(self, name, content, pageID, label="", attachType="application/octet-stream", deleteFirst=True ):
if len(content) > 0:
HUB_UPLOAD = HUB_SITE + pageID + "/child/attachment"
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = InsecureRequestWarning) # Disable known warnings
warnings.simplefilter("ignore", category = InsecurePlatformWarning) # Disable known warnings
r = self.adapterInstance.request(method='GET', url=HUB_UPLOAD + "?expand=version"
, params={"filename":name}
, headers=({"Authorization":CREDENTIAL})
, verify=False
)
if( r.status_code == 401 and r.reason=='Unauthorized' ):
return -1
newAttachment = ConfluenceAttachmentData()
rJson = r.json()
# if it exists, update it (if it differs)
if len(rJson['results']) != 0:
results = rJson['results'][0]
attId = results['id']
if deleteFirst == False:
version = results['version']['number']
r = self.adapterInstance.request(method='GET', url=HUB_BASE + results['_links']['download'][1:]
, headers=({"Authorization":CREDENTIAL})
, verify=False
)
oldJson = r.content
if content != oldJson:
print "Changes needed for {}. Updating...".format(name)
r = self.adapterInstance.request(method='POST', url=HUB_UPLOAD + "/{attId}/data".format(attId=attId)
, files={'file': (name, content, attachType ), 'minorEdit':'true'}
, headers=({"X-Atlassian-Token":"no-check","Authorization":CREDENTIAL})
, verify=False
)
if r.ok:
rJson = r.json()
if label != "":
self.addLabelToContent(attId, label)
newAttachment.attId = rJson["id"]
newAttachment.link = rJson["_links"]["download"]
newAttachment.version = rJson["version"]["number"]
labelsList = rJson["metadata"]["labels"]["results"]
newAttachment.container = rJson["container"]["id"]
newAttachment.labels = []
for label in labelsList:
newAttachment.labels.append(label["name"])
newAttachment.name = rJson["title"]
print "Complete."
return newAttachment
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return newAttachment
else:
print "No changes in file, no upload took place"
else:
print("Removing existing attachment")
self.removeAttachmentFileViaId(attId)
print "Readding file...".format(name)
r = self.adapterInstance.request(method='POST', url=HUB_UPLOAD
, params={"filename":name}
, files={'file': (name, content, attachType ), 'minorEdit':'true'}
, headers=({"X-Atlassian-Token":"no-check","Authorization":CREDENTIAL})
, verify=False
)
if r.ok:
rJson = r.json()
jsonToParse = rJson['results'][0]
contentId = jsonToParse["id"]
if label != "":
self.addLabelToContent(contentId, label)
newAttachment.attId = jsonToParse["id"]
newAttachment.link = jsonToParse["_links"]["download"]
newAttachment.version = jsonToParse["version"]["number"]
newAttachment.container = jsonToParse["container"]["id"]
newAttachment.labels = []
newAttachment.name = jsonToParse["title"]
print "Complete."
return newAttachment
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return newAttachment
else: # doesn't exist, so add it!
print "Changes needed for {}. Adding new file...".format(name)
r = self.adapterInstance.request(method='POST', url=HUB_UPLOAD
, params={"filename":name}
, files={'file': (name, content, attachType ), 'minorEdit':'true'}
, headers=({"X-Atlassian-Token":"no-check","Authorization":CREDENTIAL})
, verify=False
)
if r.ok:
rJson = r.json()
jsonToParse = rJson['results'][0]
contentId = jsonToParse["id"]
if label != "":
self.addLabelToContent(contentId, label)
newAttachment.attId = jsonToParse["id"]
newAttachment.link = jsonToParse["_links"]["download"]
newAttachment.version = jsonToParse["version"]["number"]
newAttachment.container = jsonToParse["container"]["id"]
newAttachment.labels = []
newAttachment.name = jsonToParse["title"]
print "Complete."
return newAttachment
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return newAttachment
else:
print "Error during attachment upload. Attachment {0} has no content".format(attachmentData.name)
return -2
return 0
########################################################################
# Function to update a known attachment
########################################################################
def updateAttachmentFile(self, attachmentData, content, attachType="application/octet-stream"):
return self.postAttachmentFile(attachmentData.name, content, attachmentData.container, '', attachType)
########################################################################
# Function to add a label to a piece of content
########################################################################
def addLabelToContent(self, contentId, label):
requestBody = [
{ 'name' : label }
]
labelUrl = HUB_SITE + contentId + "/label"
r = (self.adapterInstance.request(method='POST', url=labelUrl
, data=json.dumps(requestBody)
, headers=({"X-Atlassian-Token":"nocheck","Authorization":CREDENTIAL, 'Content-Type':'application/json'})
, verify=False
) )
if r.ok:
print "Complete."
return 1
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return -2
########################################################################
# Function to add a label to a piece of content
########################################################################
def setAttachmentMediaType(self, ATTACHMENTURL, attId, version, mediaType):
requestBody = {
"id" : attId,
"version":{"number":version},
"metadata" : { 'mediaType' : mediaType }
}
r = (self.adapterInstance.request(method='PUT', url=ATTACHMENTURL
, data=json.dumps(requestBody)
, headers=({"X-Atlassian-Token":"nocheck","Authorization":CREDENTIAL, 'Content-Type':'application/json'})
, verify=False
) )
if r.ok:
print "Complete."
return 1
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return -2
#-d '{"id":"att917507", "version":{"number":1}, "metadata": {"mediaType": "image/svg+xml" }}' http://localhost:1990/confluence/rest/api/content/98310/child/attachment/att917507
########################################################################
# Function remove a label from a piece of content
########################################################################
def removeLabelFromContent(self, contentId, label):
labelUrl = HUB_SITE + contentId + "/label?name=" + label
r = (self.adapterInstance.request(method='DELETE', url=labelUrl
, headers=({"X-Atlassian-Token":"nocheck","Authorization":CREDENTIAL, 'Content-Type':'application/json'})
, verify=False
) )
if r.ok:
print "Complete."
return 1
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return -2
########################################################################
# Function remove an attachment
########################################################################
def removeAttachmentFile(self, attachmentData):
return self.removeAttachmentFileViaId(attachmentData.attId)
########################################################################
# Function remove an attachment using its ID
########################################################################
def removeAttachmentFileViaId(self, attachmentId):
HUB_UPLOAD = HUB_SITE + attachmentId
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = InsecureRequestWarning) # Disable known warnings
warnings.simplefilter("ignore", category = InsecurePlatformWarning) # Disable known warnings
r = self.adapterInstance.request(method='DELETE', url=HUB_UPLOAD
, headers=({"X-Atlassian-Token":"no-check","Authorization":CREDENTIAL})
, verify=False
)
t = self.adapterInstance.request(method='DELETE', url=HUB_UPLOAD + "?status=trashed"
, headers=({"X-Atlassian-Token":"no-check","Authorization":CREDENTIAL})
, verify=False
)
if r.ok and t.ok:
print "Complete."
return 1
else:
if not r.ok:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
if not t.ok:
print "Wiki error: Status Code: {}: {}: {}".format(t.status_code, t.reason, t.json()['message'])
return -2
return 0
########################################################################
# Function to check whether a given page can be found on confluence
# from a supplied text string
########################################################################
def doesPageExist(self, pageName, nameSpace="", isExact = False):
SEARCH_URL = HUB_SITE
if self.IsLong(pageName):
SEARCH_URL = SEARCH_URL + 'search?cql=id'
isExact = True
else:
SEARCH_URL = SEARCH_URL + 'search?cql=title'
if isExact == False:
SEARCH_URL = SEARCH_URL + '~"{0}" and type=page'.format(pageName)
else:
SEARCH_URL = SEARCH_URL + '="{0}" and type=page'.format(pageName)
if nameSpace != "":
SEARCH_URL = SEARCH_URL + " and space={0}".format(nameSpace)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = InsecureRequestWarning) # Disable known warnings
warnings.simplefilter("ignore", category = InsecurePlatformWarning) # Disable known warnings
r = self.adapterInstance.request(method='GET', url=SEARCH_URL
, headers=({"Authorization":CREDENTIAL})
, verify=False
)
rJson = r.json()
if len(rJson['results']) != 0:
return rJson['results'][0]['id']
else:
return None
########################################################################
# Function to check whether a given page can be found on confluence
# from a supplied text string and return the id and contents
########################################################################
def getPageViaSearch(self, pageName, nameSpace="", isExact = False):
SEARCH_URL = HUB_SITE + "search?"
if self.IsLong(pageName):
SEARCH_URL = SEARCH_URL + 'cql=id'
isExact = True
else:
SEARCH_URL = SEARCH_URL + 'cql=title'
if isExact == False:
SEARCH_URL = SEARCH_URL + '~"{0}" and type=page'.format(pageName)
else:
SEARCH_URL = SEARCH_URL + '="{0}" and type=page'.format(pageName)
if nameSpace != "":
SEARCH_URL = SEARCH_URL + " and space={0}".format(nameSpace)
SEARCH_URL = SEARCH_URL + "&expand=body.storage,version"
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = InsecureRequestWarning) # Disable known warnings
warnings.simplefilter("ignore", category = InsecurePlatformWarning) # Disable known warnings
r = self.adapterInstance.request(method='GET', url=SEARCH_URL
, headers=({"Authorization":CREDENTIAL})
, verify=False
)
rJson = r.json()
if len(rJson['results']) != 0:
foundPage = ConfluencePage()
try:
foundPage.pageId = rJson['results'][0]['id']
foundPage.body = rJson['results'][0]['body']['storage']['value']
foundPage.version = int(rJson['results'][0]['version']['number'])
foundPage.status = rJson['results'][0]['status']
return foundPage
except:
print("Confluence Error: Error during retrieval of page search {0}".format(pageName))
return ConfluencePage()
else:
return ConfluencePage()
########################################################################
# Function to check whether a given page can be found on confluence
# from a supplied text string under a parent page and return the
# id and contents
########################################################################
def getChildPageViaSearch(self, pageName, parentId, nameSpace="", isExact = False):
SEARCH_URL = HUB_SITE + "search?"
if self.IsLong(pageName):
SEARCH_URL = SEARCH_URL + 'cql=id'
isExact = True
else:
SEARCH_URL = SEARCH_URL + 'cql=title'
if isExact == False:
SEARCH_URL = SEARCH_URL + '~"{0}" and type=page'.format(pageName)
else:
SEARCH_URL = SEARCH_URL + '="{0}" and type=page'.format(pageName)
if nameSpace != "":
SEARCH_URL = SEARCH_URL + " and space={0}".format(nameSpace)
if parentId != "":
SEARCH_URL = SEARCH_URL + " and parent={0}".format(parentId)
SEARCH_URL = SEARCH_URL + "&expand=body.storage,version"
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = InsecureRequestWarning) # Disable known warnings
warnings.simplefilter("ignore", category = InsecurePlatformWarning) # Disable known warnings
r = self.adapterInstance.request(method='GET', url=SEARCH_URL
, headers=({"Authorization":CREDENTIAL})
, verify=False
)
rJson = r.json()
if len(rJson['results']) != 0:
foundPage = ConfluencePage()
try:
foundPage.pageId = rJson['results'][0]['id']
foundPage.body = rJson['results'][0]['body']['storage']['value']
foundPage.version = int(rJson['results'][0]['version']['number'])
foundPage.status = rJson['results'][0]['status']
return foundPage
except:
print("Confluence Error: Error during retrieval of page search {0}".format(pageName))
return ConfluencePage()
else:
return ConfluencePage()
########################################################################
# Function to retrieve the contents of a known page id
########################################################################
def getPageViaId(self, id):
PAGEURL = HUB_SITE + id + "?expand=body.storage,version"
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = InsecureRequestWarning) # Disable known warnings
warnings.simplefilter("ignore", category = InsecurePlatformWarning) # Disable known warnings
r = self.adapterInstance.request(method='GET', url=PAGEURL
, headers=({"Authorization":CREDENTIAL})
, verify=False
)
rJson = r.json()
foundPage = ConfluencePage()
try:
foundPage.pageId = rJson['id']
foundPage.body = rJson['body']['storage']['value']
foundPage.version = int(rJson['version']['number'])
foundPage.title = rJson['title']
foundPage.status = rJson['status']
return foundPage
except:
print("Confluence Error: Error during retrieval of page id {0}".format(id))
return ConfluencePage()
########################################################################
# Function to retrieve a list of the child page names
# from a specified page
########################################################################
def getChildrenListOfPage(self, id, findString=None):
BATCHSIZE = 25
STARTVAL = 0
ChildrenList = []
while (True):
PAGEURL = HUB_SITE + id + "/child/page?limit=25&maxsize={0}&start={1}".format(BATCHSIZE, STARTVAL)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = InsecureRequestWarning) # Disable known warnings
warnings.simplefilter("ignore", category = InsecurePlatformWarning) # Disable known warnings
r = self.adapterInstance.request(method='GET', url=PAGEURL
, headers=({"Authorization":CREDENTIAL})
, verify=False
)
rJson = r.json()
STARTVAL = STARTVAL + BATCHSIZE
try:
childrenResults = rJson['results']
if len(childrenResults) == 0:
break
for cResult in childrenResults:
AddPage = True
if findString is not None:
if findString.lower() not in cResult['title'].lower():
AddPage = False
if AddPage == True:
ChildrenList.append(cResult['title'])
except:
print("Confluence Error: Error during retrieval of page id {0}".format(id))
break
return ChildrenList
########################################################################
# Function to retrieve a list of the child page data
# from a specified page
########################################################################
def getChildrenPagesOfPage(self, id, findString=None):
BATCHSIZE = 25
STARTVAL = 0
ChildrenList = []
while (True):
PAGEURL = HUB_SITE + id + "/child/page?expand=body.storage&limit=25&maxsize={0}&start={1}".format(BATCHSIZE, STARTVAL)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = InsecureRequestWarning) # Disable known warnings
warnings.simplefilter("ignore", category = InsecurePlatformWarning) # Disable known warnings
r = self.adapterInstance.request(method='GET', url=PAGEURL
, headers=({"Authorization":CREDENTIAL})
, verify=False
)
rJson = r.json()
STARTVAL = STARTVAL + BATCHSIZE
try:
childrenResults = rJson['results']
if len(childrenResults) == 0:
break
for cResult in childrenResults:
AddPage = True
if findString is not None:
if findString.lower() not in cResult['title'].lower():
AddPage = False
if AddPage == True:
ChildPage = ConfluencePage()
ChildPage.title = cResult['title']
ChildPage.pageId = cResult['id']
ChildPage.body = cResult['body']['storage']['value']
ChildPage.status = cResult['status']
ChildrenList.append(ChildPage)
except:
print("Confluence Error: Error during retrieval of page id {0}".format(id))
break
return ChildrenList
########################################################################
# Function to recursively retrieve a list of the child page data
# from a specified page and its children
########################################################################
def getRecursiveChildrenPagesOfPage(self, id, findString=None):
TotalChildList = self.getChildrenPagesOfPage(id)
listToProcess = TotalChildList
while(True):
print("Processing child list and recursively retrieving. Current Count {0}".format(len(TotalChildList)))
currentChildList = listToProcess
listToProcess = []
for curChild in currentChildList:
listToProcess = listToProcess + self.getChildrenPagesOfPage(curChild.pageId)
TotalChildList = TotalChildList + listToProcess
if len(listToProcess) == 0:
break
if findString is not None:
purgedList = []
for child in TotalChildList:
if findString.lower() in child.title.lower():
purgedList.append(child)
TotalChildList = purgedList
print("All children processed. Final page count {0}".format(len(TotalChildList)))
return TotalChildList
########################################################################
# Function to create a page with the given name in a given namespace
# Optional arg to give a parent page to create the page the child of
########################################################################
def CreatePage(self, pageName, nameSpace, contentOfPage, parentPage=None, representation = Representation.STORAGE):
if representation == Representation.MARKDOWN:
contentOfPage = self.ConvertMarkdownToWiki(contentOfPage)
representation = Representation.WIKI
requestBody = {
"type":"page"
,"title":pageName
,"space":{"key":nameSpace}
,"body":{
"storage":{
"value":contentOfPage,
"representation":representation
}
}
}
if parentPage is not None:
requestBody["ancestors"]=[{"id":parentPage}]
r = (self.adapterInstance.request(method='POST', url=HUB_SITE
, data=json.dumps(requestBody)
, headers=({"X-Atlassian-Token":"nocheck","Authorization":CREDENTIAL, 'Content-Type':'application/json'})
, verify=False
) )
if r.ok:
rJson = r.json()
print "Complete."
try:
return rJson["id"]
except:
print "Wiki error: Can not find newly created page id"
return None
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return None
########################################################################
# Function to update a pages content
########################################################################
def UpdatePage(self, pageId, version, contentOfPage, pageName, minorEdit = False, representation = Representation.STORAGE):
if representation == Representation.MARKDOWN:
contentOfPage = self.ConvertMarkdownToWiki(contentOfPage)
representation = Representation.WIKI
UPDATEPAGEURL = HUB_SITE + pageId
requestBody = {
"body" : {
"storage":{
"value" : contentOfPage,
"representation" : representation
}
},
"version" :
{
"number" : version,
"minorEdit" : 'true' if minorEdit == True else 'false'
},
"title" : pageName,
"type" : "page"
}
r = (self.adapterInstance.request(method='PUT', url=UPDATEPAGEURL
, data=json.dumps(requestBody)
, headers=({"X-Atlassian-Token":"nocheck","Authorization":CREDENTIAL, 'Content-Type':'application/json'})
, verify=False
) )
if r.ok:
rJson = r.json()
print "Complete."
try:
return rJson["id"]
except:
print "Wiki error: Can not find newly created page id"
return None
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return None
########################################################################
# Function to delete a page
########################################################################
def DeletePage(self, pageId):
UPDATEPAGEURL = HUB_SITE + pageId
r = (self.adapterInstance.request(method='DELETE', url=UPDATEPAGEURL
, headers=({"X-Atlassian-Token":"nocheck","Authorization":CREDENTIAL})
, verify=False
) )
if r.ok:
print "Complete."
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return None
########################################################################
# Function to get list of labels on a page
########################################################################
def GetLabelsForPage(self, pageId):
LABELPAGEURL = HUB_SITE + pageId + "/label"
r = (self.adapterInstance.request(method='GET', url=LABELPAGEURL
, headers=({"X-Atlassian-Token":"nocheck","Authorization":CREDENTIAL, 'Content-Type':'application/json'})
, verify=False
) )
labels = []
if r.ok:
rJson = r.json()
print "Complete."
try:
for result in rJson["results"]:
labels.append(result["name"])
return labels
except:
print "Wiki error: Can not find newly created page id"
return labels
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return labels
########################################################################
# Function to set the labels for a page
########################################################################
def SetLabelsForPage(self, pageId, labels):
LABELPAGEURL = HUB_SITE + pageId + "/label"
requestBody = []
for label in labels:
requestBody.append({"prefix":"global","name":label})
r = (self.adapterInstance.request(method='POST', url=LABELPAGEURL
, data=json.dumps(requestBody)
, headers=({"X-Atlassian-Token":"nocheck","Authorization":CREDENTIAL, 'Content-Type':'application/json'})
, verify=False
) )
if r.ok:
rJson = r.json()
print "Complete."
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return labels
########################################################################
# Function to return the attachment list of attachments on page
########################################################################
def GetAttachmentsList(self, pageId):
attachmentList = []
BATCHSIZE = 200
STARTVAL = 0
while (True):
ATTACHMENTPAGEURL = HUB_SITE + pageId + "/child/attachment?expand=version&limit=2000&maxsize={0}&start={1}".format(BATCHSIZE, STARTVAL)
r = (self.adapterInstance.request(method='GET', url=ATTACHMENTPAGEURL
, headers=({"X-Atlassian-Token":"nocheck","Authorization":CREDENTIAL, 'Content-Type':'application/json'})
, verify=False
) )
STARTVAL = STARTVAL + BATCHSIZE
if r.ok:
rJson = r.json()
results = rJson["results"]
if len(results) == 0:
break
for result in results:
newAttachment = ConfluenceAttachmentData()
newAttachment.attId = result["id"]
newAttachment.link = result["_links"]["download"]
newAttachment.version = result["version"]["number"]
labelsList = result["metadata"]["labels"]["results"]
newAttachment.container = result["_expandable"]["container"].split("/")[-1]
newAttachment.labels = []
for label in labelsList:
newAttachment.labels.append(label["name"])
newAttachment.name = result["title"]
attachmentList.append(newAttachment)
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return attachmentList
########################################################################
# Function to retrieve an attachmetn from a search
########################################################################
def getAttachmentViaSearch(self, attachmentName, nameSpace="", isExact = False):
SEARCH_URL = HUB_SITE + "search?"
if self.IsLong(attachmentName):
SEARCH_URL = SEARCH_URL + 'cql=id'
isExact = True
else:
SEARCH_URL = SEARCH_URL + 'cql=title'
if isExact == False:
SEARCH_URL = SEARCH_URL + '~"{0}" and type=attachment'.format(attachmentName)
else:
SEARCH_URL = SEARCH_URL + '="{0}" and type=attachment'.format(attachmentName)
if nameSpace != "":
SEARCH_URL = SEARCH_URL + " and space={0}".format(nameSpace)
SEARCH_URL = SEARCH_URL + "&expand=version"
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = InsecureRequestWarning) # Disable known warnings
warnings.simplefilter("ignore", category = InsecurePlatformWarning) # Disable known warnings
r = self.adapterInstance.request(method='GET', url=SEARCH_URL
, headers=({"Authorization":CREDENTIAL})
, verify=False
)
rJson = r.json()
if len(rJson['results']) != 0:
foundAttachment = ConfluenceAttachmentData()
try:
topResult = rJson['results'][0]
foundAttachment.attId = topResult["id"]
foundAttachment.link = topResult["_links"]["download"]
foundAttachment.version = topResult["version"]["number"]
foundAttachment.container = topResult["_expandable"]["container"].split("/")[-1]
foundAttachment.labels = []
foundAttachment.name = topResult["title"]
return foundAttachment
except:
print("Confluence Error: Error during retrieval of attachment search {0}".format(attachmentName))
return ConfluenceAttachmentData()
else:
return ConfluenceAttachmentData()
########################################################################
# Function remove an attachment using its ID
########################################################################
def removeAttachmentFileViaId(self, attachmentId):
HUB_UPLOAD = HUB_SITE + attachmentId
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = InsecureRequestWarning) # Disable known warnings
warnings.simplefilter("ignore", category = InsecurePlatformWarning) # Disable known warnings
r = self.adapterInstance.request(method='DELETE', url=HUB_UPLOAD
, headers=({"X-Atlassian-Token":"no-check","Authorization":CREDENTIAL})
, verify=False
)
if r.ok:
print "Complete."
return 1
else:
print "Wiki error: Status Code: {}: {}: {}".format(r.status_code, r.reason, r.json()['message'])
return -2
return 0
########################################################################
# Quick function to take an input markdown string and return it formatted for confluence wiki markup
########################################################################
def ConvertMarkdownToWiki(self, markdown):
markDownAppended = "{markdown}" + markdown + "{markdown}"
return markDownAppended
########################################################################
# Function to create an adapter for the other functions to use
########################################################################
def adapterSetup(self):
if self.adapterInstance is None:
self.adapterInstance = requests.Session()
self.adapterInstance.mount('https://', MyAdapter())
adapterInstance = None