diff --git a/openpgp/backend/base.py b/openpgp/backend/base.py new file mode 100644 index 0000000..b3ec109 --- /dev/null +++ b/openpgp/backend/base.py @@ -0,0 +1,97 @@ +# Copyright (C) 2025 Philipp Hörist +# +# This file is part of the OpenPGP Gajim Plugin. +# +# OpenPGP Gajim Plugin is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published +# by the Free Software Foundation; version 3 only. +# +# OpenPGP Gajim Plugin is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with OpenPGP Gajim Plugin. If not, see . + +from __future__ import annotations + +from typing import Any +from typing import TYPE_CHECKING + +from collections.abc import Sequence +from pathlib import Path + +from nbxmpp.protocol import JID + +if TYPE_CHECKING: + from openpgp.modules.key_store import KeyData + + +class BaseKeyringItem: + def __init__(self, key: Any) -> None: + self._key = key + self._uid = self._get_uid() + + @property + def is_xmpp_key(self) -> bool: + try: + return self.jid is not None + except Exception: + return False + + def is_valid(self, jid: JID) -> bool: + if not self.is_xmpp_key: + return False + return jid == self.jid + + def _get_uid(self) -> str | None: + raise NotImplementedError + + @property + def fingerprint(self) -> str: + raise NotImplementedError + + @property + def uid(self): + if self._uid is not None: + return self._uid + + @property + def jid(self) -> JID | None: + if self._uid is not None: + return JID.from_string(self._uid) + + def __hash__(self): + return hash(self.fingerprint) + + +class BasePGPBackend: + def __init__(self, jid: str, gnupghome: Path) -> None: + raise NotImplementedError + + def generate_key(self) -> None: + raise NotImplementedError + + def encrypt( + self, payload: bytes, keys: list[KeyData] + ) -> tuple[bytes | None, str | None]: + raise NotImplementedError + + def decrypt(self, payload: bytes) -> tuple[str, str]: + raise NotImplementedError + + def get_keys(self) -> Sequence[BaseKeyringItem]: + raise NotImplementedError + + def import_key(self, data: bytes, jid: JID) -> BaseKeyringItem | None: + raise NotImplementedError + + def get_own_key_details(self) -> tuple[str | None, int | None]: + raise NotImplementedError + + def export_key(self, fingerprint: str) -> bytes | None: + raise NotImplementedError + + def delete_key(self, fingerprint: str) -> None: + raise NotImplementedError diff --git a/openpgp/backend/gpgme.py b/openpgp/backend/gpgme.py index 40d451f..d7a85ca 100644 --- a/openpgp/backend/gpgme.py +++ b/openpgp/backend/gpgme.py @@ -14,34 +14,29 @@ # You should have received a copy of the GNU General Public License # along with OpenPGP Gajim Plugin. If not, see . +from typing import Any +from typing import cast + import logging +from collections.abc import Sequence +from pathlib import Path import gpg +from gpg.errors import KeyNotFound from gpg.results import ImportResult from nbxmpp.protocol import JID +from openpgp.backend.base import BaseKeyringItem +from openpgp.backend.base import BasePGPBackend +from openpgp.backend.gpgme_types import Key from openpgp.backend.util import parse_uid +from openpgp.modules.key_store import KeyData from openpgp.modules.util import DecryptionFailed log = logging.getLogger("gajim.p.openpgp.gpgme") -class KeyringItem: - def __init__(self, key): - self._key = key - self._uid = self._get_uid() - - @property - def is_xmpp_key(self) -> bool: - try: - return self.jid is not None - except Exception: - return False - - def is_valid(self, jid: JID) -> bool: - if not self.is_xmpp_key: - return False - return jid == self.jid +class KeyringItem(BaseKeyringItem): def _get_uid(self) -> str | None: for uid in self._key.uids: @@ -51,36 +46,22 @@ class KeyringItem: pass @property - def fingerprint(self): + def fingerprint(self) -> str: return self._key.fpr - @property - def uid(self): - if self._uid is not None: - return self._uid - @property - def jid(self): - if self._uid is not None: - return JID.from_string(self._uid) - - def __hash__(self): - return hash(self.fingerprint) - - -class GPGME: - def __init__(self, jid, gnuhome): +class GPGMe(BasePGPBackend): + def __init__(self, jid: str, gnuhome: Path) -> None: self._jid = jid - self._context_args = { - "home_dir": str(gnuhome), - "offline": True, - "armor": False, - } + self._home_dir = str(gnuhome) - def generate_key(self): - with gpg.Context(**self._context_args) as context: + def _get_context(self) -> gpg.Context: + return gpg.Context(armor=False, offline=True, home_dir=self._home_dir) + + def generate_key(self) -> None: + with self._get_context() as context: result = context.create_key( - f"xmpp:{str(self._jid)}", + f"xmpp:{self._jid}", algorithm="default", expires=False, passphrase=None, @@ -89,11 +70,11 @@ class GPGME: log.info("Generated new key: %s", result.fpr) - def get_key(self, fingerprint): - with gpg.Context(**self._context_args) as context: + def _get_key(self, fingerprint: str) -> Key | None: + with self._get_context() as context: try: - key = context.get_key(fingerprint) - except gpg.errors.KeyNotFound as error: + return cast(Key, context.get_key(fingerprint)) + except KeyNotFound as error: log.warning("key not found: %s", error.keystr) return @@ -101,11 +82,9 @@ class GPGME: log.warning("get_key() error: %s", error) return - return key - - def get_own_key_details(self): - with gpg.Context(**self._context_args) as context: - keys = list(context.keylist(secret=True)) + def get_own_key_details(self) -> tuple[str | None, int | None]: + with self._get_context() as context: + keys = cast(list[Key], list(context.keylist(secret=True))) if not keys: return None, None @@ -116,10 +95,10 @@ class GPGME: return None, None - def get_keys(self): - keys = [] - with gpg.Context(**self._context_args) as context: - for key in context.keylist(): + def get_keys(self) -> Sequence[KeyringItem]: + keys: list[KeyringItem] = [] + with self._get_context() as context: + for key in context.keylist(secret=False): keyring_item = KeyringItem(key) if not keyring_item.is_xmpp_key: log.warning("Key not suited for xmpp: %s", key.fpr) @@ -130,10 +109,9 @@ class GPGME: return keys - def export_key(self, fingerprint): - with gpg.Context(**self._context_args) as context: - key = context.key_export_minimal(pattern=fingerprint) - return key + def export_key(self, fingerprint: str) -> bytes | None: + with self._get_context() as context: + return context.key_export_minimal(pattern=fingerprint) # def encrypt_decrypt_files(self): # c = gpg.Context() @@ -149,9 +127,11 @@ class GPGME: # with open('foo2.txt', 'w') as output_file: # c.decrypt(input_file, output_file) - def encrypt(self, plaintext, keys): - recipients = [] - with gpg.Context(**self._context_args) as context: + def encrypt( + self, payload: bytes, keys: list[KeyData] + ) -> tuple[bytes | None, str | None]: + recipients: list[Any] = [] + with self._get_context() as context: for key in keys: key = context.get_key(key.fingerprint) if key is not None: @@ -160,18 +140,18 @@ class GPGME: if not recipients: return None, "No keys found to encrypt to" - with gpg.Context(**self._context_args) as context: - result = context.encrypt( - str(plaintext).encode(), recipients, always_trust=True - ) + with self._get_context() as context: + result = context.encrypt(payload, recipients, always_trust=True) - ciphertext, result, _sign_result = result - return ciphertext, None + ciphertext, result, _sign_result = result + return ciphertext, None - def decrypt(self, ciphertext): - with gpg.Context(**self._context_args) as context: + raise RuntimeError + + def decrypt(self, payload: bytes) -> tuple[str, str]: + with self._get_context() as context: try: - result = context.decrypt(ciphertext) + result = context.decrypt(payload) except Exception as error: raise DecryptionFailed("Decryption failed: %s" % error) @@ -186,9 +166,9 @@ class GPGME: return plaintext, fingerprints[0] - def import_key(self, data, jid): + def import_key(self, data: bytes, jid: JID) -> KeyringItem | None: log.info("Import key from %s", jid) - with gpg.Context(**self._context_args) as context: + with self._get_context() as context: result = context.key_import(data) if not isinstance(result, ImportResult) or result.imported != 1: log.error("Key import failed: %s", jid) @@ -196,7 +176,7 @@ class GPGME: return fingerprint = result.imports[0].fpr - key = self.get_key(fingerprint) + key = self._get_key(fingerprint) item = KeyringItem(key) if not item.is_valid(jid): log.warning("Invalid key found") @@ -206,8 +186,9 @@ class GPGME: return item - def delete_key(self, fingerprint): + def delete_key(self, fingerprint: str) -> None: log.info("Delete Key: %s", fingerprint) - key = self.get_key(fingerprint) - with gpg.Context(**self._context_args) as context: + key = self._get_key(fingerprint) + assert key is not None + with self._get_context() as context: context.op_delete(key, True) diff --git a/openpgp/backend/gpgme_types.py b/openpgp/backend/gpgme_types.py new file mode 100644 index 0000000..3b6d745 --- /dev/null +++ b/openpgp/backend/gpgme_types.py @@ -0,0 +1,86 @@ +# Copyright (C) 2025 Philipp Hörist +# +# This file is part of the OpenPGP Gajim Plugin. +# +# OpenPGP Gajim Plugin is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published +# by the Free Software Foundation; version 3 only. +# +# OpenPGP Gajim Plugin is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with OpenPGP Gajim Plugin. If not, see . + +from __future__ import annotations + +from typing import Any + + +class UID: + address: Any | None + comment: str + email: str + invalid: int + last_update: int + name: str + origin: int + revoked: int + signatures: list[Any] + thisown: bool + tofu: list[Any] + uid: str + uidhash: str + validity: int + + +class SubKey: + can_authenticate: int + can_certify: int + can_encrypt: int + can_sign: int + card_number: Any | None + curve: Any | None + disabled: int + expired: int + expires: int + fpr: str + invalid: int + is_cardkey: int + is_de_vs: int + is_qualified: int + keygrip: Any | None + keyid: str + length: int + pubkey_algo: int + revoked: int + secret: int + thisown: bool + timestamp: int + + +class Key: + can_authenticate: int + can_certify: int + can_encrypt: int + can_sign: int + chain_id: Any | None + disabled: int + expired: int + fpr: str + invalid: int + is_qualified: int + issuer_name: str | None + issuer_serial: str | None + keylist_mode: int + last_update: int + origin: int + owner_trust: int + protocol: int + revoked: int + secret: int + subkeys: list[SubKey] + thisown: bool + uids: list[UID] diff --git a/openpgp/backend/pygpg.py b/openpgp/backend/pygpg.py index c091f58..974bda0 100644 --- a/openpgp/backend/pygpg.py +++ b/openpgp/backend/pygpg.py @@ -15,12 +15,16 @@ # along with OpenPGP Gajim Plugin. If not, see . import logging +from collections.abc import Sequence from pathlib import Path import gnupg from nbxmpp.protocol import JID +from openpgp.backend.base import BaseKeyringItem +from openpgp.backend.base import BasePGPBackend from openpgp.backend.util import parse_uid +from openpgp.modules.key_store import KeyData from openpgp.modules.util import DecryptionFailed log = logging.getLogger("gajim.p.openpgp.pygnupg") @@ -30,22 +34,7 @@ if log.getEffectiveLevel() == logging.DEBUG: log.setLevel(logging.DEBUG) -class KeyringItem: - def __init__(self, key): - self._key = key - self._uid = self._get_uid() - - @property - def is_xmpp_key(self) -> bool: - try: - return self.jid is not None - except Exception: - return False - - def is_valid(self, jid: JID) -> bool: - if not self.is_xmpp_key: - return False - return jid == self.jid +class KeyringItem(BaseKeyringItem): @property def keyid(self) -> str: @@ -59,32 +48,18 @@ class KeyringItem: pass @property - def fingerprint(self): + def fingerprint(self) -> str: return self._key["fingerprint"] - @property - def uid(self): - if self._uid is not None: - return self._uid - @property - def jid(self): - if self._uid is not None: - return JID.from_string(self._uid) - - def __hash__(self): - return hash(self.fingerprint) - - -class PythonGnuPG(gnupg.GPG): +class PythonGnuPG(BasePGPBackend): def __init__(self, jid: str, gnupghome: Path) -> None: - gnupg.GPG.__init__(self, gpgbinary="gpg", gnupghome=str(gnupghome)) - + self._gnupg = gnupg.GPG(gpgbinary="gpg", gnupghome=str(gnupghome)) self._jid = jid self._own_fingerprint = None @staticmethod - def _get_key_params(jid): + def _get_key_params(jid: str) -> str: """ Generate --gen-key input """ @@ -102,17 +77,19 @@ class PythonGnuPG(gnupg.GPG): out += "%commit\n" return out - def generate_key(self): - super().gen_key(self._get_key_params(self._jid)) + def generate_key(self) -> None: + self._gnupg.gen_key(self._get_key_params(self._jid)) - def encrypt(self, payload, keys): + def encrypt( + self, payload: bytes, keys: list[KeyData] + ) -> tuple[bytes | None, str | None]: recipients = [key.fingerprint for key in keys] log.info("encrypt to:") for fingerprint in recipients: log.info(fingerprint) - result = super().encrypt( - str(payload).encode("utf8"), + result = self._gnupg.encrypt( + payload, recipients, armor=False, sign=self._own_fingerprint, @@ -126,19 +103,20 @@ class PythonGnuPG(gnupg.GPG): return result.data, error - def decrypt(self, payload): - result = super().decrypt(payload, always_trust=True) + def decrypt(self, payload: bytes) -> tuple[str, str]: + result = self._gnupg.decrypt(payload, always_trust=True) if not result.ok: raise DecryptionFailed(result.status) + assert result.fingerprint is not None return result.data.decode("utf8"), result.fingerprint - def get_key(self, fingerprint): - return super().list_keys(keys=[fingerprint]) + def _get_key(self, fingerprint: str) -> gnupg.ListKeys: + return self._gnupg.list_keys(keys=[fingerprint]) - def get_keys(self, secret=False): - result = super().list_keys(secret=secret) - keys = [] + def get_keys(self) -> Sequence[KeyringItem]: + result = self._gnupg.list_keys(secret=False) + keys: list[KeyringItem] = [] for key in result: item = KeyringItem(key) if not item.is_xmpp_key: @@ -149,15 +127,18 @@ class PythonGnuPG(gnupg.GPG): keys.append(item) return keys - def import_key(self, data, jid): + def import_key(self, data: bytes, jid: JID) -> KeyringItem | None: log.info("Import key from %s", jid) - result = super().import_keys(data) + result = self._gnupg.import_keys(data) if not result: log.error("Could not import key") log.error(result) return - key = self.get_key(result.results[0]["fingerprint"]) + fpr = result.results[0]["fingerprint"] + assert fpr is not None + + key = self._get_key(fpr) item = KeyringItem(key[0]) if not item.is_valid(jid): log.warning("Invalid key found, deleting key") @@ -167,8 +148,8 @@ class PythonGnuPG(gnupg.GPG): return item - def get_own_key_details(self): - result = super().list_keys(secret=True) + def get_own_key_details(self) -> tuple[str | None, int | None]: + result = self._gnupg.list_keys(secret=True) if not result: return None, None @@ -179,10 +160,13 @@ class PythonGnuPG(gnupg.GPG): self._own_fingerprint = result[0]["fingerprint"] return self._own_fingerprint, int(result[0]["date"]) - def export_key(self, fingerprint): - key = super().export_keys(fingerprint, secret=False, armor=False, minimal=True) + def export_key(self, fingerprint: str) -> bytes | None: + key = self._gnupg.export_keys( + fingerprint, secret=False, armor=False, minimal=True + ) + assert isinstance(key, bytes | None) return key - def delete_key(self, fingerprint): + def delete_key(self, fingerprint: str) -> None: log.info("Delete Key: %s", fingerprint) - super().delete_keys(fingerprint) + self._gnupg.delete_keys(fingerprint) diff --git a/openpgp/backend/sql.py b/openpgp/backend/sql.py index 6dd468c..60d0fb6 100644 --- a/openpgp/backend/sql.py +++ b/openpgp/backend/sql.py @@ -14,9 +14,18 @@ # You should have received a copy of the GNU General Public License # along with OpenPGP Gajim Plugin. If not, see . +from typing import Any +from typing import NamedTuple + import logging import sqlite3 from collections import namedtuple +from collections.abc import Iterator +from pathlib import Path + +from nbxmpp.protocol import JID + +from openpgp.modules.util import Trust log = logging.getLogger("gajim.p.openpgp.sql") @@ -32,8 +41,16 @@ TABLE_LAYOUT = """ CREATE UNIQUE INDEX jid_fingerprint ON contacts (jid, fingerprint);""" +class ContactRow(NamedTuple): + jid: JID + fingerprint: str + active: bool + trust: Trust + timestamp: float + + class Storage: - def __init__(self, folder_path): + def __init__(self, folder_path: Path) -> None: self._con = sqlite3.connect( str(folder_path / "contacts.db"), detect_types=sqlite3.PARSE_COLNAMES ) @@ -45,21 +62,21 @@ class Storage: self._con.commit() @staticmethod - def _namedtuple_factory(cursor, row): + def _namedtuple_factory(cursor: sqlite3.Cursor, row: Any) -> Any: fields = [col[0] for col in cursor.description] - Row = namedtuple("Row", fields) + Row = namedtuple("Row", fields) # pyright: ignore named_row = Row(*row) return named_row - def _user_version(self): + def _user_version(self) -> int: return self._con.execute("PRAGMA user_version").fetchone()[0] - def _create_database(self): + def _create_database(self) -> None: if not self._user_version(): log.info("Create contacts.db") self._execute_query(TABLE_LAYOUT) - def _execute_query(self, query): + def _execute_query(self, query: str) -> None: transaction = """ BEGIN TRANSACTION; %s @@ -70,40 +87,41 @@ class Storage: ) self._con.executescript(transaction) - def _migrate_database(self): + def _migrate_database(self) -> None: pass - def load_contacts(self): + def load_contacts(self) -> list[ContactRow]: sql = """SELECT jid as "jid [jid]", fingerprint, - active, + active, trust, - timestamp, - comment + timestamp FROM contacts""" return self._con.execute(sql).fetchall() - def save_contact(self, db_values): + def save_contact( + self, db_values: Iterator[tuple[JID, str, bool, Trust, float]] + ) -> None: sql = """REPLACE INTO - contacts(jid, fingerprint, active, trust, timestamp, comment) + contacts(jid, fingerprint, active, trust, timestamp) VALUES(?, ?, ?, ?, ?, ?)""" for values in db_values: log.info("Store key: %s", values) self._con.execute(sql, values) self._con.commit() - def set_trust(self, jid, fingerprint, trust): + def set_trust(self, jid: JID, fingerprint: str, trust: Trust) -> None: sql = "UPDATE contacts SET trust = ? WHERE jid = ? AND fingerprint = ?" log.info("Set Trust: %s %s %s", trust, jid, fingerprint) self._con.execute(sql, (trust, jid, fingerprint)) self._con.commit() - def delete_key(self, jid, fingerprint): + def delete_key(self, jid: JID, fingerprint: str) -> None: sql = "DELETE from contacts WHERE jid = ? AND fingerprint = ?" log.info("Delete Key: %s %s", jid, fingerprint) self._con.execute(sql, (jid, fingerprint)) self._con.commit() - def cleanup(self): + def cleanup(self) -> None: self._con.close() diff --git a/openpgp/backend/util.py b/openpgp/backend/util.py index e61fbb9..b864a5d 100644 --- a/openpgp/backend/util.py +++ b/openpgp/backend/util.py @@ -1,7 +1,7 @@ from __future__ import annotations -def parse_uid(uid: str, compat=False) -> str: +def parse_uid(uid: str, compat: bool = False) -> str: if uid.startswith("xmpp:"): return uid[5:] diff --git a/openpgp/gtk/key.py b/openpgp/gtk/key.py index 92c7cd9..55ef306 100644 --- a/openpgp/gtk/key.py +++ b/openpgp/gtk/key.py @@ -14,6 +14,8 @@ # You should have received a copy of the GNU General Public License # along with OpenPGP Gajim Plugin. If not, see . +from __future__ import annotations + import logging import time @@ -22,8 +24,11 @@ from gi.repository import Gtk from gajim.common import app from gajim.gtk.dialogs import ConfirmationDialog from gajim.gtk.dialogs import DialogButton +from gajim.gtk.util.misc import container_remove_all +from gajim.gtk.widgets import GajimAppWindow from gajim.plugins.plugins_i18n import _ +from openpgp.modules.key_store import KeyData from openpgp.modules.util import Trust log = logging.getLogger("gajim.p.openpgp.keydialog") @@ -36,48 +41,60 @@ TRUST_DATA = { } -class KeyDialog(Gtk.Dialog): - def __init__(self, account, jid, transient): - super().__init__(title=_("Public Keys for %s") % jid, destroy_with_parent=True) +class KeyDialog(GajimAppWindow): + def __init__(self, account: str, jid: str, transient: Gtk.Window) -> None: - self.set_transient_for(transient) - self.set_resizable(True) - self.set_default_size(500, 300) + GajimAppWindow.__init__( + self, + name="PGPKeyDialog", + title=_("Public Keys for %s") % jid, + default_width=450, + default_height=400, + transient_for=transient, + modal=True, + ) - self.get_style_context().add_class("openpgp-key-dialog") + self.window.add_css_class("openpgp-key-dialog") self._client = app.get_client(account) self._listbox = Gtk.ListBox() self._listbox.set_selection_mode(Gtk.SelectionMode.NONE) - self._scrolled = Gtk.ScrolledWindow() + self._scrolled = Gtk.ScrolledWindow(hexpand=True) self._scrolled.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC) - self._scrolled.add(self._listbox) + self._scrolled.set_child(self._listbox) - box = self.get_content_area() - box.pack_start(self._scrolled, True, True, 0) + self.set_child(self._scrolled) keys = self._client.get_module("OpenPGP").get_keys(jid, only_trusted=False) for key in keys: log.info("Load: %s", key.fingerprint) - self._listbox.add(KeyRow(key)) - self.show_all() + self._listbox.append(KeyRow(key, self)) + + self.show() + + def _cleanup(self) -> None: + del self._client + del self._listbox + del self._scrolled class KeyRow(Gtk.ListBoxRow): - def __init__(self, key): + def __init__(self, key: KeyData, dialog: GajimAppWindow): Gtk.ListBoxRow.__init__(self) self.set_activatable(False) - self._dialog = self.get_toplevel() + self._dialog = dialog self.key = key box = Gtk.Box() box.set_spacing(12) - self._trust_button = TrustButton(self) - box.add(self._trust_button) + self._trust_button = Gtk.MenuButton() + self._trust_button.set_popover(TrustPopver(self)) + self._update_button_state() + box.append(self._trust_button) label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) fingerprint = Gtk.Label(label=self._format_fingerprint(key.fingerprint)) @@ -88,25 +105,37 @@ class KeyRow(Gtk.ListBoxRow): fingerprint.set_halign(Gtk.Align.START) fingerprint.set_valign(Gtk.Align.START) fingerprint.set_hexpand(True) - label_box.add(fingerprint) + label_box.append(fingerprint) date = Gtk.Label(label=self._format_timestamp(key.timestamp)) date.set_halign(Gtk.Align.START) date.get_style_context().add_class("openpgp-mono") if not key.active: date.get_style_context().add_class("openpgp-inactive-color") - label_box.add(date) + label_box.append(date) - box.add(label_box) + box.append(label_box) + self.set_child(box) - self.add(box) - self.show_all() + def _update_button_state(self) -> None: + icon_name, tooltip, css_class = TRUST_DATA[self.key.trust] + self._trust_button.set_icon_name(icon_name) - def delete_fingerprint(self, *args): + for css_cls in self._trust_button.get_css_classes(): + if css_cls.startswith("openpgp"): + self._trust_button.remove_css_class(css_cls) + + if not self.key.active: + css_class = "inactive-color" + tooltip = "%s - %s" % (_("Inactive"), tooltip) + + self._trust_button.add_css_class(f"openpgp-{css_class}") + self._trust_button.set_tooltip_text(tooltip) + + def delete_fingerprint(self): def _remove(): self.get_parent().remove(self) self.key.delete() - self.destroy() ConfirmationDialog( _("Delete Public Key?"), @@ -117,98 +146,76 @@ class KeyRow(Gtk.ListBoxRow): ], ).show() - def set_trust(self, trust): - icon_name, tooltip, css_class = TRUST_DATA[trust] - image = self._trust_button.get_child() - image.set_from_icon_name(icon_name, Gtk.IconSize.MENU) - image.get_style_context().add_class(css_class) + def set_trust(self, trust: Trust) -> None: + self.key.trust = trust + self._update_button_state() @staticmethod - def _format_fingerprint(fingerprint): + def _format_fingerprint(fingerprint: str) -> str: fplen = len(fingerprint) wordsize = fplen // 8 buf = "" for w in range(0, fplen, wordsize): - buf += "{0} ".format(fingerprint[w : w + wordsize]) + buf += f"{fingerprint[w : w + wordsize]} " return buf.rstrip() @staticmethod - def _format_timestamp(timestamp): + def _format_timestamp(timestamp: float) -> str: return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) -class TrustButton(Gtk.MenuButton): - def __init__(self, row): - Gtk.MenuButton.__init__(self) - self._row = row - self._css_class = "" - self.set_popover(TrustPopver(row)) - self.update() - - def update(self): - icon_name, tooltip, css_class = TRUST_DATA[self._row.key.trust] - image = self.get_child() - image.set_from_icon_name(icon_name, Gtk.IconSize.MENU) - # remove old color from icon - image.get_style_context().remove_class(self._css_class) - - if not self._row.key.active: - css_class = "openpgp-inactive-color" - tooltip = "%s - %s" % (_("Inactive"), tooltip) - - image.get_style_context().add_class(css_class) - self._css_class = css_class - self.set_tooltip_text(tooltip) - - class TrustPopver(Gtk.Popover): - def __init__(self, row): + def __init__(self, row: KeyRow): Gtk.Popover.__init__(self) self._row = row self._listbox = Gtk.ListBox() self._listbox.set_selection_mode(Gtk.SelectionMode.NONE) if row.key.trust != Trust.VERIFIED: - self._listbox.add(VerifiedOption()) + self._listbox.append(VerifiedOption()) if row.key.trust != Trust.NOT_TRUSTED: - self._listbox.add(NotTrustedOption()) - self._listbox.add(DeleteOption()) - self.add(self._listbox) - self._listbox.show_all() + self._listbox.append(NotTrustedOption()) + self._listbox.append(DeleteOption()) + self.set_child(self._listbox) self._listbox.connect("row-activated", self._activated) - self.get_style_context().add_class("openpgp-trust-popover") + self.add_css_class("openpgp-trust-popover") - def _activated(self, listbox, row): + def _activated(self, listbox: Gtk.ListBox, row: MenuOption) -> None: self.popdown() if row.type_ is None: self._row.delete_fingerprint() else: - self._row.key.trust = row.type_ - self.get_relative_to().update() + self._row.set_trust(row.type_) self.update() def update(self): - self._listbox.foreach(lambda row: self._listbox.remove(row)) + container_remove_all(self._listbox) if self._row.key.trust != Trust.VERIFIED: - self._listbox.add(VerifiedOption()) + self._listbox.append(VerifiedOption()) if self._row.key.trust != Trust.NOT_TRUSTED: - self._listbox.add(NotTrustedOption()) - self._listbox.add(DeleteOption()) + self._listbox.append(NotTrustedOption()) + self._listbox.append(DeleteOption()) class MenuOption(Gtk.ListBoxRow): + + type_: Trust | None + icon: str + label: str + color: str + def __init__(self): Gtk.ListBoxRow.__init__(self) box = Gtk.Box() box.set_spacing(6) - image = Gtk.Image.new_from_icon_name(self.icon, Gtk.IconSize.MENU) - label = Gtk.Label(label=self.label) - image.get_style_context().add_class(self.color) + image = Gtk.Image.new_from_icon_name(self.icon) + if self.color: + image.add_css_class(self.color) - box.add(image) - box.add(label) - self.add(box) - self.show_all() + label = Gtk.Label(label=self.label) + box.append(image) + box.append(label) + self.set_child(box) class VerifiedOption(MenuOption): diff --git a/openpgp/gtk/style.css b/openpgp/gtk/style.css index 4dfb773..f03c85e 100644 --- a/openpgp/gtk/style.css +++ b/openpgp/gtk/style.css @@ -1,16 +1,14 @@ -.openpgp-inactive-color { color: @unfocused_borders; } - +.openpgp-inactive-color button > box > image { color: @unfocused_borders; } +.openpgp-error-color button > box > image { color: @error_color; } +.openpgp-warning-color button > box > image { color: @warning_color; } +.openpgp-encrypted-color button > box > image { color: rgb(75, 181, 67); } .openpgp-mono { font-size: 12px; font-family: monospace; } - .openpgp-key-dialog > box { margin: 12px; } - .openpgp-key-dialog scrolledwindow row { border-bottom: 1px solid; border-color: @unfocused_borders; padding: 10px 20px 10px 10px; } -.openpgp-key-dialog scrolledwindow row:last-child { border-bottom: 0px} - +.openpgp-key-dialog scrolledwindow row:last-child { border-bottom: 0px; } .openpgp-key-dialog scrolledwindow { border: 1px solid; border-color:@unfocused_borders; } - .openpgp-trust-popover row { padding: 10px 15px 10px 10px; } diff --git a/openpgp/gtk/wizard.py b/openpgp/gtk/wizard.py index dcdad68..24ee425 100644 --- a/openpgp/gtk/wizard.py +++ b/openpgp/gtk/wizard.py @@ -14,6 +14,8 @@ # You should have received a copy of the GNU General Public License # along with OpenPGP Gajim Plugin. If not, see . +from typing import cast + import logging import threading from enum import IntEnum @@ -22,8 +24,12 @@ from gi.repository import GLib from gi.repository import Gtk from gajim.common import app +from gajim.common.client import Client +from gajim.gtk.control import ChatControl from gajim.plugins.plugins_i18n import _ +from ..pgpplugin import OpenPGPPlugin + log = logging.getLogger("gajim.p.openpgp.wizard") @@ -35,7 +41,9 @@ class Page(IntEnum): class KeyWizard(Gtk.Assistant): - def __init__(self, plugin, account, chat_control): + def __init__( + self, plugin: OpenPGPPlugin, account: str, chat_control: ChatControl + ) -> None: Gtk.Assistant.__init__(self) self._client = app.get_client(account) @@ -48,7 +56,6 @@ class KeyWizard(Gtk.Assistant): self.set_application(app.app) self.set_transient_for(app.window) self.set_resizable(True) - self.set_position(Gtk.WindowPosition.CENTER) self.set_default_size(600, 400) self.get_style_context().add_class("dialog-margin") @@ -65,9 +72,9 @@ class KeyWizard(Gtk.Assistant): self.connect("close", self._on_cancel) self._remove_sidebar() - self.show_all() + self.show() - def _add_page(self, page): + def _add_page(self, page: Gtk.Box) -> None: self.append_page(page) self.set_page_type(page, page.type_) self.set_page_title(page, page.title) @@ -82,7 +89,7 @@ class KeyWizard(Gtk.Assistant): action = app.window.lookup_action("set-encryption") action.activate(GLib.Variant("s", self._plugin.encryption_name)) - def _on_page_change(self, assistant, page): + def _on_page_change(self, assistant: Gtk.Assistant, page: Page) -> None: if self.get_current_page() == Page.NEWKEY: if self._client.get_module("OpenPGP").secret_key_available: self.set_current_page(Page.SUCCESS) @@ -91,8 +98,8 @@ class KeyWizard(Gtk.Assistant): elif self.get_current_page() == Page.SUCCESS: self._activate_encryption() - def _on_cancel(self, widget): - self.destroy() + def _on_cancel(self, widget: Gtk.Assistant): + self.close() class WelcomePage(Gtk.Box): @@ -106,8 +113,8 @@ class WelcomePage(Gtk.Box): self.set_spacing(18) title_label = Gtk.Label(label=_("Setup OpenPGP")) text_label = Gtk.Label(label=_("Gajim will now try to setup OpenPGP for you")) - self.add(title_label) - self.add(text_label) + self.append(title_label) + self.append(text_label) class RequestPage(Gtk.Box): @@ -120,7 +127,7 @@ class RequestPage(Gtk.Box): super().__init__(orientation=Gtk.Orientation.VERTICAL) self.set_spacing(18) spinner = Gtk.Spinner() - self.pack_start(spinner, True, True, 0) + self.append(spinner) spinner.start() @@ -148,7 +155,7 @@ class NewKeyPage(RequestPage): title = _("Generating new Key") complete = False - def __init__(self, assistant, client): + def __init__(self, assistant: Gtk.Assistant, client: Client) -> None: super().__init__() self._assistant = assistant self._client = client @@ -167,14 +174,14 @@ class NewKeyPage(RequestPage): GLib.idle_add(self.finished, text) - def finished(self, error): + def finished(self, error: str | None) -> None: if error is None: self._client.get_module("OpenPGP").get_own_key_details() self._client.get_module("OpenPGP").set_public_key() self._client.get_module("OpenPGP").request_keylist() self._assistant.set_current_page(Page.SUCCESS) else: - error_page = self._assistant.get_nth_page(Page.ERROR) + error_page = cast(ErrorPage, self._assistant.get_nth_page(Page.ERROR)) error_page.set_text(error) self._assistant.set_current_page(Page.ERROR) @@ -206,17 +213,15 @@ class SuccessfulPage(Gtk.Box): self.set_spacing(12) self.set_homogeneous(True) - icon = Gtk.Image.new_from_icon_name( - "object-select-symbolic", Gtk.IconSize.DIALOG - ) - icon.get_style_context().add_class("success-color") + icon = Gtk.Image.new_from_icon_name("object-select-symbolic") + icon.add_css_class("success-color") icon.set_valign(Gtk.Align.END) label = Gtk.Label(label=_("Setup successful")) - label.get_style_context().add_class("bold16") + label.add_css_class("bold16") label.set_valign(Gtk.Align.START) - self.add(icon) - self.add(label) + self.append(icon) + self.append(label) class ErrorPage(Gtk.Box): @@ -230,17 +235,15 @@ class ErrorPage(Gtk.Box): self.set_spacing(12) self.set_homogeneous(True) - icon = Gtk.Image.new_from_icon_name( - "dialog-error-symbolic", Gtk.IconSize.DIALOG - ) + icon = Gtk.Image.new_from_icon_name("dialog-error-symbolic") icon.get_style_context().add_class("error-color") icon.set_valign(Gtk.Align.END) self._label = Gtk.Label() self._label.get_style_context().add_class("bold16") self._label.set_valign(Gtk.Align.START) - self.add(icon) - self.add(self._label) + self.append(icon) + self.append(self._label) - def set_text(self, text): + def set_text(self, text: str) -> None: self._label.set_text(text) diff --git a/openpgp/modules/key_store.py b/openpgp/modules/key_store.py index 61a4b4a..10b9d33 100644 --- a/openpgp/modules/key_store.py +++ b/openpgp/modules/key_store.py @@ -14,8 +14,17 @@ # You should have received a copy of the GNU General Public License # along with OpenPGP Gajim Plugin. If not, see . -import logging +from __future__ import annotations +import logging +from collections.abc import Iterator + +from nbxmpp.protocol import JID +from nbxmpp.structs import PGPKeyMetadata + +from openpgp.backend.base import BasePGPBackend +from openpgp.backend.sql import ContactRow +from openpgp.backend.sql import Storage from openpgp.modules.util import Trust log = logging.getLogger("gajim.p.openpgp.store") @@ -26,45 +35,34 @@ class KeyData: Holds all data related to a certain key """ - def __init__(self, contact_data): + def __init__( + self, + contact_data: ContactData, + fingerprint: str, + active: bool, + trust: Trust, + timestamp: float, + ): self._contact_data = contact_data - self.fingerprint = None - self.active = False - self._trust = Trust.UNKNOWN - self.timestamp = None + self.fingerprint = fingerprint + self.active = active + self._trust = trust + self.timestamp = timestamp self.comment = None self.has_pubkey = False @property - def trust(self): + def trust(self) -> Trust: return self._trust @trust.setter - def trust(self, value): + def trust(self, value: Trust) -> None: if value not in (Trust.NOT_TRUSTED, Trust.UNKNOWN, Trust.BLIND, Trust.VERIFIED): raise ValueError("Trust value not allowed: %s" % value) + self._trust = value self._contact_data.set_trust(self.fingerprint, self._trust) - @classmethod - def from_key(cls, contact_data, key, trust): - keydata = cls(contact_data) - keydata.fingerprint = key.fingerprint - keydata.timestamp = key.date - keydata.active = True - keydata._trust = trust - return keydata - - @classmethod - def from_row(cls, contact_data, row): - keydata = cls(contact_data) - keydata.fingerprint = row.fingerprint - keydata.timestamp = row.timestamp - keydata.comment = row.comment - keydata._trust = row.trust - keydata.active = row.active - return keydata - def delete(self): self._contact_data.delete_key(self.fingerprint) @@ -74,9 +72,9 @@ class ContactData: Holds all data related to a contact """ - def __init__(self, jid, storage, pgp): + def __init__(self, jid: JID, storage: Storage, pgp: BasePGPBackend) -> None: self.jid = jid - self._key_store = {} + self._key_store: dict[str, KeyData] = {} self._storage = storage self._pgp = pgp @@ -87,13 +85,13 @@ class ContactData: return "xmpp:%s" % self.jid @property - def default_trust(self): + def default_trust(self) -> Trust: for key in self._key_store.values(): if key.trust in (Trust.NOT_TRUSTED, Trust.BLIND): return Trust.UNKNOWN return Trust.BLIND - def db_values(self): + def db_values(self) -> Iterator[tuple[JID, str, bool, Trust, float]]: for key in self._key_store.values(): yield ( self.jid, @@ -101,28 +99,39 @@ class ContactData: key.active, key.trust, key.timestamp, - key.comment, ) - def add_from_key(self, key): + def add_from_key(self, key: PGPKeyMetadata) -> KeyData: try: keydata = self._key_store[key.fingerprint] except KeyError: - keydata = KeyData.from_key(self, key, self.default_trust) + keydata = KeyData( + self, + key.fingerprint, + True, + self.default_trust, + key.date, + ) self._key_store[key.fingerprint] = keydata log.info("Add from key: %s %s", self.jid, keydata.fingerprint) return keydata - def add_from_db(self, row): + def add_from_db(self, row: ContactRow) -> KeyData: try: keydata = self._key_store[row.fingerprint] except KeyError: - keydata = KeyData.from_row(self, row) + keydata = KeyData( + self, + row.fingerprint, + row.active, + row.trust, + row.timestamp, + ) self._key_store[row.fingerprint] = keydata log.info("Add from row: %s %s", self.jid, row.fingerprint) return keydata - def process_keylist(self, keylist): + def process_keylist(self, keylist: list[PGPKeyMetadata] | None) -> list[str]: log.info("Process keylist: %s %s", self.jid, keylist) if keylist is None: @@ -131,8 +140,8 @@ class ContactData: self._storage.save_contact(self.db_values()) return [] - missing_pub_keys = [] - fingerprints = set([key.fingerprint for key in keylist]) + missing_pub_keys: list[str] = [] + fingerprints = {key.fingerprint for key in keylist} if fingerprints == self._key_store.keys(): log.info("No updates found") for key in self._key_store.values(): @@ -156,7 +165,7 @@ class ContactData: self._storage.save_contact(self.db_values()) return missing_pub_keys - def set_public_key(self, fingerprint): + def set_public_key(self, fingerprint: str) -> None: try: keydata = self._key_store[fingerprint] except KeyError: @@ -167,7 +176,7 @@ class ContactData: keydata.has_pubkey = True log.info("Set public key: %s %s", self.jid, fingerprint) - def get_keys(self, only_trusted=True): + def get_keys(self, only_trusted: bool = True) -> list[KeyData]: keys = list(self._key_store.values()) if not only_trusted: return keys @@ -175,13 +184,13 @@ class ContactData: k for k in keys if k.active and k.trust in (Trust.VERIFIED, Trust.BLIND) ] - def get_key(self, fingerprint): + def get_key(self, fingerprint: str) -> KeyData | None: return self._key_store.get(fingerprint, None) - def set_trust(self, fingerprint, trust): + def set_trust(self, fingerprint: str, trust: Trust) -> None: self._storage.set_trust(self.jid, fingerprint, trust) - def delete_key(self, fingerprint): + def delete_key(self, fingerprint: str) -> None: self._storage.delete_key(self.jid, fingerprint) self._pgp.delete_key(fingerprint) del self._key_store[fingerprint] @@ -192,8 +201,8 @@ class PGPContacts: Holds all contacts available for PGP encryption """ - def __init__(self, pgp, storage): - self._contacts = {} + def __init__(self, pgp: BasePGPBackend, storage: Storage) -> None: + self._contacts: dict[JID, ContactData] = {} self._storage = storage self._pgp = pgp self._load_from_storage() @@ -204,14 +213,12 @@ class PGPContacts: keyring = self._pgp.get_keys() for key in keyring: log.info("Found: %s %s", key.jid, key.fingerprint) + assert key.jid is not None self.set_public_key(key.jid, key.fingerprint) def _load_from_storage(self): log.info("Load contacts from storage") rows = self._storage.load_contacts() - if rows is None: - return - for row in rows: log.info("Found: %s %s", row.jid, row.fingerprint) try: @@ -223,7 +230,9 @@ class PGPContacts: else: contact_data.add_from_db(row) - def process_keylist(self, jid, keylist): + def process_keylist( + self, jid: JID, keylist: list[PGPKeyMetadata] | None + ) -> list[str]: try: contact_data = self._contacts[jid] except KeyError: @@ -235,7 +244,7 @@ class PGPContacts: return missing_pub_keys - def set_public_key(self, jid, fingerprint): + def set_public_key(self, jid: JID, fingerprint: str) -> None: try: contact_data = self._contacts[jid] except KeyError: @@ -243,14 +252,14 @@ class PGPContacts: else: contact_data.set_public_key(fingerprint) - def get_keys(self, jid, only_trusted=True): + def get_keys(self, jid: JID, only_trusted: bool = True) -> list[KeyData]: try: contact_data = self._contacts[jid] return contact_data.get_keys(only_trusted=only_trusted) except KeyError: return [] - def get_trust(self, jid, fingerprint): + def get_trust(self, jid: JID, fingerprint: str) -> Trust: contact_data = self._contacts.get(jid, None) if contact_data is None: return Trust.UNKNOWN diff --git a/openpgp/modules/openpgp.py b/openpgp/modules/openpgp.py index 69c5a5c..2e2b694 100644 --- a/openpgp/modules/openpgp.py +++ b/openpgp/modules/openpgp.py @@ -14,37 +14,47 @@ # You should have received a copy of the GNU General Public License # along with OpenPGP Gajim Plugin. If not, see . +from typing import Any +from typing import cast + import logging import sys import time +from collections.abc import Callable from pathlib import Path from nbxmpp import Node from nbxmpp import StanzaMalformed +from nbxmpp.client import Client as nbxmppClient from nbxmpp.errors import MalformedStanzaError from nbxmpp.errors import StanzaError from nbxmpp.exceptions import StanzaDecrypted from nbxmpp.modules.openpgp import create_message_stanza from nbxmpp.modules.openpgp import create_signcrypt_node from nbxmpp.modules.openpgp import parse_signcrypt -from nbxmpp.modules.openpgp import PGPKeyMetadata from nbxmpp.namespaces import Namespace +from nbxmpp.protocol import JID +from nbxmpp.protocol import Message from nbxmpp.structs import EncryptionData from nbxmpp.structs import MessageProperties +from nbxmpp.structs import PGPKeyMetadata +from nbxmpp.structs import PGPPublicKey from nbxmpp.structs import StanzaHandler +from nbxmpp.task import Task from gajim.common import app from gajim.common import configpaths +from gajim.common.client import Client from gajim.common.events import MessageNotSent from gajim.common.modules.base import BaseModule from gajim.common.modules.util import event_node from gajim.common.structs import OutgoingMessage from openpgp.backend.sql import Storage +from openpgp.modules.key_store import KeyData from openpgp.modules.key_store import PGPContacts from openpgp.modules.util import DecryptionFailed from openpgp.modules.util import ENCRYPTION_NAME -from openpgp.modules.util import Key from openpgp.modules.util import NOT_ENCRYPTED_TAGS from openpgp.modules.util import prepare_stanza from openpgp.modules.util import Trust @@ -52,7 +62,7 @@ from openpgp.modules.util import Trust if sys.platform == "win32": from openpgp.backend.pygpg import PythonGnuPG as PGPBackend else: - from openpgp.backend.gpgme import GPGME as PGPBackend + from openpgp.backend.gpgme import GPGMe as PGPBackend log = logging.getLogger("gajim.p.openpgp") @@ -75,7 +85,7 @@ class OpenPGP(BaseModule): "request_secret_key", ] - def __init__(self, client): + def __init__(self, client: Client): BaseModule.__init__(self, client) self.handlers = [ @@ -103,67 +113,88 @@ class OpenPGP(BaseModule): log.info("Own Fingerprint at start: %s", self._fingerprint) @property - def secret_key_available(self): + def secret_key_available(self) -> bool: return self._fingerprint is not None - def get_own_key_details(self): + def get_own_key_details(self) -> tuple[str | None, int | None]: self._fingerprint, self._date = self._pgp.get_own_key_details() return self._fingerprint, self._date - def generate_key(self): + def generate_key(self) -> None: self._pgp.generate_key() - def set_public_key(self): + def set_public_key(self) -> None: log.info("%s => Publish public key", self._account) + + assert self._fingerprint is not None + assert self._date is not None + key = self._pgp.export_key(self._fingerprint) + assert key is not None self._nbxmpp("OpenPGP").set_public_key(key, self._fingerprint, self._date) - def request_public_key(self, jid, fingerprint): + def request_public_key(self, jid: JID, fingerprint: str) -> None: log.info("%s => Request public key %s - %s", self._account, fingerprint, jid) self._nbxmpp("OpenPGP").request_public_key( jid, fingerprint, callback=self._public_key_received, user_data=fingerprint ) - def _public_key_received(self, task): + def _public_key_received(self, task: Task) -> None: fingerprint = task.get_user_data() try: - result = task.finish() + result = cast(PGPPublicKey | None, task.finish()) except (StanzaError, MalformedStanzaError) as error: log.error("%s => Public Key not found: %s", self._account, error) return + if result is None: + log.error("%s => Public Key Node is empty", self._account) + return + imported_key = self._pgp.import_key(result.key, result.jid) if imported_key is not None: self._contacts.set_public_key(result.jid, fingerprint) - def set_keylist(self, keylist=None): + def set_keylist(self, keylist: list[PGPKeyMetadata] | None = None) -> None: if keylist is None: - keylist = [PGPKeyMetadata(None, self._fingerprint, self._date)] + assert self._fingerprint is not None + assert self._date is not None + keylist = [PGPKeyMetadata(self.own_jid, self._fingerprint, self._date)] + log.info("%s => Publish keylist", self._account) self._nbxmpp("OpenPGP").set_keylist(keylist) @event_node(Namespace.OPENPGP_PK) - def _keylist_notification_received(self, _con, _stanza, properties): + def _keylist_notification_received( + self, _client: nbxmppClient, _stanza: Node, properties: MessageProperties + ) -> None: + assert properties.pubsub_event is not None + if properties.pubsub_event.retracted: return - keylist = properties.pubsub_event.data or [] + assert properties.jid is not None + + keylist: list[PGPKeyMetadata] = [] + if properties.pubsub_event.data: + keylist = cast(list[PGPKeyMetadata], properties.pubsub_event.data) self._process_keylist(keylist, properties.jid) - def request_keylist(self, jid=None): + def request_keylist(self, jid: JID | None = None) -> None: if jid is None: jid = self.own_jid + log.info("%s => Fetch keylist %s", self._account, jid) self._nbxmpp("OpenPGP").request_keylist( jid, callback=self._keylist_received, user_data=jid ) - def _keylist_received(self, task): - jid = task.get_user_data() + def _keylist_received(self, task: Task) -> None: + jid = cast(JID, task.get_user_data()) try: - keylist = task.finish() + keylist = cast(list[PGPKeyMetadata] | None, task.finish()) except (StanzaError, MalformedStanzaError) as error: log.error("%s => Keylist query failed: %s", self._account, error) if self.own_jid.bare_match(jid) and self._fingerprint is not None: @@ -173,7 +204,9 @@ class OpenPGP(BaseModule): log.info("Keylist received from %s", jid) self._process_keylist(keylist, jid) - def _process_keylist(self, keylist, from_jid): + def _process_keylist( + self, keylist: list[PGPKeyMetadata] | None, from_jid: JID + ) -> None: if not keylist: log.warning("%s => Empty keylist received from %s", self._account, from_jid) self._contacts.process_keylist(self.own_jid, keylist) @@ -185,14 +218,19 @@ class OpenPGP(BaseModule): log.info("Received own keylist") for key in keylist: log.info(key.fingerprint) + for key in keylist: # Check if own fingerprint is published if key.fingerprint == self._fingerprint: log.info("Own key found in keys list") return + log.info("Own key not published") if self._fingerprint is not None: - keylist.append(Key(self._fingerprint, self._date)) + assert self._date is not None + keylist.append( + PGPKeyMetadata(self.own_jid, self._fingerprint, self._date) + ) self.set_keylist(keylist) return @@ -204,10 +242,14 @@ class OpenPGP(BaseModule): for fingerprint in missing_pub_keys: self.request_public_key(from_jid, fingerprint) - def decrypt_message(self, _con, stanza, properties: MessageProperties): + def decrypt_message( + self, _client: nbxmppClient, stanza: Message, properties: MessageProperties + ) -> None: if not properties.is_openpgp: return + assert properties.openpgp is not None + remote_jid = properties.remote_jid assert remote_jid is not None @@ -249,7 +291,9 @@ class OpenPGP(BaseModule): raise StanzaDecrypted - def encrypt_message(self, message: OutgoingMessage, callback): + def encrypt_message( + self, message: OutgoingMessage, callback: Callable[[OutgoingMessage], None] + ) -> None: remote_jid = message.contact.jid keys = self._contacts.get_keys(remote_jid) @@ -257,12 +301,17 @@ class OpenPGP(BaseModule): log.error("Dropping stanza to %s, because we have no key", remote_jid) return + assert self._fingerprint is not None + keys += self._contacts.get_keys(self.own_jid) - keys += [Key(self._fingerprint, None)] + keys += [ + KeyData(None, self._fingerprint, True, Trust.VERIFIED, 0) # pyright: ignore + ] payload = create_signcrypt_node( message.get_stanza(), [remote_jid], NOT_ENCRYPTED_TAGS ) + payload = str(payload).encode("utf8") encrypted_payload, error = self._pgp.encrypt(payload, keys) if error: @@ -279,6 +328,8 @@ class OpenPGP(BaseModule): ) return + assert encrypted_payload is not None + create_message_stanza( message.get_stanza(), encrypted_payload, bool(message.get_text()) ) @@ -292,7 +343,7 @@ class OpenPGP(BaseModule): callback(message) @staticmethod - def print_msg_to_log(stanza): + def print_msg_to_log(stanza: Node) -> None: """Prints a stanza in a fancy way to the log""" log.debug("-" * 15) stanzastr = "\n" + stanza.__str__(fancy=True) @@ -300,19 +351,21 @@ class OpenPGP(BaseModule): log.debug(stanzastr) log.debug("-" * 15) - def get_keys(self, jid=None, only_trusted=True): + def get_keys( + self, jid: JID | None = None, only_trusted: bool = True + ) -> list[KeyData]: if jid is None: jid = self.own_jid return self._contacts.get_keys(jid, only_trusted=only_trusted) - def clear_fingerprints(self): + def clear_fingerprints(self) -> None: self.set_keylist() - def cleanup(self): + def cleanup(self) -> None: self._storage.cleanup() - self._pgp = None - self._contacts = None + del self._pgp + del self._contacts -def get_instance(*args, **kwargs): +def get_instance(*args: Any, **kwargs: Any) -> tuple[Any, str]: return OpenPGP(*args, **kwargs), "OpenPGP" diff --git a/openpgp/modules/util.py b/openpgp/modules/util.py index 99e01e4..0943766 100644 --- a/openpgp/modules/util.py +++ b/openpgp/modules/util.py @@ -14,9 +14,9 @@ # You should have received a copy of the GNU General Public License # along with OpenPGP Gajim Plugin. If not, see . -from collections import namedtuple from enum import IntEnum +from nbxmpp import Node from nbxmpp.namespaces import Namespace ENCRYPTION_NAME = "OpenPGP" @@ -27,11 +27,9 @@ NOT_ENCRYPTED_TAGS = [ ("no-copy", Namespace.HINTS), ("no-permanent-store", Namespace.HINTS), ("origin-id", Namespace.SID), - ("thread", None), + ("thread", ""), ] -Key = namedtuple("Key", "fingerprint date") - class Trust(IntEnum): NOT_TRUSTED = 0 @@ -40,19 +38,23 @@ class Trust(IntEnum): VERIFIED = 3 -def prepare_stanza(stanza, payload): +def prepare_stanza(stanza: Node, payload: list[Node | str]) -> None: delete_nodes(stanza, "openpgp", Namespace.OPENPGP) delete_nodes(stanza, "body") - nodes = [(node.getName(), node.getNamespace()) for node in payload] - for name, namespace in nodes: - delete_nodes(stanza, name, namespace) - + nodes: list[Node] = [] for node in payload: + if isinstance(node, str): + continue + name, namespace = node.getName(), node.getNamespace() + delete_nodes(stanza, name, namespace) + nodes.append(node) + + for node in nodes: stanza.addChild(node=node) -def delete_nodes(stanza, name, namespace=None): +def delete_nodes(stanza: Node, name: str, namespace: str | None = None) -> None: attrs = None if namespace is not None: attrs = {"xmlns": Namespace.OPENPGP} diff --git a/openpgp/pgpplugin.py b/openpgp/pgpplugin.py index 3dfef69..478093b 100644 --- a/openpgp/pgpplugin.py +++ b/openpgp/pgpplugin.py @@ -14,10 +14,17 @@ # You should have received a copy of the GNU General Public License # along with OpenPGP Gajim Plugin. If not, see . +from __future__ import annotations + +from typing import Any +from typing import TYPE_CHECKING + import logging +from collections.abc import Callable from pathlib import Path from gi.repository import Gdk +from gi.repository import GLib from gi.repository import Gtk from nbxmpp import JID from nbxmpp.namespaces import Namespace @@ -25,7 +32,11 @@ from nbxmpp.namespaces import Namespace from gajim.common import app from gajim.common import configpaths from gajim.common import ged +from gajim.common.client import Client from gajim.common.const import CSSPriority +from gajim.common.events import SignedIn +from gajim.common.structs import OutgoingMessage +from gajim.gtk.control import ChatControl from gajim.gtk.dialogs import SimpleDialog from gajim.plugins import GajimPlugin from gajim.plugins.plugins_i18n import _ @@ -35,18 +46,21 @@ from openpgp.modules.util import ENCRYPTION_NAME try: from openpgp.modules import openpgp except (ImportError, OSError) as e: - ERROR_MSG = str(e) + error_msg = str(e) else: - ERROR_MSG = None + error_msg = None + +if TYPE_CHECKING: + from openpgp.modules.openpgp import OpenPGP log = logging.getLogger("gajim.p.openpgp") class OpenPGPPlugin(GajimPlugin): def init(self): - if ERROR_MSG: + if error_msg: self.activatable = False - self.available_text = ERROR_MSG + self.available_text = error_msg self.config_dialog = None return @@ -76,7 +90,11 @@ class OpenPGPPlugin(GajimPlugin): self._create_paths() self._load_css() - def _load_css(self): + @staticmethod + def get_openpgp_module(account: str) -> OpenPGP: + return app.get_client(account).get_module("OpenPGP") # pyright: ignore + + def _load_css(self) -> None: path = Path(__file__).parent / "gtk" / "style.css" try: with path.open("r") as f: @@ -85,59 +103,63 @@ class OpenPGPPlugin(GajimPlugin): log.error("Error loading css: %s", exc) return + display = Gdk.Display.get_default() + assert display is not None + try: provider = Gtk.CssProvider() - provider.load_from_data(bytes(css.encode("utf-8"))) - Gtk.StyleContext.add_provider_for_screen( - Gdk.Screen.get_default(), provider, CSSPriority.DEFAULT_THEME + provider.load_from_bytes(GLib.Bytes.new(css.encode("utf-8"))) + Gtk.StyleContext.add_provider_for_display( + display, provider, CSSPriority.DEFAULT_THEME ) except Exception: log.exception("Error loading application css") @staticmethod - def _create_paths(): + def _create_paths() -> None: keyring_path = Path(configpaths.get("MY_DATA")) / "openpgp" if not keyring_path.exists(): keyring_path.mkdir() - def signed_in(self, event): - client = app.get_client(event.account) - if client.get_module("OpenPGP").secret_key_available: + def signed_in(self, event: SignedIn) -> None: + openpgp = self.get_openpgp_module(event.account) + if openpgp.secret_key_available: log.info( "%s => Publish keylist and public key after sign in", event.account ) - client.get_module("OpenPGP").request_keylist() - client.get_module("OpenPGP").set_public_key() + openpgp.request_keylist() + openpgp.set_public_key() - def activate(self): + def activate(self) -> None: for account in app.settings.get_active_accounts(): client = app.get_client(account) client.get_module("Caps").update_caps() if app.account_is_connected(account): - if client.get_module("OpenPGP").secret_key_available: + openpgp = self.get_openpgp_module(account) + if openpgp.secret_key_available: log.info( "%s => Publish keylist and public key " "after plugin activation", account, ) - client.get_module("OpenPGP").request_keylist() - client.get_module("OpenPGP").set_public_key() + openpgp.request_keylist() + openpgp.set_public_key() - def deactivate(self): + def deactivate(self) -> None: pass @staticmethod - def _update_caps(_account, features): + def _update_caps(_account: str, features: list[str]) -> None: features.append("%s+notify" % Namespace.OPENPGP_PK) - def activate_encryption(self, chat_control): + def activate_encryption(self, chat_control: ChatControl) -> bool: account = chat_control.account jid = chat_control.contact.jid - client = app.get_client(account) - if client.get_module("OpenPGP").secret_key_available: - keys = client.get_module("OpenPGP").get_keys(jid, only_trusted=False) + openpgp = self.get_openpgp_module(account) + if openpgp.secret_key_available: + keys = openpgp.get_keys(jid, only_trusted=False) if not keys: - client.get_module("OpenPGP").request_keylist(JID.from_string(jid)) + openpgp.request_keylist(JID.from_string(jid)) return True from openpgp.gtk.wizard import KeyWizard @@ -146,12 +168,12 @@ class OpenPGPPlugin(GajimPlugin): return False @staticmethod - def encryption_state(_chat_control, state): + def encryption_state(_chat_control: ChatControl, state: dict[str, Any]) -> None: state["authenticated"] = True state["visible"] = True @staticmethod - def on_encryption_button_clicked(chat_control): + def on_encryption_button_clicked(chat_control: ChatControl) -> None: account = chat_control.account jid = chat_control.contact.jid @@ -159,26 +181,31 @@ class OpenPGPPlugin(GajimPlugin): KeyDialog(account, jid, app.window) - def _before_sendmessage(self, chat_control): + def _before_sendmessage(self, chat_control: ChatControl) -> None: account = chat_control.account jid = chat_control.contact.jid - client = app.get_client(account) + openpgp = self.get_openpgp_module(account) - if not client.get_module("OpenPGP").secret_key_available: + if not openpgp.secret_key_available: from openpgp.gtk.wizard import KeyWizard KeyWizard(self, account, chat_control) return - keys = client.get_module("OpenPGP").get_keys(jid) + keys = openpgp.get_keys(jid) if not keys: SimpleDialog( _("Not Trusted"), _("There was no trusted and active key found") ) chat_control.sendmessage = False - @staticmethod - def _encrypt_message(client, obj, callback): - if not client.get_module("OpenPGP").secret_key_available: + def _encrypt_message( + self, + client: Client, + message: OutgoingMessage, + callback: Callable[[OutgoingMessage], None], + ) -> None: + openpgp = self.get_openpgp_module(client.account) + if not openpgp.secret_key_available: return - client.get_module("OpenPGP").encrypt_message(obj, callback) + openpgp.encrypt_message(message, callback) diff --git a/pyproject.toml b/pyproject.toml index 4920346..ff87d71 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,6 @@ exclude = [ "**/__pycache__", ".git", ".venv", - "openpgp/*", ] [tool.ruff] diff --git a/typings/gnupg/__init__.pyi b/typings/gnupg/__init__.pyi index a3b6ea0..9d8b2e0 100644 --- a/typings/gnupg/__init__.pyi +++ b/typings/gnupg/__init__.pyi @@ -1,44 +1,40 @@ -""" -This type stub file was generated by pyright. -""" - from typing import Any +from typing import TypedDict + +import logging +import re + +from typing_extensions import NotRequired __version__: str = ... __author__: str = ... __date__: str = ... +from typing import Protocol + +string_types: type[str] = ... +text_type: type[str] = ... +path_types: tuple[type[str]] = ... + +log_everything: bool = ... +logger: logging.Logger = ... +fsencoding: str = ... + +UNSAFE: re.Pattern[Any] = ... + +def shell_quote(s: str) -> str: ... +def no_quote(s: str) -> str: ... + +class _ReadableFile(Protocol): + def read(self, size: int) -> bytes: ... + class StatusHandler: - """ - The base class for handling status messages from `gpg`. - """ - on_data_failure = ... - def __init__(self, gpg: GPG) -> None: - """ - Initialize an instance. - - Args: - gpg (GPG): The :class:`GPG` instance in use. - """ - ... - - def handle_status(self, key: str, value: str) -> None: - """ - Handle status messages from the `gpg` child process. These are lines of the format - - [GNUPG:] - - Args: - key (str): Identifies what the status message is. - value (str): Identifies additional data, which differs depending on the key. - """ - ... + on_data_failure: Exception | None = ... + def __init__(self, gpg: GPG) -> None: ... + def handle_status(self, key: str, value: str) -> None: ... class Verify(StatusHandler): - """ - This class handles status messages during signature verificaton. - """ TRUST_EXPIRED: int = ... TRUST_UNDEFINED: int = ... @@ -49,268 +45,154 @@ class Verify(StatusHandler): TRUST_LEVELS: dict[str, int] = ... GPG_SYSTEM_ERROR_CODES: dict[int, str] = ... GPG_ERROR_CODES: dict[int, str] = ... - returncode = ... - valid: bool = ... - fingerprint: str | None = ... + returncode: int | None = ... + valid: bool + fingerprint: str | None + creation_date: str | None + timestamp: str | None + signature_id: str | None + username: str | None + key_id: str | None + key_status: str | None + status: str | None + pubkey_fingerprint: str | None + expire_timestamp: str | None + sig_timestamp: str | None + trust_text: str | None + trust_level: int | None + sig_info: dict[str, dict[str, str]] + problems: list[dict[str, str]] def __init__(self, gpg: GPG) -> None: ... def __nonzero__(self) -> bool: ... - - __bool__ = ... + def __bool__(self) -> bool: ... def handle_status(self, key: str, value: str) -> None: ... -class ImportResult(StatusHandler): - """ - This class handles status messages during key import. - """ +class ImportResultDict(TypedDict): + fingerprint: str | None + problem: NotRequired[str] + ok: NotRequired[str] + text: str - counts = ... - returncode = ... +class ImportResult(StatusHandler): + + counts: list[str] = ... + returncode: int | None = ... + results: list[ImportResultDict] + fingerprints: list[str] def __init__(self, gpg: GPG) -> None: ... def __nonzero__(self) -> bool: ... + def __bool__(self) -> bool: ... - __bool__ = ... ok_reason: dict[str, str] = ... problem_reason: dict[str, str] = ... def handle_status(self, key: str, value: str) -> None: ... - def summary(self) -> str: - """ - Return a summary indicating how many keys were imported and how many were not imported. - """ - ... + def summary(self) -> str: ... -ESCAPE_PATTERN = ... -BASIC_ESCAPES = ... +ESCAPE_PATTERN: re.Pattern[Any] = ... +BASIC_ESCAPES: dict[str, str] = ... class SendResult(StatusHandler): - """ - This class handles status messages during key sending. - """ - returncode = ... + returncode: int | None = ... def handle_status(self, key: str, value: str) -> None: ... class SearchKeys(StatusHandler, list[dict[Any, Any]]): - """ - This class handles status messages during key search. - """ - UID_INDEX = ... - FIELDS = ... - returncode = ... + UID_INDEX: int = ... + FIELDS: list[str] = ... + returncode: int | None = ... def __init__(self, gpg: GPG) -> None: ... - def get_fields(self, args: Any) -> dict[str, Any]: - """ - Internal method used to update the instance from a `gpg` status message. - """ - ... - - def pub(self, args: Any) -> None: - """ - Internal method used to update the instance from a `gpg` status message. - """ - ... - - def uid(self, args: Any) -> None: - """ - Internal method used to update the instance from a `gpg` status message. - """ - ... - + def get_fields(self, args: Any) -> dict[str, Any]: ... + def pub(self, args: Any) -> None: ... + def uid(self, args: Any) -> None: ... def handle_status(self, key: str, value: str) -> None: ... class ListKeys(SearchKeys): - """ - This class handles status messages during listing keys and signatures. - Handle pub and uid (relating the latter to the former). - - We don't care about (info from GnuPG DETAILS file): - - crt = X.509 certificate - crs = X.509 certificate and private key available - uat = user attribute (same as user id except for field 10). - sig = signature - rev = revocation signature - pkd = public key data (special field format, see below) - grp = reserved for gpgsm - rvk = revocation key - """ - - UID_INDEX = ... - FIELDS = ... - fingerprints: list[str] = ... + UID_INDEX: int = ... + FIELDS: list[str] = ... def __init__(self, gpg: GPG) -> None: ... - def key(self, args: Any) -> None: - """ - Internal method used to update the instance from a `gpg` status message. - """ - ... - sec = ... - def fpr(self, args: Any) -> None: - """ - Internal method used to update the instance from a `gpg` status message. - """ - ... - - def grp(self, args: Any) -> None: - """ - Internal method used to update the instance from a `gpg` status message. - """ - ... - - def sub(self, args: Any) -> None: - """ - Internal method used to update the instance from a `gpg` status message. - """ - ... - - def ssb(self, args: Any) -> None: - """ - Internal method used to update the instance from a `gpg` status message. - """ - ... - - def sig(self, args: Any) -> None: - """ - Internal method used to update the instance from a `gpg` status message. - """ - ... + def key(self, args: Any) -> None: ... + sec: Any = ... + def fpr(self, args: Any) -> None: ... + def grp(self, args: Any) -> None: ... + def sub(self, args: Any) -> None: ... + def ssb(self, args: Any) -> None: ... + def sig(self, args: Any) -> None: ... class ScanKeys(ListKeys): - """ - This class handles status messages during scanning keys. - """ - - def sub(self, args: Any) -> None: - """ - Internal method used to update the instance from a `gpg` status message. - """ - ... + def sub(self, args: Any) -> None: ... class TextHandler: ... -_INVALID_KEY_REASONS = ... - class Crypt(Verify, TextHandler): - """ - This class handles status messages during encryption and decryption. - """ - ok: bool = ... - status: str = ... - data: bytes = ... + data: bytes + ok: bool + status_detail: str + def __init__(self, gpg: GPG) -> None: ... def __nonzero__(self) -> bool: ... - - __bool__ = ... + def __bool__(self) -> bool: ... def handle_status(self, key: str, value: str) -> None: ... class GenKey(StatusHandler): - """ - This class handles status messages during key generation. - """ - returncode = ... + returncode: int | None = ... def __init__(self, gpg: GPG) -> None: ... def __nonzero__(self) -> bool: ... - - __bool__ = ... - def __str__(self) -> str: ... + def __bool__(self) -> bool: ... def handle_status(self, key: str, value: str) -> None: ... class AddSubkey(StatusHandler): - """ - This class handles status messages during subkey addition. - """ - returncode = ... + returncode: int | None = ... def __init__(self, gpg: GPG) -> None: ... def __nonzero__(self) -> bool: ... - - __bool__ = ... - def __str__(self) -> str: ... + def __bool__(self) -> bool: ... def handle_status(self, key: str, value: str) -> None: ... class ExportResult(GenKey): - """ - This class handles status messages during key export. - """ - def handle_status(self, key: str, value: str) -> None: ... class DeleteResult(StatusHandler): - """ - This class handles status messages during key deletion. - """ - returncode = ... + returncode: int | None = ... def __init__(self, gpg: GPG) -> None: ... - def __str__(self) -> str: ... - problem_reason = ... + problem_reason: dict[str, str] = ... def handle_status(self, key: str, value: str) -> None: ... def __nonzero__(self) -> bool: ... + def __bool__(self) -> bool: ... - __bool__ = ... - -class TrustResult(DeleteResult): - """ - This class handles status messages during key trust setting. - """ - - ... +class TrustResult(DeleteResult): ... class Sign(StatusHandler, TextHandler): - """ - This class handles status messages during signing. - """ - returncode = ... - fingerprint: str | None = ... - status: str | None = ... + returncode: int | None = ... def __init__(self, gpg: GPG) -> None: ... def __nonzero__(self) -> bool: ... - - __bool__ = ... + def __bool__(self) -> bool: ... def handle_status(self, key: str, value: str) -> None: ... class AutoLocateKey(StatusHandler): - """ - This class handles status messages during key auto-locating. - fingerprint: str - key_length: int - created_at: date - email: str - email_real_name: str - """ - def __init__(self, gpg: GPG) -> None: ... def handle_status(self, key: str, value: str) -> None: ... - def pub(self, args: Any) -> None: - """ - Internal method to handle the 'pub' status message. - `pub` message contains the fingerprint of the public key, its type and its creation date. - """ - ... - + def pub(self, args: Any) -> None: ... def uid(self, args: Any) -> None: ... def sub(self, args: Any) -> None: ... def fpr(self, args: Any) -> None: ... -VERSION_RE = ... -HEX_DIGITS_RE = ... -PUBLIC_KEY_RE = ... +VERSION_RE: re.Pattern[Any] = ... +HEX_DIGITS_RE: re.Pattern[Any] = ... +PUBLIC_KEY_RE: re.Pattern[Any] = ... class GPG: - """ - This class provides a high-level programmatic interface for `gpg`. - """ - error_map = ... - encoding: str = ... + error_map: None = ... decode_errors: str = ... - buffer_size = ... - result_map = ... + buffer_size: int = ... + result_map: dict[str, Any] = ... def __init__( self, gpgbinary: str = ..., @@ -321,102 +203,15 @@ class GPG: options: list[str] | None = ..., secret_keyring: str | list[str] | None = ..., env: dict[str, str] | None = ..., - ) -> None: - """Initialize a GPG process wrapper. - - Args: - gpgbinary (str): A pathname for the GPG binary to use. - - gnupghome (str): A pathname to where we can find the public and private keyrings. The default is - whatever `gpg` defaults to. - - keyring (str|list): The name of alternative keyring file to use, or a list of such keyring files. If - specified, the default keyring is not used. - - options (list): A list of additional options to pass to the GPG binary. - - secret_keyring (str|list): The name of an alternative secret keyring file to use, or a list of such - keyring files. - - env (dict): A dict of environment variables to be used for the GPG subprocess. - """ - ... - - def make_args(self, args: list[str], passphrase: str) -> list[str]: - """ - Make a list of command line elements for GPG. The value of ``args`` - will be appended. The ``passphrase`` argument needs to be True if - a passphrase will be sent to `gpg`, else False. - - Args: - args (list[str]): A list of arguments. - passphrase (str): The passphrase to use. - """ - ... - - def is_valid_file(self, fileobj: Any) -> bool: - """ - A simplistic check for a file-like object. - - Args: - fileobj (object): The object to test. - Returns: - bool: ``True`` if it's a file-like object, else ``False``. - """ - ... - - def sign(self, message: str | bytes, **kwargs: Any) -> Sign: - """ - Sign a message. This method delegates most of the work to the `sign_file()` method. - - Args: - message (str|bytes): The data to sign. - kwargs (dict): Keyword arguments, which are passed to `sign_file()`: - - * keyid (str): The key id of the signer. - - * passphrase (str): The passphrase for the key. - - * clearsign (bool): Whether to use clear signing. - - * detach (bool): Whether to produce a detached signature. - - * binary (bool): Whether to produce a binary signature. - - * output (str): The path to write a detached signature to. - - * extra_args (list[str]): Additional arguments to pass to `gpg`. - """ - ... - - def set_output_without_confirmation(self, args: list[str], output: str) -> None: - """ - If writing to a file which exists, avoid a confirmation message by - updating the *args* value in place to set the output path and avoid - any cpmfirmation prompt. - - Args: - args (list[str]): A list of arguments. - output (str): The path to the outpur file. - """ - ... - - def is_valid_passphrase(self, passphrase: str) -> bool: - """ - Confirm that the passphrase doesn't contain newline-type characters - it is passed in a pipe to `gpg`, - and so not checking could lead to spoofing attacks by passing arbitrary text after passphrase and newline. - - Args: - passphrase (str): The passphrase to test. - - Returns: - bool: ``True`` if it's a valid passphrase, else ``False``. - """ - ... - + ) -> None: ... + def make_args(self, args: list[str], passphrase: str) -> list[str]: ... + def is_valid_file(self, fileobj: Any) -> bool: ... + def sign(self, message: str | bytes, **kwargs: Any) -> Sign: ... + def set_output_without_confirmation(self, args: list[str], output: str) -> None: ... + def is_valid_passphrase(self, passphrase: str) -> bool: ... def sign_file( self, - fileobj_or_path: Any, + fileobj_or_path: str | _ReadableFile, keyid: str | None = ..., passphrase: str | None = ..., clearsign: bool = ..., @@ -424,133 +219,29 @@ class GPG: binary: bool = ..., output: str | None = ..., extra_args: list[str] | None = ..., - ) -> Sign: - """ - Sign data in a file or file-like object. - - Args: - fileobj_or_path (str|file): The file or file-like object to sign. - - keyid (str): The key id of the signer. - - passphrase (str): The passphrase for the key. - - clearsign (bool): Whether to use clear signing. - - detach (bool): Whether to produce a detached signature. - - binary (bool): Whether to produce a binary signature. - - output (str): The path to write a detached signature to. - - extra_args (list[str]): Additional arguments to pass to `gpg`. - """ - ... - - def verify(self, data: str | bytes, **kwargs: Any) -> Verify: - """ - Verify the signature on the contents of the string *data*. This method delegates most of the work to - `verify_file()`. - - Args: - data (str|bytes): The data to verify. - kwargs (dict): Keyword arguments, which are passed to `verify_file()`: - - * fileobj_or_path (str|file): A path to a signature, or a file-like object containing one. - - * data_filename (str): If the signature is a detached one, the path to the data that was signed. - - * close_file (bool): If a file-like object is passed in, whether to close it. - - * extra_args (list[str]): Additional arguments to pass to `gpg`. - """ - ... - + ) -> Sign: ... + def verify(self, data: str | bytes, **kwargs: Any) -> Verify: ... def verify_file( self, - fileobj_or_path: Any, + fileobj_or_path: str | _ReadableFile, data_filename: str | None = ..., close_file: bool = ..., extra_args: list[str] | None = ..., - ) -> Verify: - """ - Verify a signature. - - Args: - fileobj_or_path (str|file): A path to a signature, or a file-like object containing one. - - data_filename (str): If the signature is a detached one, the path to the data that was signed. - - close_file (bool): If a file-like object is passed in, whether to close it. - - extra_args (list[str]): Additional arguments to pass to `gpg`. - """ - ... - + ) -> Verify: ... def verify_data( self, sig_filename: str, data: str | bytes, extra_args: list[str] | None = ... - ) -> Verify: - """ - Verify the signature in sig_filename against data in memory - - Args: - sig_filename (str): The path to a signature. - - data (str|bytes): The data to be verified. - - extra_args (list[str]): Additional arguments to pass to `gpg`. - """ - ... - + ) -> Verify: ... def import_keys( self, key_data: str | bytes, extra_args: list[str] | None = ..., passphrase: str | None = ..., - ) -> ImportResult: - """ - Import the key_data into our keyring. - - Args: - key_data (str|bytes): The key data to import. - - passphrase (str): The passphrase to use. - - extra_args (list[str]): Additional arguments to pass to `gpg`. - """ - ... - - def import_keys_file(self, key_path: str, **kwargs: Any) -> ImportResult: - """ - Import the key data in key_path into our keyring. - - Args: - key_path (str): A path to the key data to be imported. - """ - ... - - def recv_keys(self, keyserver: str, *keyids: str, **kwargs: Any) -> ImportResult: - """ - Import one or more keys from a keyserver. - - Args: - keyserver (str): The key server hostname. - - keyids (str): A list of key ids to receive. - """ - ... - - def send_keys(self, keyserver: str, *keyids: str, **kwargs: Any) -> SendResult: - """ - Send one or more keys to a keyserver. - - Args: - keyserver (str): The key server hostname. - - keyids (list[str]): A list of key ids to send. - """ - ... - + ) -> ImportResult: ... + def import_keys_file(self, key_path: str, **kwargs: Any) -> ImportResult: ... + def recv_keys( + self, keyserver: str, *keyids: str, **kwargs: Any + ) -> ImportResult: ... + def send_keys(self, keyserver: str, *keyids: str, **kwargs: Any) -> SendResult: ... def delete_keys( self, fingerprints: str | list[str], @@ -558,30 +249,7 @@ class GPG: passphrase: str | None = ..., expect_passphrase: bool = ..., exclamation_mode: bool = ..., - ) -> DeleteResult: - """ - Delete the indicated keys. - - Args: - fingerprints (str|list[str]): The keys to delete. - - secret (bool): Whether to delete secret keys. - - passphrase (str): The passphrase to use. - - expect_passphrase (bool): Whether a passphrase is expected. - - exclamation_mode (bool): If specified, a `'!'` is appended to each fingerprint. This deletes only a subkey - or an entire key, depending on what the fingerprint refers to. - - .. note:: Passphrases - - Since GnuPG 2.1, you can't delete secret keys without providing a passphrase. However, if you're expecting - the passphrase to go to `gpg` via pinentry, you should specify expect_passphrase=False. (It's only checked - for GnuPG >= 2.1). - """ - ... - + ) -> DeleteResult: ... def export_keys( self, keyids: str | list[str], @@ -591,136 +259,20 @@ class GPG: passphrase: str | None = ..., expect_passphrase: bool = ..., output: str | None = ..., - ) -> ExportResult: - """ - Export the indicated keys. A 'keyid' is anything `gpg` accepts. - - Args: - keyids (str|list[str]): A single keyid or a list of them. - - secret (bool): Whether to export secret keys. - - armor (bool): Whether to ASCII-armor the output. - - minimal (bool): Whether to pass `--export-options export-minimal` to `gpg`. - - passphrase (str): The passphrase to use. - - expect_passphrase (bool): Whether a passphrase is expected. - - output (str): If specified, the path to write the exported key(s) to. - - .. note:: Passphrases - - Since GnuPG 2.1, you can't export secret keys without providing a passphrase. However, if you're expecting - the passphrase to go to `gpg` via pinentry, you should specify expect_passphrase=False. (It's only checked - for GnuPG >= 2.1). - """ - ... - + ) -> str | bytes | None: ... def list_keys( self, secret: bool = ..., keys: str | list[str] | None = ..., sigs: bool = ... - ) -> ListKeys: - """ - List the keys currently in the keyring. - - Args: - secret (bool): Whether to list secret keys. - - keys (str|list[str]): A list of key ids to match. - - sigs (bool): Whether to include signature information. - - Returns: - list[dict]: A list of dictionaries with key information. - """ - ... - - def scan_keys(self, filename: str) -> ScanKeys: - """ - List details of an ascii armored or binary key file without first importing it to the local keyring. - - Args: - filename (str): The path to the file containing the key(s). - - .. warning:: Warning: - Care is needed. The function works on modern GnuPG by running: - - $ gpg --dry-run --import-options import-show --import filename - - On older versions, it does the *much* riskier: - - $ gpg --with-fingerprint --with-colons filename - """ - ... - - def scan_keys_mem(self, key_data: str | bytes) -> ScanKeys: - """ - List details of an ascii armored or binary key without first importing it to the local keyring. - - Args: - key_data (str|bytes): The key data to import. - - .. warning:: Warning: - Care is needed. The function works on modern GnuPG by running: - - $ gpg --dry-run --import-options import-show --import filename - - On older versions, it does the *much* riskier: - - $ gpg --with-fingerprint --with-colons filename - """ - ... - + ) -> ListKeys: ... + def scan_keys(self, filename: str) -> ScanKeys: ... + def scan_keys_mem(self, key_data: str | bytes) -> ScanKeys: ... def search_keys( self, query: str, keyserver: str = ..., extra_args: list[str] | None = ... - ) -> SearchKeys: - """ - search a keyserver by query (using the `--search-keys` option). - - Args: - query(str): The query to use. - - keyserver (str): The key server hostname. - - extra_args (list[str]): Additional arguments to pass to `gpg`. - """ - ... - + ) -> SearchKeys: ... def auto_locate_key( self, email: str, mechanisms: list[str] | None = ..., **kwargs: Any - ) -> AutoLocateKey: - """ - Auto locate a public key by `email`. - - Args: - email (str): The email address to search for. - mechanisms (list[str]): A list of mechanisms to use. Valid mechanisms can be found - here https://www.gnupg.org/documentation/manuals/gnupg/GPG-Configuration-Options.html - under "--auto-key-locate". Default: ['wkd', 'ntds', 'ldap', 'cert', 'dane', 'local'] - """ - ... - - def gen_key(self, input: str) -> GenKey: - """ - Generate a key; you might use `gen_key_input()` to create the input. - - Args: - input (str): The input to the key creation operation. - """ - ... - - def gen_key_input(self, **kwargs: Any) -> str: - """ - Generate `--gen-key` input (see `gpg` documentation in DETAILS). - - Args: - kwargs (dict): A list of keyword arguments. - Returns: - str: A string suitable for passing to the `gen_key()` method. - """ - ... - + ) -> AutoLocateKey: ... + def gen_key(self, input: str) -> GenKey: ... + def gen_key_input(self, **kwargs: Any) -> str: ... def add_subkey( self, master_key: str, @@ -728,26 +280,10 @@ class GPG: algorithm: str = ..., usage: str = ..., expire: str = ..., - ) -> AddSubkey: - """ - Add subkeys to a master key, - - Args: - master_key (str): The master key. - - master_passphrase (str): The passphrase for the master key. - - algorithm (str): The key algorithm to use. - - usage (str): The desired uses for the subkey. - - expire (str): The expiration date of the subkey. - """ - ... - + ) -> AddSubkey: ... def encrypt_file( self, - fileobj_or_path: Any, + fileobj_or_path: str | _ReadableFile, recipients: str | list[str], sign: str | None = ..., always_trust: bool = ..., @@ -756,144 +292,23 @@ class GPG: output: str | None = ..., symmetric: bool = ..., extra_args: list[str] | None = ..., - ) -> Crypt: - """ - Encrypt data in a file or file-like object. - - Args: - fileobj_or_path (str|file): A path to a file or a file-like object containing the data to be encrypted. - - recipients (str|list): A key id of a recipient of the encrypted data, or a list of such key ids. - - sign (str): If specified, the key id of a signer to sign the encrypted data. - - always_trust (bool): Whether to always trust keys. - - passphrase (str): The passphrase to use for a signature. - - armor (bool): Whether to ASCII-armor the output. - - output (str): A path to write the encrypted output to. - - symmetric (bool): Whether to use symmetric encryption, - - extra_args (list[str]): A list of additional arguments to pass to `gpg`. - """ - ... - + ) -> Crypt: ... def encrypt( self, data: str | bytes, recipients: str | list[str], **kwargs: Any - ) -> Crypt: - """ - Encrypt the message contained in the string *data* for *recipients*. This method delegates most of the work to - `encrypt_file()`. - - Args: - data (str|bytes): The data to encrypt. - - recipients (str|list[str]): A key id of a recipient of the encrypted data, or a list of such key ids. - - kwargs (dict): Keyword arguments, which are passed to `encrypt_file()`: - * sign (str): If specified, the key id of a signer to sign the encrypted data. - - * always_trust (bool): Whether to always trust keys. - - * passphrase (str): The passphrase to use for a signature. - - * armor (bool): Whether to ASCII-armor the output. - - * output (str): A path to write the encrypted output to. - - * symmetric (bool): Whether to use symmetric encryption, - - * extra_args (list[str]): A list of additional arguments to pass to `gpg`. - """ - ... - - def decrypt(self, message: str | bytes, **kwargs: Any) -> Crypt: - """ - Decrypt the data in *message*. This method delegates most of the work to - `decrypt_file()`. - - Args: - message (str|bytes): The data to decrypt. A default key will be used for decryption. - - kwargs (dict): Keyword arguments, which are passed to `decrypt_file()`: - - * always_trust: Whether to always trust keys. - - * passphrase (str): The passphrase to use. - - * output (str): If specified, the path to write the decrypted data to. - - * extra_args (list[str]): A list of extra arguments to pass to `gpg`. - """ - ... - + ) -> Crypt: ... + def decrypt(self, message: str | bytes, **kwargs: Any) -> Crypt: ... def decrypt_file( self, - fileobj_or_path: Any, + fileobj_or_path: str | _ReadableFile, always_trust: bool = ..., passphrase: str | None = ..., output: str | None = ..., extra_args: list[str] | None = ..., - ) -> Crypt: - """ - Decrypt data in a file or file-like object. - - Args: - fileobj_or_path (str|file): A path to a file or a file-like object containing the data to be decrypted. - - always_trust: Whether to always trust keys. - - passphrase (str): The passphrase to use. - - output (str): If specified, the path to write the decrypted data to. - - extra_args (list[str]): A list of extra arguments to pass to `gpg`. - """ - ... - - def get_recipients(self, message: str | bytes, **kwargs: Any) -> list[str]: - """Get the list of recipients for an encrypted message. This method delegates most of the work to - `get_recipients_file()`. - - Args: - message (str|bytes): The encrypted message. - - kwargs (dict): Keyword arguments, which are passed to `get_recipients_file()`: - - * extra_args (list[str]): A list of extra arguments to pass to `gpg`. - """ - ... - + ) -> Crypt: ... + def get_recipients(self, message: str | bytes, **kwargs: Any) -> list[str]: ... def get_recipients_file( - self, fileobj_or_path: Any, extra_args: list[str] | None = ... - ) -> list[str]: - """ - Get the list of recipients for an encrypted message in a file or file-like object. - - Args: - fileobj_or_path (str|file): A path to a file or file-like object containing the encrypted data. - - extra_args (list[str]): A list of extra arguments to pass to `gpg`. - """ - ... - - def trust_keys(self, fingerprints: str | list[str], trustlevel: str) -> TrustResult: - """ - Set the trust level for one or more keys. - - Args: - fingerprints (str|list[str]): A key id for which to set the trust level, or a list of such key ids. - - trustlevel (str): The trust level. This is one of the following. - - * ``'TRUST_EXPIRED'`` - * ``'TRUST_UNDEFINED'`` - * ``'TRUST_NEVER'`` - * ``'TRUST_MARGINAL'`` - * ``'TRUST_FULLY'`` - * ``'TRUST_ULTIMATE'`` - """ - ... + self, fileobj_or_path: str | _ReadableFile, extra_args: list[str] | None = ... + ) -> list[str]: ... + def trust_keys( + self, fingerprints: str | list[str], trustlevel: str + ) -> TrustResult: ... diff --git a/typings/gpg/__init__.py b/typings/gpg/__init__.py new file mode 100644 index 0000000..9998082 --- /dev/null +++ b/typings/gpg/__init__.py @@ -0,0 +1 @@ +from .core import Context # noqa: F401 diff --git a/typings/gpg/core.pyi b/typings/gpg/core.pyi new file mode 100644 index 0000000..e2459a2 --- /dev/null +++ b/typings/gpg/core.pyi @@ -0,0 +1,67 @@ +from typing import Any + +from collections.abc import Iterator + +from gpg.results import EncryptResult +from gpg.results import SignResult + +class GpgmeWrapper(object): ... + +class Context(GpgmeWrapper): + def __init__( + self, + armor: bool = ..., + textmode: bool = ..., + offline: bool = ..., + signers: list[str] = [], + pinentry_mode: str = ..., + protocol: str = ..., + wrapped: Any | None = ..., + home_dir: str | None = ..., + ) -> None: ... + def __enter__(self) -> Context: ... + def __exit__(self, type: Any, value: Any, tb: Any) -> bool: ... + def encrypt( + self, + plaintext: bytes, + recipients: list[str] = [], + sign: bool = ..., + sink: Any | None = ..., + passphrase: str | None = ..., + always_trust: bool = ..., + add_encrypt_to: bool = ..., + prepare: bool = ..., + expect_sign: bool = ..., + compress: bool = ..., + ) -> tuple[bytes, EncryptResult, SignResult]: ... + def decrypt( + self, + ciphertext: bytes, + sink: Any | None = ..., + passphrase: str | None = ..., + verify: bool = ..., + filter_signatures: bool = ..., + ) -> tuple[bytes, str, str]: ... + def key_import(self, data: bytes) -> str: ... + def key_export_minimal(self, pattern: Any | None = ...) -> bytes | None: ... + def keylist( + self, + pattern: Any | None = ..., + secret: bool = ..., + mode: str = ..., + source: Any | None = None, + ) -> Iterator[Any]: ... + def create_key( + self, + userid: str, + algorithm: str | None = ..., + expires_in: int = ..., + expires: bool = ..., + sign: bool = ..., + encrypt: bool = ..., + certify: bool = ..., + authenticate: bool = ..., + passphrase: str | None = ..., + force: bool = ..., + ) -> str: ... + def get_key(self, fpr: str, secret: bool = ...) -> Any | None: ... diff --git a/typings/gpg/errors.pyi b/typings/gpg/errors.pyi new file mode 100644 index 0000000..a5fcf89 --- /dev/null +++ b/typings/gpg/errors.pyi @@ -0,0 +1,36 @@ +from typing import Any + +class GpgError(Exception): + + error: Any | None + context: str | None + result: Any | None + + @property + def code(self) -> int: ... + @property + def code_str(self) -> str: ... + @property + def source(self) -> int: ... + @property + def source_str(self) -> str: ... + +class GPGMEError(GpgError): + @property + def message(self) -> str: ... + def getstring(self) -> str: ... + def getcode(self) -> int: ... + def getsource(self) -> int: ... + +class KeyNotFound(GPGMEError, KeyError): + keystr: str + +class EncryptionError(GpgError): ... +class InvalidRecipients(EncryptionError): ... +class DecryptionError(GpgError): ... +class UnsupportedAlgorithm(DecryptionError): ... +class SigningError(GpgError): ... +class InvalidSigners(SigningError): ... +class VerificationError(GpgError): ... +class BadSignatures(VerificationError): ... +class MissingSignatures(VerificationError): ... diff --git a/typings/gpg/results.pyi b/typings/gpg/results.pyi new file mode 100644 index 0000000..61f32aa --- /dev/null +++ b/typings/gpg/results.pyi @@ -0,0 +1,41 @@ +from typing import Any + +class Result(object): ... +class InvalidKey(Result): ... + +class EncryptResult(Result): + invalid_recipients: list[Any] + +class Recipient(Result): ... + +class DecryptResult(Result): + recipients: Recipient + +class NewSignature(Result): ... + +class SignResult(Result): + invalid_signers: InvalidKey + signatures: NewSignature + +class Notation(Result): ... + +class Signature(Result): + _type = dict(wrong_key_usage=bool, chain_model=bool, is_de_vs=bool) + notations: Notation + +class VerifyResult(Result): + signatures: Signature + +class ImportStatus(Result): ... + +class ImportResult(Result): + imports: ImportStatus + +class GenkeyResult(Result): + _type = dict(primary=bool, sub=bool) + +class KeylistResult(Result): + _type = dict(truncated=bool) + +class VFSMountResult(Result): ... +class EngineInfo(Result): ...