# Copyright Red Hat
#
# snapm/manager/boot.py - Snapshot Manager boot support
#
# This file is part of the snapm project.
#
# SPDX-License-Identifier: Apache-2.0
"""
Boot integration for snapshot manager
"""
import collections
from os import uname
from os.path import exists as path_exists
import logging
from typing import Optional
import subprocess
import boom
import boom.cache
import boom.command
from boom.config import load_boom_config, BoomConfigError
from boom.bootloader import (
OPTIONAL_KEYS,
optional_key_default,
key_to_bls_name,
)
from boom.osprofile import match_os_profile_by_version
from snapm import (
SnapmNotFoundError,
SnapmCalloutError,
SnapmSystemError,
ETC_FSTAB,
)
_log = logging.getLogger(__name__)
_log_debug = _log.debug
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error
#: Path to the system machine-id file
_MACHINE_ID = "/etc/machine-id"
#: Path to the legacy system machine-id file
_DBUS_MACHINE_ID = "/var/lib/dbus/machine-id"
#: Path to the system os-release file
_OS_RELEASE = "/etc/os-release"
#: Snapshot set kernel command line argument
SNAPSET_ARG = "snapm.snapset"
#: Snapshot set revert kernel command line argument
REVERT_ARG = "snapm.revert"
#: /dev path prefix
_DEV_PREFIX = "/dev/"
[docs]
class Fstab:
"""
A class to read and query data from an fstab file.
This class reads an fstab-like file and provides methods to iterate
over its entries and look up specific entries based on their properties.
"""
# Define a named tuple to give structure to each fstab entry.
# This makes the code more readable than using a regular tuple.
FstabEntry = collections.namedtuple(
"FstabEntry", ["what", "where", "fstype", "options", "freq", "passno"]
)
[docs]
def __init__(self, path="/etc/fstab"):
"""
Initializes the Fstab object by reading and parsing the file.
:param path: The path to the fstab file. Defaults to '/etc/fstab'.
:type path: str
:raises SnapmNotFoundError: If the specified fstab file does not exist.
:raises SnapmSystemError: If there is an error reading the fstab file.
"""
self.path = path
self.entries = []
self._read_fstab()
[docs]
def _read_fstab(self):
"""
Private method to read and parse the fstab file.
It populates the self.entries list.
"""
try:
with open(self.path, "r", encoding="utf8") as f:
for line in f:
# 1. Strip leading/trailing whitespace.
line = line.strip()
# 2. Ignore empty lines and comments.
if not line or line.startswith("#"):
continue
# 3. Split the line into parts.
parts = line.split()
# 4. An fstab entry must have 6 fields.
if len(parts) != 6:
# You might want to log a warning here in a real application.
continue
# 5. Convert freq and passno to integers for proper typing.
try:
parts[4] = int(parts[4])
parts[5] = int(parts[5])
except KeyError:
continue
# 6. Create a named tuple and append it to our list.
entry = self.FstabEntry(*parts)
self.entries.append(entry)
except FileNotFoundError as exc:
_log_error("Error: The file '%s' was not found.", self.path)
raise SnapmNotFoundError(f"Fstab file not found: {self.path}") from exc
except IOError as e:
_log_error("Error: Could not read the file '%s': %s", self.path, e)
raise SnapmSystemError(f"Error reading fstab file: {self.path}") from e
[docs]
def __iter__(self):
"""
Allows iteration over the fstab entries.
:yields:
A 6-tuple for each entry in the fstab file:
(what, where, fstype, options, freq, passno)
"""
yield from self.entries
[docs]
def lookup(self, key, value):
"""
Finds and generates all entries matching a specific key-value pair.
:params key: The field to search by. Must be one of 'what', 'where',
'fstype', 'options', 'freq', or 'passno'.
:type key: str
:params value: The value to match for the given key.
:type value: str|int
:yields: A 6-tuple for each matching fstab entry.
:raises KeyError: If the provided key is not a valid fstab field name.
"""
if key not in self.FstabEntry._fields:
raise KeyError(
f"Invalid lookup key: '{key}'. "
f"Valid keys are: {self.FstabEntry._fields}"
)
for entry in self.entries:
# getattr() allows us to get an attribute by its string name.
if getattr(entry, key) == value:
yield entry
[docs]
def __repr__(self):
"""Return a machine readable string representation of this Fstab."""
return f"Fstab(path='{self.path}')"
[docs]
def _get_uts_release():
"""
Return the UTS release (kernel version) of the running system.
:returns: A string representation of the UTS release value.
"""
return uname()[2]
[docs]
def _get_machine_id() -> Optional[str]:
"""
Return the current host's machine-id.
Get the machine-id value for the running system by reading from
``/etc/machine-id`` and return it as a string.
:returns: The ``machine_id`` as a string, or ``None`` if not available.
:rtype: Optional[str]
"""
if path_exists(_MACHINE_ID):
path = _MACHINE_ID
elif path_exists(_DBUS_MACHINE_ID): # pragma: no cover
path = _DBUS_MACHINE_ID
else:
return None
with open(path, "r", encoding="utf8") as file:
try:
machine_id = file.read().strip()
except OSError as err: # pragma: no cover
_log_error("Could not read machine-id from '%s': %s", path, err)
machine_id = None
return machine_id
[docs]
def get_device_path(identifier: str, by_type: str) -> Optional[str]:
"""
Translates a filesystem UUID or label to its corresponding device path
using the blkid command.
:param: identifier: The UUID or label of the filesystem.
:param: by_type: The type of identifier to search for.
:returns: The device path if found, otherwise None.
:rtype: Optional[str]
:raises: ValueError: If 'identifier' is empty or 'by_type' is not 'uuid' or 'label'.
:raises: SnapmNotFoundError: If the 'blkid' command is not found on the system.
:raises: SnapmSystemError: If the 'blkid' command exits with a non-zero status
due to reasons other than the identifier not being found
(e.g., permission issues).
:raises: SnapmCalloutError: For any other unexpected errors during command execution
or parsing.
"""
if not identifier:
raise ValueError("Identifier cannot be an empty string.")
if by_type not in ["uuid", "label"]:
raise ValueError("Invalid 'by_type'. Must be 'uuid' or 'label'.")
try:
command = ["blkid"]
if by_type == "uuid":
command.extend(["--uuid", identifier])
else: # by_type == "label"
command.extend(["--label", identifier])
# Execute the command
result = subprocess.run(command, capture_output=True, text=True, check=True)
# The output of 'blkid --uuid <UUID>' or 'blkid --label <LABEL>' is simply the device path
# if found. If not found, it returns a non-zero exit code, which
# 'check=True' converts into a CalledProcessError.
device_path = result.stdout.strip()
if device_path:
return device_path
# This case should ideally not be reached if check=True is working as expected
# for "not found" scenarios, but it's a safeguard.
return None
except FileNotFoundError as exc:
_log_error(
"Error: 'blkid' command not found. Please ensure it is installed and in your PATH."
)
raise SnapmNotFoundError("blkid command not found.") from exc
except subprocess.CalledProcessError as e:
# blkid returns 1 if the specified UUID/label is not found.
# Other non-zero codes indicate different errors (e.g., permissions).
if e.returncode == 1:
# This is the expected behavior when the identifier is not found.
_log_error(
"Identifier '%s' (%s) not found by blkid. Stderr: %s",
identifier,
by_type,
e.stderr.strip(),
)
return None
# Other errors (e.g., permission denied) should still be raised.
_log_error(
"Error executing blkid command (return code %d): %s", e.returncode, e
)
_log_error("Stdout: %s", e.stdout.strip())
_log_error("Stderr: %s", e.stderr.strip())
raise SnapmSystemError(f"Error executing blkid command: {e}") from e
except Exception as e:
_log_error("An unexpected error occurred while executing blkid: %s", str(e))
raise SnapmCalloutError(
f"An unexpected error occurred while executing blkid: {e}"
) from e
[docs]
def _find_snapset_root(snapset, origin=False):
"""
Find the device that backs the root filesystem for snapshot set ``snapset``.
If the snapset does not include the root volume look the device up via the
fstab.
:param snapset: The ``SnapshotSet`` to check.
:param origin: Always return the origin device, even if a snapshot exists
for the root mount point.
"""
for snapshot in snapset.snapshots:
if snapshot.mount_point == "/":
if origin:
return snapshot.origin
return snapshot.devpath
dev_path = None
fstab_reader = Fstab()
for entry in fstab_reader.lookup("where", "/"):
if entry.what.startswith("UUID="):
dev_path = get_device_path(entry.what.split("=", maxsplit=1)[1], "uuid")
if entry.what.startswith("LABEL="):
dev_path = get_device_path(entry.what.split("=", maxsplit=1)[1], "label")
if entry.what.startswith("/"):
dev_path = entry.what
if dev_path:
return dev_path
raise SnapmNotFoundError(f"Could not find root device for snapset {snapset.name}")
[docs]
def _create_default_os_profile():
"""
Create a default boom OsProfile for the running system.
This uses the boom API to run the equivalent of:
``boom profile create --from-host``.
"""
options = boom.command.os_options_from_cmdline()
_log_info(
"Creating default boot profile from %s with options %s", _OS_RELEASE, options
)
return boom.command.create_profile(
None, None, None, None, profile_file=_OS_RELEASE, options=options
)
[docs]
def _build_snapset_mount_list(snapset):
"""
Build a list of command line mount unit definitions for the snapshot set
``snapset``. Mount points that are not part of the snapset are substituted
from /etc/fstab.
:param snapset: The snapshot set to build a mount list for.
"""
mounts = []
snapset_mounts = snapset.mount_points
with open(ETC_FSTAB, "r", encoding="utf8") as fstab:
for line in fstab.readlines():
if line == "\n" or line.startswith("#"):
continue
parts = line.split()
if len(parts) != 6:
_log_warn("Skipping malformed fstab line: %s", line.strip())
continue
what, where, fstype, options, _, _ = parts
if where == "/" or fstype == "swap":
continue
if where in snapset_mounts:
snapshot = snapset.snapshot_by_mount_point(where)
mounts.append(f"{snapshot.devpath}:{where}:{fstype}:{options}")
else:
mounts.append(f"{what}:{where}:{fstype}:{options}")
return mounts
[docs]
def _build_swap_list():
"""
Build a list of command line swap unit definitions for the running system.
Swap entries are extracted from /etc/fstab and returned as a list of
"WHAT:OPTIONS" strings.
"""
swaps = []
with open(ETC_FSTAB, "r", encoding="utf8") as fstab:
for line in fstab.readlines():
if line == "\n" or line.startswith("#"):
continue
parts = line.split()
if len(parts) != 6:
_log_warn("Skipping malformed fstab line: %s", line.strip())
continue
what, _, fstype, options, _, _ = parts
if fstype != "swap":
continue
swaps.append(f"{what}:{options}")
return swaps
[docs]
def _create_boom_boot_entry(
version, title, tag_arg, root_device, mounts=None, swaps=None
):
"""
Create a boom boot entry according to the passed arguments.
:param version: The UTS release name for the boot entry
:param title: The title for the boot entry.
:param tag_arg: A tag argument to be added to the kernel command line and
used to associate the entry with a snapshot set name or
UUID.
:param root_device: The root device for the entry. Passed to root=...
:param mounts: An optional list of mount specifications to use for the
boot entry. If defined fstab=no will be appended to the
generated kernel command line.
:param swaps: An optional list of swap specifications to use for the
boot entry. Each entry should be in the format "WHAT:OPTIONS".
"""
if not isinstance(version, str) or not version:
raise TypeError("Boot entry version must have a string value")
if not isinstance(title, str) or not title:
raise TypeError("Boot entry title must have a string value")
if not isinstance(tag_arg, str) or not tag_arg:
raise TypeError("Boot entry tag_arg must have a string value")
if not isinstance(root_device, str) or not root_device:
raise TypeError("Boot entry root_device must have a string value")
machine_id = _get_machine_id()
osp = match_os_profile_by_version(version)
if not osp:
try:
osp = _create_default_os_profile()
except ValueError as err: # pragma: no cover
raise SnapmCalloutError(
f"Error calling boom to create default OsProfile: {err}"
) from err
if mounts:
add_opts = f"rw {tag_arg}"
del_opts = "ro"
else:
add_opts = tag_arg
del_opts = None
entry = boom.command.create_entry(
title,
version,
machine_id,
root_device,
profile=osp,
add_opts=add_opts,
del_opts=del_opts,
write=False,
images=boom.command.I_BACKUP,
no_fstab=bool(mounts),
mounts=mounts,
swaps=swaps,
)
# Apply defaults for optional keys enabled in profile
for opt_key in OPTIONAL_KEYS:
bls_key = key_to_bls_name(opt_key)
if bls_key in osp.optional_keys:
setattr(entry, bls_key, optional_key_default(opt_key))
# Write BLS snippet for entry
entry.write_entry()
return entry
[docs]
def create_snapset_boot_entry(snapset, title=None):
"""
Create a boom boot entry to boot into the snapshot set represented by
``snapset``.
:param snapset: The snapshot set for which to create a boot entry.
:param title: An optional title for the boot entry. If ``title`` is
``None`` the boot entry will be titled as
"Snapshot snapset_name snapset_time".
"""
version = _get_uts_release()
title = title or f"Snapshot {snapset.name} {snapset.time} ({version})"
root_device = _find_snapset_root(snapset)
mounts = _build_snapset_mount_list(snapset)
swaps = _build_swap_list()
snapset.boot_entry = _create_boom_boot_entry(
version,
title,
f"{SNAPSET_ARG}={snapset.uuid}",
root_device,
mounts=mounts,
swaps=swaps,
)
_log_debug(
"Created boot entry '%s' for snapshot set with UUID=%s", title, snapset.uuid
)
[docs]
def create_snapset_revert_entry(snapset, title=None):
"""
Create a boom boot entry to revert the snapshot set represented by
``snapset``.
:param snapset: The snapshot set for which to create a revert entry.
:param title: An optional title for the revert entry. If ``title`` is
``None`` the revert entry will be titled as
"Revert snapset_name snapset_time".
"""
version = _get_uts_release()
title = title or f"Revert {snapset.name} {snapset.time} ({version})"
root_device = _find_snapset_root(snapset, origin=True)
snapset.revert_entry = _create_boom_boot_entry(
version,
title,
f"{REVERT_ARG}={snapset.uuid}",
root_device,
)
_log_debug(
"Created revert entry '%s' for snapshot set with UUID=%s", title, snapset.uuid
)
[docs]
def _delete_boot_entry(boot_id):
"""
Delete a boom boot entry by ID.
:param boot_id: The boot identifier to delete.
"""
selection = boom.Selection(boot_id=boot_id)
boom.command.delete_entries(selection=selection)
boom.cache.clean_cache()
[docs]
def delete_snapset_boot_entry(snapset):
"""
Delete the boot entry corresponding to ``snapset``.
:param snapset: The snapshot set for which to remove a boot entry.
"""
if snapset.boot_entry is None:
return
_delete_boot_entry(snapset.boot_entry.boot_id)
[docs]
def delete_snapset_revert_entry(snapset):
"""
Delete the revert entry corresponding to ``snapset``.
:param snapset: The snapshot set for which to remove a revert entry.
"""
if snapset.revert_entry is None:
return
_delete_boot_entry(boot_id=snapset.revert_entry.boot_id)
[docs]
def check_boom_config():
"""
Check for boom configuration and create the default config if not found.
"""
not_found_err = SnapmCalloutError("No usable boom configuration found")
try:
load_boom_config()
except ValueError: # pragma: no cover
_log_warn(
"No boom configuration found: attempting to generate defaults in /boot"
)
try:
boom.command.create_config()
except AttributeError as err:
_log_error("Installed boom version does not support create_config()")
raise not_found_err from err
except (BoomConfigError, FileExistsError) as err:
_log_error("Missing or invalid boom configuration: %s", err)
raise not_found_err from err
except OSError as err:
_log_error("Error creating boom configuration: %s", err)
raise not_found_err from err
[docs]
class BootEntryCache(dict):
"""
Cache mapping snapshot sets to boom ``BootEntry`` instances.
Boot entries in the cache are either snapshot set boot entries or revert
entries depending on the value of ``entry_arg``.
"""
[docs]
def __init__(self, entry_arg):
super().__init__()
self.entry_arg = entry_arg
self.refresh_cache()
[docs]
def _parse_entry(self, boot_entry):
"""
Parse a boom ``BootEntry`` options string and return the value
of the ``snapm.snapset`` argument if present.
:param boot_entry: The boot entry to process.
:returns: The snapset name for the boot entry.
"""
for word in boot_entry.options.split():
if word.startswith(self.entry_arg):
_, value = word.split("=", maxsplit=1)
return value
return None
[docs]
def refresh_cache(self):
"""
Generate mappings from snapshot sets to boot entries.
"""
self.clear()
entries = boom.command.find_entries()
for boot_entry in entries:
snapset = self._parse_entry(boot_entry)
if snapset:
self[snapset] = boot_entry
[docs]
class BootCache:
"""
Set of caches mapping snapshot sets to boot entries and revert entries.
"""
[docs]
def __init__(self):
self.entry_cache = BootEntryCache(SNAPSET_ARG)
_log_debug(
"Initialised boot entry cache with %d entries", len(self.entry_cache)
)
self.revert_cache = BootEntryCache(REVERT_ARG)
_log_debug(
"Initialised revert boot entry cache with %d entries",
len(self.revert_cache),
)
[docs]
def refresh_cache(self):
"""
Refresh the cache of boot entry mappings held by this ``BootCache``
instance.
"""
self.entry_cache.refresh_cache()
_log_debug("Refreshed boot entry cache with %d entries", len(self.entry_cache))
self.revert_cache.refresh_cache()
_log_debug(
"Refreshed revert boot entry cache with %d entries",
len(self.revert_cache),
)