[openpgp] Port to Gtk4
This commit is contained in:
97
openpgp/backend/base.py
Normal file
97
openpgp/backend/base.py
Normal file
@@ -0,0 +1,97 @@
|
||||
# Copyright (C) 2025 Philipp Hörist <philipp AT hoerist.com>
|
||||
#
|
||||
# This file is part of the OpenPGP Gajim Plugin.
|
||||
#
|
||||
# OpenPGP Gajim Plugin is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published
|
||||
# by the Free Software Foundation; version 3 only.
|
||||
#
|
||||
# OpenPGP Gajim Plugin is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from collections.abc import Sequence
|
||||
from pathlib import Path
|
||||
|
||||
from nbxmpp.protocol import JID
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from openpgp.modules.key_store import KeyData
|
||||
|
||||
|
||||
class BaseKeyringItem:
|
||||
def __init__(self, key: Any) -> None:
|
||||
self._key = key
|
||||
self._uid = self._get_uid()
|
||||
|
||||
@property
|
||||
def is_xmpp_key(self) -> bool:
|
||||
try:
|
||||
return self.jid is not None
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def is_valid(self, jid: JID) -> bool:
|
||||
if not self.is_xmpp_key:
|
||||
return False
|
||||
return jid == self.jid
|
||||
|
||||
def _get_uid(self) -> str | None:
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def fingerprint(self) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
def uid(self):
|
||||
if self._uid is not None:
|
||||
return self._uid
|
||||
|
||||
@property
|
||||
def jid(self) -> JID | None:
|
||||
if self._uid is not None:
|
||||
return JID.from_string(self._uid)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.fingerprint)
|
||||
|
||||
|
||||
class BasePGPBackend:
|
||||
def __init__(self, jid: str, gnupghome: Path) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def generate_key(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def encrypt(
|
||||
self, payload: bytes, keys: list[KeyData]
|
||||
) -> tuple[bytes | None, str | None]:
|
||||
raise NotImplementedError
|
||||
|
||||
def decrypt(self, payload: bytes) -> tuple[str, str]:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_keys(self) -> Sequence[BaseKeyringItem]:
|
||||
raise NotImplementedError
|
||||
|
||||
def import_key(self, data: bytes, jid: JID) -> BaseKeyringItem | None:
|
||||
raise NotImplementedError
|
||||
|
||||
def get_own_key_details(self) -> tuple[str | None, int | None]:
|
||||
raise NotImplementedError
|
||||
|
||||
def export_key(self, fingerprint: str) -> bytes | None:
|
||||
raise NotImplementedError
|
||||
|
||||
def delete_key(self, fingerprint: str) -> None:
|
||||
raise NotImplementedError
|
||||
@@ -14,34 +14,29 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
|
||||
import logging
|
||||
from collections.abc import Sequence
|
||||
from pathlib import Path
|
||||
|
||||
import gpg
|
||||
from gpg.errors import KeyNotFound
|
||||
from gpg.results import ImportResult
|
||||
from nbxmpp.protocol import JID
|
||||
|
||||
from openpgp.backend.base import BaseKeyringItem
|
||||
from openpgp.backend.base import BasePGPBackend
|
||||
from openpgp.backend.gpgme_types import Key
|
||||
from openpgp.backend.util import parse_uid
|
||||
from openpgp.modules.key_store import KeyData
|
||||
from openpgp.modules.util import DecryptionFailed
|
||||
|
||||
log = logging.getLogger("gajim.p.openpgp.gpgme")
|
||||
|
||||
|
||||
class KeyringItem:
|
||||
def __init__(self, key):
|
||||
self._key = key
|
||||
self._uid = self._get_uid()
|
||||
|
||||
@property
|
||||
def is_xmpp_key(self) -> bool:
|
||||
try:
|
||||
return self.jid is not None
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def is_valid(self, jid: JID) -> bool:
|
||||
if not self.is_xmpp_key:
|
||||
return False
|
||||
return jid == self.jid
|
||||
class KeyringItem(BaseKeyringItem):
|
||||
|
||||
def _get_uid(self) -> str | None:
|
||||
for uid in self._key.uids:
|
||||
@@ -51,36 +46,22 @@ class KeyringItem:
|
||||
pass
|
||||
|
||||
@property
|
||||
def fingerprint(self):
|
||||
def fingerprint(self) -> str:
|
||||
return self._key.fpr
|
||||
|
||||
@property
|
||||
def uid(self):
|
||||
if self._uid is not None:
|
||||
return self._uid
|
||||
|
||||
@property
|
||||
def jid(self):
|
||||
if self._uid is not None:
|
||||
return JID.from_string(self._uid)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.fingerprint)
|
||||
|
||||
|
||||
class GPGME:
|
||||
def __init__(self, jid, gnuhome):
|
||||
class GPGMe(BasePGPBackend):
|
||||
def __init__(self, jid: str, gnuhome: Path) -> None:
|
||||
self._jid = jid
|
||||
self._context_args = {
|
||||
"home_dir": str(gnuhome),
|
||||
"offline": True,
|
||||
"armor": False,
|
||||
}
|
||||
self._home_dir = str(gnuhome)
|
||||
|
||||
def generate_key(self):
|
||||
with gpg.Context(**self._context_args) as context:
|
||||
def _get_context(self) -> gpg.Context:
|
||||
return gpg.Context(armor=False, offline=True, home_dir=self._home_dir)
|
||||
|
||||
def generate_key(self) -> None:
|
||||
with self._get_context() as context:
|
||||
result = context.create_key(
|
||||
f"xmpp:{str(self._jid)}",
|
||||
f"xmpp:{self._jid}",
|
||||
algorithm="default",
|
||||
expires=False,
|
||||
passphrase=None,
|
||||
@@ -89,11 +70,11 @@ class GPGME:
|
||||
|
||||
log.info("Generated new key: %s", result.fpr)
|
||||
|
||||
def get_key(self, fingerprint):
|
||||
with gpg.Context(**self._context_args) as context:
|
||||
def _get_key(self, fingerprint: str) -> Key | None:
|
||||
with self._get_context() as context:
|
||||
try:
|
||||
key = context.get_key(fingerprint)
|
||||
except gpg.errors.KeyNotFound as error:
|
||||
return cast(Key, context.get_key(fingerprint))
|
||||
except KeyNotFound as error:
|
||||
log.warning("key not found: %s", error.keystr)
|
||||
return
|
||||
|
||||
@@ -101,11 +82,9 @@ class GPGME:
|
||||
log.warning("get_key() error: %s", error)
|
||||
return
|
||||
|
||||
return key
|
||||
|
||||
def get_own_key_details(self):
|
||||
with gpg.Context(**self._context_args) as context:
|
||||
keys = list(context.keylist(secret=True))
|
||||
def get_own_key_details(self) -> tuple[str | None, int | None]:
|
||||
with self._get_context() as context:
|
||||
keys = cast(list[Key], list(context.keylist(secret=True)))
|
||||
if not keys:
|
||||
return None, None
|
||||
|
||||
@@ -116,10 +95,10 @@ class GPGME:
|
||||
|
||||
return None, None
|
||||
|
||||
def get_keys(self):
|
||||
keys = []
|
||||
with gpg.Context(**self._context_args) as context:
|
||||
for key in context.keylist():
|
||||
def get_keys(self) -> Sequence[KeyringItem]:
|
||||
keys: list[KeyringItem] = []
|
||||
with self._get_context() as context:
|
||||
for key in context.keylist(secret=False):
|
||||
keyring_item = KeyringItem(key)
|
||||
if not keyring_item.is_xmpp_key:
|
||||
log.warning("Key not suited for xmpp: %s", key.fpr)
|
||||
@@ -130,10 +109,9 @@ class GPGME:
|
||||
|
||||
return keys
|
||||
|
||||
def export_key(self, fingerprint):
|
||||
with gpg.Context(**self._context_args) as context:
|
||||
key = context.key_export_minimal(pattern=fingerprint)
|
||||
return key
|
||||
def export_key(self, fingerprint: str) -> bytes | None:
|
||||
with self._get_context() as context:
|
||||
return context.key_export_minimal(pattern=fingerprint)
|
||||
|
||||
# def encrypt_decrypt_files(self):
|
||||
# c = gpg.Context()
|
||||
@@ -149,9 +127,11 @@ class GPGME:
|
||||
# with open('foo2.txt', 'w') as output_file:
|
||||
# c.decrypt(input_file, output_file)
|
||||
|
||||
def encrypt(self, plaintext, keys):
|
||||
recipients = []
|
||||
with gpg.Context(**self._context_args) as context:
|
||||
def encrypt(
|
||||
self, payload: bytes, keys: list[KeyData]
|
||||
) -> tuple[bytes | None, str | None]:
|
||||
recipients: list[Any] = []
|
||||
with self._get_context() as context:
|
||||
for key in keys:
|
||||
key = context.get_key(key.fingerprint)
|
||||
if key is not None:
|
||||
@@ -160,18 +140,18 @@ class GPGME:
|
||||
if not recipients:
|
||||
return None, "No keys found to encrypt to"
|
||||
|
||||
with gpg.Context(**self._context_args) as context:
|
||||
result = context.encrypt(
|
||||
str(plaintext).encode(), recipients, always_trust=True
|
||||
)
|
||||
with self._get_context() as context:
|
||||
result = context.encrypt(payload, recipients, always_trust=True)
|
||||
|
||||
ciphertext, result, _sign_result = result
|
||||
return ciphertext, None
|
||||
ciphertext, result, _sign_result = result
|
||||
return ciphertext, None
|
||||
|
||||
def decrypt(self, ciphertext):
|
||||
with gpg.Context(**self._context_args) as context:
|
||||
raise RuntimeError
|
||||
|
||||
def decrypt(self, payload: bytes) -> tuple[str, str]:
|
||||
with self._get_context() as context:
|
||||
try:
|
||||
result = context.decrypt(ciphertext)
|
||||
result = context.decrypt(payload)
|
||||
except Exception as error:
|
||||
raise DecryptionFailed("Decryption failed: %s" % error)
|
||||
|
||||
@@ -186,9 +166,9 @@ class GPGME:
|
||||
|
||||
return plaintext, fingerprints[0]
|
||||
|
||||
def import_key(self, data, jid):
|
||||
def import_key(self, data: bytes, jid: JID) -> KeyringItem | None:
|
||||
log.info("Import key from %s", jid)
|
||||
with gpg.Context(**self._context_args) as context:
|
||||
with self._get_context() as context:
|
||||
result = context.key_import(data)
|
||||
if not isinstance(result, ImportResult) or result.imported != 1:
|
||||
log.error("Key import failed: %s", jid)
|
||||
@@ -196,7 +176,7 @@ class GPGME:
|
||||
return
|
||||
|
||||
fingerprint = result.imports[0].fpr
|
||||
key = self.get_key(fingerprint)
|
||||
key = self._get_key(fingerprint)
|
||||
item = KeyringItem(key)
|
||||
if not item.is_valid(jid):
|
||||
log.warning("Invalid key found")
|
||||
@@ -206,8 +186,9 @@ class GPGME:
|
||||
|
||||
return item
|
||||
|
||||
def delete_key(self, fingerprint):
|
||||
def delete_key(self, fingerprint: str) -> None:
|
||||
log.info("Delete Key: %s", fingerprint)
|
||||
key = self.get_key(fingerprint)
|
||||
with gpg.Context(**self._context_args) as context:
|
||||
key = self._get_key(fingerprint)
|
||||
assert key is not None
|
||||
with self._get_context() as context:
|
||||
context.op_delete(key, True)
|
||||
|
||||
86
openpgp/backend/gpgme_types.py
Normal file
86
openpgp/backend/gpgme_types.py
Normal file
@@ -0,0 +1,86 @@
|
||||
# Copyright (C) 2025 Philipp Hörist <philipp AT hoerist.com>
|
||||
#
|
||||
# This file is part of the OpenPGP Gajim Plugin.
|
||||
#
|
||||
# OpenPGP Gajim Plugin is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published
|
||||
# by the Free Software Foundation; version 3 only.
|
||||
#
|
||||
# OpenPGP Gajim Plugin is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
class UID:
|
||||
address: Any | None
|
||||
comment: str
|
||||
email: str
|
||||
invalid: int
|
||||
last_update: int
|
||||
name: str
|
||||
origin: int
|
||||
revoked: int
|
||||
signatures: list[Any]
|
||||
thisown: bool
|
||||
tofu: list[Any]
|
||||
uid: str
|
||||
uidhash: str
|
||||
validity: int
|
||||
|
||||
|
||||
class SubKey:
|
||||
can_authenticate: int
|
||||
can_certify: int
|
||||
can_encrypt: int
|
||||
can_sign: int
|
||||
card_number: Any | None
|
||||
curve: Any | None
|
||||
disabled: int
|
||||
expired: int
|
||||
expires: int
|
||||
fpr: str
|
||||
invalid: int
|
||||
is_cardkey: int
|
||||
is_de_vs: int
|
||||
is_qualified: int
|
||||
keygrip: Any | None
|
||||
keyid: str
|
||||
length: int
|
||||
pubkey_algo: int
|
||||
revoked: int
|
||||
secret: int
|
||||
thisown: bool
|
||||
timestamp: int
|
||||
|
||||
|
||||
class Key:
|
||||
can_authenticate: int
|
||||
can_certify: int
|
||||
can_encrypt: int
|
||||
can_sign: int
|
||||
chain_id: Any | None
|
||||
disabled: int
|
||||
expired: int
|
||||
fpr: str
|
||||
invalid: int
|
||||
is_qualified: int
|
||||
issuer_name: str | None
|
||||
issuer_serial: str | None
|
||||
keylist_mode: int
|
||||
last_update: int
|
||||
origin: int
|
||||
owner_trust: int
|
||||
protocol: int
|
||||
revoked: int
|
||||
secret: int
|
||||
subkeys: list[SubKey]
|
||||
thisown: bool
|
||||
uids: list[UID]
|
||||
@@ -15,12 +15,16 @@
|
||||
# along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import logging
|
||||
from collections.abc import Sequence
|
||||
from pathlib import Path
|
||||
|
||||
import gnupg
|
||||
from nbxmpp.protocol import JID
|
||||
|
||||
from openpgp.backend.base import BaseKeyringItem
|
||||
from openpgp.backend.base import BasePGPBackend
|
||||
from openpgp.backend.util import parse_uid
|
||||
from openpgp.modules.key_store import KeyData
|
||||
from openpgp.modules.util import DecryptionFailed
|
||||
|
||||
log = logging.getLogger("gajim.p.openpgp.pygnupg")
|
||||
@@ -30,22 +34,7 @@ if log.getEffectiveLevel() == logging.DEBUG:
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class KeyringItem:
|
||||
def __init__(self, key):
|
||||
self._key = key
|
||||
self._uid = self._get_uid()
|
||||
|
||||
@property
|
||||
def is_xmpp_key(self) -> bool:
|
||||
try:
|
||||
return self.jid is not None
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def is_valid(self, jid: JID) -> bool:
|
||||
if not self.is_xmpp_key:
|
||||
return False
|
||||
return jid == self.jid
|
||||
class KeyringItem(BaseKeyringItem):
|
||||
|
||||
@property
|
||||
def keyid(self) -> str:
|
||||
@@ -59,32 +48,18 @@ class KeyringItem:
|
||||
pass
|
||||
|
||||
@property
|
||||
def fingerprint(self):
|
||||
def fingerprint(self) -> str:
|
||||
return self._key["fingerprint"]
|
||||
|
||||
@property
|
||||
def uid(self):
|
||||
if self._uid is not None:
|
||||
return self._uid
|
||||
|
||||
@property
|
||||
def jid(self):
|
||||
if self._uid is not None:
|
||||
return JID.from_string(self._uid)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.fingerprint)
|
||||
|
||||
|
||||
class PythonGnuPG(gnupg.GPG):
|
||||
class PythonGnuPG(BasePGPBackend):
|
||||
def __init__(self, jid: str, gnupghome: Path) -> None:
|
||||
gnupg.GPG.__init__(self, gpgbinary="gpg", gnupghome=str(gnupghome))
|
||||
|
||||
self._gnupg = gnupg.GPG(gpgbinary="gpg", gnupghome=str(gnupghome))
|
||||
self._jid = jid
|
||||
self._own_fingerprint = None
|
||||
|
||||
@staticmethod
|
||||
def _get_key_params(jid):
|
||||
def _get_key_params(jid: str) -> str:
|
||||
"""
|
||||
Generate --gen-key input
|
||||
"""
|
||||
@@ -102,17 +77,19 @@ class PythonGnuPG(gnupg.GPG):
|
||||
out += "%commit\n"
|
||||
return out
|
||||
|
||||
def generate_key(self):
|
||||
super().gen_key(self._get_key_params(self._jid))
|
||||
def generate_key(self) -> None:
|
||||
self._gnupg.gen_key(self._get_key_params(self._jid))
|
||||
|
||||
def encrypt(self, payload, keys):
|
||||
def encrypt(
|
||||
self, payload: bytes, keys: list[KeyData]
|
||||
) -> tuple[bytes | None, str | None]:
|
||||
recipients = [key.fingerprint for key in keys]
|
||||
log.info("encrypt to:")
|
||||
for fingerprint in recipients:
|
||||
log.info(fingerprint)
|
||||
|
||||
result = super().encrypt(
|
||||
str(payload).encode("utf8"),
|
||||
result = self._gnupg.encrypt(
|
||||
payload,
|
||||
recipients,
|
||||
armor=False,
|
||||
sign=self._own_fingerprint,
|
||||
@@ -126,19 +103,20 @@ class PythonGnuPG(gnupg.GPG):
|
||||
|
||||
return result.data, error
|
||||
|
||||
def decrypt(self, payload):
|
||||
result = super().decrypt(payload, always_trust=True)
|
||||
def decrypt(self, payload: bytes) -> tuple[str, str]:
|
||||
result = self._gnupg.decrypt(payload, always_trust=True)
|
||||
if not result.ok:
|
||||
raise DecryptionFailed(result.status)
|
||||
|
||||
assert result.fingerprint is not None
|
||||
return result.data.decode("utf8"), result.fingerprint
|
||||
|
||||
def get_key(self, fingerprint):
|
||||
return super().list_keys(keys=[fingerprint])
|
||||
def _get_key(self, fingerprint: str) -> gnupg.ListKeys:
|
||||
return self._gnupg.list_keys(keys=[fingerprint])
|
||||
|
||||
def get_keys(self, secret=False):
|
||||
result = super().list_keys(secret=secret)
|
||||
keys = []
|
||||
def get_keys(self) -> Sequence[KeyringItem]:
|
||||
result = self._gnupg.list_keys(secret=False)
|
||||
keys: list[KeyringItem] = []
|
||||
for key in result:
|
||||
item = KeyringItem(key)
|
||||
if not item.is_xmpp_key:
|
||||
@@ -149,15 +127,18 @@ class PythonGnuPG(gnupg.GPG):
|
||||
keys.append(item)
|
||||
return keys
|
||||
|
||||
def import_key(self, data, jid):
|
||||
def import_key(self, data: bytes, jid: JID) -> KeyringItem | None:
|
||||
log.info("Import key from %s", jid)
|
||||
result = super().import_keys(data)
|
||||
result = self._gnupg.import_keys(data)
|
||||
if not result:
|
||||
log.error("Could not import key")
|
||||
log.error(result)
|
||||
return
|
||||
|
||||
key = self.get_key(result.results[0]["fingerprint"])
|
||||
fpr = result.results[0]["fingerprint"]
|
||||
assert fpr is not None
|
||||
|
||||
key = self._get_key(fpr)
|
||||
item = KeyringItem(key[0])
|
||||
if not item.is_valid(jid):
|
||||
log.warning("Invalid key found, deleting key")
|
||||
@@ -167,8 +148,8 @@ class PythonGnuPG(gnupg.GPG):
|
||||
|
||||
return item
|
||||
|
||||
def get_own_key_details(self):
|
||||
result = super().list_keys(secret=True)
|
||||
def get_own_key_details(self) -> tuple[str | None, int | None]:
|
||||
result = self._gnupg.list_keys(secret=True)
|
||||
if not result:
|
||||
return None, None
|
||||
|
||||
@@ -179,10 +160,13 @@ class PythonGnuPG(gnupg.GPG):
|
||||
self._own_fingerprint = result[0]["fingerprint"]
|
||||
return self._own_fingerprint, int(result[0]["date"])
|
||||
|
||||
def export_key(self, fingerprint):
|
||||
key = super().export_keys(fingerprint, secret=False, armor=False, minimal=True)
|
||||
def export_key(self, fingerprint: str) -> bytes | None:
|
||||
key = self._gnupg.export_keys(
|
||||
fingerprint, secret=False, armor=False, minimal=True
|
||||
)
|
||||
assert isinstance(key, bytes | None)
|
||||
return key
|
||||
|
||||
def delete_key(self, fingerprint):
|
||||
def delete_key(self, fingerprint: str) -> None:
|
||||
log.info("Delete Key: %s", fingerprint)
|
||||
super().delete_keys(fingerprint)
|
||||
self._gnupg.delete_keys(fingerprint)
|
||||
|
||||
@@ -14,9 +14,18 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from typing import Any
|
||||
from typing import NamedTuple
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
from collections import namedtuple
|
||||
from collections.abc import Iterator
|
||||
from pathlib import Path
|
||||
|
||||
from nbxmpp.protocol import JID
|
||||
|
||||
from openpgp.modules.util import Trust
|
||||
|
||||
log = logging.getLogger("gajim.p.openpgp.sql")
|
||||
|
||||
@@ -32,8 +41,16 @@ TABLE_LAYOUT = """
|
||||
CREATE UNIQUE INDEX jid_fingerprint ON contacts (jid, fingerprint);"""
|
||||
|
||||
|
||||
class ContactRow(NamedTuple):
|
||||
jid: JID
|
||||
fingerprint: str
|
||||
active: bool
|
||||
trust: Trust
|
||||
timestamp: float
|
||||
|
||||
|
||||
class Storage:
|
||||
def __init__(self, folder_path):
|
||||
def __init__(self, folder_path: Path) -> None:
|
||||
self._con = sqlite3.connect(
|
||||
str(folder_path / "contacts.db"), detect_types=sqlite3.PARSE_COLNAMES
|
||||
)
|
||||
@@ -45,21 +62,21 @@ class Storage:
|
||||
self._con.commit()
|
||||
|
||||
@staticmethod
|
||||
def _namedtuple_factory(cursor, row):
|
||||
def _namedtuple_factory(cursor: sqlite3.Cursor, row: Any) -> Any:
|
||||
fields = [col[0] for col in cursor.description]
|
||||
Row = namedtuple("Row", fields)
|
||||
Row = namedtuple("Row", fields) # pyright: ignore
|
||||
named_row = Row(*row)
|
||||
return named_row
|
||||
|
||||
def _user_version(self):
|
||||
def _user_version(self) -> int:
|
||||
return self._con.execute("PRAGMA user_version").fetchone()[0]
|
||||
|
||||
def _create_database(self):
|
||||
def _create_database(self) -> None:
|
||||
if not self._user_version():
|
||||
log.info("Create contacts.db")
|
||||
self._execute_query(TABLE_LAYOUT)
|
||||
|
||||
def _execute_query(self, query):
|
||||
def _execute_query(self, query: str) -> None:
|
||||
transaction = """
|
||||
BEGIN TRANSACTION;
|
||||
%s
|
||||
@@ -70,40 +87,41 @@ class Storage:
|
||||
)
|
||||
self._con.executescript(transaction)
|
||||
|
||||
def _migrate_database(self):
|
||||
def _migrate_database(self) -> None:
|
||||
pass
|
||||
|
||||
def load_contacts(self):
|
||||
def load_contacts(self) -> list[ContactRow]:
|
||||
sql = """SELECT jid as "jid [jid]",
|
||||
fingerprint,
|
||||
active,
|
||||
active,
|
||||
trust,
|
||||
timestamp,
|
||||
comment
|
||||
timestamp
|
||||
FROM contacts"""
|
||||
|
||||
return self._con.execute(sql).fetchall()
|
||||
|
||||
def save_contact(self, db_values):
|
||||
def save_contact(
|
||||
self, db_values: Iterator[tuple[JID, str, bool, Trust, float]]
|
||||
) -> None:
|
||||
sql = """REPLACE INTO
|
||||
contacts(jid, fingerprint, active, trust, timestamp, comment)
|
||||
contacts(jid, fingerprint, active, trust, timestamp)
|
||||
VALUES(?, ?, ?, ?, ?, ?)"""
|
||||
for values in db_values:
|
||||
log.info("Store key: %s", values)
|
||||
self._con.execute(sql, values)
|
||||
self._con.commit()
|
||||
|
||||
def set_trust(self, jid, fingerprint, trust):
|
||||
def set_trust(self, jid: JID, fingerprint: str, trust: Trust) -> None:
|
||||
sql = "UPDATE contacts SET trust = ? WHERE jid = ? AND fingerprint = ?"
|
||||
log.info("Set Trust: %s %s %s", trust, jid, fingerprint)
|
||||
self._con.execute(sql, (trust, jid, fingerprint))
|
||||
self._con.commit()
|
||||
|
||||
def delete_key(self, jid, fingerprint):
|
||||
def delete_key(self, jid: JID, fingerprint: str) -> None:
|
||||
sql = "DELETE from contacts WHERE jid = ? AND fingerprint = ?"
|
||||
log.info("Delete Key: %s %s", jid, fingerprint)
|
||||
self._con.execute(sql, (jid, fingerprint))
|
||||
self._con.commit()
|
||||
|
||||
def cleanup(self):
|
||||
def cleanup(self) -> None:
|
||||
self._con.close()
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
def parse_uid(uid: str, compat=False) -> str:
|
||||
def parse_uid(uid: str, compat: bool = False) -> str:
|
||||
if uid.startswith("xmpp:"):
|
||||
return uid[5:]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user