From 3a5816259c5d21ad53b7e94d6d107de035846bc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20H=C3=B6rist?= Date: Tue, 28 Jan 2025 23:22:05 +0100 Subject: [PATCH] [pgp] Port to Gtk4 and add type annotations --- pgp/backend/python_gnupg.py | 63 +-- pgp/backend/store.py | 56 ++- pgp/gtk/choose_key.ui | 48 +- pgp/gtk/config.py | 114 +++-- pgp/gtk/config.ui | 22 +- pgp/gtk/key.py | 164 ++++--- pgp/modules/pgp_legacy.py | 137 ++++-- pgp/modules/util.py | 7 +- pgp/plugin.py | 97 ++-- pyproject.toml | 1 - typings/gnupg/__init__.pyi | 899 ++++++++++++++++++++++++++++++++++++ 11 files changed, 1323 insertions(+), 285 deletions(-) create mode 100644 typings/gnupg/__init__.pyi diff --git a/pgp/backend/python_gnupg.py b/pgp/backend/python_gnupg.py index ed6b789..64c4a2f 100644 --- a/pgp/backend/python_gnupg.py +++ b/pgp/backend/python_gnupg.py @@ -20,6 +20,8 @@ # You should have received a copy of the GNU General Public License # along with PGP Gajim Plugin. If not, see . +from typing import Any + import logging import os from functools import lru_cache @@ -37,15 +39,14 @@ if logger.getEffectiveLevel() == logging.DEBUG: logger.setLevel(logging.DEBUG) -class PGP(gnupg.GPG, metaclass=Singleton): - def __init__(self, binary, encoding=None): - super().__init__(gpgbinary=binary, use_agent=True) +class PGP(metaclass=Singleton): + def __init__(self) -> None: + self._pgp = gnupg.GPG(use_agent=True) + self._pgp.decode_errors = "replace" - if encoding is not None: - self.encoding = encoding - self.decode_errors = "replace" - - def encrypt(self, payload, recipients, always_trust=False): + def encrypt( + self, data: str, recipients: list[str], always_trust: bool = False + ) -> tuple[str, str]: if not always_trust: # check that we'll be able to encrypt result = self.get_key(recipients[0]) @@ -53,8 +54,8 @@ class PGP(gnupg.GPG, metaclass=Singleton): if key["trust"] not in ("f", "u"): return "", "NOT_TRUSTED " + key["keyid"][-8:] - result = super().encrypt( - payload.encode("utf8"), recipients, always_trust=always_trust + result = self._pgp.encrypt( + data.encode("utf8"), recipients, always_trust=always_trust ) if result.ok: @@ -64,23 +65,25 @@ class PGP(gnupg.GPG, metaclass=Singleton): return self._strip_header_footer(str(result)), error - def decrypt(self, payload): - data = self._add_header_footer(payload, "MESSAGE") - result = super().decrypt(data.encode("utf8")) + def encrypt_file(self, file: Any, recipients: list[str]) -> gnupg.Crypt: + return self._pgp.encrypt_file(file, recipients) - return result.data.decode("utf8") + def decrypt(self, payload: str) -> str: + data = self._add_header_footer(payload, "MESSAGE") + result = self._pgp.decrypt(data.encode("utf8")) + return str(result) @lru_cache(maxsize=8) - def sign(self, payload, key_id): + def sign(self, payload: str | None, key_id: str) -> str: if payload is None: payload = "" - result = super().sign(payload.encode("utf8"), keyid=key_id, detach=True) + result = self._pgp.sign(payload.encode("utf8"), keyid=key_id, detach=True) - if result.fingerprint: + if result: return self._strip_header_footer(str(result)) raise SignError(result.status) - def verify(self, payload, signed): + def verify(self, payload: str | None, signed: str) -> str | None: # Hash algorithm is not transferred in the signed # presence stanza so try all algorithms. # Text name for hash algorithms from RFC 4880 - section 9.4 @@ -99,24 +102,30 @@ class PGP(gnupg.GPG, metaclass=Singleton): self._add_header_footer(signed, "SIGNATURE"), ] ) - result = super().verify(data.encode("utf8")) - if result.valid: + result = self._pgp.verify(data.encode("utf8")) + if result: return result.fingerprint - def get_key(self, key_id): - return super().list_keys(keys=[key_id]) + def get_key(self, key_id: str) -> gnupg.ListKeys: + return self._pgp.list_keys(keys=[key_id]) - def get_keys(self, secret=False): - keys = {} - result = super().list_keys(secret=secret) + def get_keys(self, secret: bool = False) -> dict[str, str]: + keys: dict[str, str] = {} + result = self._pgp.list_keys(secret=secret) for key in result: # Take first not empty uid keys[key["fingerprint"]] = next(uid for uid in key["uids"] if uid) return keys + def list_keys( + self, secret: bool = False, keys: list[str] | None = None, sigs: bool = False + ) -> list[str]: + res = self._pgp.list_keys(secret, keys, sigs) + return res.fingerprints + @staticmethod - def _strip_header_footer(data): + def _strip_header_footer(data: str) -> str: """ Remove header and footer from data """ @@ -137,7 +146,7 @@ class PGP(gnupg.GPG, metaclass=Singleton): return line @staticmethod - def _add_header_footer(data, type_): + def _add_header_footer(data: str, type_: str) -> str: """ Add header and footer from data """ diff --git a/pgp/backend/store.py b/pgp/backend/store.py index 7c01d05..812a939 100644 --- a/pgp/backend/store.py +++ b/pgp/backend/store.py @@ -14,9 +14,17 @@ # You should have received a copy of the GNU General Public License # along with PGP Gajim Plugin. If not, see . +from __future__ import annotations + +from typing import Any + import json +import logging +from collections.abc import Callable from pathlib import Path +from nbxmpp import JID + from gajim.common import app from gajim.common import configpaths @@ -28,7 +36,13 @@ class KeyResolveError(Exception): class KeyStore: - def __init__(self, account, own_jid, log, list_keys_func): + def __init__( + self, + account: str, + own_jid: JID, + log: logging.LoggerAdapter[Any], + list_keys_func: Callable[..., list[str]], + ) -> None: self._list_keys_func = list_keys_func self._log = log self._account = account @@ -66,15 +80,15 @@ class KeyStore: self._save_store() @staticmethod - def _empty_store(): + def _empty_store() -> dict[str, Any]: return { "_version": CURRENT_STORE_VERSION, "own_key_data": None, "contact_key_data": {}, } - def _migrate_v1_store(self): - keys = {} + def _migrate_v1_store(self) -> None: + keys: dict[str, str] = {} attached_keys = app.settings.get_account_setting( self._account, "attached_gpg_keys" ) @@ -98,7 +112,7 @@ class KeyStore: ) self._log.info("Migration from store v1 was successful") - def _migrate_v2_store(self): + def _migrate_v2_store(self) -> None: own_key_data = self.get_own_key_data() if own_key_data is not None: own_key_id, own_key_user = ( @@ -111,7 +125,7 @@ class KeyStore: except KeyResolveError: self._set_own_key_data_nosync(None) - prune_list = [] + prune_list: list[str] = [] for dict_key, key_data in self._store["contact_key_data"].items(): try: @@ -125,20 +139,18 @@ class KeyStore: self._store["_version"] = CURRENT_STORE_VERSION self._log.info("Migration from store v2 was successful") - def _save_store(self): + def _save_store(self) -> None: with self._store_path.open("w") as file: json.dump(self._store, file) - def _get_dict_key(self, jid): + def _get_dict_key(self, jid: str) -> str: return "%s-%s" % (self._account, jid) - def _resolve_short_id(self, short_id, has_secret=False): - candidates = self._list_keys_func( - secret=has_secret, keys=(short_id,) - ).fingerprints - if len(candidates) == 1: - return candidates[0] - elif len(candidates) > 1: + def _resolve_short_id(self, short_id: str, has_secret: bool = False) -> str: + fingerprints = self._list_keys_func(secret=has_secret, keys=[short_id]) + if len(fingerprints) == 1: + return fingerprints[0] + elif len(fingerprints) > 1: self._log.critical( "Key collision during migration. Key ID is %s. Removing binding...", repr(short_id), @@ -150,11 +162,11 @@ class KeyStore: ) raise KeyResolveError - def set_own_key_data(self, key_data): + def set_own_key_data(self, key_data: tuple[str, str] | None) -> None: self._set_own_key_data_nosync(key_data) self._save_store() - def _set_own_key_data_nosync(self, key_data): + def _set_own_key_data_nosync(self, key_data: tuple[str, str] | None) -> None: if key_data is None: self._store["own_key_data"] = None else: @@ -163,19 +175,21 @@ class KeyStore: "key_user": key_data[1], } - def get_own_key_data(self): + def get_own_key_data(self) -> dict[str, str] | None: return self._store["own_key_data"] - def get_contact_key_data(self, jid): + def get_contact_key_data(self, jid: str) -> dict[str, str] | None: key_ids = self._store["contact_key_data"] dict_key = self._get_dict_key(jid) return key_ids.get(dict_key) - def set_contact_key_data(self, jid, key_data): + def set_contact_key_data(self, jid: str, key_data: tuple[str, str] | None) -> None: self._set_contact_key_data_nosync(jid, key_data) self._save_store() - def _set_contact_key_data_nosync(self, jid, key_data): + def _set_contact_key_data_nosync( + self, jid: str, key_data: tuple[str, str] | None + ) -> None: key_ids = self._store["contact_key_data"] dict_key = self._get_dict_key(jid) if key_data is None: diff --git a/pgp/gtk/choose_key.ui b/pgp/gtk/choose_key.ui index 8e91fce..c1e5268 100644 --- a/pgp/gtk/choose_key.ui +++ b/pgp/gtk/choose_key.ui @@ -1,42 +1,31 @@ - - + - - - 500 - 300 - True - False - 18 vertical 6 - True - True - True - in - + 1 + 1 + 1 + - True - True + 1 liststore 1 - - Key ID + Key ID descending @@ -48,7 +37,7 @@ - Contact Name + Contact Name 1 @@ -59,13 +48,24 @@ + + + + + + horizontal + 6 + + + Cancel + + + + + OK + - - True - True - 0 - diff --git a/pgp/gtk/config.py b/pgp/gtk/config.py index b407c42..19310eb 100644 --- a/pgp/gtk/config.py +++ b/pgp/gtk/config.py @@ -14,89 +14,111 @@ # You should have received a copy of the GNU General Public License # along with PGP Gajim Plugin. If not, see . +from __future__ import annotations + +from typing import cast +from typing import TYPE_CHECKING + from pathlib import Path -from gi.repository import Gdk from gi.repository import GLib from gi.repository import Gtk from gajim.common import app +from gajim.gtk.util import SignalManager +from gajim.gtk.widgets import GajimAppWindow from gajim.plugins.helpers import get_builder from gajim.plugins.plugins_i18n import _ -from pgp.gtk.key import ChooseGPGKeyDialog +from ..modules.pgp_legacy import PGPLegacy +from .key import ChooseGPGKeyDialog + +if TYPE_CHECKING: + from ..plugin import PGPPlugin -class PGPConfigDialog(Gtk.ApplicationWindow): - def __init__(self, plugin, parent): - Gtk.ApplicationWindow.__init__(self) - self.set_application(app.app) - self.set_show_menubar(False) - self.set_title(_("PGP Configuration")) - self.set_transient_for(parent) - self.set_resizable(True) - self.set_type_hint(Gdk.WindowTypeHint.DIALOG) - self.set_destroy_with_parent(True) +class ConfigBuilder(Gtk.Builder): + config_box: Gtk.Box + sidebar: Gtk.StackSidebar + stack: Gtk.Stack + + +class PGPConfigDialog(GajimAppWindow): + def __init__(self, plugin: PGPPlugin, transient: Gtk.Window) -> None: + + GajimAppWindow.__init__( + self, + name="PGPConfigDialog", + title=_("PGP Configuration"), + default_width=600, + default_height=500, + transient_for=transient, + modal=True, + ) ui_path = Path(__file__).parent - self._ui = get_builder(ui_path.resolve() / "config.ui") + self._ui = cast( + ConfigBuilder, get_builder(str(ui_path.resolve() / "config.ui")) + ) - self.add(self._ui.config_box) - - self._ui.connect_signals(self) + self.set_child(self._ui.config_box) self._plugin = plugin for account in app.settings.get_active_accounts(): - page = Page(plugin, account) + module = cast( + PGPLegacy, + app.get_client(account).get_module("PGPLegacy"), # pyright: ignore + ) + page = Page(module) self._ui.stack.add_titled(page, account, app.get_account_label(account)) - self.show_all() + self.show() + + def _cleanup(self) -> None: + del self._plugin -class Page(Gtk.Box): - def __init__(self, plugin, account): +class Page(Gtk.Box, SignalManager): + def __init__(self, module: PGPLegacy) -> None: + SignalManager.__init__(self) Gtk.Box.__init__(self, orientation=Gtk.Orientation.VERTICAL) - self._client = app.get_client(account) - self._plugin = plugin + self._module = module + self._label = Gtk.Label() self._button = Gtk.Button(label=_("Assign Key")) - self._button.get_style_context().add_class("suggested-action") + self._button.add_css_class("suggested-action") self._button.set_halign(Gtk.Align.CENTER) self._button.set_margin_top(18) - self._button.connect("clicked", self._on_assign) + self._connect(self._button, "clicked", self._on_assign) self._load_key() - self.add(self._label) - self.add(self._button) - self.show_all() + self.append(self._label) + self.append(self._button) - def _on_assign(self, _button): - backend = self._client.get_module("PGPLegacy").pgp_backend - secret_keys = backend.get_keys(secret=True) - dialog = ChooseGPGKeyDialog(secret_keys, self.get_toplevel()) - dialog.connect("response", self._on_response) + def _on_assign(self, _button: Gtk.Button) -> None: + secret_keys = self._module.pgp_backend.get_keys(secret=True) + ChooseGPGKeyDialog( + secret_keys, cast(Gtk.Window, self.get_root()), self._on_response + ) - def _load_key(self): - key_data = self._client.get_module("PGPLegacy").get_own_key_data() + def _load_key(self) -> None: + key_data = self._module.get_own_key_data() if key_data is None: self._set_key(None) else: self._set_key((key_data["key_id"], key_data["key_user"])) - def _on_response(self, dialog, response): - if response != Gtk.ResponseType.OK: - return - - if dialog.selected_key is None: - self._client.get_module("PGPLegacy").set_own_key_data(None) + def _on_response(self, key: tuple[str, str] | None) -> None: + if key is None: + self._module.set_own_key_data(None) self._set_key(None) else: - self._client.get_module("PGPLegacy").set_own_key_data(dialog.selected_key) - self._set_key(dialog.selected_key) + self._module.set_own_key_data(key) + self._set_key(key) - def _set_key(self, key_data): + def _set_key(self, key_data: tuple[str, str] | None) -> None: if key_data is None: self._label.set_text(_("No key assigned")) else: @@ -104,3 +126,9 @@ class Page(Gtk.Box): self._label.set_markup( "%s %s" % (key_id, GLib.markup_escape_text(key_user)) ) + + def do_unroot(self) -> None: + Gtk.Box.do_unroot(self) + self._disconnect_all() + del self._module + app.check_finalize(self) diff --git a/pgp/gtk/config.ui b/pgp/gtk/config.ui index bdafb29..c81e3fc 100644 --- a/pgp/gtk/config.ui +++ b/pgp/gtk/config.ui @@ -1,41 +1,23 @@ - - + - True - False 12 - True - False stack - - False - True - 0 - 400 350 - True - False - True - 18 + 1 crossfade - - False - True - 1 - diff --git a/pgp/gtk/key.py b/pgp/gtk/key.py index 46d9426..551e835 100644 --- a/pgp/gtk/key.py +++ b/pgp/gtk/key.py @@ -14,27 +14,60 @@ # You should have received a copy of the GNU General Public License # along with PGP Gajim Plugin. If not, see . +from __future__ import annotations + +from typing import Any +from typing import cast +from typing import TYPE_CHECKING + +from collections.abc import Callable from pathlib import Path from gi.repository import GLib from gi.repository import Gtk +from nbxmpp import JID from gajim.common import app +from gajim.gtk.widgets import GajimAppWindow from gajim.plugins.helpers import get_builder from gajim.plugins.plugins_i18n import _ +from ..modules.pgp_legacy import PGPLegacy -class KeyDialog(Gtk.Dialog): - def __init__(self, plugin, account, jid, transient): - super().__init__(title=_("Assign key for %s") % jid, destroy_with_parent=True) +if TYPE_CHECKING: + from ..plugin import PGPPlugin - self.set_transient_for(transient) - self.set_resizable(True) - self.set_default_size(450, -1) + +class ChooseKeyBuilder(Gtk.Builder): + liststore: Gtk.ListStore + box: Gtk.Box + keys_treeview: Gtk.TreeView + cancel_button: Gtk.Button + ok_button: Gtk.Button + + +class KeyDialog(GajimAppWindow): + def __init__( + self, plugin: PGPPlugin, account: str, jid: JID, transient: Gtk.Window + ) -> None: + + GajimAppWindow.__init__( + self, + name="PGPKeyDialog", + title=_("Assign key for %s") % jid, + default_width=450, + transient_for=transient, + modal=True, + ) + + self.window.set_resizable(True) self._plugin = plugin - self._jid = jid - self._client = app.get_client(account) + self._jid = str(jid) + self._module = cast( + PGPLegacy, + app.get_client(account).get_module("PGPLegacy"), # pyright: ignore + ) self._label = Gtk.Label() @@ -42,45 +75,43 @@ class KeyDialog(Gtk.Dialog): self._assign_button.get_style_context().add_class("suggested-action") self._assign_button.set_halign(Gtk.Align.CENTER) self._assign_button.set_margin_top(18) - self._assign_button.connect("clicked", self._choose_key) + self._connect(self._assign_button, "clicked", self._choose_key) box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) - box.set_border_width(18) - box.add(self._label) - box.add(self._assign_button) + box.append(self._label) + box.append(self._assign_button) - area = self.get_content_area() - area.pack_start(box, True, True, 0) + self.set_child(box) self._load_key() - self.show_all() + self.show() - def _choose_key(self, *args): - backend = self._client.get_module("PGPLegacy").pgp_backend - dialog = ChooseGPGKeyDialog(backend.get_keys(), self) - dialog.connect("response", self._on_response) + def _cleanup(self) -> None: + del self._plugin + del self._module - def _load_key(self): - key_data = self._client.get_module("PGPLegacy").get_contact_key_data(self._jid) + def _choose_key(self, _button: Gtk.Button) -> None: + ChooseGPGKeyDialog( + self._module.pgp_backend.get_keys(), self.window, self._on_response + ) + + def _load_key(self) -> None: + key_data = self._module.get_contact_key_data(self._jid) if key_data is None: self._set_key(None) else: - self._set_key(key_data.values()) + key_id, key_user = key_data.values() + self._set_key((key_id, key_user)) - def _on_response(self, dialog, response): - if response != Gtk.ResponseType.OK: - return - - if dialog.selected_key is None: - self._client.get_module("PGPLegacy").set_contact_key_data(self._jid, None) + def _on_response(self, key: tuple[str, str] | None) -> None: + if key is None: + self._module.set_contact_key_data(self._jid, None) self._set_key(None) else: - self._client.get_module("PGPLegacy").set_contact_key_data( - self._jid, dialog.selected_key - ) - self._set_key(dialog.selected_key) + self._module.set_contact_key_data(self._jid, key) + self._set_key(key) - def _set_key(self, key_data): + def _set_key(self, key_data: tuple[str, str] | None) -> None: if key_data is None: self._label.set_text(_("No key assigned")) else: @@ -90,49 +121,56 @@ class KeyDialog(Gtk.Dialog): ) -class ChooseGPGKeyDialog(Gtk.Dialog): - def __init__(self, secret_keys, transient_for): - Gtk.Dialog.__init__( - self, title=_("Assign PGP Key"), transient_for=transient_for +class ChooseGPGKeyDialog(GajimAppWindow): + def __init__( + self, + secret_keys: dict[str, str], + transient: Gtk.Window, + callback: Callable[[tuple[str, str] | None], None], + ) -> None: + + GajimAppWindow.__init__( + self, + name="PGPChooseKeyDialog", + title=_("Assign PGP Key"), + default_width=450, + default_height=400, + transient_for=transient, + modal=True, ) secret_keys[_("None")] = _("None") - self.set_position(Gtk.WindowPosition.CENTER_ON_PARENT) - self.set_resizable(True) - self.set_default_size(500, 300) - - self.add_button(_("Cancel"), Gtk.ResponseType.CANCEL) - self.add_button(_("OK"), Gtk.ResponseType.OK) + self.window.set_resizable(True) + self._callback = callback self._selected_key = None ui_path = Path(__file__).parent - self._ui = get_builder(ui_path.resolve() / "choose_key.ui") + self._ui = cast( + ChooseKeyBuilder, get_builder(str(ui_path.resolve() / "choose_key.ui")) + ) - self._ui.keys_treeview = self._ui.keys_treeview + self._connect(self._ui.cancel_button, "clicked", self._on_cancel) + self._connect(self._ui.ok_button, "clicked", self._on_ok) + self._connect(self._ui.keys_treeview, "cursor-changed", self._on_row_changed) - model = self._ui.keys_treeview.get_model() + model = cast(Gtk.ListStore, self._ui.keys_treeview.get_model()) model.set_sort_func(1, self._sort) - model = self._ui.keys_treeview.get_model() for key_id in secret_keys.keys(): model.append((key_id, secret_keys[key_id])) - self.get_content_area().add(self._ui.box) + self.set_child(self._ui.box) + self.show() - self._ui.connect_signals(self) - - self.connect_after("response", self._on_response) - - self.show_all() - - @property - def selected_key(self): - return self._selected_key + def _cleanup(self) -> None: + del self._callback @staticmethod - def _sort(model, iter1, iter2, _data): + def _sort( + model: Gtk.TreeModel, iter1: Gtk.TreeIter, iter2: Gtk.TreeIter, _data: Any + ) -> int: value1 = model[iter1][1] value2 = model[iter2][1] if value1 == _("None"): @@ -143,10 +181,14 @@ class ChooseGPGKeyDialog(Gtk.Dialog): return -1 return 1 - def _on_response(self, _dialog, _response): - self.destroy() + def _on_cancel(self, _button: Gtk.Button) -> None: + self.close() - def _on_row_changed(self, treeview): + def _on_ok(self, _button: Gtk.Button) -> None: + self._callback(self._selected_key) + self.close() + + def _on_row_changed(self, treeview: Gtk.TreeView) -> None: selection = treeview.get_selection() model, iter_ = selection.get_selected() if iter_ is None: diff --git a/pgp/modules/pgp_legacy.py b/pgp/modules/pgp_legacy.py index 8c88113..ff8534d 100644 --- a/pgp/modules/pgp_legacy.py +++ b/pgp/modules/pgp_legacy.py @@ -14,21 +14,30 @@ # You should have received a copy of the GNU General Public License # along with PGP Gajim Plugin. If not, see . +from typing import Any + import os import threading import time +from collections.abc import Callable import nbxmpp from gi.repository import GLib +from nbxmpp.client import Client as nbxmppClient from nbxmpp.namespaces import Namespace from nbxmpp.protocol import Message +from nbxmpp.protocol import Presence from nbxmpp.structs import EncryptionData +from nbxmpp.structs import MessageProperties +from nbxmpp.structs import PresenceProperties from nbxmpp.structs import StanzaHandler from gajim.common import app +from gajim.common.client import Client from gajim.common.const import Trust from gajim.common.events import MessageNotSent from gajim.common.modules.base import BaseModule +from gajim.common.modules.httpupload import HTTPFileTransfer from gajim.common.structs import OutgoingMessage from gajim.plugins.plugins_i18n import _ @@ -68,7 +77,7 @@ ALLOWED_TAGS = [ class PGPLegacy(BaseModule): - def __init__(self, client): + def __init__(self, client: Client) -> None: BaseModule.__init__(self, client, plugin=True) self.handlers = [ @@ -92,26 +101,26 @@ class PGPLegacy(BaseModule): self._store = KeyStore( self._account, self.own_jid, self._log, self._pgp.list_keys ) - self._always_trust = [] - self._presence_fingerprint_store = {} + self._always_trust: list[str] = [] + self._presence_fingerprint_store: dict[str, str] = {} @property - def pgp_backend(self): + def pgp_backend(self) -> PGP: return self._pgp - def set_own_key_data(self, *args, **kwargs): - return self._store.set_own_key_data(*args, **kwargs) + def set_own_key_data(self, keydata: tuple[str, str] | None) -> None: + return self._store.set_own_key_data(keydata) - def get_own_key_data(self, *args, **kwargs): - return self._store.get_own_key_data(*args, **kwargs) + def get_own_key_data(self) -> dict[str, str] | None: + return self._store.get_own_key_data() - def set_contact_key_data(self, *args, **kwargs): - return self._store.set_contact_key_data(*args, **kwargs) + def set_contact_key_data(self, jid: str, key_data: tuple[str, str] | None) -> None: + return self._store.set_contact_key_data(jid, key_data) - def get_contact_key_data(self, *args, **kwargs): - return self._store.get_contact_key_data(*args, **kwargs) + def get_contact_key_data(self, jid: str) -> dict[str, str] | None: + return self._store.get_contact_key_data(jid) - def has_valid_key_assigned(self, jid): + def has_valid_key_assigned(self, jid: str) -> bool: key_data = self.get_contact_key_data(jid) if key_data is None: return False @@ -126,9 +135,13 @@ class PGPLegacy(BaseModule): raise KeyMismatch(announced_fingerprint) - def _on_presence_received(self, _con, _stanza, properties): + def _on_presence_received( + self, _client: nbxmppClient, _stanza: Presence, properties: PresenceProperties + ): if properties.signed is None: return + + assert properties.jid is not None jid = properties.jid.bare fingerprint = self._pgp.verify(properties.status, properties.signed) @@ -160,13 +173,16 @@ class PGPLegacy(BaseModule): ) return - def _message_received(self, _con, stanza, properties): + def _message_received( + self, _client: nbxmppClient, stanza: Message, properties: MessageProperties + ) -> None: if not properties.is_pgp_legacy or properties.from_muc: return remote_jid = properties.remote_jid self._log.info("Message received from: %s", remote_jid) + assert properties.pgp_legacy is not None payload = self._pgp.decrypt(properties.pgp_legacy) prepare_stanza(stanza, payload) @@ -174,7 +190,12 @@ class PGPLegacy(BaseModule): protocol=ENCRYPTION_NAME, key="Unknown", trust=Trust.UNDECIDED ) - def encrypt_message(self, con, message: OutgoingMessage, callback): + def encrypt_message( + self, + client: Client, + message: OutgoingMessage, + callback: Callable[[OutgoingMessage], None], + ) -> None: if not message.get_text(): callback(message) return @@ -187,15 +208,24 @@ class PGPLegacy(BaseModule): return always_trust = key_id in self._always_trust - self._encrypt(con, message, [key_id, own_key_id], callback, always_trust) + self._encrypt(client, message, [key_id, own_key_id], callback, always_trust) def _encrypt( - self, con, message: OutgoingMessage, keys, callback, always_trust: bool - ): - result = self._pgp.encrypt(message.get_text(), keys, always_trust) + self, + client: Client, + message: OutgoingMessage, + recipients: list[str], + callback: Callable[[OutgoingMessage], None], + always_trust: bool, + ) -> None: + + text = message.get_text() + assert text is not None + + result = self._pgp.encrypt(text, recipients, always_trust) encrypted_payload, error = result if error: - self._handle_encrypt_error(con, error, message, keys, callback) + self._handle_encrypt_error(client, error, message, recipients, callback) return self._cleanup_stanza(message) @@ -212,30 +242,41 @@ class PGPLegacy(BaseModule): callback(message) def _handle_encrypt_error( - self, con, error: str, message: OutgoingMessage, keys, callback - ): + self, + client: Client, + error: str, + message: OutgoingMessage, + recipients: list[str], + callback: Callable[[OutgoingMessage], None], + ) -> None: if error.startswith("NOT_TRUSTED"): - def on_yes(checked): + def on_yes(checked: bool) -> None: if checked: - self._always_trust.append(keys[0]) - self._encrypt(con, message, keys, callback, True) + self._always_trust.append(recipients[0]) + self._encrypt(client, message, recipients, callback, True) - def on_no(): - self._raise_message_not_sent(con, message, error) + def on_no() -> None: + self._raise_message_not_sent(client, message, error) app.ged.raise_event(PGPNotTrusted(on_yes=on_yes, on_no=on_no)) else: - self._raise_message_not_sent(con, message, error) + self._raise_message_not_sent(client, message, error) @staticmethod - def _raise_message_not_sent(con, message: OutgoingMessage, error: str): + def _raise_message_not_sent( + client: Client, message: OutgoingMessage, error: str + ) -> None: + + text = message.get_text() + assert text is not None + app.ged.raise_event( MessageNotSent( - client=con, + client=client, jid=str(message.contact.jid), - message=message.get_text(), + message=text, error=_("Encryption error: %s") % error, time=time.time(), ) @@ -250,7 +291,7 @@ class PGPLegacy(BaseModule): ) stanza.addChild(node=eme_node) - def sign_presence(self, presence, status): + def sign_presence(self, presence: nbxmpp.Presence, status: str) -> None: key_data = self.get_own_key_data() if key_data is None: self._log.warning("No own key id found, can’t sign presence") @@ -266,7 +307,7 @@ class PGPLegacy(BaseModule): presence.setTag(Namespace.SIGNED + " x").setData(result) @staticmethod - def _get_info_message(): + def _get_info_message() -> str: msg = "[This message is *encrypted* (See :XEP:`27`)]" lang = os.getenv("LANG") if lang is not None and not lang.startswith("en"): @@ -274,7 +315,7 @@ class PGPLegacy(BaseModule): msg = _("[This message is *encrypted* (See :XEP:`27`)]") + " (" + msg + ")" return msg - def _get_key_ids(self, jid): + def _get_key_ids(self, jid: str) -> tuple[str, str]: key_data = self.get_contact_key_data(jid) if key_data is None: raise NoKeyIdFound("No key id found for %s" % jid) @@ -301,21 +342,25 @@ class PGPLegacy(BaseModule): stanza.addChild(node=node) message.set_stanza(stanza) - def encrypt_file(self, file, callback): + def encrypt_file( + self, transfer: HTTPFileTransfer, callback: Callable[[HTTPFileTransfer], None] + ) -> None: thread = threading.Thread( - target=self._encrypt_file_thread, args=(file, callback) + target=self._encrypt_file_thread, args=(transfer, callback) ) thread.daemon = True thread.start() - def _encrypt_file_thread(self, file, callback): + def _encrypt_file_thread( + self, transfer: HTTPFileTransfer, callback: Callable[[HTTPFileTransfer], None] + ) -> None: try: - key_id, own_key_id = self._get_key_ids(file.contact.jid) + key_id, own_key_id = self._get_key_ids(str(transfer.contact.jid)) except NoKeyIdFound as error: self._log.warning(error) return - stream = open(file.path, "rb") + stream = open(transfer.path, "rb") encrypted = self._pgp.encrypt_file(stream, [key_id, own_key_id]) stream.close() @@ -323,15 +368,15 @@ class PGPLegacy(BaseModule): GLib.idle_add(self._on_file_encryption_error, encrypted.status) return - file.size = len(encrypted.data) - file.set_uri_transform_func(lambda uri: "%s.pgp" % uri) - file.set_encrypted_data(encrypted.data) - GLib.idle_add(callback, file) + transfer.size = len(encrypted.data) + transfer.set_uri_transform_func(lambda uri: "%s.pgp" % uri) + transfer.set_encrypted_data(encrypted.data) + GLib.idle_add(callback, transfer) @staticmethod - def _on_file_encryption_error(error): + def _on_file_encryption_error(error: str) -> None: app.ged.raise_event(PGPFileEncryptionError(error=error)) -def get_instance(*args, **kwargs): +def get_instance(*args: Any, **kwargs: Any) -> tuple[PGPLegacy, str]: return PGPLegacy(*args, **kwargs), "PGPLegacy" diff --git a/pgp/modules/util.py b/pgp/modules/util.py index f6fc17a..334b39d 100644 --- a/pgp/modules/util.py +++ b/pgp/modules/util.py @@ -17,23 +17,24 @@ import os import subprocess +from nbxmpp import Message from nbxmpp.namespaces import Namespace -def prepare_stanza(stanza, plaintext): +def prepare_stanza(stanza: Message, plaintext: str) -> None: delete_nodes(stanza, "encrypted", Namespace.ENCRYPTED) delete_nodes(stanza, "body") stanza.setBody(plaintext) -def delete_nodes(stanza, name, namespace=None): +def delete_nodes(stanza: Message, name: str, namespace: str | None = None) -> None: nodes = stanza.getTags(name, namespace=namespace) for node in nodes: stanza.delChild(node) def find_gpg(): - def _search(binary): + def _search(binary: str) -> bool: if os.name == "nt": gpg_cmd = binary + " -h >nul 2>&1" else: diff --git a/pgp/plugin.py b/pgp/plugin.py index 91b460c..ea0d299 100644 --- a/pgp/plugin.py +++ b/pgp/plugin.py @@ -14,15 +14,25 @@ # You should have received a copy of the GNU General Public License # along with PGP Gajim Plugin. If not, see . +from __future__ import annotations + +from typing import Any +from typing import TYPE_CHECKING + import logging import os -import sys +from collections.abc import Callable from functools import partial +import nbxmpp from packaging.version import Version as V from gajim.common import app from gajim.common import ged +from gajim.common.client import Client +from gajim.common.modules.httpupload import HTTPFileTransfer +from gajim.common.structs import OutgoingMessage +from gajim.gtk.control import ChatControl from gajim.gtk.dialogs import ConfirmationCheckDialog from gajim.gtk.dialogs import DialogButton from gajim.gtk.dialogs import SimpleDialog @@ -32,17 +42,23 @@ from gajim.plugins.plugins_i18n import _ from pgp.exceptions import KeyMismatch from pgp.gtk.config import PGPConfigDialog from pgp.gtk.key import KeyDialog +from pgp.modules.events import PGPFileEncryptionError +from pgp.modules.events import PGPNotTrusted from pgp.modules.util import find_gpg +if TYPE_CHECKING: + from pgp.modules.pgp_legacy import PGPLegacy + + ENCRYPTION_NAME = "PGP" log = logging.getLogger("gajim.p.pgplegacy") -ERROR = False +error = False try: import gnupg except ImportError: - ERROR = True + error = True else: # We need https://pypi.python.org/pypi/python-gnupg # but https://pypi.python.org/pypi/gnupg shares the same package name. @@ -53,31 +69,27 @@ else: v_gnupg = gnupg.__version__ if V(v_gnupg) < V("0.3.8") or V(v_gnupg) > V("1.0.0"): log.error("We need python-gnupg >= 0.3.8") - ERROR = True + error = True -ERROR_MSG = None +error_msg = None BINARY = find_gpg() log.info("Found GPG executable: %s", BINARY) -if BINARY is None or ERROR: +if BINARY is None or error: if os.name == "nt": - ERROR_MSG = _("Please install GnuPG / Gpg4win") + error_msg = _("Please install GnuPG / Gpg4win") else: - ERROR_MSG = _("Please install python-gnupg and gnupg") -else: - from pgp.backend.python_gnupg import PGP - from pgp.modules import pgp_legacy + error_msg = _("Please install python-gnupg and gnupg") class PGPPlugin(GajimPlugin): def init(self): - # pylint: disable=attribute-defined-outside-init self.description = _("PGP encryption as per XEP-0027") - if ERROR_MSG: + if error_msg: self.activatable = False self.config_dialog = None - self.available_text = ERROR_MSG + self.available_text = error_msg return self.config_dialog = partial(PGPConfigDialog, self) @@ -91,6 +103,8 @@ class PGPPlugin(GajimPlugin): "send-presence": (self._on_send_presence, None), } + from pgp.modules import pgp_legacy + self.modules = [pgp_legacy] self.events_handlers = { @@ -98,40 +112,36 @@ class PGPPlugin(GajimPlugin): "pgp-file-encryption-error": (ged.PRECORE, self._on_file_encryption_error), } - encoding = "utf8" if sys.platform == "linux" else None - self._pgp = PGP(BINARY, encoding=encoding) - @staticmethod - def get_pgp_module(account): - return app.get_client(account).get_module("PGPLegacy") + def get_pgp_module(account: str) -> PGPLegacy: + return app.get_client(account).get_module("PGPLegacy") # pyright: ignore - def activate(self): + def activate(self) -> None: pass - def deactivate(self): + def deactivate(self) -> None: pass - @staticmethod - def activate_encryption(_chat_control): + def activate_encryption(self, chat_control: ChatControl) -> bool: return True @staticmethod - def _encryption_state(_chat_control, state): + def _encryption_state(_chat_control: ChatControl, state: dict[str, Any]) -> None: state["visible"] = True state["authenticated"] = True - def _on_encryption_dialog(self, chat_control): + def _on_encryption_dialog(self, chat_control: ChatControl): account = chat_control.account jid = chat_control.contact.jid transient = app.window KeyDialog(self, account, jid, transient) - def _on_send_presence(self, account, presence): + def _on_send_presence(self, account: str, presence: nbxmpp.Presence) -> None: status = presence.getStatus() self.get_pgp_module(account).sign_presence(presence, status) @staticmethod - def _on_not_trusted(event): + def _on_not_trusted(event: PGPNotTrusted) -> None: ConfirmationCheckDialog( _("Untrusted PGP key"), _( @@ -148,14 +158,14 @@ class PGPPlugin(GajimPlugin): ], ).show() - @staticmethod - def _before_sendmessage(chat_control): + def _before_sendmessage(self, chat_control: ChatControl) -> None: account = chat_control.account - jid = chat_control.contact.jid + jid = str(chat_control.contact.jid) + + pgp = self.get_pgp_module(account) - client = app.get_client(account) try: - valid = client.get_module("PGPLegacy").has_valid_key_assigned(jid) + valid = pgp.has_valid_key_assigned(jid) except KeyMismatch as announced_key_id: SimpleDialog( _("PGP Key mismatch"), @@ -174,20 +184,29 @@ class PGPPlugin(GajimPlugin): _("No OpenPGP key is assigned to this contact."), ) chat_control.sendmessage = False - elif client.get_module("PGPLegacy").get_own_key_data() is None: + elif pgp.get_own_key_data() is None: SimpleDialog( _("No OpenPGP key assigned"), _("No OpenPGP key is assigned to your account."), ) chat_control.sendmessage = False - def _encrypt_message(self, conn, event, callback): - account = conn.name - self.get_pgp_module(account).encrypt_message(conn, event, callback) + def _encrypt_message( + self, + client: Client, + event: OutgoingMessage, + callback: Callable[[OutgoingMessage], None], + ): + self.get_pgp_module(client.name).encrypt_message(client, event, callback) - def encrypt_file(self, file, account, callback): - self.get_pgp_module(account).encrypt_file(file, callback) + def encrypt_file( + self, + transfer: HTTPFileTransfer, + account: str, + callback: Callable[[HTTPFileTransfer], None], + ): + self.get_pgp_module(account).encrypt_file(transfer, callback) @staticmethod - def _on_file_encryption_error(event): + def _on_file_encryption_error(event: PGPFileEncryptionError) -> None: SimpleDialog(_("Error"), event.error) diff --git a/pyproject.toml b/pyproject.toml index 98235f1..4920346 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,6 @@ exclude = [ ".git", ".venv", "openpgp/*", - "pgp/*", ] [tool.ruff] diff --git a/typings/gnupg/__init__.pyi b/typings/gnupg/__init__.pyi new file mode 100644 index 0000000..a3b6ea0 --- /dev/null +++ b/typings/gnupg/__init__.pyi @@ -0,0 +1,899 @@ +""" +This type stub file was generated by pyright. +""" + +from typing import Any + +__version__: str = ... +__author__: str = ... +__date__: str = ... + +class StatusHandler: + """ + The base class for handling status messages from `gpg`. + """ + + on_data_failure = ... + def __init__(self, gpg: GPG) -> None: + """ + Initialize an instance. + + Args: + gpg (GPG): The :class:`GPG` instance in use. + """ + ... + + def handle_status(self, key: str, value: str) -> None: + """ + Handle status messages from the `gpg` child process. These are lines of the format + + [GNUPG:] + + Args: + key (str): Identifies what the status message is. + value (str): Identifies additional data, which differs depending on the key. + """ + ... + +class Verify(StatusHandler): + """ + This class handles status messages during signature verificaton. + """ + + TRUST_EXPIRED: int = ... + TRUST_UNDEFINED: int = ... + TRUST_NEVER: int = ... + TRUST_MARGINAL: int = ... + TRUST_FULLY: int = ... + TRUST_ULTIMATE: int = ... + TRUST_LEVELS: dict[str, int] = ... + GPG_SYSTEM_ERROR_CODES: dict[int, str] = ... + GPG_ERROR_CODES: dict[int, str] = ... + returncode = ... + valid: bool = ... + fingerprint: str | None = ... + def __init__(self, gpg: GPG) -> None: ... + def __nonzero__(self) -> bool: ... + + __bool__ = ... + def handle_status(self, key: str, value: str) -> None: ... + +class ImportResult(StatusHandler): + """ + This class handles status messages during key import. + """ + + counts = ... + returncode = ... + def __init__(self, gpg: GPG) -> None: ... + def __nonzero__(self) -> bool: ... + + __bool__ = ... + ok_reason: dict[str, str] = ... + problem_reason: dict[str, str] = ... + def handle_status(self, key: str, value: str) -> None: ... + def summary(self) -> str: + """ + Return a summary indicating how many keys were imported and how many were not imported. + """ + ... + +ESCAPE_PATTERN = ... +BASIC_ESCAPES = ... + +class SendResult(StatusHandler): + """ + This class handles status messages during key sending. + """ + + returncode = ... + def handle_status(self, key: str, value: str) -> None: ... + +class SearchKeys(StatusHandler, list[dict[Any, Any]]): + """ + This class handles status messages during key search. + """ + + UID_INDEX = ... + FIELDS = ... + returncode = ... + def __init__(self, gpg: GPG) -> None: ... + def get_fields(self, args: Any) -> dict[str, Any]: + """ + Internal method used to update the instance from a `gpg` status message. + """ + ... + + def pub(self, args: Any) -> None: + """ + Internal method used to update the instance from a `gpg` status message. + """ + ... + + def uid(self, args: Any) -> None: + """ + Internal method used to update the instance from a `gpg` status message. + """ + ... + + def handle_status(self, key: str, value: str) -> None: ... + +class ListKeys(SearchKeys): + """ + This class handles status messages during listing keys and signatures. + + Handle pub and uid (relating the latter to the former). + + We don't care about (info from GnuPG DETAILS file): + + crt = X.509 certificate + crs = X.509 certificate and private key available + uat = user attribute (same as user id except for field 10). + sig = signature + rev = revocation signature + pkd = public key data (special field format, see below) + grp = reserved for gpgsm + rvk = revocation key + """ + + UID_INDEX = ... + FIELDS = ... + fingerprints: list[str] = ... + def __init__(self, gpg: GPG) -> None: ... + def key(self, args: Any) -> None: + """ + Internal method used to update the instance from a `gpg` status message. + """ + ... + sec = ... + def fpr(self, args: Any) -> None: + """ + Internal method used to update the instance from a `gpg` status message. + """ + ... + + def grp(self, args: Any) -> None: + """ + Internal method used to update the instance from a `gpg` status message. + """ + ... + + def sub(self, args: Any) -> None: + """ + Internal method used to update the instance from a `gpg` status message. + """ + ... + + def ssb(self, args: Any) -> None: + """ + Internal method used to update the instance from a `gpg` status message. + """ + ... + + def sig(self, args: Any) -> None: + """ + Internal method used to update the instance from a `gpg` status message. + """ + ... + +class ScanKeys(ListKeys): + """ + This class handles status messages during scanning keys. + """ + + def sub(self, args: Any) -> None: + """ + Internal method used to update the instance from a `gpg` status message. + """ + ... + +class TextHandler: ... + +_INVALID_KEY_REASONS = ... + +class Crypt(Verify, TextHandler): + """ + This class handles status messages during encryption and decryption. + """ + + ok: bool = ... + status: str = ... + data: bytes = ... + def __init__(self, gpg: GPG) -> None: ... + def __nonzero__(self) -> bool: ... + + __bool__ = ... + def handle_status(self, key: str, value: str) -> None: ... + +class GenKey(StatusHandler): + """ + This class handles status messages during key generation. + """ + + returncode = ... + def __init__(self, gpg: GPG) -> None: ... + def __nonzero__(self) -> bool: ... + + __bool__ = ... + def __str__(self) -> str: ... + def handle_status(self, key: str, value: str) -> None: ... + +class AddSubkey(StatusHandler): + """ + This class handles status messages during subkey addition. + """ + + returncode = ... + def __init__(self, gpg: GPG) -> None: ... + def __nonzero__(self) -> bool: ... + + __bool__ = ... + def __str__(self) -> str: ... + def handle_status(self, key: str, value: str) -> None: ... + +class ExportResult(GenKey): + """ + This class handles status messages during key export. + """ + + def handle_status(self, key: str, value: str) -> None: ... + +class DeleteResult(StatusHandler): + """ + This class handles status messages during key deletion. + """ + + returncode = ... + def __init__(self, gpg: GPG) -> None: ... + def __str__(self) -> str: ... + + problem_reason = ... + def handle_status(self, key: str, value: str) -> None: ... + def __nonzero__(self) -> bool: ... + + __bool__ = ... + +class TrustResult(DeleteResult): + """ + This class handles status messages during key trust setting. + """ + + ... + +class Sign(StatusHandler, TextHandler): + """ + This class handles status messages during signing. + """ + + returncode = ... + fingerprint: str | None = ... + status: str | None = ... + def __init__(self, gpg: GPG) -> None: ... + def __nonzero__(self) -> bool: ... + + __bool__ = ... + def handle_status(self, key: str, value: str) -> None: ... + +class AutoLocateKey(StatusHandler): + """ + This class handles status messages during key auto-locating. + fingerprint: str + key_length: int + created_at: date + email: str + email_real_name: str + """ + + def __init__(self, gpg: GPG) -> None: ... + def handle_status(self, key: str, value: str) -> None: ... + def pub(self, args: Any) -> None: + """ + Internal method to handle the 'pub' status message. + `pub` message contains the fingerprint of the public key, its type and its creation date. + """ + ... + + def uid(self, args: Any) -> None: ... + def sub(self, args: Any) -> None: ... + def fpr(self, args: Any) -> None: ... + +VERSION_RE = ... +HEX_DIGITS_RE = ... +PUBLIC_KEY_RE = ... + +class GPG: + """ + This class provides a high-level programmatic interface for `gpg`. + """ + + error_map = ... + encoding: str = ... + decode_errors: str = ... + buffer_size = ... + result_map = ... + def __init__( + self, + gpgbinary: str = ..., + gnupghome: str | None = ..., + verbose: bool = ..., + use_agent: bool = ..., + keyring: str | list[str] | None = ..., + options: list[str] | None = ..., + secret_keyring: str | list[str] | None = ..., + env: dict[str, str] | None = ..., + ) -> None: + """Initialize a GPG process wrapper. + + Args: + gpgbinary (str): A pathname for the GPG binary to use. + + gnupghome (str): A pathname to where we can find the public and private keyrings. The default is + whatever `gpg` defaults to. + + keyring (str|list): The name of alternative keyring file to use, or a list of such keyring files. If + specified, the default keyring is not used. + + options (list): A list of additional options to pass to the GPG binary. + + secret_keyring (str|list): The name of an alternative secret keyring file to use, or a list of such + keyring files. + + env (dict): A dict of environment variables to be used for the GPG subprocess. + """ + ... + + def make_args(self, args: list[str], passphrase: str) -> list[str]: + """ + Make a list of command line elements for GPG. The value of ``args`` + will be appended. The ``passphrase`` argument needs to be True if + a passphrase will be sent to `gpg`, else False. + + Args: + args (list[str]): A list of arguments. + passphrase (str): The passphrase to use. + """ + ... + + def is_valid_file(self, fileobj: Any) -> bool: + """ + A simplistic check for a file-like object. + + Args: + fileobj (object): The object to test. + Returns: + bool: ``True`` if it's a file-like object, else ``False``. + """ + ... + + def sign(self, message: str | bytes, **kwargs: Any) -> Sign: + """ + Sign a message. This method delegates most of the work to the `sign_file()` method. + + Args: + message (str|bytes): The data to sign. + kwargs (dict): Keyword arguments, which are passed to `sign_file()`: + + * keyid (str): The key id of the signer. + + * passphrase (str): The passphrase for the key. + + * clearsign (bool): Whether to use clear signing. + + * detach (bool): Whether to produce a detached signature. + + * binary (bool): Whether to produce a binary signature. + + * output (str): The path to write a detached signature to. + + * extra_args (list[str]): Additional arguments to pass to `gpg`. + """ + ... + + def set_output_without_confirmation(self, args: list[str], output: str) -> None: + """ + If writing to a file which exists, avoid a confirmation message by + updating the *args* value in place to set the output path and avoid + any cpmfirmation prompt. + + Args: + args (list[str]): A list of arguments. + output (str): The path to the outpur file. + """ + ... + + def is_valid_passphrase(self, passphrase: str) -> bool: + """ + Confirm that the passphrase doesn't contain newline-type characters - it is passed in a pipe to `gpg`, + and so not checking could lead to spoofing attacks by passing arbitrary text after passphrase and newline. + + Args: + passphrase (str): The passphrase to test. + + Returns: + bool: ``True`` if it's a valid passphrase, else ``False``. + """ + ... + + def sign_file( + self, + fileobj_or_path: Any, + keyid: str | None = ..., + passphrase: str | None = ..., + clearsign: bool = ..., + detach: bool = ..., + binary: bool = ..., + output: str | None = ..., + extra_args: list[str] | None = ..., + ) -> Sign: + """ + Sign data in a file or file-like object. + + Args: + fileobj_or_path (str|file): The file or file-like object to sign. + + keyid (str): The key id of the signer. + + passphrase (str): The passphrase for the key. + + clearsign (bool): Whether to use clear signing. + + detach (bool): Whether to produce a detached signature. + + binary (bool): Whether to produce a binary signature. + + output (str): The path to write a detached signature to. + + extra_args (list[str]): Additional arguments to pass to `gpg`. + """ + ... + + def verify(self, data: str | bytes, **kwargs: Any) -> Verify: + """ + Verify the signature on the contents of the string *data*. This method delegates most of the work to + `verify_file()`. + + Args: + data (str|bytes): The data to verify. + kwargs (dict): Keyword arguments, which are passed to `verify_file()`: + + * fileobj_or_path (str|file): A path to a signature, or a file-like object containing one. + + * data_filename (str): If the signature is a detached one, the path to the data that was signed. + + * close_file (bool): If a file-like object is passed in, whether to close it. + + * extra_args (list[str]): Additional arguments to pass to `gpg`. + """ + ... + + def verify_file( + self, + fileobj_or_path: Any, + data_filename: str | None = ..., + close_file: bool = ..., + extra_args: list[str] | None = ..., + ) -> Verify: + """ + Verify a signature. + + Args: + fileobj_or_path (str|file): A path to a signature, or a file-like object containing one. + + data_filename (str): If the signature is a detached one, the path to the data that was signed. + + close_file (bool): If a file-like object is passed in, whether to close it. + + extra_args (list[str]): Additional arguments to pass to `gpg`. + """ + ... + + def verify_data( + self, sig_filename: str, data: str | bytes, extra_args: list[str] | None = ... + ) -> Verify: + """ + Verify the signature in sig_filename against data in memory + + Args: + sig_filename (str): The path to a signature. + + data (str|bytes): The data to be verified. + + extra_args (list[str]): Additional arguments to pass to `gpg`. + """ + ... + + def import_keys( + self, + key_data: str | bytes, + extra_args: list[str] | None = ..., + passphrase: str | None = ..., + ) -> ImportResult: + """ + Import the key_data into our keyring. + + Args: + key_data (str|bytes): The key data to import. + + passphrase (str): The passphrase to use. + + extra_args (list[str]): Additional arguments to pass to `gpg`. + """ + ... + + def import_keys_file(self, key_path: str, **kwargs: Any) -> ImportResult: + """ + Import the key data in key_path into our keyring. + + Args: + key_path (str): A path to the key data to be imported. + """ + ... + + def recv_keys(self, keyserver: str, *keyids: str, **kwargs: Any) -> ImportResult: + """ + Import one or more keys from a keyserver. + + Args: + keyserver (str): The key server hostname. + + keyids (str): A list of key ids to receive. + """ + ... + + def send_keys(self, keyserver: str, *keyids: str, **kwargs: Any) -> SendResult: + """ + Send one or more keys to a keyserver. + + Args: + keyserver (str): The key server hostname. + + keyids (list[str]): A list of key ids to send. + """ + ... + + def delete_keys( + self, + fingerprints: str | list[str], + secret: bool = ..., + passphrase: str | None = ..., + expect_passphrase: bool = ..., + exclamation_mode: bool = ..., + ) -> DeleteResult: + """ + Delete the indicated keys. + + Args: + fingerprints (str|list[str]): The keys to delete. + + secret (bool): Whether to delete secret keys. + + passphrase (str): The passphrase to use. + + expect_passphrase (bool): Whether a passphrase is expected. + + exclamation_mode (bool): If specified, a `'!'` is appended to each fingerprint. This deletes only a subkey + or an entire key, depending on what the fingerprint refers to. + + .. note:: Passphrases + + Since GnuPG 2.1, you can't delete secret keys without providing a passphrase. However, if you're expecting + the passphrase to go to `gpg` via pinentry, you should specify expect_passphrase=False. (It's only checked + for GnuPG >= 2.1). + """ + ... + + def export_keys( + self, + keyids: str | list[str], + secret: bool = ..., + armor: bool = ..., + minimal: bool = ..., + passphrase: str | None = ..., + expect_passphrase: bool = ..., + output: str | None = ..., + ) -> ExportResult: + """ + Export the indicated keys. A 'keyid' is anything `gpg` accepts. + + Args: + keyids (str|list[str]): A single keyid or a list of them. + + secret (bool): Whether to export secret keys. + + armor (bool): Whether to ASCII-armor the output. + + minimal (bool): Whether to pass `--export-options export-minimal` to `gpg`. + + passphrase (str): The passphrase to use. + + expect_passphrase (bool): Whether a passphrase is expected. + + output (str): If specified, the path to write the exported key(s) to. + + .. note:: Passphrases + + Since GnuPG 2.1, you can't export secret keys without providing a passphrase. However, if you're expecting + the passphrase to go to `gpg` via pinentry, you should specify expect_passphrase=False. (It's only checked + for GnuPG >= 2.1). + """ + ... + + def list_keys( + self, secret: bool = ..., keys: str | list[str] | None = ..., sigs: bool = ... + ) -> ListKeys: + """ + List the keys currently in the keyring. + + Args: + secret (bool): Whether to list secret keys. + + keys (str|list[str]): A list of key ids to match. + + sigs (bool): Whether to include signature information. + + Returns: + list[dict]: A list of dictionaries with key information. + """ + ... + + def scan_keys(self, filename: str) -> ScanKeys: + """ + List details of an ascii armored or binary key file without first importing it to the local keyring. + + Args: + filename (str): The path to the file containing the key(s). + + .. warning:: Warning: + Care is needed. The function works on modern GnuPG by running: + + $ gpg --dry-run --import-options import-show --import filename + + On older versions, it does the *much* riskier: + + $ gpg --with-fingerprint --with-colons filename + """ + ... + + def scan_keys_mem(self, key_data: str | bytes) -> ScanKeys: + """ + List details of an ascii armored or binary key without first importing it to the local keyring. + + Args: + key_data (str|bytes): The key data to import. + + .. warning:: Warning: + Care is needed. The function works on modern GnuPG by running: + + $ gpg --dry-run --import-options import-show --import filename + + On older versions, it does the *much* riskier: + + $ gpg --with-fingerprint --with-colons filename + """ + ... + + def search_keys( + self, query: str, keyserver: str = ..., extra_args: list[str] | None = ... + ) -> SearchKeys: + """ + search a keyserver by query (using the `--search-keys` option). + + Args: + query(str): The query to use. + + keyserver (str): The key server hostname. + + extra_args (list[str]): Additional arguments to pass to `gpg`. + """ + ... + + def auto_locate_key( + self, email: str, mechanisms: list[str] | None = ..., **kwargs: Any + ) -> AutoLocateKey: + """ + Auto locate a public key by `email`. + + Args: + email (str): The email address to search for. + mechanisms (list[str]): A list of mechanisms to use. Valid mechanisms can be found + here https://www.gnupg.org/documentation/manuals/gnupg/GPG-Configuration-Options.html + under "--auto-key-locate". Default: ['wkd', 'ntds', 'ldap', 'cert', 'dane', 'local'] + """ + ... + + def gen_key(self, input: str) -> GenKey: + """ + Generate a key; you might use `gen_key_input()` to create the input. + + Args: + input (str): The input to the key creation operation. + """ + ... + + def gen_key_input(self, **kwargs: Any) -> str: + """ + Generate `--gen-key` input (see `gpg` documentation in DETAILS). + + Args: + kwargs (dict): A list of keyword arguments. + Returns: + str: A string suitable for passing to the `gen_key()` method. + """ + ... + + def add_subkey( + self, + master_key: str, + master_passphrase: str | None = ..., + algorithm: str = ..., + usage: str = ..., + expire: str = ..., + ) -> AddSubkey: + """ + Add subkeys to a master key, + + Args: + master_key (str): The master key. + + master_passphrase (str): The passphrase for the master key. + + algorithm (str): The key algorithm to use. + + usage (str): The desired uses for the subkey. + + expire (str): The expiration date of the subkey. + """ + ... + + def encrypt_file( + self, + fileobj_or_path: Any, + recipients: str | list[str], + sign: str | None = ..., + always_trust: bool = ..., + passphrase: str | None = ..., + armor: bool = ..., + output: str | None = ..., + symmetric: bool = ..., + extra_args: list[str] | None = ..., + ) -> Crypt: + """ + Encrypt data in a file or file-like object. + + Args: + fileobj_or_path (str|file): A path to a file or a file-like object containing the data to be encrypted. + + recipients (str|list): A key id of a recipient of the encrypted data, or a list of such key ids. + + sign (str): If specified, the key id of a signer to sign the encrypted data. + + always_trust (bool): Whether to always trust keys. + + passphrase (str): The passphrase to use for a signature. + + armor (bool): Whether to ASCII-armor the output. + + output (str): A path to write the encrypted output to. + + symmetric (bool): Whether to use symmetric encryption, + + extra_args (list[str]): A list of additional arguments to pass to `gpg`. + """ + ... + + def encrypt( + self, data: str | bytes, recipients: str | list[str], **kwargs: Any + ) -> Crypt: + """ + Encrypt the message contained in the string *data* for *recipients*. This method delegates most of the work to + `encrypt_file()`. + + Args: + data (str|bytes): The data to encrypt. + + recipients (str|list[str]): A key id of a recipient of the encrypted data, or a list of such key ids. + + kwargs (dict): Keyword arguments, which are passed to `encrypt_file()`: + * sign (str): If specified, the key id of a signer to sign the encrypted data. + + * always_trust (bool): Whether to always trust keys. + + * passphrase (str): The passphrase to use for a signature. + + * armor (bool): Whether to ASCII-armor the output. + + * output (str): A path to write the encrypted output to. + + * symmetric (bool): Whether to use symmetric encryption, + + * extra_args (list[str]): A list of additional arguments to pass to `gpg`. + """ + ... + + def decrypt(self, message: str | bytes, **kwargs: Any) -> Crypt: + """ + Decrypt the data in *message*. This method delegates most of the work to + `decrypt_file()`. + + Args: + message (str|bytes): The data to decrypt. A default key will be used for decryption. + + kwargs (dict): Keyword arguments, which are passed to `decrypt_file()`: + + * always_trust: Whether to always trust keys. + + * passphrase (str): The passphrase to use. + + * output (str): If specified, the path to write the decrypted data to. + + * extra_args (list[str]): A list of extra arguments to pass to `gpg`. + """ + ... + + def decrypt_file( + self, + fileobj_or_path: Any, + always_trust: bool = ..., + passphrase: str | None = ..., + output: str | None = ..., + extra_args: list[str] | None = ..., + ) -> Crypt: + """ + Decrypt data in a file or file-like object. + + Args: + fileobj_or_path (str|file): A path to a file or a file-like object containing the data to be decrypted. + + always_trust: Whether to always trust keys. + + passphrase (str): The passphrase to use. + + output (str): If specified, the path to write the decrypted data to. + + extra_args (list[str]): A list of extra arguments to pass to `gpg`. + """ + ... + + def get_recipients(self, message: str | bytes, **kwargs: Any) -> list[str]: + """Get the list of recipients for an encrypted message. This method delegates most of the work to + `get_recipients_file()`. + + Args: + message (str|bytes): The encrypted message. + + kwargs (dict): Keyword arguments, which are passed to `get_recipients_file()`: + + * extra_args (list[str]): A list of extra arguments to pass to `gpg`. + """ + ... + + def get_recipients_file( + self, fileobj_or_path: Any, extra_args: list[str] | None = ... + ) -> list[str]: + """ + Get the list of recipients for an encrypted message in a file or file-like object. + + Args: + fileobj_or_path (str|file): A path to a file or file-like object containing the encrypted data. + + extra_args (list[str]): A list of extra arguments to pass to `gpg`. + """ + ... + + def trust_keys(self, fingerprints: str | list[str], trustlevel: str) -> TrustResult: + """ + Set the trust level for one or more keys. + + Args: + fingerprints (str|list[str]): A key id for which to set the trust level, or a list of such key ids. + + trustlevel (str): The trust level. This is one of the following. + + * ``'TRUST_EXPIRED'`` + * ``'TRUST_UNDEFINED'`` + * ``'TRUST_NEVER'`` + * ``'TRUST_MARGINAL'`` + * ``'TRUST_FULLY'`` + * ``'TRUST_ULTIMATE'`` + """ + ...