# Copyright (C) 2011-2017 Yann Leboulanger # Copyright (C) 2022 Philipp Hörist # # This file is part of Gajim. # # Gajim is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published # by the Free Software Foundation; version 3 only. # # Gajim is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Gajim. If not, see . # from __future__ import annotations from typing import Any from typing import Union import logging from functools import partial from gajim.common import app from gajim.common import ged from gajim.common.const import PROPAGATE_EVENT from gajim.common.const import STOP_EVENT from gajim.common.events import Notification from gajim.common.events import GcMessageReceived from gajim.common.events import MessageReceived from gajim.common.events import Notification from gajim.common.helpers import exec_command from gajim.common.helpers import play_sound_file from gajim.plugins import GajimPlugin from gajim.plugins.plugins_i18n import _ from triggers.util import log_result from triggers.util import RuleResult from triggers.gtk.config import ConfigDialog log = logging.getLogger('gajim.p.triggers') MessageEventsT = Union[GcMessageReceived, MessageReceived] RuleT = dict[str, Any] class Triggers(GajimPlugin): def init(self): self.description = _( 'Configure Gajim’s behaviour with triggers for each contact') self.config_dialog = partial(ConfigDialog, self) self.config_default_values = {} self.events_handlers = { 'notification': (ged.PREGUI, self._on_notification), 'message-received': (ged.PREGUI2, self._on_message_received), 'gc-message-received': (ged.PREGUI2, self._on_message_received), # 'presence-received': (ged.PREGUI, self._on_presence_received), } def _on_notification(self, event: Notification) -> bool: log.info('Process %s, %s', event.name, event.type) result = self._check_all(event, self._check_rule_apply_notification, self._apply_rule) log.info('Result: %s', result) return self._excecute_notification_rules(result, event) def _on_message_received(self, event: MessageEventsT) -> bool: log.info('Process %s', event.name) result = self._check_all(event, self._check_rule_apply_msg_received, self._apply_rule) log.info('Result: %s', result) return self._excecute_message_rules(result) def _on_presence_received(self, event): # TODO return if event.old_show < 2 and event.new_show > 1: check_func = self._check_rule_apply_connected elif event.old_show > 1 and event.new_show < 2: check_func = self._check_rule_apply_disconnected else: check_func = self._check_rule_apply_status_changed self._check_all(event, check_func, self._apply_rule) def _check_all(self, event, check_func, apply_func) -> RuleResult: result = RuleResult() rules_num = [int(item) for item in self.config.keys()] rules_num.sort() to_remove = [] for num in rules_num: rule = self.config[str(num)] if check_func(event, rule): apply_func(result, rule) if 'one_shot' in rule and rule['one_shot']: to_remove.append(num) decal = 0 num = 0 while str(num) in self.config: if num + decal in to_remove: num2 = num while str(num2 + 1) in self.config: self.config[str(num2)] = self.config[str(num2 + 1)].copy() num2 += 1 del self.config[str(num2)] decal += 1 else: num += 1 return result @log_result def _check_rule_apply_msg_received(self, event: MessageEventsT, rule: RuleT) -> bool: return self._check_rule_all('message_received', event, rule) @log_result def _check_rule_apply_connected(self, event, rule: RuleT) -> bool: return self._check_rule_all('contact_connected', event, rule) @log_result def _check_rule_apply_disconnected(self, event, rule: RuleT) -> bool: return self._check_rule_all('contact_disconnected', event, rule) @log_result def _check_rule_apply_status_changed(self, event, rule: RuleT) -> bool: return self._check_rule_all('contact_status_change', event, rule) @log_result def _check_rule_apply_notification(self, event: Notification, rule: RuleT) -> bool: # Check notification type notif_type = '' if event.type == 'incoming-message': notif_type = 'message_received' if event.type == 'pres': # TODO: if (event.base_event.old_show < 2 and event.base_event.new_show > 1): notif_type = 'contact_connected' elif (event.base_event.old_show > 1 and event.base_event.new_show < 2): notif_type = 'contact_disconnected' else: notif_type = 'contact_status_change' return self._check_rule_all(notif_type, event, rule) def _check_rule_all(self, notif_type: str, event: Any, rule: RuleT) -> bool: # Check notification type if rule['event'] != notif_type: return False # notification type is ok. Now check recipient if not self._check_rule_recipients(event, rule): return False # recipient is ok. Now check our status if not self._check_rule_status(event, rule): return False # our_status is ok. Now check opened chat window if not self._check_rule_tab_opened(event, rule): return False # tab_opened is ok. Now check opened chat window if not self._check_rule_has_focus(event, rule): return False # All is ok return True @log_result def _check_rule_recipients(self, event, rule: RuleT) -> bool: rule_recipients = [t.strip() for t in rule['recipients'].split(',')] if rule['recipient_type'] == 'groupchat': if event.jid in rule_recipients: return True return False if (rule['recipient_type'] == 'contact' and event.jid not in rule_recipients): return False client = app.get_client(event.account) contact = client.get_module('Contacts').get_contact(event.jid) if contact.is_groupchat or not contact.is_in_roster: return False group_found = False for group in contact.groups: if group in rule_recipients: group_found = True break if rule['recipient_type'] == 'group' and not group_found: return False return True @log_result def _check_rule_status(self, event, rule: RuleT) -> bool: rule_statuses = rule['status'].split() our_status = app.connections[event.account].status if rule['status'] != 'all' and our_status not in rule_statuses: return False return True @log_result def _check_rule_tab_opened(self, event, rule: RuleT) -> bool: if rule['tab_opened'] == 'both': return True tab_opened = False if app.window.get_control(event.account, event.jid): tab_opened = True if tab_opened and rule['tab_opened'] == 'no': return False elif not tab_opened and rule['tab_opened'] == 'yes': return False return True @log_result def _check_rule_has_focus(self, event, rule: RuleT) -> bool: if rule['has_focus'] == 'both': return True if rule['tab_opened'] == 'no': # Does not apply in this case return True ctrl = app.window.get_control(event.account, event.jid) if not ctrl: # Does not apply in this case return True has_focus = ctrl.has_focus() if has_focus and rule['has_focus'] == 'no': return False elif not has_focus and rule['has_focus'] == 'yes': return False return True def _apply_rule(self, result: RuleResult, rule: RuleT) -> None: if rule['sound'] == 'no': result.sound = False result.sound_file = None elif rule['sound'] == 'yes': result.sound = False result.sound_file = rule['sound_file'] if rule['run_command']: result.command = rule['command'] if rule['popup'] == 'no': result.show_notification = False elif rule['popup'] == 'yes': result.show_notification = True def _excecute_notification_rules(self, result: RuleResult, event: Notification) -> bool: if result.sound is False: event.sound = None if result.sound_file is not None: play_sound_file(result.sound_file) if result.show_notification is False: return STOP_EVENT return PROPAGATE_EVENT def _excecute_message_rules(self, result: RuleResult) -> bool: if result.command is not None: try: exec_command(result.command, use_shell=True) except Exception: pass return PROPAGATE_EVENT