# Copyright (C) 2019 Philipp Hörist # Copyright (C) 2003-2014 Yann Leboulanger # Copyright (C) 2005 Alex Mauer # Copyright (C) 2005-2006 Nikos Kouremenos # Copyright (C) 2007 Stephan Erb # Copyright (C) 2008 Jean-Marie Traissard # Jonathan Schleifer # # This file is part of PGP Gajim Plugin. # # PGP Gajim Plugin is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation; version 3 only. # # PGP Gajim Plugin is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with PGP Gajim Plugin. If not, see . import logging import os from functools import lru_cache import gnupg from gajim.common.util.classes import Singleton from pgp.exceptions import SignError logger = logging.getLogger("gajim.p.pgplegacy") if logger.getEffectiveLevel() == logging.DEBUG: logger = logging.getLogger("gnupg") logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.DEBUG) class PGP(gnupg.GPG, metaclass=Singleton): def __init__(self, binary, encoding=None): super().__init__(gpgbinary=binary, use_agent=True) if encoding is not None: self.encoding = encoding self.decode_errors = "replace" def encrypt(self, payload, recipients, always_trust=False): if not always_trust: # check that we'll be able to encrypt result = self.get_key(recipients[0]) for key in result: if key["trust"] not in ("f", "u"): return "", "NOT_TRUSTED " + key["keyid"][-8:] result = super().encrypt( payload.encode("utf8"), recipients, always_trust=always_trust ) if result.ok: error = "" else: error = result.status return self._strip_header_footer(str(result)), error def decrypt(self, payload): data = self._add_header_footer(payload, "MESSAGE") result = super().decrypt(data.encode("utf8")) return result.data.decode("utf8") @lru_cache(maxsize=8) def sign(self, payload, key_id): if payload is None: payload = "" result = super().sign(payload.encode("utf8"), keyid=key_id, detach=True) if result.fingerprint: return self._strip_header_footer(str(result)) raise SignError(result.status) def verify(self, payload, signed): # Hash algorithm is not transferred in the signed # presence stanza so try all algorithms. # Text name for hash algorithms from RFC 4880 - section 9.4 if payload is None: payload = "" hash_algorithms = ["SHA512", "SHA384", "SHA256", "SHA224", "SHA1", "RIPEMD160"] for algo in hash_algorithms: data = os.linesep.join( [ "-----BEGIN PGP SIGNED MESSAGE-----", "Hash: " + algo, "", payload, self._add_header_footer(signed, "SIGNATURE"), ] ) result = super().verify(data.encode("utf8")) if result.valid: return result.fingerprint def get_key(self, key_id): return super().list_keys(keys=[key_id]) def get_keys(self, secret=False): keys = {} result = super().list_keys(secret=secret) for key in result: # Take first not empty uid keys[key["fingerprint"]] = next(uid for uid in key["uids"] if uid) return keys @staticmethod def _strip_header_footer(data): """ Remove header and footer from data """ if not data: return "" lines = data.splitlines() while lines[0] != "": lines.remove(lines[0]) while lines[0] == "": lines.remove(lines[0]) i = 0 for line in lines: if line: if line[0] == "-": break i = i + 1 line = "\n".join(lines[0:i]) return line @staticmethod def _add_header_footer(data, type_): """ Add header and footer from data """ out = "-----BEGIN PGP %s-----" % type_ + os.linesep out = out + "Version: PGP" + os.linesep out = out + os.linesep out = out + data + os.linesep out = out + "-----END PGP %s-----" % type_ + os.linesep return out