patchodon/patchodon.py
2025-10-12 22:01:29 +02:00

388 lines
11 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import hashlib
import html2text
import os
import re
import requests
import sys
import time
from pathlib import Path
# NOTES: html2text: html2text
# the replies are listed by context, should be link-listed to avoid issues,
# should specify next hash to provide some kind of a filter
# visibility public+unlisted, all unlisted, all private, all direct
VERSION = "0.1.0"
DPASTE_URL = "https://dpaste.com" # TODO any good way to parametrize this?
STATUS_LENGTH_LIMIT = 400 # TODO obtain from instance
html2text.config.IGNORE_ANCHORS = True
def trace(x):
sys.stderr.write(sys.argv[0] + ": " + x + "\n")
def api_token(args):
if args.debug_api_token:
return args.debug_api_token
if args.env_api_token:
return os.environ["PATCHODON_API_TOKEN"]
raise "API token not specified"
def auth_headers(args):
if not args.instance_url:
raise "mastodon instance not specified"
token = api_token(args)
return {"Authorization": f"Bearer {token}"}
def do_post_status(args, body, parent=None, optional=None):
if len(body) > STATUS_LENGTH_LIMIT:
raise "required status body too long"
st = body + (
"\n" + optional[0 : (STATUS_LENGTH_LIMIT - len(body) - 1)]
if optional
else ""
)
data = {"status": st, "visibility": "direct"} # TODO parametrize direct
if parent:
data["in_reply_to_id"] = parent
r = requests.post(
args.instance_url + "/api/v1/statuses",
data=data,
headers=auth_headers(args),
)
if r.status_code != 200:
raise "mastodon status posting failed ({r.status_code})"
rj = r.json()
return (rj["id"], rj["url"])
def do_pastebin_file(file):
# DPASTE API USE RULES:
# - user-agent must be set properly
# - 1 second between requests
trace(f"sending `{file}' to dpaste...")
r = requests.post(
DPASTE_URL + "/api/v2/",
data={
"content": Path(file).read_text(),
"syntax": "diff",
"title": os.path.basename(file),
"expiry_days": 1, # TODO remove after testing
},
headers={"User-agent": f"patchodon v{VERSION}"},
)
time.sleep(1.1)
if r.status_code != 201:
raise f"dpaste POST failed for `{file}'"
return r.headers["location"] + ".txt"
def split_off_diff(s):
return s.split("\ndiff --git ")[0]
def mapl(f, xs):
return list(map(f, xs))
def mayline(s):
if s:
return s + "\n"
else:
return ""
def do_post(args):
files = args.patchfile
if not files:
trace("reading patchfile series from stdin")
files = mapl(lambda x: x.rstrip(chars="\n"), sys.stdin.readlines())
n_patches = len(files)
hashes = mapl(
lambda x: hashlib.sha1(Path(x).read_text().encode()).hexdigest(), files
)
short_hashes = mapl(lambda x: x[0:8], hashes)
full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest()
paste_raw_urls = mapl(do_pastebin_file, files)
trace("posting the header...")
parent_post_id, url = do_post_status(
args,
f"{mayline(args.recipient)}{mayline(args.subject)}"
f"[patchodon: {full_hash} / {' '.join(short_hashes)}]",
)
for fn, pst, hsh, series in zip(
files, paste_raw_urls, hashes, range(n_patches)
):
trace(f"posting patch {series+1}/{n_patches}...")
parent_post_id, _ = do_post_status(
args,
f"{mayline(args.recipient)}"
f"[patchodon {series+1}/{n_patches} {hsh}]\n"
f"{pst}\n",
parent=parent_post_id,
optional=split_off_diff(Path(fn).read_text()),
)
print(url)
def find_head_post(args):
r = requests.get(
args.instance_url + "/api/v2/search",
headers=auth_headers(args),
params={"resolve": "true", "limit": "10", "q": args.patch_url},
)
if r.status_code != 200:
raise "status URL search failed!"
sts = list(
filter(lambda x: x["url"] == args.patch_url, r.json()["statuses"])
)
if len(sts) < 1:
raise "status URL not found"
if len(sts) > 1:
raise "ambiguous status URL?"
st = sts[0]
return (st["id"], st["account"]["id"], st["content"])
def get_descendant_statuses(args, parent):
r = requests.get(
args.instance_url + f"/api/v1/statuses/{parent}/context",
headers=auth_headers(args),
)
if r.status_code != 200:
raise f"retrieval of context failed for {parent}"
rj = r.json()
return rj["descendants"] if "descendants" in rj else []
re_head = re.compile(
r"^\[patchodon: ([0-9a-f]{40}) /(( +[0-9a-f]{8})+)\]$", re.MULTILINE
)
re_patch = re.compile(
r"^\[patchodon ([0-9]+)/([0-9]+) ([0-9a-f]{40})\]"
r" *\n(https://dpaste.com/[a-zA-Z0-9]+\.txt)$",
re.MULTILINE,
)
def parse_matching_status(st, parent, account, n, total_n, short_hash):
if st["in_reply_to_id"] != parent:
trace(f"wrong reply in status {st['id']}")
return None
if st["account"]["id"] != account:
trace(f"bad account in status {st['id']}")
return None
st_content = html2text.html2text(st["content"])
match = re_patch.search(st_content)
if not match:
return None
gs = match.groups()
if gs[0] != str(n) or gs[1] != str(total_n):
trace(f"patch mis-ordered in status {st['id']}")
return None
long_hash = gs[2]
if long_hash[0:8] != short_hash:
trace(f"patch hash mismatch in status {st['id']}")
return None
url = gs[3]
r = requests.get(url)
if r.status_code != 200:
trace(f"could not get patch from status {st['id']} via {url}")
return None
if long_hash != hashlib.sha1(r.text.encode()).hexdigest():
trace(f"patch hash differs from file in status {st['id']}")
return None
return (st["id"], r.text)
def do_get(args):
st_id, st_acct_id, st_content_html = find_head_post(args)
st_content = html2text.html2text(st_content_html)
# parse out the hash and subhashes
match = re_head.search(st_content)
if not match:
raise "no patchodon header found"
full_hash = match.groups()[0]
short_hashes = list(
filter(lambda x: len(x) > 0, match.groups()[1].split(" "))
)
patches = [None for _ in short_hashes]
n_patches = len(patches)
assert n_patches > 0
parent = st_id
for i, short_hash in enumerate(short_hashes):
trace(f"getting patch {i+1} ({short_hash})...")
# get context, all replies from the same author as the original status ID, subhashes must match
sts = get_descendant_statuses(args, parent)
ok_sts = list(
filter(
lambda x: x != None,
map(
lambda x: parse_matching_status(
x, parent, st_acct_id, i + 1, n_patches, short_hash
),
sts,
),
)
)
if len(ok_sts) == 0:
raise f"no suitable patches found for {i+1} ({short_hash})"
if len(ok_sts) > 1:
raise f"ambiguous statuses for patch {i+1} ({short_hash})"
ok_st_id, ok_st_patch = ok_sts[0]
parent = ok_st_id
patches[i] = ok_st_patch
# verify the full hash
hashes = list(map(lambda x: hashlib.sha1(x.encode()).hexdigest(), patches))
computed_full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest()
if computed_full_hash != full_hash:
raise "hash checksums do not match!"
# print out stuff
if args.out_prefix:
for i, patch in enumerate(patches):
path = args.out_prefix + f"{i+1:04d}.patch"
if not args.overwrite and os.path.exists(path):
raise f"refusing to overwrite {path}"
Path(path).write_text(patch)
else:
for patch in patches:
sys.stdout.write(patch)
sys.stdout.write("\n") # be nice
def main():
ap = argparse.ArgumentParser(
prog=sys.argv[0],
epilog="patchodon.py version " + VERSION + " is a free software.",
description="Publicly send and receive git patch series via Mastodon.",
)
if "API token sources":
group = ap.add_mutually_exclusive_group()
group.add_argument(
"--debug-api-token",
help=(
"specify the API token on command line (not very secure,"
" good for debugging only)"
),
)
group.add_argument(
"-e",
"--env-api-token",
action="store_true",
help="get the API token from environment PATCHODON_API_TOKEN",
)
ap.add_argument(
"-i",
"--instance-url",
help=(
"mastodon instance URL to use, such as `https://mastodon.example/'"
),
)
cmds = ap.add_subparsers(required=True, dest="command")
if "POST command":
post = cmds.add_parser("post")
post.add_argument(
"-r",
"--recipient",
default=None,
help=(
"user tag to prepend to all posted statuses (required esp. for"
" direct sending of statuses)"
),
)
post.add_argument(
"-s",
"--subject",
default=None,
help=(
"opening text of the initial post, ideally used to specify the"
" target project and patch topic"
),
)
post.add_argument(
"patchfile",
nargs="*",
help=(
"filenames of the patch series; taken from stdin if none are"
" specified (useful for piping the output of git-format-patch"
" into patchodon)"
),
)
if "GET command":
get = cmds.add_parser("get")
get.add_argument(
"patch_url",
help=(
"root URL of the status where the patch was posted (the status"
" should contain the patch hash)"
),
)
get.add_argument(
"-C",
"--out-prefix",
help=(
"instead of writing to stdout (for piping to git-am), write"
" the numbered patchfiles to files with a given prefix"
" (specifying `./patchodon-' will produce files like"
" `./patchodon-0001.patch')"
),
)
get.add_argument(
"--overwrite",
action="store_true",
help="overwrite existing patch files instead of failing",
)
ap.add_argument(
"-c",
"--config",
default=os.environ["HOME"] + "/.patchodon.ini",
help=(
"specify a custom config INI file that may specify a section"
" [patchodon] with keys instance_url and api_token; defaults to"
" `$HOME/.patchodon.ini', specify `/dev/null' to avoid config"
" loading"
),
)
args = ap.parse_args()
# TODO patch args from config (if found)
if args.command == "post":
do_post(args)
elif args.command == "get":
do_get(args)
else:
raise ("fatal: args borked")
if __name__ == "__main__":
main()