# Copyright (C) 2019 Philipp Hörist # Copyright (C) 2015 Bahtiar `kalkin-` Gadimov # Copyright (C) 2015 Daniel Gultsch # # This file is part of OMEMO Gajim Plugin. # # OMEMO Gajim Plugin is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation; version 3 only. # # OMEMO Gajim Plugin is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with OMEMO Gajim Plugin. If not, see . import logging import binascii import threading from enum import IntEnum, unique from pathlib import Path from gi.repository import GLib from gi.repository import Gtk from gi.repository import Gdk from gajim import dialogs from gajim.common import app, ged from gajim.plugins import GajimPlugin from gajim.plugins.plugins_i18n import _ from gajim.groupchat_control import GroupchatControl from omemo import file_crypto from omemo.gtk.key import KeyDialog from omemo.gtk.config import OMEMOConfigDialog from omemo.backend.aes import aes_encrypt_file AXOLOTL_MISSING = 'You are missing Python3-Axolotl or use an outdated version' PROTOBUF_MISSING = "OMEMO can't import Google Protobuf, you can find help in " \ "the GitHub Wiki" ERROR_MSG = '' log = logging.getLogger('gajim.p.omemo') if log.getEffectiveLevel() == logging.DEBUG: log_axolotl = logging.getLogger('axolotl') log_axolotl.setLevel(logging.DEBUG) log_axolotl.addHandler(logging.StreamHandler()) log_axolotl.propagate = False try: import google.protobuf except Exception as error: log.error(error) ERROR_MSG = PROTOBUF_MISSING try: import axolotl except Exception as error: log.error(error) ERROR_MSG = AXOLOTL_MISSING if not ERROR_MSG: try: from omemo.modules import omemo except Exception as error: log.error(error) ERROR_MSG = 'Error: %s' % error @unique class UserMessages(IntEnum): QUERY_DEVICES = 0 NO_FINGERPRINTS = 1 UNDECIDED_FINGERPRINTS = 2 class OmemoPlugin(GajimPlugin): def init(self): # pylint: disable=attribute-defined-outside-init if ERROR_MSG: self.activatable = False self.available_text = ERROR_MSG self.config_dialog = None return self.encryption_name = 'OMEMO' self.allow_groupchat = True self.events_handlers = { 'omemo-new-fingerprint': (ged.PRECORE, self._on_new_fingerprints), 'signed-in': (ged.PRECORE, self._on_signed_in), 'muc-config-changed': (ged.GUI2, self._on_muc_config_changed), } self.modules = [omemo] self.config_dialog = OMEMOConfigDialog(self) self.gui_extension_points = { 'hyperlink_handler': (self._file_decryption, None), 'encrypt' + self.encryption_name: (self._encrypt_message, None), 'gc_encrypt' + self.encryption_name: ( self._muc_encrypt_message, None), 'send_message' + self.encryption_name: ( self._before_sendmessage, None), 'encryption_dialog' + self.encryption_name: ( self._on_encryption_button_clicked, None), 'encryption_state' + self.encryption_name: ( self._encryption_state, None), 'update_caps': (self._update_caps, None)} self.disabled_accounts = [] self._windows = {} self.config_default_values = {'DISABLED_ACCOUNTS': ([], ''), } for account in self.config['DISABLED_ACCOUNTS']: self.disabled_accounts.append(account) # add aesgcm:// uri scheme to config schemes = app.config.get('uri_schemes') if 'aesgcm://' not in schemes.split(): schemes += ' aesgcm://' app.config.set('uri_schemes', schemes) self._load_css() def _is_enabled_account(self, account): if account in self.disabled_accounts: return False if account == 'Local': return False return True @staticmethod def get_omemo(account): return app.connections[account].get_module('OMEMO') @staticmethod def _load_css(): path = Path(__file__).parent / 'gtk' / 'style.css' try: with path.open("r") as file: css = file.read() except Exception as exc: log.error('Error loading css: %s', exc) return try: provider = Gtk.CssProvider() provider.load_from_data(bytes(css.encode('utf-8'))) Gtk.StyleContext.add_provider_for_screen(Gdk.Screen.get_default(), provider, 610) except Exception: log.exception('Error loading application css') def activate(self): """ Method called when the Plugin is activated in the PluginManager """ for account in app.connections: if not self._is_enabled_account(account): continue self.get_omemo(account).activate() def deactivate(self): """ Method called when the Plugin is deactivated in the PluginManager """ for account in app.connections: if not self._is_enabled_account(account): continue self.get_omemo(account).deactivate() def _on_signed_in(self, event): account = event.conn.name if not self._is_enabled_account(account): return self.get_omemo(account).on_signed_in() def _on_muc_config_changed(self, event): if not self._is_enabled_account(event.account): return self.get_omemo(event.account).on_muc_config_changed(event) def _update_caps(self, account): if not self._is_enabled_account(account): return self.get_omemo(account).update_caps(account) @staticmethod def activate_encryption(chat_control): if isinstance(chat_control, GroupchatControl): omemo_con = app.connections[chat_control.account].get_module('OMEMO') if not omemo_con.is_omemo_groupchat(chat_control.room_jid): dialogs.ErrorDialog( _('Bad Configuration'), _('To use OMEMO in a Groupchat, the Groupchat should be' ' non-anonymous and members-only.')) return False return True def _muc_encrypt_message(self, conn, obj, callback): account = conn.name if not self._is_enabled_account(account): return self.get_omemo(account).encrypt_message(conn, obj, callback, True) def _encrypt_message(self, conn, obj, callback): account = conn.name if not self._is_enabled_account(account): return self.get_omemo(account).encrypt_message(conn, obj, callback, False) def _file_decryption(self, uri, instance, window): file_crypto.FileDecryption(self).hyperlink_handler( uri, instance, window) def encrypt_file(self, file, _account, callback): thread = threading.Thread(target=self._encrypt_file_thread, args=(file, callback)) thread.daemon = True thread.start() @staticmethod def _encrypt_file_thread(file, callback, *args, **kwargs): result = aes_encrypt_file(file.get_data(full=True)) file.encrypted = True file.size = len(result.payload) file.user_data = binascii.hexlify(result.iv + result.key).decode() file.data = result.payload if file.event.isSet(): return GLib.idle_add(callback, file) @staticmethod def _encryption_state(_chat_control, state): state['visible'] = True state['authenticated'] = True def _on_encryption_button_clicked(self, chat_control): self._show_fingerprint_window(chat_control) def _before_sendmessage(self, chat_control): account = chat_control.account if not self._is_enabled_account(account): return contact = chat_control.contact omemo = self.get_omemo(account) self.new_fingerprints_available(chat_control) if isinstance(chat_control, GroupchatControl): room = chat_control.room_jid missing = True for jid in omemo.backend.get_muc_members(room): if not omemo.are_keys_missing(jid): missing = False if missing: log.info('%s => No Trusted Fingerprints for %s', account, room) self.print_message(chat_control, UserMessages.NO_FINGERPRINTS) else: # check if we have devices for the contact if not omemo.backend.get_devices(contact.jid, without_self=True): omemo.request_devicelist(contact.jid) self.print_message(chat_control, UserMessages.QUERY_DEVICES) chat_control.sendmessage = False return # check if bundles are missing for some devices if omemo.backend.storage.hasUndecidedFingerprints(contact.jid): log.info('%s => Undecided Fingerprints for %s', account, contact.jid) self.print_message(chat_control, UserMessages.UNDECIDED_FINGERPRINTS) chat_control.sendmessage = False else: log.debug('%s => Sending Message to %s', account, contact.jid) def _on_new_fingerprints(self, event): self.new_fingerprints_available(event.chat_control) def new_fingerprints_available(self, chat_control): jid = chat_control.contact.jid account = chat_control.account omemo = self.get_omemo(account) if isinstance(chat_control, GroupchatControl): for jid_ in omemo.backend.get_muc_members(chat_control.room_jid, without_self=False): fingerprints = omemo.backend.storage.getNewFingerprints(jid_) if fingerprints: self._show_fingerprint_window( chat_control, fingerprints) break elif not isinstance(chat_control, GroupchatControl): fingerprints = omemo.backend.storage.getNewFingerprints(jid) if fingerprints: self._show_fingerprint_window( chat_control, fingerprints) def _show_fingerprint_window(self, chat_control, fingerprints=None): contact = chat_control.contact account = chat_control.account omemo = self.get_omemo(account) transient = chat_control.parent_win.window if 'dialog' not in self._windows: is_groupchat = isinstance(chat_control, GroupchatControl) self._windows['dialog'] = \ KeyDialog(self, contact, transient, self._windows, groupchat=is_groupchat) if fingerprints: log.debug('%s => Showing Fingerprint Prompt for %s', account, contact.jid) omemo.backend.storage.setShownFingerprints(fingerprints) else: self._windows['dialog'].present() self._windows['dialog'].update() if fingerprints: omemo.backend.storage.setShownFingerprints(fingerprints) @staticmethod def print_message(chat_control, kind): msg = None if kind == UserMessages.QUERY_DEVICES: msg = _('No devices found. Query in progress...') elif kind == UserMessages.NO_FINGERPRINTS: msg = _('To send an encrypted message, you have to ' 'first trust the fingerprint of your contact!') elif kind == UserMessages.UNDECIDED_FINGERPRINTS: msg = _('You have undecided fingerprints') if msg is None: return chat_control.print_conversation_line(msg, 'status', '', None)