Compare commits

..

No commits in common. "bf5bb7e811c08f44dd8df5029f7ebf9b70d226f6" and "be5fcce8c0df9c5a4feec1bfdbf5a1d4f469e388" have entirely different histories.

5 changed files with 128 additions and 273 deletions

View file

@ -109,4 +109,6 @@ You might have noticed that this is far from complete. The following features
are on the TODO list: are on the TODO list:
- support for different pastebins (possibly providing more privacy) - support for different pastebins (possibly providing more privacy)
- status visibility configuration
- dpaste retention configuration
- likely a better default hash function than SHA1 - likely a better default hash function than SHA1

View file

@ -10,8 +10,7 @@ license = "GPL-3.0-or-later"
license-files = ["LICENSE"] license-files = ["LICENSE"]
keywords = ["git", "mastodon", "patch"] keywords = ["git", "mastodon", "patch"]
classifiers = [ classifiers = [
"Development Status :: 4 - Beta", "Development Status :: 4 - BetaIntended Audience :: Science/Research",
"Intended Audience :: Science/Research",
"Operating System :: POSIX", "Operating System :: POSIX",
"Operating System :: Unix", "Operating System :: Unix",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
@ -32,6 +31,3 @@ Homepage = "https://gitlab.com/exaexa/patchodon"
[project.scripts] [project.scripts]
patchodon = "patchodon:main" patchodon = "patchodon:main"
[tool.setuptools.dynamic]
version = {attr = "patchodon.__version__"}

2
requirements.txt Normal file
View file

@ -0,0 +1,2 @@
requests >= 2.25
html2text >= 2025

View file

@ -1,19 +1,19 @@
"""Functions and main() for patchodon command."""
__version__ = "0.1.0" __version__ = "0.1.0"
import argparse
import hashlib
import html2text
import os import os
import re import re
import requests
import sys import sys
import time import time
from pathlib import Path from pathlib import Path
import argparse
import configparser
import hashlib
import html2text # NOTES: html2text: html2text
import requests # the replies are listed by context, should be link-listed to avoid issues,
# should specify next hash to provide some kind of a filter
# visibility public+unlisted, all unlisted, all private, all direct
DPASTE_URL = "https://dpaste.com" # TODO any good way to parametrize this? DPASTE_URL = "https://dpaste.com" # TODO any good way to parametrize this?
@ -23,69 +23,36 @@ html2text.config.IGNORE_ANCHORS = True
def trace(x): def trace(x):
"""
Helper function for printing out progress
"""
sys.stderr.write(sys.argv[0] + ": " + x + "\n") sys.stderr.write(sys.argv[0] + ": " + x + "\n")
def api_token(args): def api_token(args):
"""
Get the applicable API token out of args
"""
if args.debug_api_token: if args.debug_api_token:
return args.debug_api_token return args.debug_api_token
if args.env_api_token: if args.env_api_token:
return os.environ["PATCHODON_API_TOKEN"] return os.environ["PATCHODON_API_TOKEN"]
if args.config_api_token: raise "API token not specified"
return args.config_api_token
raise ValueError("API token not specified")
def auth_headers(args): def auth_headers(args):
"""
Get a headers structure for `requests` with the Authorization set properly
"""
if not args.instance_url: if not args.instance_url:
raise ValueError("mastodon instance not specified") raise "mastodon instance not specified"
token = api_token(args) token = api_token(args)
return {"Authorization": f"Bearer {token}"} return {"Authorization": f"Bearer {token}"}
def post_visibility(args, is_head): def do_post_status(args, body, parent=None, optional=None):
"""
choose the status visibility based on args and head-ness
"""
if args.direct:
return "direct"
if args.private:
return "private"
if args.unlisted:
return "unlisted"
if args.public:
return "public" if is_head else "unlisted"
if args.all_public:
return "public"
return "public" if is_head else "unlisted"
def do_post_status(args, body, is_head, parent=None, optional=None):
"""
POST a new status with body, optionally in reply-to `parent` post ID, and
with attached `optional` contents to body.
"""
if len(body) > STATUS_LENGTH_LIMIT: if len(body) > STATUS_LENGTH_LIMIT:
raise ValueError("required status body too long") raise "required status body too long"
st = body + ( st = body + (
"\n" + optional[0 : (STATUS_LENGTH_LIMIT - len(body) - 1)] "\n" + optional[0 : (STATUS_LENGTH_LIMIT - len(body) - 1)]
if optional if optional
else "" else ""
) )
data = {"status": st, "visibility": post_visibility(args, is_head)} data = {"status": st, "visibility": "direct"} # TODO parametrize direct
# visibility options: public head+unlisted, all unlisted, all private, all direct
if parent: if parent:
data["in_reply_to_id"] = parent data["in_reply_to_id"] = parent
@ -93,91 +60,68 @@ def do_post_status(args, body, is_head, parent=None, optional=None):
args.instance_url + "/api/v1/statuses", args.instance_url + "/api/v1/statuses",
data=data, data=data,
headers=auth_headers(args), headers=auth_headers(args),
timeout=args.timeout,
) )
if r.status_code != 200: if r.status_code != 200:
raise RuntimeError(f"mastodon status posting failed ({r.status_code})") raise "mastodon status posting failed ({r.status_code})"
rj = r.json() rj = r.json()
return (rj["id"], rj["url"]) return (rj["id"], rj["url"])
def do_pastebin_file(args): def do_pastebin_file(file):
""" # DPASTE API USE RULES:
Send the `file` to dpaste, returning URL for the raw file. # - user-agent must be set properly
""" # - 1 second between requests
trace(f"sending `{file}' to dpaste...")
def f(file): r = requests.post(
# DPASTE API USE RULES: DPASTE_URL + "/api/v2/",
# - user-agent must be set properly data={
# - 1 second between requests "content": Path(file).read_text(),
trace(f"sending `{file}' to dpaste...") "syntax": "diff",
r = requests.post( "title": os.path.basename(file),
DPASTE_URL + "/api/v2/", "expiry_days": 1, # TODO remove after testing
data={ },
"content": Path(file).read_text(), headers={"User-agent": f"patchodon v{__version__}"},
"syntax": "diff", )
"title": os.path.basename(file), time.sleep(1.1)
"expiry_days": args.paste_expire_days, if r.status_code != 201:
}, raise f"dpaste POST failed for `{file}'"
headers={"User-agent": f"patchodon v{__version__}"}, return r.headers["location"] + ".txt"
timeout=args.timeout,
)
time.sleep(1.1)
if r.status_code != 201:
raise RuntimeError("dpaste POST failed for `{file}'")
return r.headers["location"] + ".txt"
return f
def split_off_diff(s): def split_off_diff(s):
"""
try to split off the diff part out of a git .patch
"""
return s.split("\ndiff --git ")[0] return s.split("\ndiff --git ")[0]
def mapl(f, xs): def mapl(f, xs):
"""
helper that listifies the generator out of map
"""
return list(map(f, xs)) return list(map(f, xs))
def mayline(s): def mayline(s):
"""
if the argument string is non-empty, make it a line, otherwise return empty
string
"""
if s: if s:
return s + "\n" return s + "\n"
else:
return "" return ""
def do_post(args): def do_post(args):
"""
implementation of the `patchodon post` subcommand
"""
files = args.patchfile files = args.patchfile
if not files: if not files:
trace("reading patchfile series from stdin") trace("reading patchfile series from stdin")
files = mapl(lambda x: x.rstrip("\n"), sys.stdin.readlines()) files = mapl(lambda x: x.rstrip(chars="\n"), sys.stdin.readlines())
n_patches = len(files) n_patches = len(files)
hashes = mapl( hashes = mapl(
lambda x: hashlib.sha1(Path(x).read_text().encode()).hexdigest(), files lambda x: hashlib.sha1(Path(x).read_text().encode()).hexdigest(), files
) )
short_hashes = mapl(lambda x: x[0:8], hashes) short_hashes = mapl(lambda x: x[0:8], hashes)
full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest() full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest()
paste_raw_urls = mapl(do_pastebin_file(args), files) paste_raw_urls = mapl(do_pastebin_file, files)
trace("posting the header...") trace("posting the header...")
parent_post_id, url = do_post_status( parent_post_id, url = do_post_status(
args, args,
f"{mayline(args.recipient)}{mayline(args.subject)}" f"{mayline(args.recipient)}{mayline(args.subject)}"
f"[patchodon: {full_hash} / {' '.join(short_hashes)}]", f"[patchodon: {full_hash} / {' '.join(short_hashes)}]",
True,
) )
for fn, pst, hsh, series in zip( for fn, pst, hsh, series in zip(
files, paste_raw_urls, hashes, range(n_patches) files, paste_raw_urls, hashes, range(n_patches)
@ -188,7 +132,6 @@ def do_post(args):
f"{mayline(args.recipient)}" f"{mayline(args.recipient)}"
f"[patchodon {series+1}/{n_patches} {hsh}]\n" f"[patchodon {series+1}/{n_patches} {hsh}]\n"
f"{pst}\n", f"{pst}\n",
False,
parent=parent_post_id, parent=parent_post_id,
optional=split_off_diff(Path(fn).read_text()), optional=split_off_diff(Path(fn).read_text()),
) )
@ -196,44 +139,35 @@ def do_post(args):
def find_head_post(args): def find_head_post(args):
"""
Find a post ID in the configured mastodon instave via the search API
("internalizing" it in the process), returning some extra metadata
"""
r = requests.get( r = requests.get(
args.instance_url + "/api/v2/search", args.instance_url + "/api/v2/search",
headers=auth_headers(args), headers=auth_headers(args),
params={"resolve": "true", "limit": "10", "q": args.patch_url}, params={"resolve": "true", "limit": "10", "q": args.patch_url},
timeout=args.timeout,
) )
if r.status_code != 200: if r.status_code != 200:
raise RuntimeError("status URL search failed!") raise "status URL search failed!"
sts = list( sts = list(
filter(lambda x: x["url"] == args.patch_url, r.json()["statuses"]) filter(lambda x: x["url"] == args.patch_url, r.json()["statuses"])
) )
if len(sts) < 1: if len(sts) < 1:
raise RuntimeError("status URL not found") raise "status URL not found"
if len(sts) > 1: if len(sts) > 1:
raise RuntimeError("ambiguous status URL") raise "ambiguous status URL?"
st = sts[0] st = sts[0]
return (st["id"], st["account"]["id"], st["content"]) return (st["id"], st["account"]["id"], st["content"])
def get_descendant_statuses(args, parent): def get_descendant_statuses(args, parent):
"""
retrieve replies to a given parent status
"""
r = requests.get( r = requests.get(
args.instance_url + f"/api/v1/statuses/{parent}/context", args.instance_url + f"/api/v1/statuses/{parent}/context",
headers=auth_headers(args), headers=auth_headers(args),
timeout=args.timeout,
) )
if r.status_code != 200: if r.status_code != 200:
raise RuntimeError(f"retrieval of context failed for {parent}") raise f"retrieval of context failed for {parent}"
rj = r.json() rj = r.json()
return rj["descendants"] if "descendants" in rj else [] return rj["descendants"] if "descendants" in rj else []
@ -249,16 +183,9 @@ re_patch = re.compile(
) )
def parse_matching_status(args, st, parent, account, n, total_n, short_hash): def parse_matching_status(st, parent, account, n, total_n, short_hash):
"""
If the status in `st` satisfies the expected conditions, parse out its id
and text; if not, return None.
"""
if st["in_reply_to_id"] != parent: if st["in_reply_to_id"] != parent:
# Descendants are also transitive, which includes all subsequent trace(f"wrong reply in status {st['id']}")
# patches. Thus we just don't trace anything here to avoid excessive
# warnings.
return None return None
if st["account"]["id"] != account: if st["account"]["id"] != account:
trace(f"bad account in status {st['id']}") trace(f"bad account in status {st['id']}")
@ -276,12 +203,7 @@ def parse_matching_status(args, st, parent, account, n, total_n, short_hash):
trace(f"patch hash mismatch in status {st['id']}") trace(f"patch hash mismatch in status {st['id']}")
return None return None
url = gs[3] url = gs[3]
r = requests.get( r = requests.get(url)
url,
timeout=args.timeout,
headers={"User-agent": f"patchodon v{__version__}"},
)
time.sleep(1.1) # dpaste ToS!
if r.status_code != 200: if r.status_code != 200:
trace(f"could not get patch from status {st['id']} via {url}") trace(f"could not get patch from status {st['id']} via {url}")
return None return None
@ -292,15 +214,12 @@ def parse_matching_status(args, st, parent, account, n, total_n, short_hash):
def do_get(args): def do_get(args):
"""
implementation of `patchodon get` subcommand
"""
st_id, st_acct_id, st_content_html = find_head_post(args) st_id, st_acct_id, st_content_html = find_head_post(args)
st_content = html2text.html2text(st_content_html) st_content = html2text.html2text(st_content_html)
# parse out the hash and subhashes # parse out the hash and subhashes
match = re_head.search(st_content) match = re_head.search(st_content)
if not match: if not match:
raise RuntimeError("no patchodon header found") raise "no patchodon header found"
full_hash = match.groups()[0] full_hash = match.groups()[0]
short_hashes = list( short_hashes = list(
filter(lambda x: len(x) > 0, match.groups()[1].split(" ")) filter(lambda x: len(x) > 0, match.groups()[1].split(" "))
@ -311,32 +230,23 @@ def do_get(args):
parent = st_id parent = st_id
for i, short_hash in enumerate(short_hashes): for i, short_hash in enumerate(short_hashes):
trace(f"getting patch {i+1} ({short_hash})...") trace(f"getting patch {i+1} ({short_hash})...")
# get context, all replies from the same author as the original status ID, subhashes must match
sts = get_descendant_statuses(args, parent) sts = get_descendant_statuses(args, parent)
ok_sts = list( ok_sts = list(
filter( filter(
lambda x: x is not None, lambda x: x != None,
map( map(
lambda x: parse_matching_status( lambda x: parse_matching_status(
args, x, parent, st_acct_id, i + 1, n_patches, short_hash
x,
parent,
st_acct_id,
i + 1,
n_patches,
short_hash,
), ),
sts, sts,
), ),
) )
) )
if len(ok_sts) == 0: if len(ok_sts) == 0:
raise RuntimeError( raise f"no suitable patches found for {i+1} ({short_hash})"
f"no suitable patches found for {i+1} ({short_hash})"
)
if len(ok_sts) > 1: if len(ok_sts) > 1:
raise RuntimeError( raise f"ambiguous statuses for patch {i+1} ({short_hash})"
f"ambiguous statuses for patch {i+1} ({short_hash})"
)
ok_st_id, ok_st_patch = ok_sts[0] ok_st_id, ok_st_patch = ok_sts[0]
parent = ok_st_id parent = ok_st_id
patches[i] = ok_st_patch patches[i] = ok_st_patch
@ -345,14 +255,14 @@ def do_get(args):
hashes = list(map(lambda x: hashlib.sha1(x.encode()).hexdigest(), patches)) hashes = list(map(lambda x: hashlib.sha1(x.encode()).hexdigest(), patches))
computed_full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest() computed_full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest()
if computed_full_hash != full_hash: if computed_full_hash != full_hash:
raise RuntimeError("hash checksums do not match!") raise "hash checksums do not match!"
# print out stuff # print out stuff
if args.out_prefix: if args.out_prefix:
for i, patch in enumerate(patches): for i, patch in enumerate(patches):
path = args.out_prefix + f"{i+1:04d}.patch" path = args.out_prefix + f"{i+1:04d}.patch"
if not args.overwrite and os.path.exists(path): if not args.overwrite and os.path.exists(path):
raise RuntimeError(f"refusing to overwrite {path}") raise f"refusing to overwrite {path}"
Path(path).write_text(patch) Path(path).write_text(patch)
else: else:
for patch in patches: for patch in patches:
@ -361,29 +271,27 @@ def do_get(args):
def main(): def main():
"""
parse commandline arguments and run either `do_post` or `do_get`
"""
ap = argparse.ArgumentParser( ap = argparse.ArgumentParser(
prog=sys.argv[0], prog=sys.argv[0],
epilog="patchodon.py version " + __version__ + " is a free software.", epilog="patchodon.py version " + __version__ + " is a free software.",
description="Publicly send and receive git patch series via Mastodon.", description="Publicly send and receive git patch series via Mastodon.",
) )
group = ap.add_mutually_exclusive_group() if "API token sources":
group.add_argument( group = ap.add_mutually_exclusive_group()
"--debug-api-token", group.add_argument(
help=( "--debug-api-token",
"specify the API token on command line (not very secure," help=(
" good for debugging only)" "specify the API token on command line (not very secure,"
), " good for debugging only)"
) ),
group.add_argument( )
"-e", group.add_argument(
"--env-api-token", "-e",
action="store_true", "--env-api-token",
help="get the API token from environment PATCHODON_API_TOKEN", action="store_true",
) help="get the API token from environment PATCHODON_API_TOKEN",
)
ap.add_argument( ap.add_argument(
"-i", "-i",
@ -395,96 +303,60 @@ def main():
cmds = ap.add_subparsers(required=True, dest="command") cmds = ap.add_subparsers(required=True, dest="command")
post = cmds.add_parser("post") if "POST command":
post.add_argument( post = cmds.add_parser("post")
"-r", post.add_argument(
"--recipient", "-r",
default=None, "--recipient",
help=( default=None,
"user tag to prepend to all posted statuses (required esp. for" help=(
" direct sending of statuses)" "user tag to prepend to all posted statuses (required esp. for"
), " direct sending of statuses)"
) ),
post.add_argument( )
"-s", post.add_argument(
"--subject", "-s",
default=None, "--subject",
help=( default=None,
"opening text of the initial post, ideally used to specify the" help=(
" target project and patch topic" "opening text of the initial post, ideally used to specify the"
), " target project and patch topic"
) ),
post.add_argument( )
"-x", post.add_argument(
"--paste-expire-days", "patchfile",
default=14, nargs="*",
help="how many days should dpaste.com hold the patches (default: 14)", help=(
) "filenames of the patch series; taken from stdin if none are"
post.add_argument( " specified (useful for piping the output of git-format-patch"
"patchfile", " into patchodon)"
nargs="*", ),
help=( )
"filenames of the patch series; taken from stdin if none are"
" specified (useful for piping the output of git-format-patch"
" into patchodon)"
),
)
visibility = post.add_mutually_exclusive_group()
visibility.add_argument(
"--all-public",
action="store_true",
help="post head status and all patches publicly",
)
visibility.add_argument(
"--public",
action="store_true",
help=(
"post head status publicly, patches unlisted (this is the default)"
),
)
visibility.add_argument(
"--unlisted",
action="store_true",
help=(
"post all statuses as unlisted"
),
)
visibility.add_argument(
"--private",
action="store_true",
help=(
"post statuses as private (visible by followers and recipients only)"
),
)
visibility.add_argument(
"--direct",
action="store_true",
help="post statuses as direct (visible only by the tagged recipients)",
)
get = cmds.add_parser("get") if "GET command":
get.add_argument( get = cmds.add_parser("get")
"patch_url", get.add_argument(
help=( "patch_url",
"root URL of the status where the patch was posted (the status" help=(
" should contain the patch hash)" "root URL of the status where the patch was posted (the status"
), " should contain the patch hash)"
) ),
get.add_argument( )
"-C", get.add_argument(
"--out-prefix", "-C",
help=( "--out-prefix",
"instead of writing to stdout (for piping to git-am), write" help=(
" the numbered patchfiles to files with a given prefix" "instead of writing to stdout (for piping to git-am), write"
" (specifying `./patchodon-' will produce files like" " the numbered patchfiles to files with a given prefix"
" `./patchodon-0001.patch')" " (specifying `./patchodon-' will produce files like"
), " `./patchodon-0001.patch')"
) ),
get.add_argument( )
"--overwrite", get.add_argument(
action="store_true", "--overwrite",
help="overwrite existing patch files instead of failing", action="store_true",
) help="overwrite existing patch files instead of failing",
)
ap.add_argument( ap.add_argument(
"-c", "-c",
@ -497,28 +369,13 @@ def main():
" loading" " loading"
), ),
) )
ap.add_argument(
"--timeout",
type=int,
default=300,
help="timeout for HTTP API requests in seconds (default: 300)",
)
args = ap.parse_args() args = ap.parse_args()
if os.path.exists(args.config): # TODO patch args from config (if found)
cp = configparser.ConfigParser()
cp.read(args.config)
if "patchodon" in cp:
if "instance_url" in cp["patchodon"] and args.instance_url is None:
args.instance_url = cp["patchodon"]["instance_url"]
if "api_token" in cp["patchodon"]:
args.config_api_token = cp["patchodon"]["api_token"]
else:
trace(f"ignoring non-existent config: {args.config}")
if args.command == "post": if args.command == "post":
do_post(args) do_post(args)
elif args.command == "get": elif args.command == "get":
do_get(args) do_get(args)
else: else:
raise ValueError("fatal: args borked") raise ("fatal: args borked")

View file

@ -1,6 +1,4 @@
"""runpy entrypoint for patchodon"""
if __name__ == "__main__": if __name__ == "__main__":
from patchodon import main from .patchodon import main
main() main()