getting patches seems to work

This commit is contained in:
Mirek Kratochvil 2025-10-12 21:59:25 +02:00
parent a7309aa1c1
commit 82247f41ac

View file

@ -1,10 +1,14 @@
#!/usr/bin/env python3
import sys, os
import argparse
import requests
import hashlib
import html2text
import os
import re
import requests
import sys
import time
from pathlib import Path
# NOTES: html2text: html2text
# the replies are listed by context, should be link-listed to avoid issues,
@ -17,6 +21,8 @@ DPASTE_URL = "https://dpaste.com" # TODO any good way to parametrize this?
STATUS_LENGTH_LIMIT = 400 # TODO obtain from instance
html2text.config.IGNORE_ANCHORS = True
def trace(x):
sys.stderr.write(sys.argv[0] + ": " + x + "\n")
@ -27,12 +33,12 @@ def api_token(args):
return args.debug_api_token
if args.env_api_token:
return os.environ["PATCHODON_API_TOKEN"]
raise ("API token not specified")
raise "API token not specified"
def auth_headers(args):
if not args.instance_url:
raise ("mastodon instance not specified")
raise "mastodon instance not specified"
token = api_token(args)
@ -41,9 +47,7 @@ def auth_headers(args):
def do_post_status(args, body, parent=None, optional=None):
if len(body) > STATUS_LENGTH_LIMIT:
raise ("required status body too long")
headers = auth_headers(args)
raise "required status body too long"
st = body + (
"\n" + optional[0 : (STATUS_LENGTH_LIMIT - len(body) - 1)]
@ -55,11 +59,13 @@ def do_post_status(args, body, parent=None, optional=None):
data["in_reply_to_id"] = parent
r = requests.post(
args.instance_url + "/api/v1/statuses", data=data, headers=headers
args.instance_url + "/api/v1/statuses",
data=data,
headers=auth_headers(args),
)
if r.status_code != 200:
raise ("mastodon status posting failed ({r.status_code})")
raise "mastodon status posting failed ({r.status_code})"
rj = r.json()
return (rj["id"], rj["url"])
@ -73,7 +79,7 @@ def do_pastebin_file(file):
r = requests.post(
DPASTE_URL + "/api/v2/",
data={
"content": open(file, "r").read(),
"content": Path(file).read_text(),
"syntax": "diff",
"title": os.path.basename(file),
"expiry_days": 1, # TODO remove after testing
@ -82,7 +88,7 @@ def do_pastebin_file(file):
)
time.sleep(1.1)
if r.status_code != 201:
raise (f"dpaste POST failed for `{file}'")
raise f"dpaste POST failed for `{file}'"
return r.headers["location"] + ".txt"
@ -108,7 +114,7 @@ def do_post(args):
files = mapl(lambda x: x.rstrip(chars="\n"), sys.stdin.readlines())
n_patches = len(files)
hashes = mapl(
lambda x: hashlib.sha1(open(x, "r").read().encode()).hexdigest(), files
lambda x: hashlib.sha1(Path(x).read_text().encode()).hexdigest(), files
)
short_hashes = mapl(lambda x: x[0:8], hashes)
full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest()
@ -117,7 +123,7 @@ def do_post(args):
parent_post_id, url = do_post_status(
args,
f"{mayline(args.recipient)}{mayline(args.subject)}"
f"[patchodon hash {full_hash} / {' '.join(short_hashes)}]",
f"[patchodon: {full_hash} / {' '.join(short_hashes)}]",
)
for fn, pst, hsh, series in zip(
files, paste_raw_urls, hashes, range(n_patches)
@ -129,40 +135,141 @@ def do_post(args):
f"[patchodon {series+1}/{n_patches} {hsh}]\n"
f"{pst}\n",
parent=parent_post_id,
optional=split_off_diff(open(fn, "r").read()),
optional=split_off_diff(Path(fn).read_text()),
)
print(url)
def find_head_post(args):
headers = auth_headers(args)
r = requests.get(
args.instance_id + "/api/v2/search",
args.instance_url + "/api/v2/search",
headers=auth_headers(args),
params={"resolve": "true", "limit": "10", "q": args.patch_url},
)
print(r.__dict__)
if r.status_code != 200:
raise ("status URL search failed!")
raise "status URL search failed!"
sts = filter(lambda x: x["url"] == args.patch_url, r.json()["statuses"])
sts = list(
filter(lambda x: x["url"] == args.patch_url, r.json()["statuses"])
)
if len(sts) < 1:
raise ("status URL not found")
raise "status URL not found"
if len(sts) > 1:
raise ("ambiguous status URL?")
raise "ambiguous status URL?"
st = sts[0]
return (st["id"], st["account"]["id"], st["content"])
def get_descendant_statuses(args, parent):
r = requests.get(
args.instance_url + f"/api/v1/statuses/{parent}/context",
headers=auth_headers(args),
)
if r.status_code != 200:
raise f"retrieval of context failed for {parent}"
rj = r.json()
return rj["descendants"] if "descendants" in rj else []
re_head = re.compile(
r"^\[patchodon: ([0-9a-f]{40}) /(( +[0-9a-f]{8})+)\]$", re.MULTILINE
)
re_patch = re.compile(
r"^\[patchodon ([0-9]+)/([0-9]+) ([0-9a-f]{40})\]"
r" *\n(https://dpaste.com/[a-zA-Z0-9]+\.txt)$",
re.MULTILINE,
)
def parse_matching_status(st, parent, account, n, total_n, short_hash):
if st["in_reply_to_id"] != parent:
trace(f"wrong reply in status {st['id']}")
return None
if st["account"]["id"] != account:
trace(f"bad account in status {st['id']}")
return None
st_content = html2text.html2text(st["content"])
match = re_patch.search(st_content)
if not match:
return None
gs = match.groups()
if gs[0] != str(n) or gs[1] != str(total_n):
trace(f"patch mis-ordered in status {st['id']}")
return None
long_hash = gs[2]
if long_hash[0:8] != short_hash:
trace(f"patch hash mismatch in status {st['id']}")
return None
url = gs[3]
r = requests.get(url)
if r.status_code != 200:
trace(f"could not get patch from status {st['id']} via {url}")
return None
if long_hash != hashlib.sha1(r.text.encode()).hexdigest():
trace(f"patch hash differs from file in status {st['id']}")
return None
return (st["id"], r.text)
def do_get(args):
st_id, st_acct_id, st_content = find_head_post(args)
st_id, st_acct_id, st_content_html = find_head_post(args)
st_content = html2text.html2text(st_content_html)
# parse out the hash and subhashes
# get context, all replies from the same author as the original status ID, subhashes must match
# repeat for all subhashes
match = re_head.search(st_content)
if not match:
raise "no patchodon header found"
full_hash = match.groups()[0]
short_hashes = list(
filter(lambda x: len(x) > 0, match.groups()[1].split(" "))
)
patches = [None for _ in short_hashes]
n_patches = len(patches)
assert n_patches > 0
parent = st_id
for i, short_hash in enumerate(short_hashes):
trace(f"getting patch {i+1} ({short_hash})...")
# get context, all replies from the same author as the original status ID, subhashes must match
sts = get_descendant_statuses(args, parent)
ok_sts = list(
filter(
lambda x: x != None,
map(
lambda x: parse_matching_status(
x, parent, st_acct_id, i + 1, n_patches, short_hash
),
sts,
),
)
)
if len(ok_sts) == 0:
raise f"no suitable patches found for {i+1} ({short_hash})"
if len(ok_sts) > 1:
raise f"ambiguous statuses for patch {i+1} ({short_hash})"
ok_st_id, ok_st_patch = ok_sts[0]
parent = ok_st_id
patches[i] = ok_st_patch
# verify the full hash
# pass as one blob to git-am OR throw to a directory
hashes = list(map(lambda x: hashlib.sha1(x.encode()).hexdigest(), patches))
computed_full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest()
if computed_full_hash != full_hash:
raise "hash checksums do not match!"
# print out stuff
if args.out_prefix:
for i, patch in enumerate(patches):
path = args.out_prefix + f"{i+1:04d}.patch"
if not args.overwrite and os.path.exists(path):
raise f"refusing to overwrite {path}"
Path(path).write_text(patch)
else:
for patch in patches:
sys.stdout.write(patch)
sys.stdout.write("\n") # be nice
def main():
@ -172,10 +279,8 @@ def main():
description="Publicly send and receive git patch series via Mastodon.",
)
cmds = ap.add_subparsers(required=True, dest="command")
if "API token sources":
group = cmds.add_mutually_exclusive_group()
group = ap.add_mutually_exclusive_group()
group.add_argument(
"--debug-api-token",
help=(
@ -190,7 +295,7 @@ def main():
help="get the API token from environment PATCHODON_API_TOKEN",
)
cmds.add_argument(
ap.add_argument(
"-i",
"--instance-url",
help=(
@ -198,6 +303,8 @@ def main():
),
)
cmds = ap.add_subparsers(required=True, dest="command")
if "POST command":
post = cmds.add_parser("post")
post.add_argument(
@ -237,32 +344,21 @@ def main():
" should contain the patch hash)"
),
)
if "output possibilities":
group = get.add_mutually_exclusive_group()
group.add_argument(
"-a",
"--run-git-am",
action="store_true",
help=(
"apply the patches immediately with git-am instead of"
" storing them in a directory"
),
)
group.add_argument(
"-C",
"--out-prefix",
default="./patchodon-",
help=(
"write the numbered patchfiles to files with a given prefix"
" (the default `./patchodon-' will produce files like"
" `./patchodon-0001.patch')"
),
)
get.add_argument(
"--overwrite",
action="store_true",
help="overwrite existing patch files instead of failing",
)
get.add_argument(
"-C",
"--out-prefix",
help=(
"instead of writing to stdout (for piping to git-am), write"
" the numbered patchfiles to files with a given prefix"
" (specifying `./patchodon-' will produce files like"
" `./patchodon-0001.patch')"
),
)
get.add_argument(
"--overwrite",
action="store_true",
help="overwrite existing patch files instead of failing",
)
ap.add_argument(
"-c",
@ -277,6 +373,8 @@ def main():
)
args = ap.parse_args()
# TODO patch from config if found
if args.command == "post":
do_post(args)
elif args.command == "get":