blob: b81d871313eef6cc84d135708895b28d4da2c071 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2018 The ChromiumOS Authors
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common libdot util code."""
import argparse
import base64
import dataclasses
import hashlib
import importlib.machinery
import io
import json
import logging
import logging.handlers
import os
from pathlib import Path
import shutil
import subprocess
import sys
import time
import types
from typing import Optional, Union
import urllib.error
import urllib.request
# Require recent Python 3 versions.
# NB: We cannot require newer versions than CrOS itself supports.
assert sys.version_info >= (
3,
9,
), f"Python 3.9 or newer is required; found {sys.version}"
BIN_DIR = Path(__file__).resolve().parent
DIR = BIN_DIR.parent
LIBAPPS_DIR = DIR.parent
class ColoredFormatter(logging.Formatter):
"""Colorize warning/error messages automatically."""
_COLOR_MAPPING = {"WARNING": "\033[1;33m", "ERROR": "\033[1;31m"}
_RESET = "\033[m"
def __init__(self, *args, **kwargs):
"""Initialize!"""
self._use_colors = "NOCOLOR" not in os.environ
super().__init__(*args, **kwargs)
def format(self, record):
"""Formats |record| with color."""
msg = super().format(record)
color = self._COLOR_MAPPING.get(record.levelname)
if self._use_colors and color:
msg = f"{color}{msg}{self._RESET}"
return msg
def setup_logging(debug=False, quiet=0):
"""Setup the logging module."""
fmt = "%(asctime)s: %(levelname)-7s: "
if debug:
fmt += "%(filename)s:%(funcName)s: "
fmt += "%(message)s"
# 'Sat, 05 Oct 2013 18:58:50 -0400 (EST)'
datefmt = "%a, %d %b %Y %H:%M:%S %z"
tzname = time.strftime("%Z", time.localtime())
if tzname and " " not in tzname and len(tzname) <= 5:
# If the name is verbose, don't include it. Some systems like to use
# "Eastern Daylight Time" which is much too chatty.
datefmt += f" ({tzname})"
if debug:
level = logging.DEBUG
elif quiet <= 0:
level = logging.INFO
elif quiet <= 1:
level = logging.WARNING
elif quiet <= 2:
level = logging.ERROR
else: # if quiet <= 3:
level = logging.CRITICAL
formatter = ColoredFormatter(fmt, datefmt)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(level)
class ArgumentParser(argparse.ArgumentParser):
"""Custom parser to hold a consistent set of options & runtime env."""
def __init__(self, short_options=True, **kwargs):
"""Initialize!"""
super().__init__(**kwargs)
self.add_common_arguments(short_options=short_options)
def parse_args(self, args=None, namespace=None):
"""Parse all the |args| and save the results to |namespace|."""
# This will call our parse_known_args below, so don't use setup_logging.
namespace = argparse.ArgumentParser.parse_args(
self, args=args, namespace=namespace
)
return namespace
def parse_known_args(self, args=None, namespace=None):
"""Parse all the |args| and save the results to |namespace|."""
namespace, unknown_args = argparse.ArgumentParser.parse_known_args(
self, args=args, namespace=namespace
)
setup_logging(debug=namespace.debug, quiet=namespace.quiet)
return (namespace, unknown_args)
def add_common_arguments(self, short_options=True):
"""Add our custom/consistent set of command line flags."""
def getopts(*args):
return args if short_options else args[1:]
self.add_argument(
*getopts("-d", "--debug"),
action="store_true",
help="Run with debug output.",
)
self.add_argument(
*getopts("-q", "--quiet"),
action="count",
default=0,
help="Use once to hide info messages, twice to hide "
"warnings, and thrice to hide errors.",
)
def touch(path):
"""Touch (and truncate) |path|."""
with open(path, "wb"):
pass
def unlink(path):
"""Remove |path| and ignore errors if it doesn't exist."""
try:
os.unlink(path)
except FileNotFoundError:
pass
def symlink(target, path):
"""Always symlink |path| to a relativized |target|."""
unlink(path)
path = os.path.realpath(path)
target = os.path.relpath(os.path.realpath(target), os.path.dirname(path))
logging.info("Symlinking %s -> %s", path, target)
os.symlink(target, path)
def cmdstr(cmd):
"""Return a string for the |cmd| list w/reasonable quoting."""
if isinstance(cmd, str):
return cmd
quoted = []
for arg in cmd:
if isinstance(arg, Path):
arg = str(arg)
if " " in arg:
arg = f'"{arg}"'
quoted.append(arg)
return " ".join(quoted)
def run(
cmd: list[str],
cmd_prefix: list[str] = None,
log_prefix: list[str] = None,
check: bool = True,
cwd: str = None,
extra_env: dict[str, str] = None,
**kwargs,
):
"""Run |cmd| inside of |cwd| and exit if it fails.
Args:
cmd: The command to run.
cmd_prefix: (Unlogged) prefix for the command to run. Useful for passing
interpreters like `java` or `python` but omitting from default output.
log_prefix: Prefix for logging the command, but not running. Useful for
wrapper scripts that get executed directly and use |cmd_prefix|.
check: Whether to exit if |cmd| execution fails.
cwd: The working directory to run |cmd| inside of.
extra_env: Extra environment settings to set before running.
Returns:
A subprocess.CompletedProcess instance.
"""
# The |env| setting specifies the entire environment, so we need to manually
# merge our |extra_env| settings into it before passing it along.
if extra_env is not None:
env = kwargs.pop("env", os.environ)
env = env.copy()
env.update(extra_env)
kwargs["env"] = env
if not log_prefix:
log_prefix = []
log_cmd = log_prefix + cmd
if not cmd_prefix:
cmd_prefix = []
real_cmd = cmd_prefix + cmd
if cwd is None:
cwd = os.getcwd()
logging.info("Running: %s\n (cwd = %s)", cmdstr(log_cmd), cwd)
if cmd_prefix:
logging.debug("Real full command: %s", cmdstr(real_cmd))
result = subprocess.run(real_cmd, cwd=cwd, check=False, **kwargs)
if check and result.returncode:
logging.error("Running %s failed!", log_cmd[0])
if result.stdout is not None:
logging.error("stdout:\n%s", result.stdout)
if result.stderr is not None:
logging.error("stderr:\n%s", result.stderr)
sys.exit(result.returncode)
return result
def sha256(path: Union[Path, str]) -> str:
"""Return sha256 hex digest of |path|."""
# The file shouldn't be too big to load into memory, so be lazy.
with open(path, "rb") as fp:
data = fp.read()
m = hashlib.sha256()
m.update(data)
return m.hexdigest()
class Hasher:
"""Helper for checking hashes."""
def __init__(self, hashes: dict[str, str]) -> None:
self._hashes = hashes
self._hashers = {}
for algo in hashes:
self._hashers[algo] = hashlib.new(algo)
def update(self, data: bytes) -> None:
"""Update hashes with |data|."""
for hasher in self._hashers.values():
hasher.update(data)
def check(self) -> bool:
"""Verify all the hashes match."""
for algo, hasher in self._hashers.items():
new_hash = hasher.hexdigest()
if self._hashes[algo] != new_hash:
logging.error(
"Hash mismatch: expected %s but found %s",
self._hashes[algo],
new_hash,
)
return False
else:
logging.debug("Hash %s matches: %s", algo, new_hash)
return True
@dataclasses.dataclass()
class Artifact:
"""Single file to download."""
# Size of the file. Stored as a string to avoid JSON int limits.
size: int = 0
# URL to fetch the file.
url: str = ""
# Known hashes.
hashes: dict[str, str] = dataclasses.field(default_factory=dict)
# Whether the file is base64 encoded.
base64: bool = False
# Whether the artifact is used in CrOS for crosh/Terminal.
crosh: bool = False
@classmethod
def from_dict(cls, spec: dict) -> "Artifact":
"""Convert a dict into the object."""
return Artifact(
size=int(spec["size"]),
url=spec["url"],
hashes=spec["hashes"].copy(),
base64=spec.get("base64", False),
crosh=spec.get("crosh", False),
)
def asdict(self) -> dict:
"""Return a minimized dict of this object."""
ret = {
"size": str(self.size),
"url": self.url,
"hashes": self.hashes,
}
if self.base64:
ret["base64"] = True
if self.crosh:
ret["crosh"] = True
return ret
@dataclasses.dataclass()
class ArtifactManifest:
"""Manifest for downloading & verifying files."""
# The format version. Not really used by us.
version: int = 0
# List of files to download.
files: dict[str, Artifact] = dataclasses.field(default_factory=dict)
@classmethod
def from_dict(cls, manifest: dict) -> "ArtifactManifest":
"""Convert a dict into the object."""
return cls(
version=manifest.get("_version"),
files=dict(
(k, Artifact.from_dict(v))
for k, v in manifest.items()
if k not in {"_version"}
),
)
@classmethod
def from_file(cls, path: Path) -> "ArtifactManifest":
"""Read a manifest from a file."""
with path.open("rb") as fp:
try:
return cls.from_dict(json.load(fp))
except json.decoder.JSONDecodeError as e:
logging.error("%s: %s", path, e)
raise
def __getitem__(self, key: str) -> Artifact | None:
return self.files.get(key)
def asdict(self) -> dict:
"""Return a minimized dict of this object."""
return {
"_version": self.version,
**dict((k, v.asdict()) for k, v in self.files.items()),
}
def toJSON(self) -> str:
"""Return a JSON string of this object."""
return (
json.dumps(
self.asdict(),
ensure_ascii=False,
indent=2,
sort_keys=True,
)
+ "\n"
)
def unpack(
archive: Union[Path, str],
cwd: Optional[Path] = None,
files: Optional[list[Union[Path, str]]] = (),
):
"""Unpack |archive| into |cwd|."""
archive = Path(archive)
if cwd is None:
cwd = Path.cwd()
if files:
files = ["--"] + list(files)
else:
files = []
# Try to make symlink usage easier in Windows.
extra_env = {
"MSYS": "winsymlinks:nativestrict", # nocheck
}
logging.info("Unpacking %s", archive.name)
# We use relpath here to help out tar on platforms where it doesn't like
# paths with colons in them (e.g. Windows). We have to construct the full
# before running through relpath as relative archives will implicitly be
# checked against os.getcwd rather than the explicit cwd.
src = os.path.relpath(cwd / archive, cwd)
run(
["tar", "--no-same-owner", "-xf", src] + files,
cwd=cwd,
extra_env=extra_env,
)
def pack(
archive: Union[Path, str],
paths: list[Union[Path, str]],
cwd: Optional[Path] = None,
exclude: Optional[list[Union[Path, str]]] = (),
):
"""Create an |archive| with |paths| in |cwd|.
The output will use XZ compression.
"""
archive = Path(archive)
if cwd is None:
cwd = Path.cwd()
if archive.suffix == ".xz":
archive = archive.with_suffix("")
# Make sure all the paths have reasonable permissions.
def walk(path):
if path.is_symlink():
return
elif path.is_dir():
# All dirs should be 755.
mode = path.stat().st_mode & 0o777
if mode != 0o755:
path.chmod(0o755)
for subpath in path.glob("*"):
walk(subpath)
elif path.is_file():
# All scripts should be 755 while other files should be 644.
mode = path.stat().st_mode & 0o777
if mode in (0o755, 0o644):
return
if mode & 0o111:
path.chmod(0o755)
else:
path.chmod(0o644)
else:
raise ValueError(f"{path}: unknown file type")
logging.info("Forcing reasonable permissions on inputs")
for path in paths:
walk(cwd / path)
logging.info("Creating %s tarball", archive.name)
# We use relpath here to help out tar on platforms where it doesn't like
# paths with colons in them (e.g. Windows). We have to construct the full
# before running through relpath as relative archives will implicitly be
# checked against os.getcwd rather than the explicit cwd.
tar = os.path.relpath(cwd / archive, cwd)
run(
["tar", "--owner=0", "--group=0", "-cf", tar]
+ [f"--exclude={x}" for x in exclude]
+ ["--"]
+ paths,
cwd=cwd,
)
logging.info("Compressing tarball")
run(["xz", "-f", "-T0", "-9", tar], cwd=cwd)
def fetch_data(
uri: str,
output=None,
verbose: bool = False,
b64: bool = False,
size: int = 0,
hashes: Optional[dict[str, str]] = None,
):
"""Fetch |uri| and write the results to |output| (or return BytesIO)."""
# This is the timeout used on each blocking operation, not the entire
# life of the connection. So it's used for initial urlopen and for each
# read attempt (which may be partial reads). 5 minutes should be fine.
TIMEOUT = 5 * 60
if output is None:
output = io.BytesIO()
hasher = Hasher(hashes or {})
try:
with urllib.request.urlopen(uri, timeout=TIMEOUT) as infp:
mb = 0
length = infp.length
if size and length != size:
logging.error(
"Size mismatch: expected %s but found %s", size, length
)
sys.exit(1)
while True:
data = infp.read(1024 * 1024)
if not data:
break
# Show a simple progress bar if the user is interactive.
if verbose:
mb += 1
print(f"~{mb} MiB downloaded", end="")
if length:
percent = mb * 1024 * 1024 * 100 / length
print(f" ({percent:.2f}%)", end="")
print("\r", end="", flush=True)
if b64:
data = base64.b64decode(data)
hasher.update(data)
output.write(data)
except urllib.error.HTTPError as e:
logging.error("%s: %s", uri, e)
sys.exit(1)
if not hasher.check():
sys.exit(1)
return output
def fetch(
uri: str,
output: Path | str,
b64: bool = False,
size: int = 0,
hashes: Optional[dict[str, str]] = None,
) -> None:
"""Download |uri| and save it to |output|."""
output = os.path.abspath(output)
distdir, name = os.path.split(output)
if os.path.exists(output):
logging.info("Using existing download: %s", name)
return
logging.info("Downloading %s to %s", uri, output)
os.makedirs(distdir, exist_ok=True)
# Use kokoro build cache or Gentoo distdir if available.
for envvar in ("KOKORO_GFILE_DIR", "DISTDIR"):
cache_dir = os.getenv(envvar)
if cache_dir:
cache_name = uri.rsplit("/", 1)[-1]
cache_file = os.path.join(cache_dir, cache_name)
if os.path.exists(cache_file):
logging.info(" Cache hit via %s", envvar)
symlink(cache_file, output)
return
# Don't be verbose if running on CI systems.
verbose = os.isatty(sys.stdout.fileno())
# We use urllib rather than wget or curl to avoid external utils & libs.
# This seems to be good enough for our needs.
tmpfile = output + ".tmp"
for _ in range(0, 5):
try:
with open(tmpfile, "wb") as outfp:
fetch_data(
uri,
outfp,
verbose=verbose,
b64=b64,
size=size,
hashes=hashes,
)
break
except ConnectionError as e:
time.sleep(1)
logging.warning("Download failed; retrying: %s", e)
else:
logging.error("Unabled to download; giving up")
unlink(tmpfile)
sys.exit(1)
# Clear the progress bar.
if verbose:
print("\r\x1b[K", end="")
os.rename(tmpfile, output)
def fetch_manifest_spec(spec: Artifact, output: str) -> None:
"""Download an artifact by |spec| and save to |output|."""
fetch(
spec.url,
output,
b64=spec.base64,
size=int(spec.size),
hashes=spec.hashes,
)
def fetch_manifest_lookup(path: Path, item: str) -> dict:
"""Lookup the settings for |item| in |manifest|."""
manifest = ArtifactManifest.from_file(path)
return manifest[item]
def fetch_manifest(path: Path, item: str, output: str) -> None:
"""Download |item| defined in |manifest| and save to |output|."""
spec = fetch_manifest_lookup(path, item)
fetch_manifest_spec(spec, output)
def download_tarball(
tar: str,
base_uri: str,
tar_dir: Path,
latest_hash: str,
size: int = 0,
hashes: Optional[dict[str, str]] = None,
):
"""Download & update our copy of a tarball.
Args:
tar: tarball filename.
base_uri: Will download {base_uri}/{tar} to tar_dir.parent.
tar_dir: Root directory of extracted tarball where .hash file is written.
tar_dir.name will be extracted from tar at tar_dir.parent
latest_hash: Latest hash of tarball. Download and extract is not done
if tar_dir/.hash already exists and matches.
"""
hash_file = tar_dir / ".hash"
old_hash = None
try:
old_hash = hash_file.read_text(encoding="utf-8").strip()
except FileNotFoundError:
pass
# In case of an upgrade, nuke existing dir.
if old_hash != latest_hash:
shutil.rmtree(tar_dir, ignore_errors=True)
# Download & unpack the archive.
uri = "/".join((base_uri, tar))
output = tar_dir.parent / tar
fetch(uri, output, size=size, hashes=hashes)
unpack(output, cwd=tar_dir.parent, files=[tar_dir.name])
unlink(output)
# Mark the hash of this checkout.
hash_file.write_text(latest_hash, encoding="utf-8")
def download_tarball_manifest(path: Path, item: str, output: Path) -> None:
"""Download & update the tarball |item| defined in |path|."""
spec = fetch_manifest_lookup(path, item)
base_uri, tar = spec.url.rsplit("/", 1)
hashes = spec.hashes
download_tarball(
tar,
base_uri,
output,
hashes["sha256"],
size=spec.size,
hashes=hashes,
)
def node_and_npm_setup():
"""Download our copies of node & npm to our tree and updates env ($PATH)."""
# We have to update modules first as it'll nuke the dir node lives under.
node.modules_update()
node.update()
def load_module(name, path):
"""Load a module from the filesystem.
Args:
name: The name of the new module to import.
path: The full path to the file to import.
"""
loader = importlib.machinery.SourceFileLoader(name, path)
module = types.ModuleType(loader.name)
loader.exec_module(module)
return module
class HelperProgram:
"""Wrapper around local programs that get reused by other projects.
This allows people to do inprocess execution rather than having to fork+exec
another Python instance.
This allows us to avoid filesystem symlinks (which aren't portable), and to
avoid naming programs with .py extensions, and to avoid clashes between
projects that use the same program name (e.g. "import lint" would confuse
libdot/bin/lint & nassh/bin/lint), and to avoid merging all libdot helpers
into the single libdot.py module.
"""
_BIN_DIR = BIN_DIR
def __init__(self, name, path=None):
"""Initialize.
Args:
name: The base name of the program to import.
path: The full path to the file. It defaults to libdot/bin/|name|.
"""
self._name = name
if path is None:
path = os.path.join(self._BIN_DIR, name)
self._path = path
self._module_cache = None
@property
def _module(self):
"""Load & cache the program module."""
if self._module_cache is None:
self._module_cache = load_module(self._name, self._path)
return self._module_cache
def __getattr__(self, name):
"""Dynamic forwarder to module members."""
return getattr(self._module, name)
# Wrappers around libdot/bin/ programs for other tools to access directly.
# keep-sorted start
black = HelperProgram("black")
clang_format = HelperProgram("clang-format")
closure_compiler = HelperProgram("closure-compiler")
cpplint = HelperProgram("cpplint")
eslint = HelperProgram("eslint")
format_tool = HelperProgram("format")
headless_chrome = HelperProgram("headless-chrome")
jsonlint = HelperProgram("jsonlint")
lint = HelperProgram("lint")
load_tests = HelperProgram("load_tests")
mdlint = HelperProgram("mdlint")
minify_translations = HelperProgram("minify-translations")
node = HelperProgram("node")
npm = HelperProgram("npm")
pylint = HelperProgram("pylint")
sync_package_json = HelperProgram("sync-package-json")
# keep-sorted end