[openpgp] Be more resistant against invalid keys
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import logging
|
||||
|
||||
@@ -22,6 +23,7 @@ from nbxmpp.protocol import JID
|
||||
import gpg
|
||||
from gpg.results import ImportResult
|
||||
|
||||
from openpgp.backend.util import parse_uid
|
||||
from openpgp.modules.util import DecryptionFailed
|
||||
|
||||
log = logging.getLogger('gajim.p.openpgp.gpgme')
|
||||
@@ -39,10 +41,17 @@ class KeyringItem:
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _get_uid(self):
|
||||
def is_valid(self, jid: JID) -> bool:
|
||||
if not self.is_xmpp_key:
|
||||
return False
|
||||
return jid == self.jid
|
||||
|
||||
def _get_uid(self) -> Optional[str]:
|
||||
for uid in self._key.uids:
|
||||
if uid.uid.startswith('xmpp:'):
|
||||
return uid.uid
|
||||
try:
|
||||
return parse_uid(uid.uid)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@property
|
||||
def fingerprint(self):
|
||||
@@ -56,7 +65,7 @@ class KeyringItem:
|
||||
@property
|
||||
def jid(self):
|
||||
if self._uid is not None:
|
||||
return JID.from_string(self._uid[5:])
|
||||
return JID.from_string(self._uid)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.fingerprint)
|
||||
@@ -118,6 +127,7 @@ class GPGME:
|
||||
keyring_item = KeyringItem(key)
|
||||
if not keyring_item.is_xmpp_key:
|
||||
log.warning('Key not suited for xmpp: %s', key.fpr)
|
||||
self.delete_key(keyring_item.fingerprint)
|
||||
continue
|
||||
|
||||
keys.append(keyring_item)
|
||||
@@ -191,10 +201,17 @@ class GPGME:
|
||||
|
||||
fingerprint = result.imports[0].fpr
|
||||
key = self.get_key(fingerprint)
|
||||
item = KeyringItem(key)
|
||||
if not item.is_valid(jid):
|
||||
log.warning('Invalid key found')
|
||||
log.warning(key)
|
||||
self.delete_key(item.fingerprint)
|
||||
return
|
||||
|
||||
return KeyringItem(key)
|
||||
return item
|
||||
|
||||
def delete_key(self, fingerprint):
|
||||
log.info('Delete Key: %s', fingerprint)
|
||||
key = self.get_key(fingerprint)
|
||||
with gpg.Context(**self._context_args) as context:
|
||||
context.op_delete(key, True)
|
||||
|
||||
@@ -14,21 +14,68 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
|
||||
import gnupg
|
||||
from nbxmpp.protocol import JID
|
||||
|
||||
from openpgp.backend.util import parse_uid
|
||||
from openpgp.modules.util import DecryptionFailed
|
||||
|
||||
|
||||
log = logging.getLogger('gajim.p.openpgp.pygnupg')
|
||||
if log.getEffectiveLevel() == logging.DEBUG:
|
||||
log = logging.getLogger('gnupg')
|
||||
log.addHandler(logging.StreamHandler())
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
KeyringItem = namedtuple('KeyringItem', 'jid keyid fingerprint')
|
||||
|
||||
class KeyringItem:
|
||||
def __init__(self, key):
|
||||
self._key = key
|
||||
self._uid = self._get_uid()
|
||||
|
||||
@property
|
||||
def is_xmpp_key(self) -> bool:
|
||||
try:
|
||||
return self.jid is not None
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def is_valid(self, jid: JID) -> bool:
|
||||
if not self.is_xmpp_key:
|
||||
return False
|
||||
return jid == self.jid
|
||||
|
||||
@property
|
||||
def keyid(self) -> str:
|
||||
return self._key['keyid']
|
||||
|
||||
def _get_uid(self) -> Optional[str]:
|
||||
for uid in self._key['uids']:
|
||||
try:
|
||||
return parse_uid(uid)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@property
|
||||
def fingerprint(self):
|
||||
return self._key['fingerprint']
|
||||
|
||||
@property
|
||||
def uid(self):
|
||||
if self._uid is not None:
|
||||
return self._uid
|
||||
|
||||
@property
|
||||
def jid(self):
|
||||
if self._uid is not None:
|
||||
return JID.from_string(self._uid)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.fingerprint)
|
||||
|
||||
|
||||
class PythonGnuPG(gnupg.GPG):
|
||||
@@ -93,22 +140,15 @@ class PythonGnuPG(gnupg.GPG):
|
||||
result = super().list_keys(secret=secret)
|
||||
keys = []
|
||||
for key in result:
|
||||
item = self._make_keyring_item(key)
|
||||
if item is None:
|
||||
item = KeyringItem(key)
|
||||
if not item.is_xmpp_key:
|
||||
log.warning('Invalid key found, deleting key')
|
||||
log.warning(key)
|
||||
self.delete_key(item.fingerprint)
|
||||
continue
|
||||
keys.append(self._make_keyring_item(key))
|
||||
keys.append(item)
|
||||
return keys
|
||||
|
||||
@staticmethod
|
||||
def _make_keyring_item(key):
|
||||
userid = key['uids'][0]
|
||||
if not userid.startswith('xmpp:'):
|
||||
log.warning('Incorrect userid: %s found for key, '
|
||||
'key will be ignored', userid)
|
||||
return
|
||||
jid = userid[5:]
|
||||
return KeyringItem(jid, key['keyid'], key['fingerprint'])
|
||||
|
||||
def import_key(self, data, jid):
|
||||
log.info('Import key from %s', jid)
|
||||
result = super().import_keys(data)
|
||||
@@ -117,45 +157,15 @@ class PythonGnuPG(gnupg.GPG):
|
||||
log.error(result)
|
||||
return
|
||||
|
||||
if not self.validate_key(data, str(jid)):
|
||||
return None
|
||||
key = self.get_key(result.results[0]['fingerprint'])
|
||||
return self._make_keyring_item(key[0])
|
||||
item = KeyringItem(key[0])
|
||||
if not item.is_valid(jid):
|
||||
log.warning('Invalid key found, deleting key')
|
||||
log.warning(key)
|
||||
self.delete_key(item.fingerprint)
|
||||
return
|
||||
|
||||
def validate_key(self, public_key, jid):
|
||||
import tempfile
|
||||
temppath = os.path.join(tempfile.gettempdir(), 'temp_pubkey')
|
||||
with open(temppath, 'wb') as tempfile:
|
||||
tempfile.write(public_key)
|
||||
|
||||
result = self.scan_keys(temppath)
|
||||
if result:
|
||||
key_found = False
|
||||
for uid in result.uids:
|
||||
if uid.startswith('xmpp:'):
|
||||
if uid[5:] == jid:
|
||||
key_found = True
|
||||
else:
|
||||
log.warning('Found wrong userid in key: %s != %s',
|
||||
uid[5:], jid)
|
||||
log.debug(result)
|
||||
os.remove(temppath)
|
||||
return False
|
||||
|
||||
if not key_found:
|
||||
log.warning('No valid userid found in key')
|
||||
log.debug(result)
|
||||
os.remove(temppath)
|
||||
return False
|
||||
|
||||
log.info('Key validation succesful')
|
||||
os.remove(temppath)
|
||||
return True
|
||||
|
||||
log.warning('Invalid key data: %s')
|
||||
log.debug(result)
|
||||
os.remove(temppath)
|
||||
return False
|
||||
return item
|
||||
|
||||
def get_own_key_details(self):
|
||||
result = super().list_keys(secret=True)
|
||||
|
||||
13
openpgp/backend/util.py
Normal file
13
openpgp/backend/util.py
Normal file
@@ -0,0 +1,13 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
def parse_uid(uid: str, compat=False) -> str:
|
||||
if uid.startswith('xmpp:'):
|
||||
return uid[5:]
|
||||
|
||||
# Compat with uids of form "Name <xmpp:my@jid.com>"
|
||||
if compat and '<xmpp:' in uid and uid.endswith('>'):
|
||||
return uid[:-1].split('<xmpp:', maxsplit=1)[1]
|
||||
|
||||
raise ValueError('Uknown UID format: %s' % uid)
|
||||
Reference in New Issue
Block a user