dolphin/Tools/test-updater.py

202 lines
6.9 KiB
Python

#!/usr/bin/env python3
# requirements: pycryptodome
from Crypto.PublicKey import ECC
from Crypto.Signature import eddsa
from hashlib import sha256
from pathlib import Path
import base64
import configparser
import gzip
import http.server
import json
import os
import shutil
import socketserver
import subprocess
import sys
import tempfile
import threading
import time
UPDATE_KEY_TEST = ECC.construct(
curve="Ed25519",
seed=bytes.fromhex(
"543a581db60008bbb978a464e136d686dbc9d594119e928b5276bece3d583d81"
),
)
HTTP_SERVER_ADDR = ("localhost", 8042)
DOLPHIN_UPDATE_SERVER_URL = f"http://{HTTP_SERVER_ADDR[0]}:{HTTP_SERVER_ADDR[1]}"
class Manifest:
def __init__(self, path: Path):
self.path = path
self.entries = {}
for p in self.path.glob("**/*.*"):
if not p.is_file():
continue
digest = sha256(p.read_bytes()).digest()[:0x10].hex()
self.entries[digest] = p.relative_to(self.path).as_posix()
def get_signed(self):
manifest = "".join(
f"{name}\t{digest}\n" for digest, name in self.entries.items()
)
manifest = manifest.encode("utf-8")
sig = eddsa.new(UPDATE_KEY_TEST, "rfc8032").sign(manifest)
manifest += b"\n" + base64.b64encode(sig) + b"\n"
return gzip.compress(manifest)
def get_path(self, digest):
return self.path.joinpath(self.entries.get(digest))
class HTTPRequestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path.startswith("/update/check/v1/updater-test"):
self.send_response(200)
self.end_headers()
self.wfile.write(
bytes(
json.dumps(
{
"status": "outdated",
"content-store": DOLPHIN_UPDATE_SERVER_URL + "/content/",
"changelog": [],
"old": {"manifest": DOLPHIN_UPDATE_SERVER_URL + "/old"},
"new": {
"manifest": DOLPHIN_UPDATE_SERVER_URL + "/new",
"name": "updater-test",
"hash": bytes(range(32)).hex(),
},
}
),
"utf-8",
)
)
elif self.path == "/old":
self.send_response(200)
self.end_headers()
self.wfile.write(self.current.get_signed())
elif self.path == "/new":
self.send_response(200)
self.end_headers()
self.wfile.write(self.next.get_signed())
elif self.path.startswith("/content/"):
self.send_response(200)
self.end_headers()
digest = "".join(self.path[len("/content/") :].split("/"))
path = self.next.get_path(digest)
self.wfile.write(gzip.compress(path.read_bytes()))
elif self.path.startswith("/update-test-done/"):
self.send_response(200)
self.end_headers()
HTTPRequestHandler.dolphin_pid = int(self.path[len("/update-test-done/") :])
self.done.set()
def http_server():
with socketserver.TCPServer(HTTP_SERVER_ADDR, HTTPRequestHandler) as httpd:
httpd.serve_forever()
def create_entries_in_ini(ini_path: Path, entries: dict):
config = configparser.ConfigParser()
if ini_path.exists():
config.read(ini_path)
else:
ini_path.parent.mkdir(parents=True, exist_ok=True)
for section, options in entries.items():
if not config.has_section(section):
config.add_section(section)
for option, value in options.items():
config.set(section, option, value)
with ini_path.open("w") as f:
config.write(f)
if __name__ == "__main__":
dolphin_bin_path = Path(sys.argv[1])
threading.Thread(target=http_server, daemon=True).start()
with tempfile.TemporaryDirectory(suffix=" ¿ 🐬") as tmp_dir:
tmp_dir = Path(tmp_dir)
tmp_dolphin = tmp_dir.joinpath("dolphin")
print(f"install to {tmp_dolphin}")
shutil.copytree(dolphin_bin_path.parent, tmp_dolphin)
tmp_dolphin.joinpath("portable.txt").touch()
create_entries_in_ini(
tmp_dolphin.joinpath("User/Config/Dolphin.ini"),
{
"Analytics": {"Enabled": "False", "PermissionAsked": "True"},
"AutoUpdate": {"UpdateTrack": "updater-test"},
},
)
tmp_dolphin_next = tmp_dir.joinpath("dolphin_next")
print(f"install next to {tmp_dolphin_next}")
# XXX copies from just-created dir so Dolphin.ini is kept
shutil.copytree(tmp_dolphin, tmp_dolphin_next)
tmp_dolphin_next.joinpath("updater-test-file").write_text("test")
tmp_dolphin_next.joinpath("updater-test-filἑ").write_text("test")
with tmp_dolphin_next.joinpath("build_info.txt").open("a") as f:
print("test", file=f)
for ext in ("exe", "dll"):
for path in tmp_dolphin_next.glob("**/*." + ext):
data = bytearray(path.read_bytes())
richpos = data[:0x200].find(b"Rich")
if richpos < 0:
continue
data[richpos : richpos + 4] = b"DOLP"
path.write_bytes(data)
HTTPRequestHandler.current = Manifest(tmp_dolphin)
HTTPRequestHandler.next = Manifest(tmp_dolphin_next)
HTTPRequestHandler.done = threading.Event()
tmp_env = os.environ
tmp_env.update({"DOLPHIN_UPDATE_SERVER_URL": DOLPHIN_UPDATE_SERVER_URL})
tmp_dolphin_bin = tmp_dolphin.joinpath(dolphin_bin_path.name)
result = subprocess.run(tmp_dolphin_bin, env=tmp_env)
assert result.returncode == 0
assert HTTPRequestHandler.done.wait(60 * 2)
# works fine but raises exceptions...
try:
os.kill(HTTPRequestHandler.dolphin_pid, 0)
except:
pass
try:
os.waitpid(HTTPRequestHandler.dolphin_pid, 0)
except:
pass
failed = False
for path in tmp_dolphin_next.glob("**/*.*"):
if not path.is_file():
continue
path_rel = path.relative_to(tmp_dolphin_next)
if path_rel.parts[0] == "User":
continue
new_path = tmp_dolphin.joinpath(path_rel)
if not new_path.exists():
print(f"missing: {new_path}")
failed = True
continue
if (
sha256(new_path.read_bytes()).digest()
!= sha256(path.read_bytes()).digest()
):
print(f"bad digest: {new_path} {path}")
failed = True
continue
assert not failed
print(tmp_dolphin.joinpath("User/Logs/Updater.log").read_text())
# while True: time.sleep(1)