First the PasswordStore
, which is pretty straight-forward. It stores title-password association, but it is important that a title can have multiple passwords.
The __or__
method is very important since that's how later password stores are joined.
"""Password store module for managing title-password associations."""
from __future__ import annotations
import copy
from collections import defaultdict
class PasswordStore:
"""A store that associates titles with multiple passwords using sets."""
def __init__(self) -> None:
self._store: dict[str, set[str]] = defaultdict(set)
def __getitem__(self, title: str) -> set[str]:
return self._store[title].copy()
def __contains__(self, title: str) -> bool:
return title in self._store
def __len__(self) -> int:
return len(self._store)
def add_password(self, title: str, password: str) -> None:
"""Add a password to the specified title."""
if title == "":
raise ValueError("Empty title")
if password == "":
raise ValueError("Empty password")
self._store[title].add(password)
def remove_password(self, title: str, password: str) -> bool:
"""Remove a password from the specified title.
Returns True if removed, False if not found."""
if title in self._store and password in self._store[title]:
self._store[title].remove(password)
if not self._store[title]:
del self._store[title]
return True
return False
def __iter__(self):
for title, passwords in self._store.items():
yield title, passwords
def __or__(self, p: PasswordStore) -> PasswordStore:
self_copy = copy.deepcopy(self)
for title, passwords in p:
for password in passwords:
self_copy.add_password(title, password)
return self_copy
def clear_passwords(self, title: str) -> None:
"""Clear all passwords for the specified title."""
if title in self._store:
del self._store[title]
def pretty_print(self) -> str:
"""Return a pretty-formatted string representation of the password store."""
if not self._store:
return "PasswordStore (empty)"
MAX_COL_WIDTH: int = 83
# Calculate column widths
max_title_length = min(
MAX_COL_WIDTH, max(len(title) for title in self._store.keys())
)
max_password_length = max(
max(len(password) for password in passwords)
for passwords in self._store.values()
)
title_width = max(max_title_length, len("Title"))
password_width = max(max_password_length, len("Password"))
top_line = f"┏━{'━' * title_width}━┳━{'━' * password_width}━┓"
header_line = (
f"┃ {'Title'.ljust(title_width)} ┃ {'Password'.ljust(password_width)} ┃"
)
header_separator_line = f"┣━{'━' * title_width}━╇━{'━' * password_width}━┫"
separator_line = f"┠─{'─' * title_width}─┼─{'─' * password_width}─┨"
bottom_line = f"┗━{'━' * title_width}━┷━{'━' * password_width}━┛"
lines = [top_line, header_line, header_separator_line]
first_entry = True
for title in sorted(self._store.keys()):
if len(title) > MAX_COL_WIDTH:
display_title = title[0 : (MAX_COL_WIDTH - 3)] + "..."
else:
display_title = title
passwords = sorted(self._store[title])
if not first_entry:
# no need to add a separator for the first line
lines.append(separator_line)
for i, password in enumerate(passwords):
if i == 0:
# First password for this title:
line = f"┃ {display_title.ljust(title_width)} │ {password.ljust(password_width)} ┃"
else:
# Additional passwords for same title:
line = f"┃ {' ' * title_width} │ {password.ljust(password_width)} ┃"
lines.append(line)
first_entry = False
lines.append(bottom_line)
return "\n".join(lines)
Here are some tests for the password store.
Since the passwords have no real security relevance, we save them in plain text.
Then we need "plugins" to actually gather the passwords, for which we defined an ABC (though maybe this would've been a good use case for protocols?):
"""Abstract base class for password extraction plugins."""
import abc
import typing
import hoarder.password_store
class PasswordPlugin(abc.ABC):
"""Abstract base class for password extraction plugins."""
@abc.abstractmethod
def __init__(self, config: dict[str, typing.Any]):
pass
@abc.abstractmethod
def extract_passwords(self) -> hoarder.password_store.PasswordStore:
"""Extract passwords from the file, returning a mapping of title -> passwords."""
pass
Now finally the first implementation of the PasswordPlugin
, the NZB password extractor. This plugin gets configured to search in certain directories for NZB files.
NZB is an XML-based file format for retrieving posts from Usenet, and the passwords can be either in the filename like filename{{password}}.nzb
or they are in a meta-tag of the header-section.
The "title" will then be the filename without the file extension and the password (if present).
There is still one complication: sometimes NZB files are compressed as RAR.
But, we already have a RarArchive class (basically a wrapper for 7z) that and implement a read method for it.
And finally we are ready for the NZB plugin:
"""NZB password extraction plugin."""
import logging
import os
import pathlib
import re
import traceback
import xml.etree.ElementTree as ET
import hoarder
from hoarder.password_plugin import PasswordPlugin
try:
from typing import override # type: ignore [attr-defined]
except ImportError:
from typing_extensions import override
logger = logging.getLogger("hoarder.nzb_password_plugin")
class NzbPasswordPlugin(PasswordPlugin):
"""Plugin to extract passwords from NZB filenames with {{password}} format."""
_nzb_paths: list[pathlib.Path]
@override
def __init__(self, config: dict[str, list[str]]):
if "nzb_paths" in config:
paths = [pathlib.Path(p) for p in config["nzb_paths"]]
invalid_paths = [p for p in paths if not p.is_dir()]
if len(invalid_paths) > 0:
raise FileNotFoundError(
f"No directory at {invalid_paths[0]}"
+ (
f" and {len(invalid_paths) - 1} other invalid paths"
if len(invalid_paths) > 1
else ""
)
)
else:
self._nzb_paths = paths
@staticmethod
def _extract_pw_from_nzb_filename(
file_path: pathlib.PurePath,
) -> tuple[str, str | None]:
filename = file_path.stem
# Extract the password from title{{password}}.nzb pattern
filename_passwords = re.findall(r"\{\{(.+?)\}\}", filename)
title = re.sub(r"\{\{.+?\}\}", "", filename).strip()
if len(filename_passwords) >= 2:
logger.error(f"Error when extracting password from {file_path}")
raise ValueError("Ambiguous passwords")
if len(filename_passwords) == 0:
return (title, None)
return (title, filename_passwords[0])
@staticmethod
def _extract_pw_from_nzb_file_content(content: bytes | str) -> str | None:
password: str | None = None
try:
logger.debug("Extracting password from file content")
root = ET.fromstring(content)
ns = {"nzb": "http://www.newzbin.com/DTD/2003/nzb"}
for meta in root.findall('.//nzb:meta[@type="password"]', ns):
if meta.text:
password = meta.text.strip()
break
except (ET.ParseError, OSError, UnicodeDecodeError):
logger.debug("Failure extracting password from content")
print(traceback.format_exc())
pass
return password
@staticmethod
def _process_directory(
nzb_directory: pathlib.Path,
) -> hoarder.password_store.PasswordStore:
dir_store = hoarder.password_store.PasswordStore()
content: str | bytes
for root, _, files in os.walk(nzb_directory):
for file in files:
title = password = None
full_path: pathlib.Path = nzb_directory / root / file
if full_path.suffix == ".nzb":
logger.debug(f"Processing NZB {full_path}")
title, password = NzbPasswordPlugin._extract_pw_from_nzb_filename(
full_path
)
if not password:
logger.debug("No password in filename, opening NZB file...")
with open(full_path) as f:
content = f.read()
password = (
NzbPasswordPlugin._extract_pw_from_nzb_file_content(
content
)
)
if password:
dir_store.add_password(title, password)
elif full_path.suffix == ".rar":
logger.debug(f"Processing RARed NZB(s) {full_path}")
rar_file: hoarder.RarArchive = hoarder.RarArchive.from_path(
full_path
)
for file_entry in rar_file.files:
logger.debug(f"Read {file_entry.path}... extracting passwords")
if file_entry.path.suffix == ".nzb":
(
title,
password,
) = NzbPasswordPlugin._extract_pw_from_nzb_filename(
file_entry.path
)
if not password:
content = rar_file.read_file(file_entry.path)
password = (
NzbPasswordPlugin._extract_pw_from_nzb_file_content(
content
)
)
if password:
dir_store.add_password(title, password)
return dir_store
@override
def extract_passwords(self) -> hoarder.password_store.PasswordStore:
password_store = hoarder.password_store.PasswordStore()
for p in self._nzb_paths:
password_store = password_store | NzbPasswordPlugin._process_directory(p)
return password_store
There are also some tests for it which can be found here.
The strategy is to first look at the filename and try the extraction on it. If that fails, open the file and try to extract it from the parsed XML.