# Copyright Red Hat
#
# snapm/fsdiff/changes.py - Snapshot Manager fs diff change detection
#
# This file is part of the snapm project.
#
# SPDX-License-Identifier: Apache-2.0
"""
File system entry change detection and classification.
"""
from typing import Dict, List, Optional
from enum import Enum
import logging
import stat
from snapm import SNAPM_SUBSYSTEM_FSDIFF
from .options import DiffOptions
from .treewalk import FsEntry
_log = logging.getLogger(__name__)
_log_debug = _log.debug
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error
[docs]
def _log_debug_fsdiff(msg, *args, **kwargs):
"""A wrapper for fsdiff subsystem debug logs."""
_log.debug(msg, *args, extra={"subsystem": SNAPM_SUBSYSTEM_FSDIFF}, **kwargs)
[docs]
class ChangeType(Enum):
"""
Enum representing different types of file system entry change.
"""
CONTENT = "content"
PERMISSIONS = "permissions"
OWNERSHIP = "ownership"
TIMESTAMPS = "timestamps"
XATTRS = "extended_attributes"
SYMLINK_TARGET = "symlink_target"
[docs]
class FileChange:
"""
Representation of a file system entry change.
"""
[docs]
def __init__(
self,
change_type: ChangeType,
old_value: Optional[str] = None,
new_value: Optional[str] = None,
description: str = "",
):
"""
Initialise a new ``FileChange`` object.
:param change_type: The type of change.
:type change_type: ``ChangeType``
:param old_value: The original value.
:type old_value: ``Optional[str]``
:param new_value: The changed value.
:type new_value: ``Optional[str]``
:param description: A string description of the change.
:type description: ``str``
"""
self.change_type = change_type
self.old_value = old_value
self.new_value = new_value
self.description = description
[docs]
def __str__(self) -> str:
"""
Return a string representation of this ``FileChange`` object.
:returns: A human readable string representation of this instance.
:rtype: str
"""
return (
f" change_type: {self.change_type.value}\n"
f" old_value: {self.old_value}\n"
f" new_value: {self.new_value}\n"
f" description: {self.description}"
)
[docs]
def to_dict(self) -> Dict[str, Optional[str]]:
"""
Convert this ``FileChange`` object into a dictionary representation
suitable for encoding as JSON.
:returns: A dictionary mapping this instance's keys to values.
:rtype: ``Dict[str, Optional[str]]``
"""
return {
"change_type": self.change_type.value,
"old_value": self.old_value,
"new_value": self.new_value,
"description": self.description,
}
[docs]
class ChangeDetector:
"""
Class to detect and classify changes in ``FsEntry`` objects.
"""
# pylint: disable=too-many-branches
[docs]
def detect_added(
self,
new_entry: FsEntry,
options: DiffOptions,
) -> List[FileChange]:
"""
Detect all changes for a newly added file.
:param new_entry: The updated file system entry.
:type new_entry: ``FsEntry``
:param options: Change detection options.
:type options: ``DiffOptions``
:returns: A list of changes to the file system entry.
:rtype: ``List[FileChange]``
"""
changes = []
_log_debug_fsdiff("Detecting changes for added file B:%s", new_entry.path)
# If content_only is set, skip all metadata comparisons
if options.content_only:
# Content changes (for regular files)
if new_entry.is_file:
_log_debug_fsdiff(
"Detected content-only change (added %s)",
f"{new_entry.content_hash[0:16] if new_entry.content_hash else ''}",
)
changes.append(
FileChange(
ChangeType.CONTENT,
"",
new_entry.content_hash,
"content hash changed",
)
)
return changes
# Content changes (for regular files)
if new_entry.is_file:
_log_debug_fsdiff(
"Detected content change (added %s)",
f"{new_entry.content_hash[0:16] if new_entry.content_hash else ''}",
)
changes.append(
FileChange(
ChangeType.CONTENT,
"",
new_entry.content_hash,
"content hash changed",
)
)
# Permission changes
if not options.ignore_permissions:
new_perms = stat.S_IMODE(new_entry.mode)
_log_debug_fsdiff(
"Detected mode change (added 0o%o)",
new_perms,
)
changes.append(
FileChange(
ChangeType.PERMISSIONS,
"0o0",
oct(new_perms),
f"mode changed from 0o0 to {oct(new_perms)}",
)
)
# Ownership changes
if not options.ignore_ownership:
_log_debug_fsdiff(
"Detected ownership change (added %d:%d)",
new_entry.uid,
new_entry.gid,
)
changes.append(
FileChange(
ChangeType.OWNERSHIP,
"none:none",
f"{new_entry.uid}:{new_entry.gid}",
"owner changed",
)
)
# Symlink target changes
if new_entry.is_symlink:
_log_debug_fsdiff(
"Detected symlink target change (added '%s')",
new_entry.symlink_target,
)
changes.append(
FileChange(
ChangeType.SYMLINK_TARGET,
"",
new_entry.symlink_target,
"symlink target changed",
)
)
# Timestamp changes
if not options.ignore_timestamps:
_log_debug_fsdiff(
"Detected mtime change (added '%.6f')",
new_entry.mtime,
)
changes.append(
FileChange(
ChangeType.TIMESTAMPS,
"",
str(new_entry.mtime),
"modification time changed",
)
)
# Extended attribute changes
if new_entry.xattrs:
_log_debug_fsdiff(
"Detected xattr change (added '%s')",
", ".join(f"{k}:{v}" for k, v in new_entry.xattrs.items()),
)
changes.append(
FileChange(
ChangeType.XATTRS,
"",
str(new_entry.xattrs),
"extended attributes changed",
)
)
return changes
# pylint: disable=too-many-branches
[docs]
def detect_removed(
self,
old_entry: FsEntry,
options: DiffOptions,
) -> List[FileChange]:
"""
Detect all changes for a removed file.
:param old_entry: The updated file system entry.
:type old_entry: ``FsEntry``
:param options: Change detection options.
:type options: ``DiffOptions``
:returns: A list of changes to the file system entry.
:rtype: ``List[FileChange]``
"""
changes = []
_log_debug_fsdiff("Detecting changes for removed file B:%s", old_entry.path)
# If content_only is set, skip all metadata comparisons
if options.content_only:
# Content changes (for regular files)
if old_entry.is_file:
_log_debug_fsdiff(
"Detected content-only change (removed %s)",
f"{old_entry.content_hash[0:16] if old_entry.content_hash else ''}",
)
changes.append(
FileChange(
ChangeType.CONTENT,
old_entry.content_hash,
"",
"content hash changed",
)
)
return changes
# Content changes (for regular files)
if old_entry.is_file:
_log_debug_fsdiff(
"Detected content change (removed %s)",
f"{old_entry.content_hash[0:16] if old_entry.content_hash else ''}",
)
changes.append(
FileChange(
ChangeType.CONTENT,
old_entry.content_hash,
"",
"content hash changed",
)
)
# Permission changes
if not options.ignore_permissions:
old_perms = stat.S_IMODE(old_entry.mode)
_log_debug_fsdiff(
"Detected mode change (removed 0o%o)",
old_perms,
)
changes.append(
FileChange(
ChangeType.PERMISSIONS,
oct(old_perms),
"0o0",
f"mode changed from {oct(old_perms)} to 0o0",
)
)
# Ownership changes
if not options.ignore_ownership:
_log_debug_fsdiff(
"Detected ownership change (removed %d:%d)",
old_entry.uid,
old_entry.gid,
)
changes.append(
FileChange(
ChangeType.OWNERSHIP,
f"{old_entry.uid}:{old_entry.gid}",
"none:none",
"owner changed",
)
)
# Symlink target changes
if old_entry.is_symlink:
_log_debug_fsdiff(
"Detected symlink target change (removed '%s')",
old_entry.symlink_target,
)
changes.append(
FileChange(
ChangeType.SYMLINK_TARGET,
old_entry.symlink_target,
"",
"symlink target changed",
)
)
# Timestamp changes
if not options.ignore_timestamps:
_log_debug_fsdiff(
"Detected mtime change (removed '%.6f')",
old_entry.mtime,
)
changes.append(
FileChange(
ChangeType.TIMESTAMPS,
str(old_entry.mtime),
"",
"modification time changed",
)
)
# Extended attribute changes
if old_entry.xattrs:
_log_debug_fsdiff(
"Detected xattr change (removed '%s')",
", ".join(f"{k}:{v}" for k, v in old_entry.xattrs.items()),
)
changes.append(
FileChange(
ChangeType.XATTRS,
str(old_entry.xattrs),
"",
"extended attributes changed",
)
)
return changes
# pylint: disable=too-many-branches
[docs]
def detect_changes(
self,
old_entry: FsEntry,
new_entry: FsEntry,
options: DiffOptions,
) -> List[FileChange]:
"""
Detect all changes between two file system entries.
:param old_entry: The original file system entry.
:type old_entry: ``FsEntry``
:param new_entry: The updated file system entry.
:type new_entry: ``FsEntry``
:param options: Change detection options.
:type options: ``DiffOptions``
:returns: A list of changes to the file system entry.
:rtype: ``List[FileChange]``
"""
changes = []
_log_debug_fsdiff(
"Detecting changes for A:%s // B:%s", old_entry.path, new_entry.path
)
# If content_only is set, skip all metadata comparisons
if options.content_only:
# Content changes (for regular files)
if old_entry.is_file and new_entry.is_file:
if old_entry.content_hash != new_entry.content_hash:
_log_debug_fsdiff(
"Detected content-only change (%s != %s)",
f"{old_entry.content_hash[0:16] if old_entry.content_hash else ''}",
f"{new_entry.content_hash[0:16] if new_entry.content_hash else ''}",
)
changes.append(
FileChange(
ChangeType.CONTENT,
old_entry.content_hash,
new_entry.content_hash,
"content hash changed",
)
)
return changes
# Content changes (for regular files)
if old_entry.is_file and new_entry.is_file:
if old_entry.content_hash != new_entry.content_hash:
_log_debug_fsdiff(
"Detected content change (%s != %s)",
f"{old_entry.content_hash[0:16] if old_entry.content_hash else ''}",
f"{new_entry.content_hash[0:16] if new_entry.content_hash else ''}",
)
changes.append(
FileChange(
ChangeType.CONTENT,
old_entry.content_hash,
new_entry.content_hash,
"content hash changed",
)
)
# Permission changes
if not options.ignore_permissions:
old_perms = stat.S_IMODE(old_entry.mode)
new_perms = stat.S_IMODE(new_entry.mode)
if old_perms != new_perms:
_log_debug_fsdiff(
"Detected mode change (0o%o != 0o%o)",
old_perms,
new_perms,
)
changes.append(
FileChange(
ChangeType.PERMISSIONS,
oct(old_perms),
oct(new_perms),
f"mode changed from {oct(old_perms)} to {oct(new_perms)}",
)
)
# Ownership changes
if not options.ignore_ownership:
if old_entry.uid != new_entry.uid or old_entry.gid != new_entry.gid:
_log_debug_fsdiff(
"Detected ownership change (%d:%d != %d:%d)",
old_entry.uid,
old_entry.gid,
new_entry.uid,
new_entry.gid,
)
changes.append(
FileChange(
ChangeType.OWNERSHIP,
f"{old_entry.uid}:{old_entry.gid}",
f"{new_entry.uid}:{new_entry.gid}",
"owner changed",
)
)
# Symlink target changes
if old_entry.is_symlink and new_entry.is_symlink:
if old_entry.symlink_target != new_entry.symlink_target:
_log_debug_fsdiff(
"Detected symlink target change ('%s' != '%s')",
old_entry.symlink_target,
new_entry.symlink_target,
)
changes.append(
FileChange(
ChangeType.SYMLINK_TARGET,
old_entry.symlink_target,
new_entry.symlink_target,
"symlink target changed",
)
)
# Timestamp changes
if not options.ignore_timestamps:
if old_entry.mtime != new_entry.mtime:
_log_debug_fsdiff(
"Detected mtime change ('%d' != '%d')",
old_entry.mtime,
new_entry.mtime,
)
changes.append(
FileChange(
ChangeType.TIMESTAMPS,
str(old_entry.mtime),
str(new_entry.mtime),
"modification time changed",
)
)
# Extended attribute changes
if old_entry.xattrs != new_entry.xattrs:
_log_debug_fsdiff(
"Detected xattr change ('%s' != '%s')",
", ".join(f"{k}:{v}" for k, v in old_entry.xattrs.items()),
", ".join(f"{k}:{v}" for k, v in new_entry.xattrs.items()),
)
changes.append(
FileChange(
ChangeType.XATTRS,
str(old_entry.xattrs),
str(new_entry.xattrs),
"extended attributes changed",
)
)
return changes