Files
otc-metadata-rework/tools/create_preprod_branch.py
Sebastian Gode 12d16b7a2d added preprod branch script (#87)
Reviewed-on: infra/otc-metadata-rework#87
Reviewed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com>
Co-authored-by: Sebastian Gode <sebastian.gode@telekom.de>
Co-committed-by: Sebastian Gode <sebastian.gode@telekom.de>
2026-04-01 11:46:33 +00:00

238 lines
7.9 KiB
Python
Executable File

#!/usr/bin/python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import pathlib
import shutil
from git import exc
from git import Repo
import otc_metadata.services
data = otc_metadata.services.Services()
def process_repositories(args, service):
"""Create or update branch for a service repository"""
logging.debug(f"Processing service {service['service_title']}")
workdir = pathlib.Path(args.work_dir)
workdir.mkdir(exist_ok=True)
repo_dir = None
repo_url = None
target_repo = None
error_list = []
for repo in service["repositories"]:
if repo["cloud_environments"][0] != args.cloud_environment:
continue
logging.debug(f"Processing repository {repo}")
repo_dir_path = pathlib.Path(workdir, repo["type"], repo["repo"])
if repo["environment"] == args.target_environment:
repo_dir = repo_dir_path
else:
logging.debug(f"Skipping repository {repo}")
continue
repo_dir.mkdir(parents=True, exist_ok=True)
if repo["type"] == "gitea":
repo_url = (
f"ssh://git@gitea.eco.tsi-dev.otc-service.com:2222/"
f"{repo['repo']}"
)
elif repo["type"] == "github":
repo_url = f"git@github.com:/{repo['repo']}"
else:
logging.error(f"Repository type {repo['type']} is not supported")
error_list.append({
"error": f"Repository type {repo['type']} is not supported",
"repo": repo['repo']
})
continue
if repo_dir.exists():
logging.debug(f"Repository {repo} already checked out")
try:
git_repo = Repo(repo_dir)
git_repo.remotes.origin.fetch()
except exc.InvalidGitRepositoryError:
logging.warning(f"Existing repository checkout is invalid, removing: {repo_dir}")
try:
shutil.rmtree(repo_dir)
except Exception as e:
logging.error(f"Error removing invalid repository {repo_dir}: {e}")
error_list.append({
"error": f"Error removing invalid repository: {e}",
"repo": repo['repo']
})
continue
except Exception as e:
logging.error(f"Error fetching repository {repo_dir}: {e}")
error_list.append({
"error": e,
"repo": repo['repo']
})
continue
if not repo_dir.exists():
try:
git_repo = Repo.clone_from(repo_url, repo_dir, branch="main")
except Exception as e:
logging.error(f"Error cloning repository {repo_url}: {e}")
error_list.append({
"error": f"Error cloning repository {repo_url}: {e}",
"repo": repo['repo']
})
continue
if repo["environment"] == args.target_environment:
target_repo = repo
break
if not target_repo:
logging.info(
f"No repository for service {service['service_title']} "
f"for environment {args.target_environment} in cloud_environment {args.cloud_environment}"
)
return
branch_name = args.branch_name
# Check if branch exists remotely
remote_branch_exists = False
try:
for ref in git_repo.remotes.origin.refs:
if ref.name == branch_name:
remote_branch_exists = True
break
# Also check via fetch if refs are not up to date
if not remote_branch_exists:
git_repo.remotes.origin.fetch()
for ref in git_repo.remotes.origin.refs:
if ref.name == branch_name:
remote_branch_exists = True
break
except Exception as e:
logging.warning(f"Error checking for remote branch: {e}")
if remote_branch_exists:
# Branch exists, checkout and merge main into it
logging.debug(f"Branch {branch_name} exists, merging main into it")
try:
git_repo.heads[branch_name].checkout()
git_repo.git.merge("main", no_commit=True)
git_repo.index.commit(args.commit_description)
except Exception as e:
logging.error(f"Error merging main into branch {branch_name}: {e}")
error_list.append({
"error": f"Error merging main into branch {branch_name}: {e}",
"repo": target_repo['repo']
})
return
else:
# Branch does not exist, create it from main
logging.debug(f"Branch {branch_name} does not exist, creating it")
try:
git_repo.heads.main.checkout()
new_branch = git_repo.create_head(branch_name)
new_branch.checkout()
except Exception as e:
logging.error(f"Error creating branch {branch_name}: {e}")
error_list.append({
"error": f"Error creating branch {branch_name}: {e}",
"repo": target_repo['repo']
})
return
# Push the branch
push_args = ["--set-upstream", "origin", branch_name]
if args.force_push:
push_args.append("--force")
try:
git_repo.git.push(*push_args)
logging.info(f"Successfully pushed branch {branch_name} for {target_repo['repo']}")
except Exception as e:
logging.error(f"Error pushing branch {branch_name}: {e}")
error_list.append({
"error": f"Error pushing branch {branch_name}: {e}",
"repo": target_repo['repo']
})
if len(error_list) != 0:
logging.error("The following errors have happened:")
logging.error(error_list)
def main():
parser = argparse.ArgumentParser(
description="Create or update preprod branch for services."
)
parser.add_argument(
"--target-environment",
required=True,
choices=["internal", "public"],
help="Environment to be used as a source",
)
parser.add_argument("--service-type", help="Service to update")
parser.add_argument(
"--work-dir",
required=True,
help="Working directory to use for repository checkout.",
)
parser.add_argument(
"--branch-name",
default="preprod",
help="Branch name to be used for synchronizing.",
)
parser.add_argument(
"--force-push",
action="store_true",
help="Whether to force push the commit"
)
parser.add_argument(
"--commit-description",
default=(
"Sync with main\n\n"
"Performed-by: gitea/infra/otc-metadata/"
"tools/create_preprod_branch.py"
),
help="Commit description for the commit",
)
parser.add_argument(
"--cloud-environment",
required=True,
default="eu_de",
help="Cloud Environment. Default: eu_de",
)
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG)
services = []
if args.service_type:
services = [data.get_service_with_repo_by_service_type(service_type=args.service_type)]
else:
services = data.services_with_repos()
for service in services:
process_repositories(args, service)
if __name__ == "__main__":
main()