462 lines
16 KiB
Python
Executable File
462 lines
16 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import argparse
|
|
import base64
|
|
import logging
|
|
import pathlib
|
|
import requests
|
|
import subprocess
|
|
|
|
from git import exc
|
|
from git import Repo
|
|
|
|
import otc_metadata.services
|
|
|
|
data = otc_metadata.services.Services()
|
|
|
|
api_session = requests.Session()
|
|
|
|
|
|
def extract_description(result):
|
|
"""Extract description from API response and clean it."""
|
|
if "choices" in result and len(result["choices"]) > 0:
|
|
message = result["choices"][0].get("message", {})
|
|
description = message.get("content", "")
|
|
elif "response" in result:
|
|
description = result["response"].strip()
|
|
elif isinstance(result, dict) and "text" in result:
|
|
description = result["text"].strip()
|
|
else:
|
|
return None
|
|
|
|
description = description.strip()
|
|
|
|
if not description or description.isspace():
|
|
return None
|
|
|
|
# Extract only the first sentence
|
|
parts = description.split(".")
|
|
first_sentence = parts[0].strip() + "."
|
|
if len(first_sentence) <= 1:
|
|
first_sentence = description[:160].strip() + "."
|
|
if len(first_sentence) > 160:
|
|
first_sentence = first_sentence[:157] + "..."
|
|
return first_sentence
|
|
|
|
|
|
def extract_keywords(result):
|
|
"""Extract keywords from API response and clean it."""
|
|
if "choices" in result and len(result["choices"]) > 0:
|
|
message = result["choices"][0].get("message", {})
|
|
keywords_text = message.get("content", "")
|
|
elif "response" in result:
|
|
keywords_text = result["response"].strip()
|
|
elif isinstance(result, dict) and "text" in result:
|
|
keywords_text = result["text"].strip()
|
|
else:
|
|
return None
|
|
|
|
keywords_text = keywords_text.strip()
|
|
|
|
if not keywords_text or keywords_text.isspace():
|
|
return None
|
|
|
|
keywords = [kw.strip() for kw in keywords_text.split(",")]
|
|
keywords = [kw for kw in keywords if kw and len(kw) > 0]
|
|
|
|
keywords = keywords[:5]
|
|
|
|
return ", ".join(keywords)
|
|
|
|
|
|
def generate_description_with_llm(text, service_title, llm_api_url, model_name, api_username, api_password):
|
|
"""Generate a meta description using the llama.cpp /completion endpoint with up to 3 retries."""
|
|
content_preview = text[:2000].replace("\n", " ")
|
|
prompt = (
|
|
f"Generate a meta description (40-160 chars) for: {service_title}."
|
|
f"This is a service from the cloud provider called 'T Cloud Public', do not mention other Cloud Providers or services from them."
|
|
f"Content preview: {content_preview}."
|
|
f"Output ONLY the description text, nothing else."
|
|
)
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
if api_username and api_password:
|
|
credentials = f"{api_username}:{api_password}"
|
|
encoded_credentials = base64.b64encode(credentials.encode()).decode()
|
|
headers["Authorization"] = f"Basic {encoded_credentials}"
|
|
|
|
for attempt in range(3):
|
|
try:
|
|
response = requests.post(
|
|
llm_api_url,
|
|
json={
|
|
"messages": [
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
"model": model_name,
|
|
"temperature": 0.5,
|
|
|
|
"top_k": 40,
|
|
"top_p": 0.9,
|
|
"min_p": 0.05,
|
|
|
|
"repeat_last_n": 256,
|
|
"repeat_penalty": 1.18,
|
|
"presence_penalty": 0.2,
|
|
"frequency_penalty": 0.2,
|
|
|
|
"dry_multiplier": 0.8,
|
|
"dry_base": 1.75,
|
|
"dry_allowed_length": 2,
|
|
"dry_penalty_last_n": -1,
|
|
"chat_template_kwargs": {"enable_thinking": False},
|
|
},
|
|
headers=headers,
|
|
timeout=15,
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
description = extract_description(result)
|
|
if description:
|
|
return description
|
|
logging.warning(f"Attempt {attempt + 1}: Empty or invalid response from LLM API.")
|
|
except requests.exceptions.RequestException as e:
|
|
logging.warning(f"Attempt {attempt + 1}: LLM API request failed: {e}. Retrying...")
|
|
except (KeyError, ValueError, IndexError) as e:
|
|
logging.warning(f"Attempt {attempt + 1}: LLM API response parsing failed: {e}. Retrying...")
|
|
|
|
# After all retries failed, use fallback - extract first headline
|
|
logging.warning("All LLM API retries failed. Using fallback description from first headline.")
|
|
lines = text.split("\n")
|
|
for i, line in enumerate(lines):
|
|
line_stripped = line.strip()
|
|
if line_stripped and not line_stripped.startswith("-") and not line_stripped.startswith("#"):
|
|
# Check if next line is a headline underline (=== or ---)
|
|
if i + 1 < len(lines):
|
|
next_line = lines[i + 1].strip()
|
|
if next_line and all(c in "=-" for c in next_line):
|
|
description = line_stripped
|
|
if len(description) > 160:
|
|
description = description[:157] + "..."
|
|
return description
|
|
|
|
return f"{service_title} documentation"
|
|
|
|
|
|
def generate_keywords_with_llm(text, service_title, llm_api_url, model_name, api_username, api_password):
|
|
"""Generate keywords using the llama.cpp /completion endpoint with up to 3 retries."""
|
|
content_preview = text[:2000].replace("\n", " ")
|
|
prompt = (
|
|
f"Generate up to 5 keywords (comma-separated) for: {service_title}. "
|
|
f"This is a service from the cloud provider called 'T Cloud Public', do not mention other Cloud Providers or services from them."
|
|
f"Content preview: {content_preview}. "
|
|
f"Output ONLY comma-separated keywords, nothing else."
|
|
)
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
if api_username and api_password:
|
|
credentials = f"{api_username}:{api_password}"
|
|
encoded_credentials = base64.b64encode(credentials.encode()).decode()
|
|
headers["Authorization"] = f"Basic {encoded_credentials}"
|
|
|
|
for attempt in range(3):
|
|
try:
|
|
response = requests.post(
|
|
llm_api_url,
|
|
json={
|
|
"messages": [
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
"model": model_name,
|
|
"temperature": 0.7,
|
|
|
|
"top_k": 40,
|
|
"top_p": 0.9,
|
|
"min_p": 0.05,
|
|
|
|
"repeat_last_n": 256,
|
|
"repeat_penalty": 1.18,
|
|
"presence_penalty": 0.2,
|
|
"frequency_penalty": 0.2,
|
|
|
|
"dry_multiplier": 0.8,
|
|
"dry_base": 1.75,
|
|
"dry_allowed_length": 2,
|
|
"dry_penalty_last_n": -1,
|
|
"chat_template_kwargs": {"enable_thinking": False},
|
|
},
|
|
headers=headers,
|
|
timeout=15,
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
keywords = extract_keywords(result)
|
|
if keywords:
|
|
return keywords
|
|
logging.warning(f"Attempt {attempt + 1}: Empty or invalid response from LLM API for keywords.")
|
|
except requests.exceptions.RequestException as e:
|
|
logging.warning(f"Attempt {attempt + 1}: LLM API request failed: {e}. Retrying...")
|
|
except (KeyError, ValueError, IndexError) as e:
|
|
logging.warning(f"Attempt {attempt + 1}: LLM API response parsing failed: {e}. Retrying...")
|
|
|
|
logging.warning("All LLM API retries failed for keywords. Using fallback.")
|
|
return f"{service_title.replace('-', ' ').title()}"
|
|
|
|
|
|
def read_rst_content(file_path):
|
|
"""Read and return the content of an RST file."""
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
|
|
|
|
def add_sphinx_metadata(file_path, meta_description, meta_keywords=None):
|
|
"""Add Sphinx-compatible meta block at the end of an RST file."""
|
|
content = read_rst_content(file_path)
|
|
|
|
meta_block = "\n\n.. meta::\n"
|
|
if meta_description:
|
|
meta_block += " :description: {}\n".format(meta_description)
|
|
if meta_keywords:
|
|
meta_block += " :keywords: {}\n".format(meta_keywords)
|
|
|
|
# Check if meta block already exists
|
|
if ".. meta::" in content:
|
|
logging.debug(f"Meta block already exists in {file_path}. Skipping.")
|
|
return False
|
|
|
|
# Append meta block at the end of the file
|
|
new_content = content.rstrip() + meta_block
|
|
|
|
with open(file_path, "w", encoding="utf-8", newline="") as f:
|
|
f.write(new_content)
|
|
|
|
return True
|
|
|
|
|
|
def process_service(args, service):
|
|
"""Process a single service and add metadata to its RST files."""
|
|
logging.debug(f"Processing service {service['service_title']}")
|
|
workdir = pathlib.Path(args.work_dir)
|
|
workdir.mkdir(exist_ok=True)
|
|
|
|
repo_url = None
|
|
repo_dir = None
|
|
git_repo = None
|
|
error_list = []
|
|
|
|
repo = None
|
|
for r in service["repositories"]:
|
|
if r["cloud_environments"][0] == args.cloud_environment:
|
|
repo_dir = workdir / r["type"] / r["repo"]
|
|
|
|
if r["environment"] == args.target_environment:
|
|
repo = r
|
|
break
|
|
else:
|
|
logging.debug(f"Skipping repository {r}")
|
|
continue
|
|
|
|
if not repo_dir:
|
|
logging.info(f"No repository found for service {service['service_title']}")
|
|
return
|
|
|
|
if repo_dir.exists():
|
|
logging.debug(f"Repository {repo_dir} already exists")
|
|
try:
|
|
git_repo = Repo(repo_dir)
|
|
git_repo.remotes.origin.fetch()
|
|
git_repo.heads.main.checkout()
|
|
git_repo.remotes.origin.pull()
|
|
except exc.InvalidGitRepositoryError:
|
|
logging.error("Existing repository checkout is bad")
|
|
import shutil
|
|
shutil.rmtree(repo_dir)
|
|
git_repo = None
|
|
except Exception as e:
|
|
error_list.append({"error": e, "repo": repo["repo"]})
|
|
|
|
if not repo_dir.exists() or git_repo is None:
|
|
if repo["type"] == "gitea":
|
|
repo_url = (
|
|
f"ssh://git@gitea.eco.tsi-dev.otc-service.com:2222/"
|
|
f"{repo['repo']}"
|
|
)
|
|
elif repo["type"] == "github":
|
|
repo_url = f"git@github.com:{repo['repo']}"
|
|
else:
|
|
logging.error(f"Repository type {repo['type']} is not supported")
|
|
error_list.append({"error": f"Repository type {repo['type']} is not supported", "repo": repo["repo"]})
|
|
return
|
|
|
|
try:
|
|
logging.debug(f"Cloning repository {repo_url}")
|
|
git_repo = Repo.clone_from(repo_url, repo_dir, branch="main")
|
|
except Exception as e:
|
|
logging.error(f"Error cloning repository {repo_url}: {e}")
|
|
error_list.append({"error": f"Error cloning repository {repo_url}", "repo": repo["repo"]})
|
|
return
|
|
|
|
branch_name = f"add-meta-{args.branch_name}"
|
|
|
|
try:
|
|
new_branch = git_repo.create_head(branch_name, "main")
|
|
except Exception as e:
|
|
logging.warning(f"Skipping service {service} due to {e}")
|
|
error_list.append({"error": e, "repo": repo["repo"]})
|
|
return
|
|
|
|
new_branch.checkout()
|
|
|
|
rst_files = (list(repo_dir.rglob("doc/**/*.rst"))
|
|
+ list(repo_dir.rglob("umn/**/*.rst"))
|
|
+ list(repo_dir.rglob("api-ref/**/*.rst")))
|
|
|
|
processed_count = 0
|
|
updated_count = 0
|
|
|
|
for rst_file in rst_files:
|
|
|
|
logging.debug(f"Analyzing document {rst_file}")
|
|
|
|
try:
|
|
content = read_rst_content(rst_file)
|
|
description = generate_description_with_llm(
|
|
content,
|
|
service["service_title"],
|
|
args.llm_api_url,
|
|
args.llm_model,
|
|
args.llm_username,
|
|
args.llm_password
|
|
)
|
|
keywords = generate_keywords_with_llm(
|
|
content,
|
|
service["service_title"],
|
|
args.llm_api_url,
|
|
args.llm_model,
|
|
args.llm_username,
|
|
args.llm_password
|
|
)
|
|
|
|
if add_sphinx_metadata(rst_file, description, keywords):
|
|
updated_count += 1
|
|
logging.info(f"Added meta description and keywords to {rst_file}")
|
|
else:
|
|
processed_count += 1
|
|
|
|
git_repo.index.add([str(rst_file)])
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing {rst_file}: {e}")
|
|
error_list.append({"error": e, "repo": str(rst_file)})
|
|
|
|
if len(git_repo.index.diff("HEAD")) == 0:
|
|
logging.debug("No changes required for service %s", service["service_type"])
|
|
return
|
|
|
|
git_repo.index.commit(args.commit_description)
|
|
|
|
try:
|
|
git_repo.git.push("--set-upstream", "origin", branch_name)
|
|
logging.info(f"Pushed changes for service {service['service_title']}")
|
|
except Exception as e:
|
|
error_list.append({"error": e, "repo": repo["repo"]})
|
|
|
|
if repo_url and "github" in repo_url:
|
|
subprocess.run(
|
|
args=["gh", "pr", "create", "-f"], cwd=repo_dir, check=False
|
|
)
|
|
elif repo_url and "gitea" in repo_url and args.token:
|
|
pass
|
|
|
|
if len(error_list) != 0:
|
|
logging.error("The following errors have happened:")
|
|
logging.error(error_list)
|
|
|
|
logging.info(f"Processed {processed_count} files, updated {updated_count} files")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Add Sphinx meta blocks to RST files using LLM-generated descriptions."
|
|
)
|
|
parser.add_argument(
|
|
"--target-environment",
|
|
required=True,
|
|
choices=["internal", "public"],
|
|
help="Environment to be used as a source",
|
|
)
|
|
parser.add_argument("--service-type", help="Service to update")
|
|
parser.add_argument(
|
|
"--work-dir",
|
|
required=True,
|
|
help="Working directory to use for repository checkout.",
|
|
)
|
|
parser.add_argument(
|
|
"--branch-name",
|
|
default="meta-generation",
|
|
help="Branch name to be used for changes.",
|
|
)
|
|
parser.add_argument("--token", metavar="token", help="API token")
|
|
parser.add_argument(
|
|
"--llm-api-url",
|
|
default="http://localhost:8080/v1/chat/completions",
|
|
help="URL of the LLM API server. Default: http://localhost:8080/v1/chat/completions",
|
|
)
|
|
parser.add_argument(
|
|
"--llm-model",
|
|
default="llama2",
|
|
help="LLM model name to use. Default: llama2",
|
|
)
|
|
parser.add_argument(
|
|
"--llm-username",
|
|
help="Username for Basic Authentication with LLM server",
|
|
)
|
|
parser.add_argument(
|
|
"--llm-password",
|
|
help="Password for Basic Authentication with LLM server",
|
|
)
|
|
parser.add_argument(
|
|
"--commit-description",
|
|
default=(
|
|
"Add Sphinx meta blocks to RST files\n\n"
|
|
"Generated by otc-metadata-rework/tools/generate_meta.py"
|
|
),
|
|
help="Commit description for the commit",
|
|
)
|
|
parser.add_argument(
|
|
"--cloud-environment",
|
|
required=True,
|
|
default="eu_de",
|
|
help="Cloud Environment. Default: eu_de",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
services = []
|
|
if args.service_type:
|
|
services = [data.get_service_with_repo_by_service_type(service_type=args.service_type)]
|
|
else:
|
|
services = data.services_with_repos()
|
|
|
|
for service in services:
|
|
process_service(args, service)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|