cq: Format with black and isort

This commit is contained in:
Philipp Hörist
2025-01-25 19:15:37 +01:00
parent e6e71d82bf
commit 841b1fb25e
44 changed files with 1641 additions and 1660 deletions

View File

@@ -20,8 +20,8 @@
# You should have received a copy of the GNU General Public License
# along with PGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
import os
import logging
import os
from functools import lru_cache
import gnupg
@@ -30,56 +30,51 @@ from gajim.common.util.classes import Singleton
from pgp.exceptions import SignError
logger = logging.getLogger('gajim.p.pgplegacy')
logger = logging.getLogger("gajim.p.pgplegacy")
if logger.getEffectiveLevel() == logging.DEBUG:
logger = logging.getLogger('gnupg')
logger = logging.getLogger("gnupg")
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
class PGP(gnupg.GPG, metaclass=Singleton):
def __init__(self, binary, encoding=None):
super().__init__(gpgbinary=binary,
use_agent=True)
super().__init__(gpgbinary=binary, use_agent=True)
if encoding is not None:
self.encoding = encoding
self.decode_errors = 'replace'
self.decode_errors = "replace"
def encrypt(self, payload, recipients, always_trust=False):
if not always_trust:
# check that we'll be able to encrypt
result = self.get_key(recipients[0])
for key in result:
if key['trust'] not in ('f', 'u'):
return '', 'NOT_TRUSTED ' + key['keyid'][-8:]
if key["trust"] not in ("f", "u"):
return "", "NOT_TRUSTED " + key["keyid"][-8:]
result = super().encrypt(
payload.encode('utf8'),
recipients,
always_trust=always_trust)
payload.encode("utf8"), recipients, always_trust=always_trust
)
if result.ok:
error = ''
error = ""
else:
error = result.status
return self._strip_header_footer(str(result)), error
def decrypt(self, payload):
data = self._add_header_footer(payload, 'MESSAGE')
result = super().decrypt(data.encode('utf8'))
data = self._add_header_footer(payload, "MESSAGE")
result = super().decrypt(data.encode("utf8"))
return result.data.decode('utf8')
return result.data.decode("utf8")
@lru_cache(maxsize=8)
def sign(self, payload, key_id):
if payload is None:
payload = ''
result = super().sign(payload.encode('utf8'),
keyid=key_id,
detach=True)
payload = ""
result = super().sign(payload.encode("utf8"), keyid=key_id, detach=True)
if result.fingerprint:
return self._strip_header_footer(str(result))
@@ -91,19 +86,20 @@ class PGP(gnupg.GPG, metaclass=Singleton):
# Text name for hash algorithms from RFC 4880 - section 9.4
if payload is None:
payload = ''
payload = ""
hash_algorithms = ['SHA512', 'SHA384', 'SHA256',
'SHA224', 'SHA1', 'RIPEMD160']
hash_algorithms = ["SHA512", "SHA384", "SHA256", "SHA224", "SHA1", "RIPEMD160"]
for algo in hash_algorithms:
data = os.linesep.join(
['-----BEGIN PGP SIGNED MESSAGE-----',
'Hash: ' + algo,
'',
payload,
self._add_header_footer(signed, 'SIGNATURE')]
)
result = super().verify(data.encode('utf8'))
[
"-----BEGIN PGP SIGNED MESSAGE-----",
"Hash: " + algo,
"",
payload,
self._add_header_footer(signed, "SIGNATURE"),
]
)
result = super().verify(data.encode("utf8"))
if result.valid:
return result.fingerprint
@@ -116,7 +112,7 @@ class PGP(gnupg.GPG, metaclass=Singleton):
for key in result:
# Take first not empty uid
keys[key['fingerprint']] = next(uid for uid in key['uids'] if uid)
keys[key["fingerprint"]] = next(uid for uid in key["uids"] if uid)
return keys
@staticmethod
@@ -125,19 +121,19 @@ class PGP(gnupg.GPG, metaclass=Singleton):
Remove header and footer from data
"""
if not data:
return ''
return ""
lines = data.splitlines()
while lines[0] != '':
while lines[0] != "":
lines.remove(lines[0])
while lines[0] == '':
while lines[0] == "":
lines.remove(lines[0])
i = 0
for line in lines:
if line:
if line[0] == '-':
if line[0] == "-":
break
i = i+1
line = '\n'.join(lines[0:i])
i = i + 1
line = "\n".join(lines[0:i])
return line
@staticmethod

View File

@@ -34,31 +34,30 @@ class KeyStore:
self._account = account
own_bare_jid = own_jid.bare
path = Path(configpaths.get('PLUGINS_DATA')) / 'pgplegacy' / own_bare_jid
path = Path(configpaths.get("PLUGINS_DATA")) / "pgplegacy" / own_bare_jid
if not path.exists():
path.mkdir(parents=True)
self._store_path = path / 'store'
self._store_path = path / "store"
if self._store_path.exists():
# having store v2 or higher
with self._store_path.open('r') as file:
with self._store_path.open("r") as file:
try:
self._store = json.load(file)
except Exception:
log.exception('Could not load config')
log.exception("Could not load config")
self._store = self._empty_store()
ver = self._store.get('_version', 2)
ver = self._store.get("_version", 2)
if ver > CURRENT_STORE_VERSION:
raise Exception('Unknown store version! '
'Please upgrade pgp plugin.')
raise Exception("Unknown store version! " "Please upgrade pgp plugin.")
elif ver == 2:
self._migrate_v2_store()
self._save_store()
elif ver != CURRENT_STORE_VERSION:
# garbled version
self._store = self._empty_store()
log.warning('Bad pgp key store version. Initializing new.')
log.warning("Bad pgp key store version. Initializing new.")
else:
# having store v1 or fresh install
self._store = self._empty_store()
@@ -69,15 +68,16 @@ class KeyStore:
@staticmethod
def _empty_store():
return {
'_version': CURRENT_STORE_VERSION,
'own_key_data': None,
'contact_key_data': {},
"_version": CURRENT_STORE_VERSION,
"own_key_data": None,
"contact_key_data": {},
}
def _migrate_v1_store(self):
keys = {}
attached_keys = app.settings.get_account_setting(
self._account, 'attached_gpg_keys')
self._account, "attached_gpg_keys"
)
if not attached_keys:
return
attached_keys = attached_keys.split()
@@ -86,23 +86,25 @@ class KeyStore:
keys[attached_keys[2 * i]] = attached_keys[2 * i + 1]
for jid, key_id in keys.items():
self._set_contact_key_data_nosync(jid, (key_id, ''))
self._set_contact_key_data_nosync(jid, (key_id, ""))
own_key_id = app.settings.get_account_setting(self._account, 'keyid')
own_key_user = app.settings.get_account_setting(
self._account, 'keyname')
own_key_id = app.settings.get_account_setting(self._account, "keyid")
own_key_user = app.settings.get_account_setting(self._account, "keyname")
if own_key_id:
self._set_own_key_data_nosync((own_key_id, own_key_user))
attached_keys = app.settings.set_account_setting(
self._account, 'attached_gpg_keys', '')
self._log.info('Migration from store v1 was successful')
self._account, "attached_gpg_keys", ""
)
self._log.info("Migration from store v1 was successful")
def _migrate_v2_store(self):
own_key_data = self.get_own_key_data()
if own_key_data is not None:
own_key_id, own_key_user = (own_key_data['key_id'],
own_key_data['key_user'])
own_key_id, own_key_user = (
own_key_data["key_id"],
own_key_data["key_user"],
)
try:
own_key_fp = self._resolve_short_id(own_key_id, has_secret=True)
self._set_own_key_data_nosync((own_key_fp, own_key_user))
@@ -111,38 +113,41 @@ class KeyStore:
prune_list = []
for dict_key, key_data in self._store['contact_key_data'].items():
for dict_key, key_data in self._store["contact_key_data"].items():
try:
key_data['key_id'] = self._resolve_short_id(key_data['key_id'])
key_data["key_id"] = self._resolve_short_id(key_data["key_id"])
except KeyResolveError:
prune_list.append(dict_key)
for dict_key in prune_list:
del self._store['contact_key_data'][dict_key]
del self._store["contact_key_data"][dict_key]
self._store['_version'] = CURRENT_STORE_VERSION
self._log.info('Migration from store v2 was successful')
self._store["_version"] = CURRENT_STORE_VERSION
self._log.info("Migration from store v2 was successful")
def _save_store(self):
with self._store_path.open('w') as file:
with self._store_path.open("w") as file:
json.dump(self._store, file)
def _get_dict_key(self, jid):
return '%s-%s' % (self._account, jid)
return "%s-%s" % (self._account, jid)
def _resolve_short_id(self, short_id, has_secret=False):
candidates = self._list_keys_func(
secret=has_secret, keys=(short_id,)).fingerprints
secret=has_secret, keys=(short_id,)
).fingerprints
if len(candidates) == 1:
return candidates[0]
elif len(candidates) > 1:
self._log.critical('Key collision during migration. '
'Key ID is %s. Removing binding...',
repr(short_id))
self._log.critical(
"Key collision during migration. " "Key ID is %s. Removing binding...",
repr(short_id),
)
else:
self._log.warning('Key %s was not found during migration. '
'Removing binding...',
repr(short_id))
self._log.warning(
"Key %s was not found during migration. " "Removing binding...",
repr(short_id),
)
raise KeyResolveError
def set_own_key_data(self, key_data):
@@ -151,18 +156,18 @@ class KeyStore:
def _set_own_key_data_nosync(self, key_data):
if key_data is None:
self._store['own_key_data'] = None
self._store["own_key_data"] = None
else:
self._store['own_key_data'] = {
'key_id': key_data[0],
'key_user': key_data[1]
self._store["own_key_data"] = {
"key_id": key_data[0],
"key_user": key_data[1],
}
def get_own_key_data(self):
return self._store['own_key_data']
return self._store["own_key_data"]
def get_contact_key_data(self, jid):
key_ids = self._store['contact_key_data']
key_ids = self._store["contact_key_data"]
dict_key = self._get_dict_key(jid)
return key_ids.get(dict_key)
@@ -171,12 +176,9 @@ class KeyStore:
self._save_store()
def _set_contact_key_data_nosync(self, jid, key_data):
key_ids = self._store['contact_key_data']
key_ids = self._store["contact_key_data"]
dict_key = self._get_dict_key(jid)
if key_data is None:
key_ids[dict_key] = None
else:
key_ids[dict_key] = {
'key_id': key_data[0],
'key_user': key_data[1]
}
key_ids[dict_key] = {"key_id": key_data[0], "key_user": key_data[1]}