From 82247f41acf3bd7cc7bdd61efe22294ab6a3ddca Mon Sep 17 00:00:00 2001 From: Mirek Kratochvil Date: Sun, 12 Oct 2025 21:59:25 +0200 Subject: [PATCH] getting patches seems to work --- patchodon.py | 208 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 153 insertions(+), 55 deletions(-) diff --git a/patchodon.py b/patchodon.py index 734d7e5..0211dcd 100755 --- a/patchodon.py +++ b/patchodon.py @@ -1,10 +1,14 @@ #!/usr/bin/env python3 -import sys, os import argparse -import requests import hashlib +import html2text +import os +import re +import requests +import sys import time +from pathlib import Path # NOTES: html2text: html2text # the replies are listed by context, should be link-listed to avoid issues, @@ -17,6 +21,8 @@ DPASTE_URL = "https://dpaste.com" # TODO any good way to parametrize this? STATUS_LENGTH_LIMIT = 400 # TODO obtain from instance +html2text.config.IGNORE_ANCHORS = True + def trace(x): sys.stderr.write(sys.argv[0] + ": " + x + "\n") @@ -27,12 +33,12 @@ def api_token(args): return args.debug_api_token if args.env_api_token: return os.environ["PATCHODON_API_TOKEN"] - raise ("API token not specified") + raise "API token not specified" def auth_headers(args): if not args.instance_url: - raise ("mastodon instance not specified") + raise "mastodon instance not specified" token = api_token(args) @@ -41,9 +47,7 @@ def auth_headers(args): def do_post_status(args, body, parent=None, optional=None): if len(body) > STATUS_LENGTH_LIMIT: - raise ("required status body too long") - - headers = auth_headers(args) + raise "required status body too long" st = body + ( "\n" + optional[0 : (STATUS_LENGTH_LIMIT - len(body) - 1)] @@ -55,11 +59,13 @@ def do_post_status(args, body, parent=None, optional=None): data["in_reply_to_id"] = parent r = requests.post( - args.instance_url + "/api/v1/statuses", data=data, headers=headers + args.instance_url + "/api/v1/statuses", + data=data, + headers=auth_headers(args), ) if r.status_code != 200: - raise ("mastodon status posting failed ({r.status_code})") + raise "mastodon status posting failed ({r.status_code})" rj = r.json() return (rj["id"], rj["url"]) @@ -73,7 +79,7 @@ def do_pastebin_file(file): r = requests.post( DPASTE_URL + "/api/v2/", data={ - "content": open(file, "r").read(), + "content": Path(file).read_text(), "syntax": "diff", "title": os.path.basename(file), "expiry_days": 1, # TODO remove after testing @@ -82,7 +88,7 @@ def do_pastebin_file(file): ) time.sleep(1.1) if r.status_code != 201: - raise (f"dpaste POST failed for `{file}'") + raise f"dpaste POST failed for `{file}'" return r.headers["location"] + ".txt" @@ -108,7 +114,7 @@ def do_post(args): files = mapl(lambda x: x.rstrip(chars="\n"), sys.stdin.readlines()) n_patches = len(files) hashes = mapl( - lambda x: hashlib.sha1(open(x, "r").read().encode()).hexdigest(), files + lambda x: hashlib.sha1(Path(x).read_text().encode()).hexdigest(), files ) short_hashes = mapl(lambda x: x[0:8], hashes) full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest() @@ -117,7 +123,7 @@ def do_post(args): parent_post_id, url = do_post_status( args, f"{mayline(args.recipient)}{mayline(args.subject)}" - f"[patchodon hash {full_hash} / {' '.join(short_hashes)}]", + f"[patchodon: {full_hash} / {' '.join(short_hashes)}]", ) for fn, pst, hsh, series in zip( files, paste_raw_urls, hashes, range(n_patches) @@ -129,40 +135,141 @@ def do_post(args): f"[patchodon {series+1}/{n_patches} {hsh}]\n" f"{pst}\n", parent=parent_post_id, - optional=split_off_diff(open(fn, "r").read()), + optional=split_off_diff(Path(fn).read_text()), ) print(url) def find_head_post(args): - headers = auth_headers(args) r = requests.get( - args.instance_id + "/api/v2/search", + args.instance_url + "/api/v2/search", + headers=auth_headers(args), params={"resolve": "true", "limit": "10", "q": args.patch_url}, ) - print(r.__dict__) if r.status_code != 200: - raise ("status URL search failed!") + raise "status URL search failed!" - sts = filter(lambda x: x["url"] == args.patch_url, r.json()["statuses"]) + sts = list( + filter(lambda x: x["url"] == args.patch_url, r.json()["statuses"]) + ) if len(sts) < 1: - raise ("status URL not found") + raise "status URL not found" if len(sts) > 1: - raise ("ambiguous status URL?") + raise "ambiguous status URL?" st = sts[0] return (st["id"], st["account"]["id"], st["content"]) +def get_descendant_statuses(args, parent): + r = requests.get( + args.instance_url + f"/api/v1/statuses/{parent}/context", + headers=auth_headers(args), + ) + if r.status_code != 200: + raise f"retrieval of context failed for {parent}" + rj = r.json() + return rj["descendants"] if "descendants" in rj else [] + + +re_head = re.compile( + r"^\[patchodon: ([0-9a-f]{40}) /(( +[0-9a-f]{8})+)\]$", re.MULTILINE +) + +re_patch = re.compile( + r"^\[patchodon ([0-9]+)/([0-9]+) ([0-9a-f]{40})\]" + r" *\n(https://dpaste.com/[a-zA-Z0-9]+\.txt)$", + re.MULTILINE, +) + + +def parse_matching_status(st, parent, account, n, total_n, short_hash): + if st["in_reply_to_id"] != parent: + trace(f"wrong reply in status {st['id']}") + return None + if st["account"]["id"] != account: + trace(f"bad account in status {st['id']}") + return None + st_content = html2text.html2text(st["content"]) + match = re_patch.search(st_content) + if not match: + return None + gs = match.groups() + if gs[0] != str(n) or gs[1] != str(total_n): + trace(f"patch mis-ordered in status {st['id']}") + return None + long_hash = gs[2] + if long_hash[0:8] != short_hash: + trace(f"patch hash mismatch in status {st['id']}") + return None + url = gs[3] + r = requests.get(url) + if r.status_code != 200: + trace(f"could not get patch from status {st['id']} via {url}") + return None + if long_hash != hashlib.sha1(r.text.encode()).hexdigest(): + trace(f"patch hash differs from file in status {st['id']}") + return None + return (st["id"], r.text) + + def do_get(args): - st_id, st_acct_id, st_content = find_head_post(args) + st_id, st_acct_id, st_content_html = find_head_post(args) + st_content = html2text.html2text(st_content_html) # parse out the hash and subhashes - # get context, all replies from the same author as the original status ID, subhashes must match - # repeat for all subhashes + match = re_head.search(st_content) + if not match: + raise "no patchodon header found" + full_hash = match.groups()[0] + short_hashes = list( + filter(lambda x: len(x) > 0, match.groups()[1].split(" ")) + ) + patches = [None for _ in short_hashes] + n_patches = len(patches) + assert n_patches > 0 + parent = st_id + for i, short_hash in enumerate(short_hashes): + trace(f"getting patch {i+1} ({short_hash})...") + # get context, all replies from the same author as the original status ID, subhashes must match + sts = get_descendant_statuses(args, parent) + ok_sts = list( + filter( + lambda x: x != None, + map( + lambda x: parse_matching_status( + x, parent, st_acct_id, i + 1, n_patches, short_hash + ), + sts, + ), + ) + ) + if len(ok_sts) == 0: + raise f"no suitable patches found for {i+1} ({short_hash})" + if len(ok_sts) > 1: + raise f"ambiguous statuses for patch {i+1} ({short_hash})" + ok_st_id, ok_st_patch = ok_sts[0] + parent = ok_st_id + patches[i] = ok_st_patch + # verify the full hash - # pass as one blob to git-am OR throw to a directory + hashes = list(map(lambda x: hashlib.sha1(x.encode()).hexdigest(), patches)) + computed_full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest() + if computed_full_hash != full_hash: + raise "hash checksums do not match!" + + # print out stuff + if args.out_prefix: + for i, patch in enumerate(patches): + path = args.out_prefix + f"{i+1:04d}.patch" + if not args.overwrite and os.path.exists(path): + raise f"refusing to overwrite {path}" + Path(path).write_text(patch) + else: + for patch in patches: + sys.stdout.write(patch) + sys.stdout.write("\n") # be nice def main(): @@ -172,10 +279,8 @@ def main(): description="Publicly send and receive git patch series via Mastodon.", ) - cmds = ap.add_subparsers(required=True, dest="command") - if "API token sources": - group = cmds.add_mutually_exclusive_group() + group = ap.add_mutually_exclusive_group() group.add_argument( "--debug-api-token", help=( @@ -190,7 +295,7 @@ def main(): help="get the API token from environment PATCHODON_API_TOKEN", ) - cmds.add_argument( + ap.add_argument( "-i", "--instance-url", help=( @@ -198,6 +303,8 @@ def main(): ), ) + cmds = ap.add_subparsers(required=True, dest="command") + if "POST command": post = cmds.add_parser("post") post.add_argument( @@ -237,32 +344,21 @@ def main(): " should contain the patch hash)" ), ) - if "output possibilities": - group = get.add_mutually_exclusive_group() - group.add_argument( - "-a", - "--run-git-am", - action="store_true", - help=( - "apply the patches immediately with git-am instead of" - " storing them in a directory" - ), - ) - group.add_argument( - "-C", - "--out-prefix", - default="./patchodon-", - help=( - "write the numbered patchfiles to files with a given prefix" - " (the default `./patchodon-' will produce files like" - " `./patchodon-0001.patch')" - ), - ) - get.add_argument( - "--overwrite", - action="store_true", - help="overwrite existing patch files instead of failing", - ) + get.add_argument( + "-C", + "--out-prefix", + help=( + "instead of writing to stdout (for piping to git-am), write" + " the numbered patchfiles to files with a given prefix" + " (specifying `./patchodon-' will produce files like" + " `./patchodon-0001.patch')" + ), + ) + get.add_argument( + "--overwrite", + action="store_true", + help="overwrite existing patch files instead of failing", + ) ap.add_argument( "-c", @@ -277,6 +373,8 @@ def main(): ) args = ap.parse_args() + # TODO patch from config if found + if args.command == "post": do_post(args) elif args.command == "get":