make the python level bearable

This commit is contained in:
Mirek Kratochvil 2025-10-13 21:07:55 +02:00
parent eee4e73062
commit 265673f935
2 changed files with 164 additions and 99 deletions

View file

@ -1,19 +1,18 @@
"""Functions and main() for patchodon command."""
__version__ = "0.1.0"
import argparse
import hashlib
import html2text
import os
import re
import requests
import sys
import time
from pathlib import Path
# NOTES: html2text: html2text
# the replies are listed by context, should be link-listed to avoid issues,
# should specify next hash to provide some kind of a filter
# visibility public+unlisted, all unlisted, all private, all direct
from pathlib import Path
import argparse
import hashlib
import html2text
import requests
DPASTE_URL = "https://dpaste.com" # TODO any good way to parametrize this?
@ -23,20 +22,29 @@ html2text.config.IGNORE_ANCHORS = True
def trace(x):
"""
Helper function for printing out progress
"""
sys.stderr.write(sys.argv[0] + ": " + x + "\n")
def api_token(args):
"""
Get the applicable API token out of args
"""
if args.debug_api_token:
return args.debug_api_token
if args.env_api_token:
return os.environ["PATCHODON_API_TOKEN"]
raise "API token not specified"
raise ValueError("API token not specified")
def auth_headers(args):
"""
Get a headers structure for `requests` with the Authorization set properly
"""
if not args.instance_url:
raise "mastodon instance not specified"
raise ValueError("mastodon instance not specified")
token = api_token(args)
@ -44,8 +52,12 @@ def auth_headers(args):
def do_post_status(args, body, parent=None, optional=None):
"""
POST a new status with body, optionally in reply-to `parent` post ID, and
with attached `optional` contents to body.
"""
if len(body) > STATUS_LENGTH_LIMIT:
raise "required status body too long"
raise ValueError("required status body too long")
st = body + (
"\n" + optional[0 : (STATUS_LENGTH_LIMIT - len(body) - 1)]
@ -53,6 +65,7 @@ def do_post_status(args, body, parent=None, optional=None):
else ""
)
data = {"status": st, "visibility": "direct"} # TODO parametrize direct
# visibility options: public head+unlisted, all unlisted, all private, all direct
if parent:
data["in_reply_to_id"] = parent
@ -60,16 +73,20 @@ def do_post_status(args, body, parent=None, optional=None):
args.instance_url + "/api/v1/statuses",
data=data,
headers=auth_headers(args),
timeout=args.timeout,
)
if r.status_code != 200:
raise "mastodon status posting failed ({r.status_code})"
raise RuntimeError(f"mastodon status posting failed ({r.status_code})")
rj = r.json()
return (rj["id"], rj["url"])
def do_pastebin_file(file):
"""
Send the `file` to dpaste, returning URL for the raw file.
"""
# DPASTE API USE RULES:
# - user-agent must be set properly
# - 1 second between requests
@ -83,29 +100,43 @@ def do_pastebin_file(file):
"expiry_days": 1, # TODO remove after testing
},
headers={"User-agent": f"patchodon v{__version__}"},
timeout=300, # TODO passthrough args
)
time.sleep(1.1)
if r.status_code != 201:
raise f"dpaste POST failed for `{file}'"
raise RuntimeError("dpaste POST failed for `{file}'")
return r.headers["location"] + ".txt"
def split_off_diff(s):
"""
try to split off the diff part out of a git .patch
"""
return s.split("\ndiff --git ")[0]
def mapl(f, xs):
"""
helper that listifies the generator out of map
"""
return list(map(f, xs))
def mayline(s):
"""
if the argument string is non-empty, make it a line, otherwise return empty
string
"""
if s:
return s + "\n"
else:
return ""
return ""
def do_post(args):
"""
implementation of the `patchodon post` subcommand
"""
files = args.patchfile
if not files:
trace("reading patchfile series from stdin")
@ -139,35 +170,44 @@ def do_post(args):
def find_head_post(args):
"""
Find a post ID in the configured mastodon instave via the search API
("internalizing" it in the process), returning some extra metadata
"""
r = requests.get(
args.instance_url + "/api/v2/search",
headers=auth_headers(args),
params={"resolve": "true", "limit": "10", "q": args.patch_url},
timeout=args.timeout,
)
if r.status_code != 200:
raise "status URL search failed!"
raise RuntimeError("status URL search failed!")
sts = list(
filter(lambda x: x["url"] == args.patch_url, r.json()["statuses"])
)
if len(sts) < 1:
raise "status URL not found"
raise RuntimeError("status URL not found")
if len(sts) > 1:
raise "ambiguous status URL?"
raise RuntimeError("ambiguous status URL")
st = sts[0]
return (st["id"], st["account"]["id"], st["content"])
def get_descendant_statuses(args, parent):
"""
retrieve replies to a given parent status
"""
r = requests.get(
args.instance_url + f"/api/v1/statuses/{parent}/context",
headers=auth_headers(args),
timeout=args.timeout,
)
if r.status_code != 200:
raise f"retrieval of context failed for {parent}"
raise RuntimeError(f"retrieval of context failed for {parent}")
rj = r.json()
return rj["descendants"] if "descendants" in rj else []
@ -183,7 +223,12 @@ re_patch = re.compile(
)
def parse_matching_status(st, parent, account, n, total_n, short_hash):
def parse_matching_status(args, st, parent, account, n, total_n, short_hash):
"""
If the status in `st` satisfies the expected conditions, parse out its id
and text; if not, return None.
"""
if st["in_reply_to_id"] != parent:
trace(f"wrong reply in status {st['id']}")
return None
@ -203,7 +248,7 @@ def parse_matching_status(st, parent, account, n, total_n, short_hash):
trace(f"patch hash mismatch in status {st['id']}")
return None
url = gs[3]
r = requests.get(url)
r = requests.get(url, timeout=args.timeout)
if r.status_code != 200:
trace(f"could not get patch from status {st['id']} via {url}")
return None
@ -214,12 +259,15 @@ def parse_matching_status(st, parent, account, n, total_n, short_hash):
def do_get(args):
"""
implementation of `patchodon get` subcommand
"""
st_id, st_acct_id, st_content_html = find_head_post(args)
st_content = html2text.html2text(st_content_html)
# parse out the hash and subhashes
match = re_head.search(st_content)
if not match:
raise "no patchodon header found"
raise RuntimeError("no patchodon header found")
full_hash = match.groups()[0]
short_hashes = list(
filter(lambda x: len(x) > 0, match.groups()[1].split(" "))
@ -230,23 +278,32 @@ def do_get(args):
parent = st_id
for i, short_hash in enumerate(short_hashes):
trace(f"getting patch {i+1} ({short_hash})...")
# get context, all replies from the same author as the original status ID, subhashes must match
sts = get_descendant_statuses(args, parent)
ok_sts = list(
filter(
lambda x: x != None,
lambda x: x is not None,
map(
lambda x: parse_matching_status(
x, parent, st_acct_id, i + 1, n_patches, short_hash
args,
x,
parent,
st_acct_id,
i + 1,
n_patches,
short_hash,
),
sts,
),
)
)
if len(ok_sts) == 0:
raise f"no suitable patches found for {i+1} ({short_hash})"
raise RuntimeError(
f"no suitable patches found for {i+1} ({short_hash})"
)
if len(ok_sts) > 1:
raise f"ambiguous statuses for patch {i+1} ({short_hash})"
raise RuntimeError(
f"ambiguous statuses for patch {i+1} ({short_hash})"
)
ok_st_id, ok_st_patch = ok_sts[0]
parent = ok_st_id
patches[i] = ok_st_patch
@ -255,14 +312,14 @@ def do_get(args):
hashes = list(map(lambda x: hashlib.sha1(x.encode()).hexdigest(), patches))
computed_full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest()
if computed_full_hash != full_hash:
raise "hash checksums do not match!"
raise RuntimeError("hash checksums do not match!")
# print out stuff
if args.out_prefix:
for i, patch in enumerate(patches):
path = args.out_prefix + f"{i+1:04d}.patch"
if not args.overwrite and os.path.exists(path):
raise f"refusing to overwrite {path}"
raise RuntimeError(f"refusing to overwrite {path}")
Path(path).write_text(patch)
else:
for patch in patches:
@ -271,27 +328,29 @@ def do_get(args):
def main():
"""
parse commandline arguments and run either `do_post` or `do_get`
"""
ap = argparse.ArgumentParser(
prog=sys.argv[0],
epilog="patchodon.py version " + __version__ + " is a free software.",
description="Publicly send and receive git patch series via Mastodon.",
)
if "API token sources":
group = ap.add_mutually_exclusive_group()
group.add_argument(
"--debug-api-token",
help=(
"specify the API token on command line (not very secure,"
" good for debugging only)"
),
)
group.add_argument(
"-e",
"--env-api-token",
action="store_true",
help="get the API token from environment PATCHODON_API_TOKEN",
)
group = ap.add_mutually_exclusive_group()
group.add_argument(
"--debug-api-token",
help=(
"specify the API token on command line (not very secure,"
" good for debugging only)"
),
)
group.add_argument(
"-e",
"--env-api-token",
action="store_true",
help="get the API token from environment PATCHODON_API_TOKEN",
)
ap.add_argument(
"-i",
@ -303,60 +362,58 @@ def main():
cmds = ap.add_subparsers(required=True, dest="command")
if "POST command":
post = cmds.add_parser("post")
post.add_argument(
"-r",
"--recipient",
default=None,
help=(
"user tag to prepend to all posted statuses (required esp. for"
" direct sending of statuses)"
),
)
post.add_argument(
"-s",
"--subject",
default=None,
help=(
"opening text of the initial post, ideally used to specify the"
" target project and patch topic"
),
)
post.add_argument(
"patchfile",
nargs="*",
help=(
"filenames of the patch series; taken from stdin if none are"
" specified (useful for piping the output of git-format-patch"
" into patchodon)"
),
)
post = cmds.add_parser("post")
post.add_argument(
"-r",
"--recipient",
default=None,
help=(
"user tag to prepend to all posted statuses (required esp. for"
" direct sending of statuses)"
),
)
post.add_argument(
"-s",
"--subject",
default=None,
help=(
"opening text of the initial post, ideally used to specify the"
" target project and patch topic"
),
)
post.add_argument(
"patchfile",
nargs="*",
help=(
"filenames of the patch series; taken from stdin if none are"
" specified (useful for piping the output of git-format-patch"
" into patchodon)"
),
)
if "GET command":
get = cmds.add_parser("get")
get.add_argument(
"patch_url",
help=(
"root URL of the status where the patch was posted (the status"
" should contain the patch hash)"
),
)
get.add_argument(
"-C",
"--out-prefix",
help=(
"instead of writing to stdout (for piping to git-am), write"
" the numbered patchfiles to files with a given prefix"
" (specifying `./patchodon-' will produce files like"
" `./patchodon-0001.patch')"
),
)
get.add_argument(
"--overwrite",
action="store_true",
help="overwrite existing patch files instead of failing",
)
get = cmds.add_parser("get")
get.add_argument(
"patch_url",
help=(
"root URL of the status where the patch was posted (the status"
" should contain the patch hash)"
),
)
get.add_argument(
"-C",
"--out-prefix",
help=(
"instead of writing to stdout (for piping to git-am), write"
" the numbered patchfiles to files with a given prefix"
" (specifying `./patchodon-' will produce files like"
" `./patchodon-0001.patch')"
),
)
get.add_argument(
"--overwrite",
action="store_true",
help="overwrite existing patch files instead of failing",
)
ap.add_argument(
"-c",
@ -369,6 +426,12 @@ def main():
" loading"
),
)
ap.add_argument(
"--timeout",
type=int,
default=300,
help="timeout for HTTP API requests in seconds (default: 300)",
)
args = ap.parse_args()
# TODO patch args from config (if found)
@ -378,4 +441,4 @@ def main():
elif args.command == "get":
do_get(args)
else:
raise ("fatal: args borked")
raise ValueError("fatal: args borked")

View file

@ -1,3 +1,5 @@
"""runpy entrypoint for patchodon"""
if __name__ == "__main__":
from patchodon import main