Compare commits

...

10 commits

5 changed files with 274 additions and 129 deletions

View file

@ -109,6 +109,4 @@ You might have noticed that this is far from complete. The following features
are on the TODO list: are on the TODO list:
- support for different pastebins (possibly providing more privacy) - support for different pastebins (possibly providing more privacy)
- status visibility configuration
- dpaste retention configuration
- likely a better default hash function than SHA1 - likely a better default hash function than SHA1

View file

@ -10,7 +10,8 @@ license = "GPL-3.0-or-later"
license-files = ["LICENSE"] license-files = ["LICENSE"]
keywords = ["git", "mastodon", "patch"] keywords = ["git", "mastodon", "patch"]
classifiers = [ classifiers = [
"Development Status :: 4 - BetaIntended Audience :: Science/Research", "Development Status :: 4 - Beta",
"Intended Audience :: Science/Research",
"Operating System :: POSIX", "Operating System :: POSIX",
"Operating System :: Unix", "Operating System :: Unix",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
@ -31,3 +32,6 @@ Homepage = "https://gitlab.com/exaexa/patchodon"
[project.scripts] [project.scripts]
patchodon = "patchodon:main" patchodon = "patchodon:main"
[tool.setuptools.dynamic]
version = {attr = "patchodon.__version__"}

View file

@ -1,2 +0,0 @@
requests >= 2.25
html2text >= 2025

View file

@ -1,19 +1,19 @@
"""Functions and main() for patchodon command."""
__version__ = "0.1.0" __version__ = "0.1.0"
import argparse
import hashlib
import html2text
import os import os
import re import re
import requests
import sys import sys
import time import time
from pathlib import Path
# NOTES: html2text: html2text from pathlib import Path
# the replies are listed by context, should be link-listed to avoid issues, import argparse
# should specify next hash to provide some kind of a filter import configparser
# visibility public+unlisted, all unlisted, all private, all direct import hashlib
import html2text
import requests
DPASTE_URL = "https://dpaste.com" # TODO any good way to parametrize this? DPASTE_URL = "https://dpaste.com" # TODO any good way to parametrize this?
@ -23,36 +23,69 @@ html2text.config.IGNORE_ANCHORS = True
def trace(x): def trace(x):
"""
Helper function for printing out progress
"""
sys.stderr.write(sys.argv[0] + ": " + x + "\n") sys.stderr.write(sys.argv[0] + ": " + x + "\n")
def api_token(args): def api_token(args):
"""
Get the applicable API token out of args
"""
if args.debug_api_token: if args.debug_api_token:
return args.debug_api_token return args.debug_api_token
if args.env_api_token: if args.env_api_token:
return os.environ["PATCHODON_API_TOKEN"] return os.environ["PATCHODON_API_TOKEN"]
raise "API token not specified" if args.config_api_token:
return args.config_api_token
raise ValueError("API token not specified")
def auth_headers(args): def auth_headers(args):
"""
Get a headers structure for `requests` with the Authorization set properly
"""
if not args.instance_url: if not args.instance_url:
raise "mastodon instance not specified" raise ValueError("mastodon instance not specified")
token = api_token(args) token = api_token(args)
return {"Authorization": f"Bearer {token}"} return {"Authorization": f"Bearer {token}"}
def do_post_status(args, body, parent=None, optional=None): def post_visibility(args, is_head):
"""
choose the status visibility based on args and head-ness
"""
if args.direct:
return "direct"
if args.private:
return "private"
if args.unlisted:
return "unlisted"
if args.public:
return "public" if is_head else "unlisted"
if args.all_public:
return "public"
return "public" if is_head else "unlisted"
def do_post_status(args, body, is_head, parent=None, optional=None):
"""
POST a new status with body, optionally in reply-to `parent` post ID, and
with attached `optional` contents to body.
"""
if len(body) > STATUS_LENGTH_LIMIT: if len(body) > STATUS_LENGTH_LIMIT:
raise "required status body too long" raise ValueError("required status body too long")
st = body + ( st = body + (
"\n" + optional[0 : (STATUS_LENGTH_LIMIT - len(body) - 1)] "\n" + optional[0 : (STATUS_LENGTH_LIMIT - len(body) - 1)]
if optional if optional
else "" else ""
) )
data = {"status": st, "visibility": "direct"} # TODO parametrize direct data = {"status": st, "visibility": post_visibility(args, is_head)}
# visibility options: public head+unlisted, all unlisted, all private, all direct
if parent: if parent:
data["in_reply_to_id"] = parent data["in_reply_to_id"] = parent
@ -60,16 +93,22 @@ def do_post_status(args, body, parent=None, optional=None):
args.instance_url + "/api/v1/statuses", args.instance_url + "/api/v1/statuses",
data=data, data=data,
headers=auth_headers(args), headers=auth_headers(args),
timeout=args.timeout,
) )
if r.status_code != 200: if r.status_code != 200:
raise "mastodon status posting failed ({r.status_code})" raise RuntimeError(f"mastodon status posting failed ({r.status_code})")
rj = r.json() rj = r.json()
return (rj["id"], rj["url"]) return (rj["id"], rj["url"])
def do_pastebin_file(file): def do_pastebin_file(args):
"""
Send the `file` to dpaste, returning URL for the raw file.
"""
def f(file):
# DPASTE API USE RULES: # DPASTE API USE RULES:
# - user-agent must be set properly # - user-agent must be set properly
# - 1 second between requests # - 1 second between requests
@ -80,48 +119,65 @@ def do_pastebin_file(file):
"content": Path(file).read_text(), "content": Path(file).read_text(),
"syntax": "diff", "syntax": "diff",
"title": os.path.basename(file), "title": os.path.basename(file),
"expiry_days": 1, # TODO remove after testing "expiry_days": args.paste_expire_days,
}, },
headers={"User-agent": f"patchodon v{__version__}"}, headers={"User-agent": f"patchodon v{__version__}"},
timeout=args.timeout,
) )
time.sleep(1.1) time.sleep(1.1)
if r.status_code != 201: if r.status_code != 201:
raise f"dpaste POST failed for `{file}'" raise RuntimeError("dpaste POST failed for `{file}'")
return r.headers["location"] + ".txt" return r.headers["location"] + ".txt"
return f
def split_off_diff(s): def split_off_diff(s):
"""
try to split off the diff part out of a git .patch
"""
return s.split("\ndiff --git ")[0] return s.split("\ndiff --git ")[0]
def mapl(f, xs): def mapl(f, xs):
"""
helper that listifies the generator out of map
"""
return list(map(f, xs)) return list(map(f, xs))
def mayline(s): def mayline(s):
"""
if the argument string is non-empty, make it a line, otherwise return empty
string
"""
if s: if s:
return s + "\n" return s + "\n"
else:
return "" return ""
def do_post(args): def do_post(args):
"""
implementation of the `patchodon post` subcommand
"""
files = args.patchfile files = args.patchfile
if not files: if not files:
trace("reading patchfile series from stdin") trace("reading patchfile series from stdin")
files = mapl(lambda x: x.rstrip(chars="\n"), sys.stdin.readlines()) files = mapl(lambda x: x.rstrip("\n"), sys.stdin.readlines())
n_patches = len(files) n_patches = len(files)
hashes = mapl( hashes = mapl(
lambda x: hashlib.sha1(Path(x).read_text().encode()).hexdigest(), files lambda x: hashlib.sha1(Path(x).read_text().encode()).hexdigest(), files
) )
short_hashes = mapl(lambda x: x[0:8], hashes) short_hashes = mapl(lambda x: x[0:8], hashes)
full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest() full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest()
paste_raw_urls = mapl(do_pastebin_file, files) paste_raw_urls = mapl(do_pastebin_file(args), files)
trace("posting the header...") trace("posting the header...")
parent_post_id, url = do_post_status( parent_post_id, url = do_post_status(
args, args,
f"{mayline(args.recipient)}{mayline(args.subject)}" f"{mayline(args.recipient)}{mayline(args.subject)}"
f"[patchodon: {full_hash} / {' '.join(short_hashes)}]", f"[patchodon: {full_hash} / {' '.join(short_hashes)}]",
True,
) )
for fn, pst, hsh, series in zip( for fn, pst, hsh, series in zip(
files, paste_raw_urls, hashes, range(n_patches) files, paste_raw_urls, hashes, range(n_patches)
@ -132,6 +188,7 @@ def do_post(args):
f"{mayline(args.recipient)}" f"{mayline(args.recipient)}"
f"[patchodon {series+1}/{n_patches} {hsh}]\n" f"[patchodon {series+1}/{n_patches} {hsh}]\n"
f"{pst}\n", f"{pst}\n",
False,
parent=parent_post_id, parent=parent_post_id,
optional=split_off_diff(Path(fn).read_text()), optional=split_off_diff(Path(fn).read_text()),
) )
@ -139,35 +196,44 @@ def do_post(args):
def find_head_post(args): def find_head_post(args):
"""
Find a post ID in the configured mastodon instave via the search API
("internalizing" it in the process), returning some extra metadata
"""
r = requests.get( r = requests.get(
args.instance_url + "/api/v2/search", args.instance_url + "/api/v2/search",
headers=auth_headers(args), headers=auth_headers(args),
params={"resolve": "true", "limit": "10", "q": args.patch_url}, params={"resolve": "true", "limit": "10", "q": args.patch_url},
timeout=args.timeout,
) )
if r.status_code != 200: if r.status_code != 200:
raise "status URL search failed!" raise RuntimeError("status URL search failed!")
sts = list( sts = list(
filter(lambda x: x["url"] == args.patch_url, r.json()["statuses"]) filter(lambda x: x["url"] == args.patch_url, r.json()["statuses"])
) )
if len(sts) < 1: if len(sts) < 1:
raise "status URL not found" raise RuntimeError("status URL not found")
if len(sts) > 1: if len(sts) > 1:
raise "ambiguous status URL?" raise RuntimeError("ambiguous status URL")
st = sts[0] st = sts[0]
return (st["id"], st["account"]["id"], st["content"]) return (st["id"], st["account"]["id"], st["content"])
def get_descendant_statuses(args, parent): def get_descendant_statuses(args, parent):
"""
retrieve replies to a given parent status
"""
r = requests.get( r = requests.get(
args.instance_url + f"/api/v1/statuses/{parent}/context", args.instance_url + f"/api/v1/statuses/{parent}/context",
headers=auth_headers(args), headers=auth_headers(args),
timeout=args.timeout,
) )
if r.status_code != 200: if r.status_code != 200:
raise f"retrieval of context failed for {parent}" raise RuntimeError(f"retrieval of context failed for {parent}")
rj = r.json() rj = r.json()
return rj["descendants"] if "descendants" in rj else [] return rj["descendants"] if "descendants" in rj else []
@ -183,9 +249,16 @@ re_patch = re.compile(
) )
def parse_matching_status(st, parent, account, n, total_n, short_hash): def parse_matching_status(args, st, parent, account, n, total_n, short_hash):
"""
If the status in `st` satisfies the expected conditions, parse out its id
and text; if not, return None.
"""
if st["in_reply_to_id"] != parent: if st["in_reply_to_id"] != parent:
trace(f"wrong reply in status {st['id']}") # Descendants are also transitive, which includes all subsequent
# patches. Thus we just don't trace anything here to avoid excessive
# warnings.
return None return None
if st["account"]["id"] != account: if st["account"]["id"] != account:
trace(f"bad account in status {st['id']}") trace(f"bad account in status {st['id']}")
@ -203,7 +276,12 @@ def parse_matching_status(st, parent, account, n, total_n, short_hash):
trace(f"patch hash mismatch in status {st['id']}") trace(f"patch hash mismatch in status {st['id']}")
return None return None
url = gs[3] url = gs[3]
r = requests.get(url) r = requests.get(
url,
timeout=args.timeout,
headers={"User-agent": f"patchodon v{__version__}"},
)
time.sleep(1.1) # dpaste ToS!
if r.status_code != 200: if r.status_code != 200:
trace(f"could not get patch from status {st['id']} via {url}") trace(f"could not get patch from status {st['id']} via {url}")
return None return None
@ -214,12 +292,15 @@ def parse_matching_status(st, parent, account, n, total_n, short_hash):
def do_get(args): def do_get(args):
"""
implementation of `patchodon get` subcommand
"""
st_id, st_acct_id, st_content_html = find_head_post(args) st_id, st_acct_id, st_content_html = find_head_post(args)
st_content = html2text.html2text(st_content_html) st_content = html2text.html2text(st_content_html)
# parse out the hash and subhashes # parse out the hash and subhashes
match = re_head.search(st_content) match = re_head.search(st_content)
if not match: if not match:
raise "no patchodon header found" raise RuntimeError("no patchodon header found")
full_hash = match.groups()[0] full_hash = match.groups()[0]
short_hashes = list( short_hashes = list(
filter(lambda x: len(x) > 0, match.groups()[1].split(" ")) filter(lambda x: len(x) > 0, match.groups()[1].split(" "))
@ -230,23 +311,32 @@ def do_get(args):
parent = st_id parent = st_id
for i, short_hash in enumerate(short_hashes): for i, short_hash in enumerate(short_hashes):
trace(f"getting patch {i+1} ({short_hash})...") trace(f"getting patch {i+1} ({short_hash})...")
# get context, all replies from the same author as the original status ID, subhashes must match
sts = get_descendant_statuses(args, parent) sts = get_descendant_statuses(args, parent)
ok_sts = list( ok_sts = list(
filter( filter(
lambda x: x != None, lambda x: x is not None,
map( map(
lambda x: parse_matching_status( lambda x: parse_matching_status(
x, parent, st_acct_id, i + 1, n_patches, short_hash args,
x,
parent,
st_acct_id,
i + 1,
n_patches,
short_hash,
), ),
sts, sts,
), ),
) )
) )
if len(ok_sts) == 0: if len(ok_sts) == 0:
raise f"no suitable patches found for {i+1} ({short_hash})" raise RuntimeError(
f"no suitable patches found for {i+1} ({short_hash})"
)
if len(ok_sts) > 1: if len(ok_sts) > 1:
raise f"ambiguous statuses for patch {i+1} ({short_hash})" raise RuntimeError(
f"ambiguous statuses for patch {i+1} ({short_hash})"
)
ok_st_id, ok_st_patch = ok_sts[0] ok_st_id, ok_st_patch = ok_sts[0]
parent = ok_st_id parent = ok_st_id
patches[i] = ok_st_patch patches[i] = ok_st_patch
@ -255,14 +345,14 @@ def do_get(args):
hashes = list(map(lambda x: hashlib.sha1(x.encode()).hexdigest(), patches)) hashes = list(map(lambda x: hashlib.sha1(x.encode()).hexdigest(), patches))
computed_full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest() computed_full_hash = hashlib.sha1(" ".join(hashes).encode()).hexdigest()
if computed_full_hash != full_hash: if computed_full_hash != full_hash:
raise "hash checksums do not match!" raise RuntimeError("hash checksums do not match!")
# print out stuff # print out stuff
if args.out_prefix: if args.out_prefix:
for i, patch in enumerate(patches): for i, patch in enumerate(patches):
path = args.out_prefix + f"{i+1:04d}.patch" path = args.out_prefix + f"{i+1:04d}.patch"
if not args.overwrite and os.path.exists(path): if not args.overwrite and os.path.exists(path):
raise f"refusing to overwrite {path}" raise RuntimeError(f"refusing to overwrite {path}")
Path(path).write_text(patch) Path(path).write_text(patch)
else: else:
for patch in patches: for patch in patches:
@ -271,13 +361,15 @@ def do_get(args):
def main(): def main():
"""
parse commandline arguments and run either `do_post` or `do_get`
"""
ap = argparse.ArgumentParser( ap = argparse.ArgumentParser(
prog=sys.argv[0], prog=sys.argv[0],
epilog="patchodon.py version " + __version__ + " is a free software.", epilog="patchodon.py version " + __version__ + " is a free software.",
description="Publicly send and receive git patch series via Mastodon.", description="Publicly send and receive git patch series via Mastodon.",
) )
if "API token sources":
group = ap.add_mutually_exclusive_group() group = ap.add_mutually_exclusive_group()
group.add_argument( group.add_argument(
"--debug-api-token", "--debug-api-token",
@ -303,7 +395,6 @@ def main():
cmds = ap.add_subparsers(required=True, dest="command") cmds = ap.add_subparsers(required=True, dest="command")
if "POST command":
post = cmds.add_parser("post") post = cmds.add_parser("post")
post.add_argument( post.add_argument(
"-r", "-r",
@ -323,6 +414,12 @@ def main():
" target project and patch topic" " target project and patch topic"
), ),
) )
post.add_argument(
"-x",
"--paste-expire-days",
default=14,
help="how many days should dpaste.com hold the patches (default: 14)",
)
post.add_argument( post.add_argument(
"patchfile", "patchfile",
nargs="*", nargs="*",
@ -332,8 +429,39 @@ def main():
" into patchodon)" " into patchodon)"
), ),
) )
visibility = post.add_mutually_exclusive_group()
visibility.add_argument(
"--all-public",
action="store_true",
help="post head status and all patches publicly",
)
visibility.add_argument(
"--public",
action="store_true",
help=(
"post head status publicly, patches unlisted (this is the default)"
),
)
visibility.add_argument(
"--unlisted",
action="store_true",
help=(
"post all statuses as unlisted"
),
)
visibility.add_argument(
"--private",
action="store_true",
help=(
"post statuses as private (visible by followers and recipients only)"
),
)
visibility.add_argument(
"--direct",
action="store_true",
help="post statuses as direct (visible only by the tagged recipients)",
)
if "GET command":
get = cmds.add_parser("get") get = cmds.add_parser("get")
get.add_argument( get.add_argument(
"patch_url", "patch_url",
@ -369,13 +497,28 @@ def main():
" loading" " loading"
), ),
) )
ap.add_argument(
"--timeout",
type=int,
default=300,
help="timeout for HTTP API requests in seconds (default: 300)",
)
args = ap.parse_args() args = ap.parse_args()
# TODO patch args from config (if found) if os.path.exists(args.config):
cp = configparser.ConfigParser()
cp.read(args.config)
if "patchodon" in cp:
if "instance_url" in cp["patchodon"] and args.instance_url is None:
args.instance_url = cp["patchodon"]["instance_url"]
if "api_token" in cp["patchodon"]:
args.config_api_token = cp["patchodon"]["api_token"]
else:
trace(f"ignoring non-existent config: {args.config}")
if args.command == "post": if args.command == "post":
do_post(args) do_post(args)
elif args.command == "get": elif args.command == "get":
do_get(args) do_get(args)
else: else:
raise ("fatal: args borked") raise ValueError("fatal: args borked")

View file

@ -1,4 +1,6 @@
"""runpy entrypoint for patchodon"""
if __name__ == "__main__": if __name__ == "__main__":
from .patchodon import main from patchodon import main
main() main()