""" Copyright (c) , All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ __author__ = "Michael Walker" __copyright__ = "BSD (2-clause)" __credits__ = ["Michael Walker", "Renee Lyons"] # Third-Party Packages from requests import Session, Response # Internal Packages from dataclasses import dataclass, field from time import sleep, time from datetime import datetime from typing import Generator from re import search from logging import ( Logger, getLogger, DEBUG, INFO, FileHandler, Formatter, StreamHandler, ) RUNTIME = datetime.now() STR_RUNTIME = RUNTIME.strftime("%m-%d-%Y_%H-%M-%S") logger: Logger = getLogger("Sync_Jira_Bitbucket") logger.setLevel(DEBUG) file_handler = FileHandler(f"Sync_Jira_Bitbucket({STR_RUNTIME}).log") file_handler_format = Formatter("%(asctime)s | %(levelname)s | %(message)s") file_handler.setFormatter(file_handler_format) file_handler.setLevel(DEBUG) stream_handler = StreamHandler() stream_handler_format = Formatter("(%(levelname)s) - %(message)s") stream_handler.setFormatter(stream_handler_format) stream_handler.setLevel(INFO) logger.addHandler(file_handler) logger.addHandler(stream_handler) try: # local config file import config JIRA_SESSION = Session() JIRA_SESSION.auth = (config.jira_email, config.jira_token) JIRA_BASE_URL = config.jira_url BITBUCKET_SESSION = Session() BITBUCKET_SESSION.auth = (config.bitbucket_username, config.bitbucket_apppassword) BITBUCKET_WORKSPACE = config.bitbucket_workspace BITBUCKET_BASE_URL = "https://api.bitbucket.org" REQUIRED_REGEX = config.required_group_regex REQUIRED_EXACT = config.required_group_exact FULL_SYNC = not REQUIRED_REGEX and not REQUIRED_EXACT except [ImportError, NameError, AttributeError]: print( f"FATAL: Atleast 1 config params was missing or the config.py file couldn't be read." " Please check the config and try again." ) exit() ACCEPT_JSON = {"Accept": "application/json"} CONTENT_JSON = {"Content-type": "application/json"} @dataclass class BitbucketMember: email_address: str # (email) display_name: str # (display_name) aid: str # (account_id) uuid: str # (uuid) active: bool # (is_active) def __post_init__(self): self.uuid = self.uuid.replace("{", "").replace("}", "") def equivalent(self, other: object) -> bool: if isinstance(other, (BitbucketMember, JiraMember)): return self.aid == other.aid else: return False @dataclass class BitbucketGroup: name: str slug: str members: list[BitbucketMember] = field(default_factory=list) def equivalent(self, other: object) -> bool: """ Similar to __eq__ but used to determine if an object is likely the equivalent with a low chance for mis-match If the objects are the same, then the comparison can be reliable, but if a different obj, then it's just likely, not perfect """ # if comparing two bitbucket groups, we use the most accurate, slug if isinstance(other, BitbucketGroup): return self.slug == other.slug # if comparing against a Jira group, we compare the lower() names elif isinstance(other, JiraGroup): return self.name.lower() == other.name.lower() else: return False def get_members(self): for member in Bitbucket_Actions.get_group_members(self.slug): logger.debug( f"Tracking member '{member.display_name}' with email" f" ('{member.email_address}') in group {self.name} for Bitbucket" ) self.members.append(member) @dataclass class JiraMember: aid: str # aka Atlassian ID (accountId) email_address: str # (emailAddress) display_name: str # (displayName) active: bool # (active) def equivalent(self, other: object) -> bool: if isinstance(other, (JiraMember, BitbucketMember)): return self.aid == other.aid else: return False @dataclass class JiraGroup: name: str group_id: str members: list[JiraMember] = field(default_factory=list) def equivalent(self, other: object) -> bool: """ Similar to __eq__ but used to determine if an object is likely the equivalent with a low chance for mis-match If the objects are the same, then the comparison can be reliable, but if a different obj, then it's just likely, not perfect """ # If comparing 2 Jira groups, use the most accurate group_id if isinstance(other, JiraGroup): return self.group_id == other.group_id # If comparing against a BB group, use both group's lower() name elif isinstance(other, BitbucketGroup): return self.name.lower() == other.name.lower() else: return False def get_members(self): for member in Jira_Actions.get_group_members(self.group_id): logger.debug( f"Tracking member '{member.display_name}' with email" f" ('{member.email_address}') in group {self.name} for Jira" ) self.members.append(member) def get_bb_equivalent( self, bb_groups: list[BitbucketGroup] ) -> BitbucketGroup | None: try: bb_group = next( bb_group for bb_group in bb_groups if self.equivalent(bb_group) ) return bb_group except StopIteration: # If the group is in Jira but not in Bitbucket, skip return None @dataclass class PendingInvite: email: str group: str def __post_init__(self): # strip the workspace and "/" character from the string to leave only group's slug self.group = self.group.replace(f"{BITBUCKET_WORKSPACE}/", "") def __eq__(self, other: object) -> bool: return self.__hash__() == other.__hash__() def __hash__(self) -> int: return hash(f"{self.email}{self.group}") class API: @staticmethod def RATE_LIMITED(response: Response) -> bool: if response.status_code == 429: logger.info( "Waiting for 1 minute since the REST API rate limit was exceeded..." ) sleep(secs=60) # in seconds return True return False class Jira_Actions: @staticmethod def get_groups() -> Generator[JiraGroup, None, None]: # https://developer.atlassian.com/cloud/jira/platform/rest/v3/api-group-groups/#api-rest-api-3-group-bulk-get url = f"{JIRA_BASE_URL}/rest/api/3/group/bulk" start_at = 0 max_results = 50 while True: params = {"startAt": start_at, "maxResults": max_results} response = JIRA_SESSION.get(url, headers=ACCEPT_JSON, params=params) if API.RATE_LIMITED(response): # re-run current pass after sleeping continue if response.status_code not in [200]: logger.error( f"Received a {response.status_code} response from Jira when" f" attempting to query all groups.\n\tError: {response.text}" ) exit() r_json = response.json() for value in r_json.get("values"): yield JiraGroup(name=value.get("name"), group_id=value.get("groupId")) # if "isLast" field is False, increment start counter and continue while loop if r_json.get("isLast") is False: start_at += max_results else: # otherwise, end while True look return @staticmethod def get_group_members(group_id: str) -> Generator[JiraMember, None, None]: # https://developer.atlassian.com/cloud/jira/platform/rest/v3/api-group-groups/#api-rest-api-3-group-member-get # TODO add includeinactive = true url = f"{JIRA_BASE_URL}/rest/api/3/group/member" start_at = 0 max_results = 50 while True: params = { "startAt": start_at, "maxResults": max_results, "groupId": group_id, "includeInactiveUsers": True, } response = JIRA_SESSION.get(url, headers=ACCEPT_JSON, params=params) if API.RATE_LIMITED(response): # re-run current pass after sleeping continue if response.status_code not in [200]: logger.error( f"Received a {response.status_code} response from Jira when attempting" f' to query all users from group "{group_id}".\n\tError: {response.text}' ) exit() r_json = response.json() for value in r_json.get("values"): yield JiraMember( aid=value.get("accountId"), email_address=value.get("emailAddress", ""), display_name=value.get("displayName", ""), active=value.get("active"), ) # if "isLast" field is False, increment start counter and continue while loop if r_json.get("isLast") is False: start_at += max_results else: # otherwise, end while True look return class Bitbucket_Actions: @staticmethod def get_groups() -> Generator[BitbucketGroup, None, None]: url = f"{BITBUCKET_BASE_URL}/internal/workspaces/{BITBUCKET_WORKSPACE}/groups" page = 1 while True: params = {"page": page} response = BITBUCKET_SESSION.get(url, headers=ACCEPT_JSON, params=params) if API.RATE_LIMITED(response): # re-run current pass after sleeping continue if response.status_code not in [200]: logger.error( f"Received a {response.status_code} response from Bitbucket when " f"attempting to query all groups.\n\tError: {response.text}" ) exit() r_json = response.json() for value in r_json.get("values"): yield BitbucketGroup(name=value.get("name"), slug=value.get("slug")) if r_json.get("next"): page += 1 else: return @staticmethod def create_group(group_name: str) -> BitbucketGroup: url = f"{BITBUCKET_BASE_URL}/1.0/groups/{BITBUCKET_WORKSPACE}" payload = f"name={group_name}" response = BITBUCKET_SESSION.post(url, data=payload) if response.ok: logger.info(f'Successfully created group "{group_name}" in Bitbucket.') bb_group = BitbucketGroup(name=group_name, slug=group_name, members=list()) return bb_group logger.critical( f'Failed to create the group "{group_name}" within Bitbucket,' " Please address the issue and try again." f"\n\tHttp: {response.status_code}" f"\n\tText: {response.text}" ) exit() @staticmethod def get_group_members(group_slug) -> Generator[BitbucketMember, None, None]: url = f"{BITBUCKET_BASE_URL}/internal/workspaces/{BITBUCKET_WORKSPACE}/groups/{group_slug}/members" page = 1 while True: params = {"page": page} response = BITBUCKET_SESSION.get(url, headers=ACCEPT_JSON, params=params) if API.RATE_LIMITED(response): # re-run current pass after sleeping continue if response.status_code not in [200]: logger.error( f"Received a {response.status_code} response from Bitbucket when attempting to" f' query all users from group "{group_slug}".\n\tError: {response.text}' ) exit() r_json = response.json() for value in r_json.get("values"): yield BitbucketMember( email_address=value.get("email", ""), display_name=value.get("display_name", ""), aid=value.get("account_id"), uuid=value.get("uuid"), active=value.get("is_active"), ) if r_json.get("next"): page += 1 else: return @staticmethod def get_pending_invites() -> Generator[PendingInvite, None, None]: while True: url = f"{BITBUCKET_BASE_URL}/1.0/users/{BITBUCKET_WORKSPACE}/invitations" respone = BITBUCKET_SESSION.get(url, headers=ACCEPT_JSON) if API.RATE_LIMITED(respone): continue r_json = respone.json() for value in r_json: for group in value.get("groups"): yield PendingInvite(value.get("email"), group) return @staticmethod def add_user_to_group(jirauser: JiraMember, jiragroup: JiraGroup) -> bool: # https://support.atlassian.com/bitbucket-cloud/docs/groups-endpoint/ payload = "{}" # group slugs always need to be lower case regardless of the display name pulled in audit log url = f"{BITBUCKET_BASE_URL}/1.0/groups/{BITBUCKET_WORKSPACE}/{jiragroup.name.lower()}/members/{jirauser.email_address}/" response = BITBUCKET_SESSION.put(url, data=payload) if response.status_code == 200: logger.info( f'Successfully added user "{jirauser.email_address}" to group "{jiragroup.name}".' ) return True elif response.status_code == 404: return Bitbucket_Actions.invite_user(jiragroup, jirauser) # logger.warning( # f'Failed to add user "{email_address}" to group "{group_name}" with status code "404".' # " Group likely does not exist or user is not a workspace member." # ) logger.warning( f'Failed to add user "{jirauser.display_name} ({jirauser.email_address})" to group "{jiragroup.name}" with status code' f' "{response.status_code}" and text "{response.text}".' ) return False @staticmethod def invite_user(jiragroup: JiraGroup, jirauser: JiraMember) -> bool: # https://support.atlassian.com/bitbucket-cloud/docs/invitations-resource/ if jirauser.email_address == "": logger.warning( f'The user "{jirauser.display_name}" (Aid: {jirauser.aid}) is lacking an email address and is unable to be invited.' ) return False url = f"{BITBUCKET_BASE_URL}/1.0/users/{BITBUCKET_WORKSPACE}/invitations" payload = { "email": jirauser.email_address, "group_slug": jiragroup.name.lower(), } r = BITBUCKET_SESSION.put(url, json=payload) if r.status_code == 200: logger.info( f'Successfully invited "{jirauser.email_address}" to "{jiragroup.name}".' ) return True elif r.status_code == 404: logger.warning( f'Failed to invite user "{jirauser.email_address}" to group "{jiragroup.name}" as the group does not exist' ) return False logger.warning( f'Failed to invite user "{jirauser.display_name}" (Email: "{jirauser.email_address}" Aid: {jirauser.aid}) to "{jiragroup.name}" with status "{r.status_code}" and text "{r.text}"' ) return False @staticmethod def remove_user_from_group( bb_member: BitbucketMember, bb_group: BitbucketGroup ) -> None: url = f"{BITBUCKET_BASE_URL}/1.0/groups/{BITBUCKET_WORKSPACE}/{bb_group.slug}/members/%7B{bb_member.uuid}%7D" r = BITBUCKET_SESSION.delete(url) if r.status_code in [200, 204]: logger.info( f'Successfully removed user "{bb_member.email_address}" from group "{bb_group.name}".' ) return elif r.status_code == 404: logger.debug( f'Could not locate user "{bb_member.email_address}" in group "{bb_group.name}".' ) return logger.warning( f'Failed to remove user "{bb_member.email_address}" from group "{bb_group.name}" with status code' f' "{r.status_code}" and text:\n\t"{r.text}".' ) return @staticmethod def revoke_pending_invitation(invite: PendingInvite) -> bool: url = f"{BITBUCKET_BASE_URL}/1.0/users/{BITBUCKET_WORKSPACE}/invitations" json = {"email": invite.email, "group_slug": invite.group.lower()} r = BITBUCKET_SESSION.delete(url, headers=CONTENT_JSON, json=json) if r.status_code == 204: # if an invite is pending, this will remove it # but it will also return 204 if there was no invite but BB accepted the request anyway logger.info( f'Successfully revoked/removed invite for "{invite.email}" from group "{invite.group}"' ) return True logger.warning( f'Failed to remove a pending invite for the user "{invite.email}"' f' on group "{invite.group}" with status code "{r.status_code}" and text "{r.text}"' ) return False def save_groups_for_manual_verification( bb_groups: list[BitbucketGroup], jira_groups: list[JiraGroup], pending_invites: list[PendingInvite], ) -> None: bb_filename = f"Bitbucket_groups({STR_RUNTIME}).csv" bb_headers = "GroupName,Slug,EmailAddress,UserDisplayName,AtlassianID\n" with open(bb_filename, "w+") as bb_file: bb_file.write(bb_headers) for group in bb_groups: for member in group.members: bb_file.write( f"{group.name}," f"{group.slug}," f"{member.email_address}," f"{member.display_name}," f"{member.uuid}\n" ) jira_filename = f"Jira_groups({STR_RUNTIME}).csv" jira_headers = "GroupName,GroupID,EmailAddress,UserDisplayName,AtlassianID\n" with open(jira_filename, "w+") as jira_file: jira_file.write(jira_headers) for group in jira_groups: for member in group.members: jira_file.write( f"{group.name}," f"{group.group_id}," f"{member.email_address}," f"{member.display_name}," f"{member.aid}\n" ) invite_filename = f"Bitbucket_pending_invites({STR_RUNTIME}).csv" invite_headers = "EmailAddress,GroupName\n" with open(invite_filename, "w+") as invite_file: invite_file.write(invite_headers) for invite in pending_invites: invite_file.write(f"{invite.email},{invite.group}\n") def revalidate_pending_invites( pending_invites: list[PendingInvite], ) -> list[PendingInvite]: for invite in Bitbucket_Actions.get_pending_invites(): if invite not in pending_invites: logger.info( f'New invite for "{invite.email}" registered for "{invite.group}"' ) pending_invites.append(invite) return pending_invites def scan_groups() -> tuple[list[JiraGroup], list[BitbucketGroup], list[PendingInvite]]: jira_groups: list[JiraGroup] = [] bb_groups: list[BitbucketGroup] = [] bb_pending_invites: list[PendingInvite] = [] for bb_group in Bitbucket_Actions.get_groups(): if ( FULL_SYNC or bb_group.name in REQUIRED_EXACT or search(REQUIRED_REGEX, bb_group.name) ): logger.info(f'Tracking group "{bb_group.name}" in Bitbucket') bb_group.get_members() bb_groups.append(bb_group) for invite in Bitbucket_Actions.get_pending_invites(): if ( FULL_SYNC or invite.group in REQUIRED_EXACT or search(REQUIRED_REGEX, invite.group) ): logger.info( f'Tracking pending invite for user "{invite.email}" on group "{invite.group}"' ) bb_pending_invites.append(invite) for j_group in Jira_Actions.get_groups(): if ( FULL_SYNC or j_group.name in REQUIRED_EXACT or search(REQUIRED_REGEX, j_group.name) ): logger.info(f'Tracking group "{j_group.name}" in Jira') j_group.get_members() jira_groups.append(j_group) return jira_groups, bb_groups, bb_pending_invites def sync_memberships( bb_group: BitbucketGroup, j_group: JiraGroup, pending_invites: list[PendingInvite] ) -> None: for jira_member in j_group.members: if not any(jira_member.equivalent(bb_member) for bb_member in bb_group.members): if not any( invite for invite in pending_invites if invite.email == jira_member.email_address ): Bitbucket_Actions.add_user_to_group(jira_member, j_group) for bb_member in bb_group.members: if not any(bb_member.equivalent(j_member) for j_member in j_group.members): Bitbucket_Actions.remove_user_from_group(bb_member, bb_group) # If there's a pending invite but the email for that invite is not in the respective jira group # revoke the invite for invite in pending_invites: if not any( j_member for j_member in j_group.members if j_member.email_address == invite.email ): Bitbucket_Actions.revoke_pending_invitation(invite) def main(): """ for each group in jira that matches the search criteria create the group if it's missing from Bitbucket diff the members and add/remove users until it matches Jira """ # Scan just the groups called out via regex/exact match if provided, # else grab a local copy of all of them jira_groups, bb_groups, pending_invites = scan_groups() logger.info("-----Tracking complete, moving to group comparisons-----\n") # Perform the diff and sync accordingly for jira_group in jira_groups: if not (bb_group := jira_group.get_bb_equivalent(bb_groups)): bb_group = Bitbucket_Actions.create_group(jira_group.name) related_invites = [ invite for invite in pending_invites if invite.group.lower() == jira_group.name.lower() ] sync_memberships(bb_group, jira_group, related_invites) pending_invites = revalidate_pending_invites(pending_invites) save_groups_for_manual_verification(bb_groups, jira_groups, pending_invites) if __name__ == "__main__": start = time() logger.info(f"Starting Sync") try: main() except KeyboardInterrupt: logger.info("Stopped execution early due to Keyboard Interrupt, Closing...") end = time() total_time = end - start logger.info( f"Sync Complete (total runtime = {total_time:.2f} seconds), check log for details, Closing..." )