From 572301fe895377061410483f3680799afe422993 Mon Sep 17 00:00:00 2001 From: lovetox Date: Mon, 18 Apr 2022 14:04:43 +0200 Subject: [PATCH] [openpgp] Be more resistant against invalid keys --- openpgp/backend/gpgme.py | 27 +++++++-- openpgp/backend/pygpg.py | 116 +++++++++++++++++++++------------------ openpgp/backend/util.py | 13 +++++ 3 files changed, 98 insertions(+), 58 deletions(-) create mode 100644 openpgp/backend/util.py diff --git a/openpgp/backend/gpgme.py b/openpgp/backend/gpgme.py index c75cc06..54b8619 100644 --- a/openpgp/backend/gpgme.py +++ b/openpgp/backend/gpgme.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU General Public License # along with OpenPGP Gajim Plugin. If not, see . +from typing import Optional import logging @@ -22,6 +23,7 @@ from nbxmpp.protocol import JID import gpg from gpg.results import ImportResult +from openpgp.backend.util import parse_uid from openpgp.modules.util import DecryptionFailed log = logging.getLogger('gajim.p.openpgp.gpgme') @@ -39,10 +41,17 @@ class KeyringItem: except Exception: return False - def _get_uid(self): + def is_valid(self, jid: JID) -> bool: + if not self.is_xmpp_key: + return False + return jid == self.jid + + def _get_uid(self) -> Optional[str]: for uid in self._key.uids: - if uid.uid.startswith('xmpp:'): - return uid.uid + try: + return parse_uid(uid.uid) + except Exception: + pass @property def fingerprint(self): @@ -56,7 +65,7 @@ class KeyringItem: @property def jid(self): if self._uid is not None: - return JID.from_string(self._uid[5:]) + return JID.from_string(self._uid) def __hash__(self): return hash(self.fingerprint) @@ -118,6 +127,7 @@ class GPGME: keyring_item = KeyringItem(key) if not keyring_item.is_xmpp_key: log.warning('Key not suited for xmpp: %s', key.fpr) + self.delete_key(keyring_item.fingerprint) continue keys.append(keyring_item) @@ -191,10 +201,17 @@ class GPGME: fingerprint = result.imports[0].fpr key = self.get_key(fingerprint) + item = KeyringItem(key) + if not item.is_valid(jid): + log.warning('Invalid key found') + log.warning(key) + self.delete_key(item.fingerprint) + return - return KeyringItem(key) + return item def delete_key(self, fingerprint): + log.info('Delete Key: %s', fingerprint) key = self.get_key(fingerprint) with gpg.Context(**self._context_args) as context: context.op_delete(key, True) diff --git a/openpgp/backend/pygpg.py b/openpgp/backend/pygpg.py index e6850c1..009114f 100644 --- a/openpgp/backend/pygpg.py +++ b/openpgp/backend/pygpg.py @@ -14,21 +14,68 @@ # You should have received a copy of the GNU General Public License # along with OpenPGP Gajim Plugin. If not, see . -import os +from typing import Optional + import logging -from collections import namedtuple import gnupg +from nbxmpp.protocol import JID +from openpgp.backend.util import parse_uid from openpgp.modules.util import DecryptionFailed + log = logging.getLogger('gajim.p.openpgp.pygnupg') if log.getEffectiveLevel() == logging.DEBUG: log = logging.getLogger('gnupg') log.addHandler(logging.StreamHandler()) log.setLevel(logging.DEBUG) -KeyringItem = namedtuple('KeyringItem', 'jid keyid fingerprint') + +class KeyringItem: + def __init__(self, key): + self._key = key + self._uid = self._get_uid() + + @property + def is_xmpp_key(self) -> bool: + try: + return self.jid is not None + except Exception: + return False + + def is_valid(self, jid: JID) -> bool: + if not self.is_xmpp_key: + return False + return jid == self.jid + + @property + def keyid(self) -> str: + return self._key['keyid'] + + def _get_uid(self) -> Optional[str]: + for uid in self._key['uids']: + try: + return parse_uid(uid) + except Exception: + pass + + @property + def fingerprint(self): + return self._key['fingerprint'] + + @property + def uid(self): + if self._uid is not None: + return self._uid + + @property + def jid(self): + if self._uid is not None: + return JID.from_string(self._uid) + + def __hash__(self): + return hash(self.fingerprint) class PythonGnuPG(gnupg.GPG): @@ -93,22 +140,15 @@ class PythonGnuPG(gnupg.GPG): result = super().list_keys(secret=secret) keys = [] for key in result: - item = self._make_keyring_item(key) - if item is None: + item = KeyringItem(key) + if not item.is_xmpp_key: + log.warning('Invalid key found, deleting key') + log.warning(key) + self.delete_key(item.fingerprint) continue - keys.append(self._make_keyring_item(key)) + keys.append(item) return keys - @staticmethod - def _make_keyring_item(key): - userid = key['uids'][0] - if not userid.startswith('xmpp:'): - log.warning('Incorrect userid: %s found for key, ' - 'key will be ignored', userid) - return - jid = userid[5:] - return KeyringItem(jid, key['keyid'], key['fingerprint']) - def import_key(self, data, jid): log.info('Import key from %s', jid) result = super().import_keys(data) @@ -117,45 +157,15 @@ class PythonGnuPG(gnupg.GPG): log.error(result) return - if not self.validate_key(data, str(jid)): - return None key = self.get_key(result.results[0]['fingerprint']) - return self._make_keyring_item(key[0]) + item = KeyringItem(key[0]) + if not item.is_valid(jid): + log.warning('Invalid key found, deleting key') + log.warning(key) + self.delete_key(item.fingerprint) + return - def validate_key(self, public_key, jid): - import tempfile - temppath = os.path.join(tempfile.gettempdir(), 'temp_pubkey') - with open(temppath, 'wb') as tempfile: - tempfile.write(public_key) - - result = self.scan_keys(temppath) - if result: - key_found = False - for uid in result.uids: - if uid.startswith('xmpp:'): - if uid[5:] == jid: - key_found = True - else: - log.warning('Found wrong userid in key: %s != %s', - uid[5:], jid) - log.debug(result) - os.remove(temppath) - return False - - if not key_found: - log.warning('No valid userid found in key') - log.debug(result) - os.remove(temppath) - return False - - log.info('Key validation succesful') - os.remove(temppath) - return True - - log.warning('Invalid key data: %s') - log.debug(result) - os.remove(temppath) - return False + return item def get_own_key_details(self): result = super().list_keys(secret=True) diff --git a/openpgp/backend/util.py b/openpgp/backend/util.py new file mode 100644 index 0000000..e4cc329 --- /dev/null +++ b/openpgp/backend/util.py @@ -0,0 +1,13 @@ + +from __future__ import annotations + + +def parse_uid(uid: str, compat=False) -> str: + if uid.startswith('xmpp:'): + return uid[5:] + + # Compat with uids of form "Name " + if compat and ''): + return uid[:-1].split('