[pgp] Port to Gtk4 and add type annotations

This commit is contained in:
Philipp Hörist
2025-01-28 23:22:05 +01:00
parent 6fe2da67c3
commit 3a5816259c
11 changed files with 1323 additions and 285 deletions

View File

@@ -14,21 +14,30 @@
# You should have received a copy of the GNU General Public License
# along with PGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
from typing import Any
import os
import threading
import time
from collections.abc import Callable
import nbxmpp
from gi.repository import GLib
from nbxmpp.client import Client as nbxmppClient
from nbxmpp.namespaces import Namespace
from nbxmpp.protocol import Message
from nbxmpp.protocol import Presence
from nbxmpp.structs import EncryptionData
from nbxmpp.structs import MessageProperties
from nbxmpp.structs import PresenceProperties
from nbxmpp.structs import StanzaHandler
from gajim.common import app
from gajim.common.client import Client
from gajim.common.const import Trust
from gajim.common.events import MessageNotSent
from gajim.common.modules.base import BaseModule
from gajim.common.modules.httpupload import HTTPFileTransfer
from gajim.common.structs import OutgoingMessage
from gajim.plugins.plugins_i18n import _
@@ -68,7 +77,7 @@ ALLOWED_TAGS = [
class PGPLegacy(BaseModule):
def __init__(self, client):
def __init__(self, client: Client) -> None:
BaseModule.__init__(self, client, plugin=True)
self.handlers = [
@@ -92,26 +101,26 @@ class PGPLegacy(BaseModule):
self._store = KeyStore(
self._account, self.own_jid, self._log, self._pgp.list_keys
)
self._always_trust = []
self._presence_fingerprint_store = {}
self._always_trust: list[str] = []
self._presence_fingerprint_store: dict[str, str] = {}
@property
def pgp_backend(self):
def pgp_backend(self) -> PGP:
return self._pgp
def set_own_key_data(self, *args, **kwargs):
return self._store.set_own_key_data(*args, **kwargs)
def set_own_key_data(self, keydata: tuple[str, str] | None) -> None:
return self._store.set_own_key_data(keydata)
def get_own_key_data(self, *args, **kwargs):
return self._store.get_own_key_data(*args, **kwargs)
def get_own_key_data(self) -> dict[str, str] | None:
return self._store.get_own_key_data()
def set_contact_key_data(self, *args, **kwargs):
return self._store.set_contact_key_data(*args, **kwargs)
def set_contact_key_data(self, jid: str, key_data: tuple[str, str] | None) -> None:
return self._store.set_contact_key_data(jid, key_data)
def get_contact_key_data(self, *args, **kwargs):
return self._store.get_contact_key_data(*args, **kwargs)
def get_contact_key_data(self, jid: str) -> dict[str, str] | None:
return self._store.get_contact_key_data(jid)
def has_valid_key_assigned(self, jid):
def has_valid_key_assigned(self, jid: str) -> bool:
key_data = self.get_contact_key_data(jid)
if key_data is None:
return False
@@ -126,9 +135,13 @@ class PGPLegacy(BaseModule):
raise KeyMismatch(announced_fingerprint)
def _on_presence_received(self, _con, _stanza, properties):
def _on_presence_received(
self, _client: nbxmppClient, _stanza: Presence, properties: PresenceProperties
):
if properties.signed is None:
return
assert properties.jid is not None
jid = properties.jid.bare
fingerprint = self._pgp.verify(properties.status, properties.signed)
@@ -160,13 +173,16 @@ class PGPLegacy(BaseModule):
)
return
def _message_received(self, _con, stanza, properties):
def _message_received(
self, _client: nbxmppClient, stanza: Message, properties: MessageProperties
) -> None:
if not properties.is_pgp_legacy or properties.from_muc:
return
remote_jid = properties.remote_jid
self._log.info("Message received from: %s", remote_jid)
assert properties.pgp_legacy is not None
payload = self._pgp.decrypt(properties.pgp_legacy)
prepare_stanza(stanza, payload)
@@ -174,7 +190,12 @@ class PGPLegacy(BaseModule):
protocol=ENCRYPTION_NAME, key="Unknown", trust=Trust.UNDECIDED
)
def encrypt_message(self, con, message: OutgoingMessage, callback):
def encrypt_message(
self,
client: Client,
message: OutgoingMessage,
callback: Callable[[OutgoingMessage], None],
) -> None:
if not message.get_text():
callback(message)
return
@@ -187,15 +208,24 @@ class PGPLegacy(BaseModule):
return
always_trust = key_id in self._always_trust
self._encrypt(con, message, [key_id, own_key_id], callback, always_trust)
self._encrypt(client, message, [key_id, own_key_id], callback, always_trust)
def _encrypt(
self, con, message: OutgoingMessage, keys, callback, always_trust: bool
):
result = self._pgp.encrypt(message.get_text(), keys, always_trust)
self,
client: Client,
message: OutgoingMessage,
recipients: list[str],
callback: Callable[[OutgoingMessage], None],
always_trust: bool,
) -> None:
text = message.get_text()
assert text is not None
result = self._pgp.encrypt(text, recipients, always_trust)
encrypted_payload, error = result
if error:
self._handle_encrypt_error(con, error, message, keys, callback)
self._handle_encrypt_error(client, error, message, recipients, callback)
return
self._cleanup_stanza(message)
@@ -212,30 +242,41 @@ class PGPLegacy(BaseModule):
callback(message)
def _handle_encrypt_error(
self, con, error: str, message: OutgoingMessage, keys, callback
):
self,
client: Client,
error: str,
message: OutgoingMessage,
recipients: list[str],
callback: Callable[[OutgoingMessage], None],
) -> None:
if error.startswith("NOT_TRUSTED"):
def on_yes(checked):
def on_yes(checked: bool) -> None:
if checked:
self._always_trust.append(keys[0])
self._encrypt(con, message, keys, callback, True)
self._always_trust.append(recipients[0])
self._encrypt(client, message, recipients, callback, True)
def on_no():
self._raise_message_not_sent(con, message, error)
def on_no() -> None:
self._raise_message_not_sent(client, message, error)
app.ged.raise_event(PGPNotTrusted(on_yes=on_yes, on_no=on_no))
else:
self._raise_message_not_sent(con, message, error)
self._raise_message_not_sent(client, message, error)
@staticmethod
def _raise_message_not_sent(con, message: OutgoingMessage, error: str):
def _raise_message_not_sent(
client: Client, message: OutgoingMessage, error: str
) -> None:
text = message.get_text()
assert text is not None
app.ged.raise_event(
MessageNotSent(
client=con,
client=client,
jid=str(message.contact.jid),
message=message.get_text(),
message=text,
error=_("Encryption error: %s") % error,
time=time.time(),
)
@@ -250,7 +291,7 @@ class PGPLegacy(BaseModule):
)
stanza.addChild(node=eme_node)
def sign_presence(self, presence, status):
def sign_presence(self, presence: nbxmpp.Presence, status: str) -> None:
key_data = self.get_own_key_data()
if key_data is None:
self._log.warning("No own key id found, cant sign presence")
@@ -266,7 +307,7 @@ class PGPLegacy(BaseModule):
presence.setTag(Namespace.SIGNED + " x").setData(result)
@staticmethod
def _get_info_message():
def _get_info_message() -> str:
msg = "[This message is *encrypted* (See :XEP:`27`)]"
lang = os.getenv("LANG")
if lang is not None and not lang.startswith("en"):
@@ -274,7 +315,7 @@ class PGPLegacy(BaseModule):
msg = _("[This message is *encrypted* (See :XEP:`27`)]") + " (" + msg + ")"
return msg
def _get_key_ids(self, jid):
def _get_key_ids(self, jid: str) -> tuple[str, str]:
key_data = self.get_contact_key_data(jid)
if key_data is None:
raise NoKeyIdFound("No key id found for %s" % jid)
@@ -301,21 +342,25 @@ class PGPLegacy(BaseModule):
stanza.addChild(node=node)
message.set_stanza(stanza)
def encrypt_file(self, file, callback):
def encrypt_file(
self, transfer: HTTPFileTransfer, callback: Callable[[HTTPFileTransfer], None]
) -> None:
thread = threading.Thread(
target=self._encrypt_file_thread, args=(file, callback)
target=self._encrypt_file_thread, args=(transfer, callback)
)
thread.daemon = True
thread.start()
def _encrypt_file_thread(self, file, callback):
def _encrypt_file_thread(
self, transfer: HTTPFileTransfer, callback: Callable[[HTTPFileTransfer], None]
) -> None:
try:
key_id, own_key_id = self._get_key_ids(file.contact.jid)
key_id, own_key_id = self._get_key_ids(str(transfer.contact.jid))
except NoKeyIdFound as error:
self._log.warning(error)
return
stream = open(file.path, "rb")
stream = open(transfer.path, "rb")
encrypted = self._pgp.encrypt_file(stream, [key_id, own_key_id])
stream.close()
@@ -323,15 +368,15 @@ class PGPLegacy(BaseModule):
GLib.idle_add(self._on_file_encryption_error, encrypted.status)
return
file.size = len(encrypted.data)
file.set_uri_transform_func(lambda uri: "%s.pgp" % uri)
file.set_encrypted_data(encrypted.data)
GLib.idle_add(callback, file)
transfer.size = len(encrypted.data)
transfer.set_uri_transform_func(lambda uri: "%s.pgp" % uri)
transfer.set_encrypted_data(encrypted.data)
GLib.idle_add(callback, transfer)
@staticmethod
def _on_file_encryption_error(error):
def _on_file_encryption_error(error: str) -> None:
app.ged.raise_event(PGPFileEncryptionError(error=error))
def get_instance(*args, **kwargs):
def get_instance(*args: Any, **kwargs: Any) -> tuple[PGPLegacy, str]:
return PGPLegacy(*args, **kwargs), "PGPLegacy"