How would you debug this script without creating many posts?
How would you debug this script without creating many posts?
I won't know if this script works until I run it and see the errors but the comments won't start to generate until after all the posts so I can't debug that part until I've already created too much content.
python
import sqlite3 import requests from pythorhead import Lemmy import schedule import time import logging from config import * logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s", handlers=[logging.FileHandler("debug.log"), logging.StreamHandler()], ) def initialize_database(): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS posts ( github_url TEXT PRIMARY KEY, lemmy_post_id INTEGER, lemmy_post_name TEXT, lemmy_post_body TEXT ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS comments ( github_comment_id INTEGER PRIMARY KEY, lemmy_comment_id INTEGER, comment_user TEXT, comment_body TEXT ) """) conn.commit() return conn def initialize_lemmy_instance(): lemmy = Lemmy(LEMMY_INSTANCE_URL) lemmy.log_in(LEMMY_USERNAME, LEMMY_PASSWORD) logging.info("Initialized Lemmy instance") return lemmy def discover_community(lemmy, community_name): community_id = lemmy.discover_community(community_name) logging.info(f"Discovered community {community_name} with ID {community_id}") return community_id def fetch_github_issues(repo): url = f"{GITHUB_API_BASE}/repos/{repo}/issues" headers = {"Accept": "application/vnd.github+json"} response = requests.get(url, headers=headers) logging.info(f"Fetched issues from {url}") return response.json() def extract_issue_info(issue, repo): issue_url = issue["html_url"] issue_state = "[Closed]" if issue["state"] == "closed" else "" repo_abbr = "[BE]" if "lemmy" in repo else "[UI]" issue_title = f"{issue_state}{repo_abbr} {issue['title']} #{issue['number']}" issue_body = issue["body"] return issue_url, issue_title, issue_body def post_issues_to_lemmy(lemmy, community_id, repo): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() issues = fetch_github_issues(repo) for issue in issues: issue_url, issue_title, issue_body = extract_issue_info(issue, repo) cursor.execute("SELECT lemmy_post_id FROM posts WHERE github_url=?", (issue_url,)) existing_post = cursor.fetchone() if not existing_post: post = lemmy.post.create(community_id, issue_title, url=issue_url, body=issue_body)["post_view"]["post"] lemmy_post_id = post["id"] lemmy_post_name = post["name"] lemmy_post_body = post["body"] cursor.execute("INSERT INTO posts (github_url, lemmy_post_id, lemmy_post_name, lemmy_post_body) VALUES (?, ?, ?, ?)", (issue_url, lemmy_post_id, lemmy_post_name, lemmy_post_body)) conn.commit() logging.info(f"Posted issue {issue_title} to community {community_id}") def fetch_github_comments(repo, issue_number): url = f"{GITHUB_API_BASE}/repos/{repo}/issues/{issue_number}/comments" headers = {"Accept": "application/vnd.github+json"} response = requests.get(url, headers=headers) logging.info(f"Fetched comments for issue #{issue_number}") return response.json() def post_comments_to_lemmy(lemmy, post_id, repo, issue_number): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() github_comments = fetch_github_comments(repo, issue_number) for comment in github_comments: github_comment_id = comment["id"] cursor.execute("SELECT lemmy_comment_id FROM comments WHERE github_comment_id=?", (github_comment_id,)) existing_comment = cursor.fetchone() if not existing_comment: comment_user = comment["user"]["login"] comment_body = comment["body"] lemmy_comment_id = lemmy.comment.create(post_id, comment_body)["comment"]["id"] cursor.execute("INSERT INTO comments (github_comment_id, lemmy_comment_id, comment_user, comment_body) VALUES (?, ?, ?, ?)", (github_comment_id, lemmy_comment_id, comment_user, comment_body)) conn.commit() logging.info(f"Posted comment {github_comment_id} to lemmy post {post_id}") # Fetch the GitHub issue number and Lemmy post ID for each issue def fetch_issue_data(repo): conn = sqlite3.connect(DB_FILE) cursor = conn.cursor() cursor.execute("SELECT github_url, lemmy_post_id FROM posts WHERE github_url LIKE ?", (f"https://github.com/{repo}/issues/%",)) issue_data = cursor.fetchall() return issue_data def extract_issue_number(github_url): return int(github_url.split("/")[-1]) def main(): logging.info("Running main function") initialize_database() lemmy = initialize_lemmy_instance() community_id = discover_community(lemmy, LEMMY_COMMUNITY_NAME) for repo in REPOSITORIES: post_issues_to_lemmy(lemmy, community_id, repo) issue_data = fetch_issue_data(repo) for github_url, lemmy_post_id in issue_data: issue_number = extract_issue_number post_comments_to_lemmy(lemmy, lemmy_post_id, repo, issue_number) def run_periodically(): main() schedule.every(2).hours.do(main) while True: schedule.run_pending() time.sleep(60) if __name__ == "__main__": logging.info("Starting script") run_periodically()