[openpgp] Add GPGME backend

This commit is contained in:
lovetox
2020-11-21 22:56:55 +01:00
parent a2622aa6c6
commit d8bf566db2
4 changed files with 171 additions and 175 deletions

View File

@@ -15,180 +15,169 @@
# along with OpenPGP Gajim Plugin. If not, see <http://www.gnu.org/licenses/>.
import io
from collections import namedtuple
import logging
from nbxmpp.protocol import JID
import gpg
from gpg.results import ImportResult
from gajim.common import app
from openpgp.modules.util import DecryptionFailed
KeyringItem = namedtuple('KeyringItem',
'type keyid userid fingerprint')
log = logging.getLogger('gajim.p.openpgp.pgpme')
log = logging.getLogger('gajim.p.openpgp.gpgme')
class PGPContext():
class KeyringItem:
def __init__(self, key):
self._key = key
self._uid = self._get_uid()
def _get_uid(self):
for uid in self._key.uids:
if uid.uid.startswith('xmpp:'):
return uid.uid
@property
def fingerprint(self):
return self._key.fpr
@property
def uid(self):
if self._uid is not None:
return self._uid
@property
def jid(self):
if self._uid is not None:
return JID.from_string(self._uid[5:])
def __hash__(self):
return hash(self.fingerprint)
class GPGME:
def __init__(self, jid, gnuhome):
self.context = gpg.Context(home_dir=str(gnuhome))
# self.create_new_key()
# self.get_key_by_name()
# self.get_key_by_fingerprint()
self.export_public_key()
self._jid = jid
self._context_args = {
'home_dir': str(gnuhome),
'offline': True,
'armor': False,
}
def create_new_key(self):
parms = """<GnupgKeyParms format="internal">
Key-Type: RSA
Key-Length: 2048
Subkey-Type: RSA
Subkey-Length: 2048
Name-Real: Joe Tester
Name-Comment: with stupid passphrase
Name-Email: test@example.org
Passphrase: Crypt0R0cks
Expire-Date: 2020-12-31
</GnupgKeyParms>
"""
def generate_key(self):
with gpg.Context(**self._context_args) as context:
result = context.create_key(f'xmpp:{str(self._jid)}',
expires=False,
sign=True,
encrypt=True,
certify=False,
authenticate=False,
passphrase=None,
force=False)
with self.context as c:
c.set_engine_info(gpg.constants.PROTOCOL_OpenPGP, None, app.gajimpaths['MY_DATA'])
c.set_progress_cb(gpg.callbacks.progress_stdout)
c.op_genkey(parms, None, None)
print("Generated key with fingerprint {0}.".format(
c.op_genkey_result().fpr))
log.info('Generated new key: %s', result.fpr)
def get_all_keys(self):
c = gpg.Context()
for key in c.keylist():
user = key.uids[0]
print("Keys for %s (%s):" % (user.name, user.email))
def get_key(self, fingerprint):
with gpg.Context(**self._context_args) as context:
try:
key = context.get_key(fingerprint)
except gpg.errors.KeyNotFound as error:
log.warning('key not found: %s', error.keystr)
return
except Exception as error:
log.warning('get_key() error: %s', error)
return
return key
def get_own_key_details(self):
with gpg.Context(**self._context_args) as context:
keys = list(context.keylist(secret=True))
if not keys:
return None, None
key = keys[0]
for subkey in key.subkeys:
features = []
if subkey.can_authenticate:
features.append('auth')
if subkey.can_certify:
features.append('cert')
if subkey.can_encrypt:
features.append('encrypt')
if subkey.can_sign:
features.append('sign')
print(' %s %s' % (subkey.fpr, ','.join(features)))
if subkey.fpr == key.fpr:
return subkey.fpr, subkey.timestamp
def get_key_by_name(self):
c = gpg.Context()
for key in c.keylist('john'):
print(key.subkeys[0].fpr)
return None, None
def get_key_by_fingerprint(self):
c = gpg.Context()
fingerprint = 'key fingerprint to search for'
try:
key = c.get_key(fingerprint)
print('%s (%s)' % (key.uids[0].name, key.uids[0].email))
except gpg.errors.KeyNotFound:
print("No key for fingerprint '%s'." % fingerprint)
def get_secret_key(self):
'''
Key(can_authenticate=1,
can_certify=1,
can_encrypt=1,
can_sign=1,
chain_id=None,
disabled=0,
expired=0,
fpr='7ECE1F88BAFCA37F168E1556A4DBDD1BA55FE3CE',
invalid=0,
is_qualified=0,
issuer_name=None,
issuer_serial=None,
keylist_mode=1,
last_update=0,
origin=0,
owner_trust=5,
protocol=0,
revoked=0,
secret=1,
subkeys=[
SubKey(can_authenticate=1,
can_certify=1,
can_encrypt=1,
can_sign=1,
card_number=None
curve=None,
disabled=0,
expired=0,
expires=0,
fpr='7ECE1F88BAFCA37F168E1556A4DBDD1BA55FE3CE',
invalid=0,
is_cardkey=0,
is_de_vs=1,
is_qualified=0,
keygrip='15BECB77301E4810ABB9CA6A9391158E575DABEC',
keyid='A4DBDD1BA55FE3CE',
length=2048,
pubkey_algo=1,
revoked=0,
secret=1,
timestamp=1525006759)],
uids=[
UID(address=None,
comment='',
email='',
invalid=0,
last_update=0,
name='xmpp:philw@jabber.at',
origin=0,
revoked=0,
signatures=[],
tofu=[],
uid='xmpp:philw@jabber.at',
validity=5)])
'''
for key in self.context.keylist(secret=True):
break
return key.fpr, key.fpr[-16:]
def get_keys(self, secret=False):
def get_keys(self):
keys = []
for key in self.context.keylist():
for uid in key.uids:
if uid.uid.startswith('xmpp:'):
keys.append((key, uid.uid[5:]))
break
with gpg.Context(**self._context_args) as context:
for key in context.keylist():
keys.append(KeyringItem(key))
return keys
def export_public_key(self):
# print(dir(self.context))
result = self.context.key_export_minimal()
print(result)
def export_key(self, fingerprint):
with gpg.Context(**self._context_args) as context:
key = context.key_export_minimal(pattern=fingerprint)
return key
def encrypt_decrypt_files(self):
c = gpg.Context()
recipient = c.get_key("fingerprint of recipient's key")
# def encrypt_decrypt_files(self):
# c = gpg.Context()
# recipient = c.get_key("fingerprint of recipient's key")
# Encrypt
with open('foo.txt', 'r') as input_file:
with open('foo.txt.gpg', 'wb') as output_file:
c.encrypt([recipient], 0, input_file, output_file)
# # Encrypt
# with open('foo.txt', 'r') as input_file:
# with open('foo.txt.gpg', 'wb') as output_file:
# c.encrypt([recipient], 0, input_file, output_file)
# Decrypt
with open('foo.txt.gpg', 'rb') as input_file:
with open('foo2.txt', 'w') as output_file:
c.decrypt(input_file, output_file)
# # Decrypt
# with open('foo.txt.gpg', 'rb') as input_file:
# with open('foo2.txt', 'w') as output_file:
# c.decrypt(input_file, output_file)
def encrypt(self):
c = gpg.Context()
recipient = c.get_key("fingerprint of recipient's key")
def encrypt(self, plaintext, keys):
recipients = []
with gpg.Context(**self._context_args) as context:
for key in keys:
key = context.get_key(key.fingerprint)
if key is not None:
recipients.append(key)
plaintext_string = u'plain text data'
plaintext_bytes = io.BytesIO(plaintext_string.encode('utf8'))
encrypted_bytes = io.BytesIO()
c.encrypt([recipient], 0, plaintext_bytes, encrypted_bytes)
if not recipients:
return None, 'No keys found to encrypt to'
def decrypt(self):
c = gpg.Context()
decrypted_bytes = io.BytesIO()
c.decrypt(encrypted_bytes, decrypted_bytes)
decrypted_string = decrypted_bytes.getvalue().decode('utf8')
with gpg.Context(**self._context_args) as context:
result = context.encrypt(str(plaintext).encode(),
recipients,
always_trust=True)
ciphertext, result, _sign_result = result
return ciphertext, None
def decrypt(self, ciphertext):
with gpg.Context(**self._context_args) as context:
try:
result = context.decrypt(ciphertext)
except Exception as error:
raise DecryptionFailed('Decryption failed: %s' % error)
plaintext, result, verify_result = result
plaintext = plaintext.decode()
fingerprints = [sig.fpr for sig in verify_result.signatures]
if not fingerprints or len(fingerprints) > 1:
log.error(result)
log.error(verify_result)
raise DecryptionFailed('Verification failed')
return plaintext, fingerprints[0]
def import_key(self, data, jid):
log.info('Import key from %s', jid)
with gpg.Context(**self._context_args) as context:
result = context.key_import(data)
if not isinstance(result, ImportResult) or result.imported != 1:
log.error('Key import failed: %s', jid)
log.error(result)
return
fingerprint = result.imports[0].fpr
key = self.get_key(fingerprint)
return KeyringItem(key)

View File

@@ -31,7 +31,7 @@ if log.getEffectiveLevel() == logging.DEBUG:
KeyringItem = namedtuple('KeyringItem', 'jid keyid fingerprint')
class PGPContext(gnupg.GPG):
class PythonGnuPG(gnupg.GPG):
def __init__(self, jid, gnupghome):
gnupg.GPG.__init__(
self, gpgbinary='gpg', gnupghome=str(gnupghome))