From 25bf2d4ed52329a567a881fa1e7be0045a1dcf17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20H=C3=B6rist?= Date: Wed, 25 Dec 2019 23:52:56 +0100 Subject: [PATCH] [preview] Rewrite plugin --- url_image_preview/config_dialog.py | 26 +- url_image_preview/context_menu.ui | 10 +- url_image_preview/http_functions.py | 115 --- url_image_preview/resize_gif.py | 90 --- url_image_preview/url_image_preview.py | 1017 ++++++++++-------------- url_image_preview/utils.py | 353 ++++++++ 6 files changed, 794 insertions(+), 817 deletions(-) delete mode 100644 url_image_preview/http_functions.py delete mode 100644 url_image_preview/resize_gif.py create mode 100644 url_image_preview/utils.py diff --git a/url_image_preview/config_dialog.py b/url_image_preview/config_dialog.py index 51f00a3..d87bf0f 100644 --- a/url_image_preview/config_dialog.py +++ b/url_image_preview/config_dialog.py @@ -21,7 +21,6 @@ from gi.repository import GObject from gi.repository import Gtk from gajim.gtk.settings import SettingsDialog -from gajim.gtk.settings import GenericSetting from gajim.gtk.settings import SpinSetting from gajim.gtk.const import Setting from gajim.gtk.const import SettingKind @@ -40,16 +39,11 @@ class UrlImagePreviewConfigDialog(SettingsDialog): ('10485760', '10 MiB')] actions = [ - ('open_menuitem', _('Open')), - ('save_as_menuitem', _('Save as')), - ('open_folder_menuitem', _('Open Folder')), - ('copy_link_location_menuitem', _('Copy Link Location')), - ('open_link_in_browser_menuitem', _('Open Link in Browser'))] - - geo_providers = [ - ('no_preview', _('No map preview')), - ('Google', _('Google Maps')), - ('OSM', _('OpenStreetMap'))] + ('open', _('Open')), + ('save_as', _('Save as')), + ('open_folder', _('Open Folder')), + ('copy_link_location', _('Copy Link Location')), + ('open_link_in_browser', _('Open Link in Browser'))] self.plugin = plugin settings = [ @@ -77,12 +71,6 @@ class UrlImagePreviewConfigDialog(SettingsDialog): desc=_('Action when left clicking a preview'), props={'combo_items': actions}), - Setting(SettingKind.COMBO, _('Map Service'), - SettingType.VALUE, self.plugin.config['GEO_PREVIEW_PROVIDER'], - callback=self.on_setting, data='GEO_PREVIEW_PROVIDER', - desc=_('Service used for map previews'), - props={'combo_items': geo_providers}), - Setting(SettingKind.SWITCH, _('HTTPS Verification'), SettingType.VALUE, self.plugin.config['VERIFY'], desc=_('Whether to check for a valid certificate'), @@ -91,8 +79,8 @@ class UrlImagePreviewConfigDialog(SettingsDialog): SettingsDialog.__init__(self, parent, _('UrlImagePreview Configuration'), Gtk.DialogFlags.MODAL, settings, None, - extend=[ - ('PreviewSizeSpinSetting', SizeSpinSetting)]) + extend=[('PreviewSizeSpinSetting', + SizeSpinSetting)]) def on_setting(self, value, data): self.plugin.config[data] = value diff --git a/url_image_preview/context_menu.ui b/url_image_preview/context_menu.ui index 93d2ea6..d472025 100644 --- a/url_image_preview/context_menu.ui +++ b/url_image_preview/context_menu.ui @@ -5,7 +5,7 @@ False - + True False _Open @@ -13,7 +13,7 @@ - + True False _Save as @@ -21,7 +21,7 @@ - + True False Open _Folder @@ -35,7 +35,7 @@ - + True False _Copy Link @@ -43,7 +43,7 @@ - + True False Open Link in _Browser diff --git a/url_image_preview/http_functions.py b/url_image_preview/http_functions.py deleted file mode 100644 index 19037a4..0000000 --- a/url_image_preview/http_functions.py +++ /dev/null @@ -1,115 +0,0 @@ -# -*- coding: utf-8 -*- -## -## This file is part of Gajim. -## -## Gajim is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published -## by the Free Software Foundation; version 3 only. -## -## Gajim is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with Gajim. If not, see . -## - -import urllib.request as urllib2 -import socket -import ssl -import logging -import os - -from gajim.common import app -from gajim.plugins.plugins_i18n import _ - - -if os.name == 'nt': - import certifi - -log = logging.getLogger('gajim.p.preview.http_functions') - -def get_http_head(account, url, verify): - return _get_http_head_direct(url, verify) - -def get_http_file(account, attrs): - return _get_http_direct(attrs) - -def _get_http_head_direct(url, verify): - log.info('Head request direct for URL: %s', url) - try: - req = urllib2.Request(url) - req.get_method = lambda: 'HEAD' - req.add_header('User-Agent', 'Gajim %s' % app.version) - if not verify: - context = ssl.create_default_context() - context.check_hostname = False - context.verify_mode = ssl.CERT_NONE - log.warning('CERT Verification disabled') - file_ = urllib2.urlopen(req, timeout=30, context=context) - else: - if os.name == 'nt': - file_ = urllib2.urlopen(req, cafile=certifi.where()) - else: - file_ = urllib2.urlopen(req) - except Exception as ex: - log.debug('Error', exc_info=True) - return ('', 0) - ctype = file_.headers['Content-Type'] - clen = file_.headers['Content-Length'] - try: - clen = int(clen) - except (TypeError, ValueError): - pass - return (ctype, clen) - -def _get_http_direct(attrs): - ''' - Download a file. This function should - be launched in a separated thread. - ''' - log.info('Get request direct for URL: %s', attrs['src']) - mem, alt, max_size = b'', '', 2 * 1024 * 1024 - if 'max_size' in attrs: - max_size = attrs['max_size'] - try: - req = urllib2.Request(attrs['src']) - req.add_header('User-Agent', 'Gajim ' + app.version) - if not attrs['verify']: - context = ssl.create_default_context() - context.check_hostname = False - context.verify_mode = ssl.CERT_NONE - log.warning('CERT Verification disabled') - file_ = urllib2.urlopen(req, timeout=30, context=context) - else: - if os.name == 'nt': - file_ = urllib2.urlopen(req, cafile=certifi.where()) - else: - file_ = urllib2.urlopen(req) - except Exception as ex: - log.debug('Error', exc_info=True) - pixbuf = None - alt = attrs.get('alt', 'Broken image') - else: - while True: - try: - temp = file_.read(100) - except socket.timeout as ex: - log.debug('Timeout loading image %s', attrs['src'] + str(ex)) - alt = attrs.get('alt', '') - if alt: - alt += '\n' - alt += _('Timeout loading image') - break - if temp: - mem += temp - else: - break - if len(mem) > max_size: - alt = attrs.get('alt', '') - if alt: - alt += '\n' - alt += _('Image is too big') - break - return (mem, alt) diff --git a/url_image_preview/resize_gif.py b/url_image_preview/resize_gif.py deleted file mode 100644 index 0b38d0a..0000000 --- a/url_image_preview/resize_gif.py +++ /dev/null @@ -1,90 +0,0 @@ -from io import BytesIO -from PIL import Image - - -def resize_gif(mem, path, resize_to): - frames, result = extract_and_resize_frames(mem, resize_to) - - if len(frames) == 1: - frames[0].save(path, optimize=True) - else: - frames[0].save(path, - optimize=True, - save_all=True, - append_images=frames[1:], - duration=result['duration'], - loop=1000) - - -def analyse_image(mem): - ''' - Pre-process pass over the image to determine the mode (full or additive). - Necessary as assessing single frames isn't reliable. Need to know the mode - before processing all frames. - ''' - image = Image.open(BytesIO(mem)) - results = { - 'size': image.size, - 'mode': 'full', - 'duration': image.info.get('duration', 0) - } - - try: - while True: - if image.tile: - tile = image.tile[0] - update_region = tile[1] - update_region_dimensions = update_region[2:] - if update_region_dimensions != image.size: - results['mode'] = 'partial' - break - image.seek(image.tell() + 1) - except EOFError: - pass - return results - - -def extract_and_resize_frames(mem, resize_to): - result = analyse_image(mem) - image = Image.open(BytesIO(mem)) - - i = 0 - palette = image.getpalette() - last_frame = image.convert('RGBA') - - frames = [] - - try: - while True: - ''' - If the GIF uses local colour tables, - each frame will have its own palette. - If not, we need to apply the global palette to the new frame. - ''' - if not image.getpalette(): - image.putpalette(palette) - - new_frame = Image.new('RGBA', image.size) - - ''' - Is this file a "partial"-mode GIF where frames update a region - of a different size to the entire image? - If so, we need to construct the new frame by - pasting it on top of the preceding frames. - ''' - if result['mode'] == 'partial': - new_frame.paste(last_frame) - - new_frame.paste(image, (0, 0), image.convert('RGBA')) - - # This method preservs aspect ratio - new_frame.thumbnail(resize_to, Image.ANTIALIAS) - frames.append(new_frame) - - i += 1 - last_frame = new_frame - image.seek(image.tell() + 1) - except EOFError: - pass - - return frames, result diff --git a/url_image_preview/url_image_preview.py b/url_image_preview/url_image_preview.py index a1e54b8..a8d4e0d 100644 --- a/url_image_preview/url_image_preview.py +++ b/url_image_preview/url_image_preview.py @@ -1,52 +1,48 @@ -# -*- coding: utf-8 -*- -## -## This file is part of Gajim. -## -## Gajim is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published -## by the Free Software Foundation; version 3 only. -## -## Gajim is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License -## along with Gajim. If not, see . -## +# This file is part of Image Preview Gajim Plugin. +# +# Image Preview Gajim Plugin is free software; +# you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published +# by the Free Software Foundation; version 3 only. +# +# Image Preview Gajim Plugin is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Image Preview Gajim Plugin. +# If not, see . import os -import hashlib -import binascii import logging -import math import shutil - +from pathlib import Path from functools import partial from urllib.parse import urlparse from urllib.parse import unquote -from io import BytesIO from gi.repository import Gtk from gi.repository import Gdk from gi.repository import GLib -from gi.repository import GdkPixbuf +from gi.repository import Soup from gajim.common import app from gajim.common import configpaths from gajim.common.helpers import open_file from gajim.common.helpers import open_uri -from gajim.gtkgui_helpers import add_css_to_widget +from gajim.common.helpers import write_file_async +from gajim.common.helpers import load_file_async +from gajim.common.helpers import get_tls_error_phrase from gajim.gtk.dialogs import ErrorDialog from gajim.gtk.filechoosers import FileSaveDialog -from gajim.gtk.util import get_cursor +from gajim.gtk.util import load_icon +from gajim.gtk.util import get_monitor_scale_factor from gajim.plugins import GajimPlugin -from gajim.plugins.helpers import log_calls +from gajim.plugins.helpers import get_builder from gajim.plugins.plugins_i18n import _ -from url_image_preview.http_functions import get_http_head -from url_image_preview.http_functions import get_http_file from url_image_preview.config_dialog import UrlImagePreviewConfigDialog @@ -54,658 +50,503 @@ log = logging.getLogger('gajim.p.preview') ERROR_MSG = None try: - from PIL import Image - from url_image_preview.resize_gif import resize_gif + from PIL import Image # pylint: disable=unused-import except ImportError: - log.debug('Pillow not available') + log.error('Pillow not available') ERROR_MSG = _('Please install python-pillow') try: - if os.name == 'nt': - from cryptography.hazmat.backends.openssl import backend - else: - from cryptography.hazmat.backends import default_backend - from cryptography.hazmat.primitives.ciphers import Cipher - from cryptography.hazmat.primitives.ciphers import algorithms - from cryptography.hazmat.primitives.ciphers.modes import GCM - decryption_available = True + import cryptography # pylint: disable=unused-import except Exception: - DEP_MSG = _('To enable previews for encrypted images, ' - 'please install python-cryptography!') - log.exception('Error') - log.info('Decryption/Encryption disabled due to errors') - decryption_available = False + ERROR_MSG = _('Please install python-cryptography') + log.error('python-cryptography not available') -ACCEPTED_MIME_TYPES = ('image/png', 'image/jpeg', 'image/gif', 'image/raw', - 'image/svg+xml', 'image/x-ms-bmp') +# pylint: disable=ungrouped-imports +if ERROR_MSG is None: + from url_image_preview.utils import aes_decrypt + from url_image_preview.utils import get_image_paths + from url_image_preview.utils import split_geo_uri + from url_image_preview.utils import parse_fragment + from url_image_preview.utils import create_thumbnail + from url_image_preview.utils import pixbuf_from_data + from url_image_preview.utils import create_clickable_image + from url_image_preview.utils import filename_from_uri +# pylint: enable=ungrouped-imports + +ACCEPTED_MIME_TYPES = [ + 'image/png', + 'image/jpeg', + 'image/gif', + 'image/raw', + 'image/svg+xml', + 'image/x-ms-bmp', +] class UrlImagePreviewPlugin(GajimPlugin): - @log_calls('UrlImagePreviewPlugin') def init(self): + # pylint: disable=attribute-defined-outside-init if ERROR_MSG: self.activatable = False self.available_text = ERROR_MSG self.config_dialog = None return - if not decryption_available: - self.available_text = DEP_MSG self.config_dialog = partial(UrlImagePreviewConfigDialog, self) + self.gui_extension_points = { - 'chat_control_base': (self.connect_with_chat_control, - self.disconnect_from_chat_control), - 'history_window': - (self.connect_with_history, self.disconnect_from_history), - 'print_real_text': (self.print_real_text, None), } + 'chat_control_base': (self._on_connect_chat_control_base, + self._on_disconnect_chat_control_base), + 'history_window': (self._on_connect_history_window, + self._on_disconnect_history_window), + 'print_real_text': (self._print_real_text, None), } + self.config_default_values = { 'PREVIEW_SIZE': (150, 'Preview size (100-1000)'), 'MAX_FILE_SIZE': (5242880, 'Max file size for image preview'), 'ALLOW_ALL_IMAGES': (False, ''), 'LEFTCLICK_ACTION': ('open_menuitem', 'Open'), 'ANONYMOUS_MUC': (False, ''), - 'GEO_PREVIEW_PROVIDER': ('Google', 'Google Maps'), 'VERIFY': (True, ''),} - self.controls = {} - self.history_window_control = None - @log_calls('UrlImagePreviewPlugin') - def connect_with_chat_control(self, chat_control): - account = chat_control.contact.account.name - jid = chat_control.contact.jid - if account not in self.controls: - self.controls[account] = {} - self.controls[account][jid] = Base(self, chat_control.conv_textview) + self._textviews = {} - @log_calls('UrlImagePreviewPlugin') - def disconnect_from_chat_control(self, chat_control): - account = chat_control.contact.account.name - jid = chat_control.contact.jid - self.controls[account][jid].deinit_handlers() - del self.controls[account][jid] - - @log_calls('UrlImagePreviewPlugin') - def connect_with_history(self, history_window): - if self.history_window_control: - self.history_window_control.deinit_handlers() - self.history_window_control = Base( - self, history_window.history_textview) - - @log_calls('UrlImagePreviewPlugin') - def disconnect_from_history(self, history_window): - if self.history_window_control: - self.history_window_control.deinit_handlers() - self.history_window_control = None - - def print_real_text(self, tv, real_text, text_tags, graphics, - iter_, additional_data): - if tv.used_in_history_window and self.history_window_control: - self.history_window_control.print_real_text( - real_text, text_tags, graphics, iter_, additional_data) - - account = tv.account - for jid in self.controls[account]: - if self.controls[account][jid].textview != tv: - continue - self.controls[account][jid].print_real_text( - real_text, text_tags, graphics, iter_, additional_data) - return - - -class Base: - def __init__(self, plugin, textview): - self.plugin = plugin - self.textview = textview - self.handlers = {} + self._session = Soup.Session() + self._session.add_feature_by_type(Soup.ContentSniffer) + self._session.props.https_aliases = ['aesgcm'] + self._session.props.ssl_strict = False self.directory = os.path.join(configpaths.get('MY_DATA'), 'downloads') self.thumbpath = os.path.join(configpaths.get('MY_CACHE'), 'downloads.thumb') - try: - self._create_path(self.directory) - self._create_path(self.thumbpath) - except Exception: - log.error('Error creating download and/or thumbnail folder!') - raise + if GLib.mkdir_with_parents(self.directory, 0o700) != 0: + log.error('Failed to create: %s', self.directory) - def deinit_handlers(self): - # Remove all register handlers on wigets, created by self.xml - # to prevent circular references among objects - for i in list(self.handlers.keys()): - if self.handlers[i].handler_is_connected(i): - self.handlers[i].disconnect(i) - del self.handlers[i] + if GLib.mkdir_with_parents(self.thumbpath, 0o700) != 0: + log.error('Failed to create: %s', self.directory) - def print_real_text(self, real_text, text_tags, graphics, iter_, - additional_data): + self._migrate_config() - if len(real_text.split(' ')) > 1: + def _migrate_config(self): + action = self.config['LEFTCLICK_ACTION'] + if action.endswith('_menuitem'): + self.config['LEFTCLICK_ACTION'] = action[:-9] + + def _on_connect_chat_control_base(self, chat_control): + self._textviews[chat_control.control_id] = chat_control.conv_textview + + def _on_disconnect_chat_control_base(self, chat_control): + self._textviews.pop(chat_control.control_id, None) + + def _on_connect_history_window(self, history_window): + self._textviews[id(history_window)] = history_window.history_textview + + def _on_disconnect_history_window(self, history_window): + self._textviews.pop(id(history_window), None) + + def _get_control_id(self, textview): + for control_id, textview_ in self._textviews.items(): + if textview == textview_: + return control_id + + def _print_real_text(self, textview, text, _text_tags, _graphics, + iter_, additional_data): + + if len(text.split(' ')) > 1: # urlparse doesn't recognise spaces as URL delimiter - log.debug('Url with text will not be displayed: %s', real_text) + log.debug('Text is not an uri: %s', text[:15]) return - urlparts = urlparse(unquote(real_text)) - if not self._accept_uri(urlparts, real_text, additional_data): + uri = text + urlparts = urlparse(unquote(uri)) + if not self._accept_uri(urlparts, uri, additional_data): return - # Don't print the URL in the message window (in the calling function) - self.textview.plugin_modified = True + textview.plugin_modified = True + control_id = self._get_control_id(textview) - buffer_ = self.textview.tv.get_buffer() + start_mark, end_mark = self._print_text(textview.tv.get_buffer(), + iter_, + uri) + + if uri.startswith('geo:'): + preview = self._process_geo_uri(uri, + start_mark, + end_mark, + control_id) + if preview is None: + return + pixbuf = load_icon('map', + size=preview.size, + scale=get_monitor_scale_factor(), + pixbuf=True) + self._update_textview(pixbuf, preview) + return + + preview = self._process_web_uri(uri, + urlparts, + start_mark, + end_mark, + control_id) + + if not preview.orig_exists(): + self._download_content(preview) + + elif not preview.thumb_exists(): + load_file_async(preview.orig_path, + self._on_orig_load_finished, + preview) + + else: + load_file_async(preview.thumb_path, + self._on_thumb_load_finished, + preview) + + @staticmethod + def _print_text(buffer_, iter_, text): if not iter_: iter_ = buffer_.get_end_iter() - # Show URL, until image is loaded (if ever) - ttt = buffer_.get_tag_table() - repl_start = buffer_.create_mark(None, iter_, True) - buffer_.insert_with_tags( - iter_, - real_text, - *[(ttt.lookup(t) if isinstance(t, str) else t) for t in ['url']]) - repl_end = buffer_.create_mark(None, iter_, True) + start_mark = buffer_.create_mark(None, iter_, True) + buffer_.insert_with_tags_by_name(iter_, text, 'url') + end_mark = buffer_.create_mark(None, iter_, True) + return start_mark, end_mark - # Handle geo:-URIs - if real_text.startswith('geo:'): - if self.plugin.config['GEO_PREVIEW_PROVIDER'] == 'no_preview': - return - size = self.plugin.config['PREVIEW_SIZE'] - geo_provider = self.plugin.config['GEO_PREVIEW_PROVIDER'] - key = '' - iv = '' - encrypted = False - ext = '.png' - color = 'blue' - zoom = 16 - location = real_text[4:] - lat, _, lon = location.partition(',') - if lon == '': - return - - filename = 'location_%(provider)s_%(location)s' % { - 'provider': geo_provider, - 'location': location.replace(',', '_').replace('.', '-') - } - newfilename = filename + ext - thumbfilename = '%(filename)s_thumb_%(size)s%(ext)s' % { - 'filename': filename, - 'size': str(self.plugin.config['PREVIEW_SIZE']), - 'ext': ext - } - - filepath = os.path.join(self.directory, newfilename) - thumbpath = os.path.join(self.thumbpath, thumbfilename) - filepaths = [filepath, thumbpath] - - # Google - if geo_provider == 'Google': - url = 'https://maps.googleapis.com/maps/api/staticmap?' \ - 'center={}&zoom={}&size={}x{}&markers=color:{}' \ - '|label:.|{}'.format(location, zoom, size, size, - color, location) - weburl = 'https://www.google.com/maps/place/{}' \ - .format(location) - real_text = url - else: - # OpenStreetMap / MapQuest - apikey = 'F7x36jLVv2hiANVAXmhwvUB044XvGASh' - - url = 'https://open.mapquestapi.com/staticmap/v4/' \ - 'getmap?key={}¢er={}&zoom={}&size={},{}&type=map' \ - '&imagetype=png&pois={},{}&scalebar=false' \ - .format(apikey, location, zoom, size, size, color, - location) - weburl = 'http://www.openstreetmap.org/' \ - '?mlat={}&mlon={}#map={}/{}/{}&layers=N' \ - .format(lat, lon, zoom, lat, lon) - real_text = url - else: - weburl = real_text - filename = os.path.basename(urlparts.path) - ext = os.path.splitext(filename)[1] - name = os.path.splitext(filename)[0] - if len(name) > 90: - # Many Filesystems have a limit on filename length - # Most have 255, some encrypted ones only 143 - # We add around 50 chars for the hash, - # so the filename should not exceed 90 - name = name[:90] - namehash = hashlib.sha1(real_text.encode('utf-8')).hexdigest() - newfilename = '%(name)s_%(namehash)s%(ext)s' % { - 'name': name, - 'namehash': namehash, - 'ext': ext - } - thumbfilename = '%(name)s_%(namehash)s_thumb_%(size)s%(ext)s' % { - 'name': name, - 'namehash': namehash, - 'size': str(self.plugin.config['PREVIEW_SIZE']), - 'ext': ext - } - - filepath = os.path.join(self.directory, newfilename) - thumbpath = os.path.join(self.thumbpath, thumbfilename) - filepaths = [filepath, thumbpath] - - key = '' - iv = '' - encrypted = False - if urlparts.fragment: - fragment = binascii.unhexlify(urlparts.fragment) - key = fragment[16:] - iv = fragment[:16] - if len(key) == 32 and len(iv) == 16: - encrypted = True - if not encrypted: - key = fragment[12:] - iv = fragment[:12] - if len(key) == 32 and len(iv) == 12: - encrypted = True - - # File exists but thumbnail got deleted - if os.path.exists(filepath) and not os.path.exists(thumbpath): - if urlparts.scheme == 'geo': - real_text = weburl - with open(filepath, 'rb') as file_to_preview: - mem = file_to_preview.read() - app.thread_interface( - self._save_thumbnail, [thumbpath, mem], - self._update_img, [real_text, repl_start, - repl_end, filepath, encrypted]) - - # Display thumbnail if already downloaded - # (but only if file exists) - elif os.path.exists(filepath) and os.path.exists(thumbpath): - if urlparts.scheme == 'geo': - real_text = weburl - app.thread_interface( - self._load_thumbnail, [thumbpath], - self._update_img, [real_text, repl_start, - repl_end, filepath, encrypted]) - - # Or download file, calculate thumbnail, and finally display it - else: - if encrypted and not decryption_available: - log.debug('Please install Crytography to decrypt pictures') - else: - # First get the http head request - # which does not fetch data, just headers - # then check the mime type and filesize - if urlparts.scheme == 'aesgcm': - real_text = 'https://' + real_text[9:] - verify = self.plugin.config['VERIFY'] - app.thread_interface( - get_http_head, [self.textview.account, real_text, verify], - self._check_mime_size, [real_text, weburl, repl_start, - repl_end, filepaths, key, iv, - encrypted]) - - def _accept_uri(self, urlparts, real_text, additional_data): + def _accept_uri(self, urlparts, uri, additional_data): try: oob_url = additional_data['gajim']['oob_url'] except (KeyError, AttributeError): oob_url = None - if not urlparts.netloc: - log.info('No netloc found in URL %s', real_text) - return False - # geo if urlparts.scheme == 'geo': - if self.plugin.config['GEO_PREVIEW_PROVIDER'] == 'no_preview': - log.info('geo: link preview is disabled') - return False return True + if not urlparts.netloc: + log.info('No netloc found in URL: %s', uri) + return False + # aesgcm if urlparts.scheme == 'aesgcm': return True - # https - if urlparts.scheme == 'https': - if real_text == oob_url or self.plugin.config['ALLOW_ALL_IMAGES']: + # http/https + if urlparts.scheme in ('https', 'http'): + if self.config['ALLOW_ALL_IMAGES']: return True - log.info('Incorrect oob data found') - return False - log.info('Unsupported URI scheme found: %s', real_text) + if oob_url is None: + log.info('No oob url for: %s', uri) + return False + + if uri != oob_url: + log.info('uri != oob url: %s != %s', uri, oob_url) + return False + return True + + log.info('Unsupported URI scheme: %s', uri) return False - def _save_thumbnail(self, thumbpath, mem): - size = self.plugin.config['PREVIEW_SIZE'] - + @staticmethod + def _process_geo_uri(uri, start_mark, end_mark, control_id): try: - loader = GdkPixbuf.PixbufLoader() - loader.write(mem) - loader.close() - if loader.get_format().get_name() == 'gif': - pixbuf = loader.get_animation() - else: - pixbuf = loader.get_pixbuf() - except GLib.GError as error: - log.info('Failed to load image using Gdk.Pixbuf') - log.debug(error) - - # Try Pillow - image = Image.open(BytesIO(mem)).convert('RGBA') - array = GLib.Bytes.new(image.tobytes()) - width, height = image.size - pixbuf = GdkPixbuf.Pixbuf.new_from_bytes( - array, GdkPixbuf.Colorspace.RGB, True, - 8, width, height, width * 4) - - try: - self._create_path(os.path.dirname(thumbpath)) - thumbnail = pixbuf - if isinstance(pixbuf, GdkPixbuf.PixbufAnimation): - if size < pixbuf.get_width() or size < pixbuf.get_height(): - resize_gif(mem, thumbpath, (size, size)) - thumbnail = self._load_thumbnail(thumbpath) - else: - self._write_file(thumbpath, mem) - else: - width, height = self._get_thumbnail_size(pixbuf, size) - thumbnail = pixbuf.scale_simple( - width, height, GdkPixbuf.InterpType.BILINEAR) - thumbnail.savev(thumbpath, 'png', [], []) + split_geo_uri(uri) except Exception as error: - GLib.idle_add( - self._raise_error_dialog, - _('Could not save file'), - _('Exception raised while saving thumbnail ' - 'for image file (see error log for more ' - 'information)')) - log.exception(error) - return - return thumbnail - - @staticmethod - def _get_thumbnail_size(pixbuf, size): - # Calculates the new thumbnail size while preserving the aspect ratio - image_width = pixbuf.get_width() - image_height = pixbuf.get_height() - - if image_width > image_height: - if image_width > size: - image_height = math.ceil((size / float(image_width) * image_height)) - image_width = int(size) - else: - if image_height > size: - image_width = math.ceil((size / float(image_height) * image_width)) - image_height = int(size) - - return image_width, image_height - - @staticmethod - def _load_thumbnail(thumbpath): - ext = os.path.splitext(thumbpath)[1] - if ext == '.gif': - return GdkPixbuf.PixbufAnimation.new_from_file(thumbpath) - return GdkPixbuf.Pixbuf.new_from_file(thumbpath) - - @staticmethod - def _write_file(path, data): - log.info('Writing \'%s\' of size %d...', path, len(data)) - try: - with open(path, 'wb') as output_file: - output_file.write(data) - output_file.closed - except Exception as e: - log.error('Failed to write file \'%s\'!', path) - raise - - def _get_at_end(self): - return self.textview.autoscroll - - def _scroll_to_end(self): - self.textview.scroll_to_end() - - def _update_img(self, pixbuf, url, repl_start, repl_end, - filepath, encrypted): - if pixbuf is None: - # If image could not be downloaded, URL is already displayed - log.error('Could not download image for URL: %s', url) + log.error(uri) + log.error(error) return - urlparts = urlparse(unquote(url)) - filename = os.path.basename(urlparts.path) - if os.path.basename(filepath).startswith('location_'): - filename = os.path.basename(filepath) + return Preview(uri, + None, + None, + None, + start_mark, + end_mark, + 96, + control_id) - def add_to_textview(): - try: - at_end = self._get_at_end() + def _process_web_uri(self, uri, urlparts, start_mark, end_mark, control_id): + size = self.config['PREVIEW_SIZE'] + orig_path, thumb_path = get_image_paths(uri, + urlparts, + size, + self.directory, + self.thumbpath) + return Preview(uri, + urlparts, + orig_path, + thumb_path, + start_mark, + end_mark, + size, + control_id) - buffer_ = repl_start.get_buffer() - iter_ = buffer_.get_iter_at_mark(repl_start) - buffer_.insert(iter_, '\n') - anchor = buffer_.create_child_anchor(iter_) - anchor.plaintext = url - - image = self._create_clickable_image(pixbuf, url) - - self.textview.tv.add_child_at_anchor(image, anchor) - buffer_.delete(iter_, - buffer_.get_iter_at_mark(repl_end)) - - image.connect( - 'button-press-event', self._on_button_press_event, - filepath, filename, url, encrypted) - image.get_window().set_cursor(get_cursor('pointer')) - - if at_end: - self._scroll_to_end() - except Exception as ex: - log.exception('Exception while loading %s: %s', url, ex) - return False - # Add to mainloop --> make call threadsafe - GLib.idle_add(add_to_textview) - - def _create_clickable_image(self, pixbuf, url): - if isinstance(pixbuf, GdkPixbuf.PixbufAnimation): - image = Gtk.Image.new_from_animation(pixbuf) - else: - image = Gtk.Image.new_from_pixbuf(pixbuf) - - css = '''#Preview { - box-shadow: 0px 0px 3px 0px alpha(@theme_text_color, 0.2); - margin: 5px 10px 5px 10px; }''' - add_css_to_widget(image, css) - image.set_name('Preview') - - event_box = Gtk.EventBox() - event_box.set_tooltip_text(url) - event_box.add(image) - event_box.show_all() - return event_box - - def _check_mime_size(self, tuple_arg, - url, weburl, repl_start, repl_end, filepaths, - key, iv, encrypted): - file_mime, file_size = tuple_arg - # Check if mime type is acceptable - if not file_mime or not file_size: - log.info('Failed to load HEAD Request for URL: \'%s\' ' - 'mime: %s, size: %s', url, file_mime, file_size) - # URL is already displayed + def _on_orig_load_finished(self, data, error, preview): + if data is None: + log.error('%s: %s', preview.orig_path.name, error) return - if file_mime.lower() not in ACCEPTED_MIME_TYPES: - log.info('Not accepted mime type \'%s\' for URL: \'%s\'', - file_mime.lower(), url) - # URL is already displayed + + if preview.create_thumbnail(data): + write_file_async(preview.thumb_path, + preview.thumbnail, + self._on_thumb_write_finished, + preview) + + def _on_thumb_load_finished(self, data, error, preview): + if data is None: + log.error('%s: %s', preview.thumb_path.name, error) return - # Check if file size is acceptable - max_size = int(self.plugin.config['MAX_FILE_SIZE']) - if file_size > max_size or file_size == 0: + + preview.thumbnail = data + + pixbuf = pixbuf_from_data(preview.thumbnail) + self._update_textview(pixbuf, preview) + + def _download_content(self, preview): + log.info('Start downloading: %s', preview.request_uri) + message = Soup.Message.new('GET', preview.request_uri) + message.connect('starting', self._check_certificate) + message.connect('content-sniffed', self._on_content_sniffed) + self._session.queue_message(message, self._on_finished, preview) + + def _check_certificate(self, message): + _https_used, _tls_certificate, tls_errors = message.get_https_status() + + if not self.config['VERIFY']: + return + + if tls_errors: + phrase = get_tls_error_phrase(tls_errors) + log.warning('TLS verification failed: %s', phrase) + self._session.cancel_message(message, Soup.Status.CANCELLED) + return + + def _on_content_sniffed(self, message, type_, _params): + size = message.props.response_headers.get_content_length() + uri = message.props.uri.to_string(False) + if type_ not in ACCEPTED_MIME_TYPES: + log.info('Not allowed content type: %s, %s', type_, uri) + self._session.cancel_message(message, Soup.Status.CANCELLED) + return + + if size == 0 or size > int(self.config['MAX_FILE_SIZE']): log.info('File size (%s) too big or unknown (zero) for URL: \'%s\'', - file_size, url) - # URL is already displayed + size, uri) + self._session.cancel_message(message, Soup.Status.CANCELLED) return - attributes = {'src': url, - 'verify': self.plugin.config['VERIFY'], - 'max_size': max_size, - 'filepaths': filepaths, - 'key': key, - 'iv': iv} - - app.thread_interface( - self._download_image, [self.textview.account, - attributes, encrypted], - self._update_img, [weburl, repl_start, repl_end, - filepaths[0], encrypted]) - - def _download_image(self, account, attributes, encrypted): - filepath = attributes['filepaths'][0] - thumbpath = attributes['filepaths'][1] - key = attributes['key'] - iv = attributes['iv'] - mem, alt = get_http_file(account, attributes) - - # Decrypt file if necessary - if encrypted: - mem = self._aes_decrypt_fast(key, iv, mem) - - try: - # Write file to harddisk - self._write_file(filepath, mem) - except Exception as e: - GLib.idle_add( - self._raise_error_dialog, - _('Could not save file'), - _('Exception raised while saving image file' - ' (see error log for more information)')) - log.error(str(e)) - - # Create thumbnail, write to disk and return it - return self._save_thumbnail(thumbpath, mem) - - def _create_path(self, folder): - if os.path.exists(folder): + def _on_finished(self, _session, message, preview): + if message.status_code != Soup.Status.OK: + log.warning('Download failed: %s', preview.request_uri) + log.warning(Soup.Status.get_phrase(message.status_code)) return - log.debug('Creating folder \'%s\'' % folder) - os.mkdir(folder, 0o700) - def _aes_decrypt_fast(self, key, iv, payload): - # Use AES128 GCM with the given key and iv to decrypt the payload - if os.name == 'nt': - be = backend - else: - be = default_backend() - data = payload[:-16] - tag = payload[-16:] - decryptor = Cipher( - algorithms.AES(key), - GCM(iv, tag=tag), - backend=be).decryptor() - return decryptor.update(data) + decryptor.finalize() - - def make_rightclick_menu(self, event, data): - xml = Gtk.Builder() - xml.set_translation_domain('gajim_plugins') - xml.add_from_file(self.plugin.local_file_path('context_menu.ui')) - menu = xml.get_object('context_menu') - - open_menuitem = xml.get_object('open_menuitem') - save_as_menuitem = xml.get_object('save_as_menuitem') - open_folder_menuitem = xml.get_object('open_folder_menuitem') - copy_link_location_menuitem = xml.get_object( - 'copy_link_location_menuitem') - open_link_in_browser_menuitem = xml.get_object( - 'open_link_in_browser_menuitem') - - if data['encrypted']: - open_link_in_browser_menuitem.hide() - - id_ = open_menuitem.connect( - 'activate', self._on_open_menuitem_activate, data) - self.handlers[id_] = open_menuitem - id_ = save_as_menuitem.connect( - 'activate', self._on_save_as_menuitem_activate, data) - self.handlers[id_] = save_as_menuitem - id_ = open_folder_menuitem.connect( - 'activate', self._on_open_folder_menuitem_activate, data) - self.handlers[id_] = save_as_menuitem - id_ = copy_link_location_menuitem.connect( - 'activate', self._on_copy_link_location_menuitem_activate, data) - self.handlers[id_] = copy_link_location_menuitem - id_ = open_link_in_browser_menuitem.connect( - 'activate', self._on_open_link_in_browser_menuitem_activate, data) - self.handlers[id_] = open_link_in_browser_menuitem - - return menu - - def _on_open_menuitem_activate(self, menu, data): - filepath = data['filepath'] - original_filename = data['original_filename'] - url = data['url'] - if original_filename.startswith('location_'): - open_uri(url) + data = message.props.response_body_data.get_data() + if data is None: return - open_file(filepath) - def _on_save_as_menuitem_activate(self, menu, data): - filepath = data['filepath'] - original_filename = data['original_filename'] + if preview.is_aes_encrypted: + data = aes_decrypt(preview, data) + write_file_async(preview.orig_path, + data, + self._on_orig_write_finished, + preview) + + if preview.create_thumbnail(data): + write_file_async(preview.thumb_path, + preview.thumbnail, + self._on_thumb_write_finished, + preview) + + @staticmethod + def _on_orig_write_finished(_result, error, preview): + if error is not None: + log.error('%s: %s', preview.orig_path.name, error) + return + + log.info('File stored: %s', preview.orig_path.name) + + def _on_thumb_write_finished(self, _result, error, preview): + if error is not None: + log.error('%s: %s', preview.thumb_path.name, error) + return + + log.info('Thumbnail stored: %s ', preview.thumb_path.name) + pixbuf = pixbuf_from_data(preview.thumbnail) + self._update_textview(pixbuf, preview) + + def _update_textview(self, pixbuf, preview): + textview = self._textviews.get(preview.control_id) + if textview is None: + # Control closed + return + + buffer_ = preview.start_mark.get_buffer() + iter_ = buffer_.get_iter_at_mark(preview.start_mark) + buffer_.insert(iter_, '\n') + anchor = buffer_.create_child_anchor(iter_) + anchor.plaintext = preview.uri + + image = create_clickable_image(pixbuf, preview) + + textview.tv.add_child_at_anchor(image, anchor) + buffer_.delete(iter_, + buffer_.get_iter_at_mark(preview.end_mark)) + + image.connect('button-press-event', + self._on_button_press_event, + preview) + + if textview.autoscroll: + textview.scroll_to_end() + + def _get_context_menu(self, preview): + path = self.local_file_path('context_menu.ui') + ui = get_builder(path) + if preview.is_aes_encrypted: + ui.open_link_in_browser.hide() + + if preview.is_geo_uri: + ui.open_link_in_browser.hide() + ui.save_as.hide() + ui.open_folder.hide() + + ui.open.connect( + 'activate', self._on_open, preview) + ui.save_as.connect( + 'activate', self._on_save_as, preview) + ui.open_folder.connect( + 'activate', self._on_open_folder, preview) + ui.open_link_in_browser.connect( + 'activate', self._on_open_link_in_browser, preview) + ui.copy_link_location.connect( + 'activate', self._on_copy_link_location, preview) + + def destroy(menu, _pspec): + visible = menu.get_property('visible') + if not visible: + GLib.idle_add(menu.destroy) + + ui.context_menu.connect('notify::visible', destroy) + return ui.context_menu + + @staticmethod + def _on_open(_menu, preview): + if preview.is_geo_uri: + open_uri(preview.uri) + return + open_file(preview.orig_path) + + @staticmethod + def _on_save_as(_menu, preview): def on_ok(target_path): - dirname = os.path.dirname(target_path) + dirname = Path(target_path).parent if not os.access(dirname, os.W_OK): ErrorDialog( _('Directory \'%s\' is not writable') % dirname, _('You do not have the proper permissions to ' 'create files in this directory.'), - transient_for=app.app.get_active_window()) + transient_for=app.app.get_active_window()) return - shutil.copy(filepath, target_path) + shutil.copy(str(preview.orig_path), target_path) FileSaveDialog(on_ok, path=app.config.get('last_save_dir'), - file_name=original_filename, + file_name=preview.filename, transient_for=app.app.get_active_window()) - def _on_open_folder_menuitem_activate(self, menu, data): - open_file(self.directory) - - def _on_copy_link_location_menuitem_activate(self, menu, data): - url = data['url'] - clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) - clipboard.set_text(url, -1) - clipboard.store() - - def _on_open_link_in_browser_menuitem_activate(self, menu, data): - url = data['url'] - if data['encrypted']: - self._on_open_menuitem_activate(self, data) - else: - open_uri(url) - - def _on_button_press_event(self, eb, event, filepath, original_filename, - url, encrypted): - data = {'filepath': filepath, - 'original_filename': original_filename, - 'url': url, - 'encrypted': encrypted} - # Left click - if event.type == Gdk.EventType.BUTTON_PRESS and event.button == 1: - method = getattr(self, '_on_' - + self.plugin.config['LEFTCLICK_ACTION'] - + '_activate') - method(event, data) - # Right klick - elif event.type == Gdk.EventType.BUTTON_PRESS and event.button == 3: - menu = self.make_rightclick_menu(event, data) - # menu.attach_to_widget(self.tv, None) - # menu.popup(None, None, None, event.button, event.time) - menu.popup_at_pointer(event) + @staticmethod + def _on_open_folder(_menu, preview): + open_file(preview.orig_path.parent) @staticmethod - def _raise_error_dialog(pritext, sectext): - # Used by methods that run in a different thread - ErrorDialog(pritext, - sectext, - transient_for=app.app.get_active_window()) + def _on_copy_link_location(_menu, preview): + clipboard = Gtk.Clipboard.get_default(Gdk.Display.get_default()) + clipboard.set_text(preview.uri, -1) - def disconnect_from_chat_control(self): - pass + @staticmethod + def _on_open_link_in_browser(_menu, preview): + if preview.is_aes_encrypted: + if preview.is_geo_uri: + open_uri(preview.uri) + return + open_file(preview.orig_path) + else: + open_uri(preview.uri) + + def _on_button_press_event(self, _image, event, preview): + if event.type == Gdk.EventType.BUTTON_PRESS and event.button == 1: + # Left click + action = self.config['LEFTCLICK_ACTION'] + method = getattr(self, '_on_%s' % action) + method(event, preview) + + elif event.type == Gdk.EventType.BUTTON_PRESS and event.button == 3: + # Right klick + menu = self._get_context_menu(preview) + menu.popup_at_pointer(event) + + +class Preview: + def __init__(self, uri, urlparts, orig_path, thumb_path, + start_mark, end_mark, size, control_id): + self._uri = uri + self._urlparts = urlparts + self._filename = filename_from_uri(self._uri) + self.size = size + self.control_id = control_id + self.orig_path = orig_path + self.thumb_path = thumb_path + self.start_mark = start_mark + self.end_mark = end_mark + self.thumbnail = None + + self.key, self.iv = None, None + if self.is_aes_encrypted: + self.key, self.iv = parse_fragment(urlparts.fragment) + + @property + def is_geo_uri(self): + return self._uri.startswith('geo:') + + @property + def is_web_uri(self): + return not self.is_geo_uri + + @property + def uri(self): + return self._uri + + @property + def filename(self): + return self._filename + + @property + def request_uri(self): + if self.is_aes_encrypted: + # Remove fragments so we dont transmit it to the server + urlparts = self._urlparts._replace(scheme='https', fragment='') + return urlparts.geturl() + return self._urlparts.geturl() + + @property + def is_aes_encrypted(self): + if self._urlparts is None: + return False + return self._urlparts.scheme == 'aesgcm' + + def thumb_exists(self): + return self.thumb_path.exists() + + def orig_exists(self): + return self.orig_path.exists() + + def create_thumbnail(self, data): + self.thumbnail = create_thumbnail(data, self.size) + if self.thumbnail is None: + log.warning('creating thumbnail failed for: %s', self.orig_path) + return False + return True diff --git a/url_image_preview/utils.py b/url_image_preview/utils.py new file mode 100644 index 0000000..fe92c36 --- /dev/null +++ b/url_image_preview/utils.py @@ -0,0 +1,353 @@ +# This file is part of Image Preview Gajim Plugin. +# +# Image Preview Gajim Plugin is free software; +# you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published +# by the Free Software Foundation; version 3 only. +# +# Image Preview Gajim Plugin is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Image Preview Gajim Plugin. +# If not, see . + +import math +import logging +import binascii +import hashlib +from io import BytesIO +from collections import namedtuple +from pathlib import Path +from urllib.parse import urlparse +from urllib.parse import unquote + +from gi.repository import GdkPixbuf +from gi.repository import GLib +from gi.repository import Gtk + +from PIL import Image + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import Cipher +from cryptography.hazmat.primitives.ciphers import algorithms +from cryptography.hazmat.primitives.ciphers.modes import GCM + +from gajim.gtk.util import get_cursor + +log = logging.getLogger('gajim.p.preview.utils') + +Coords = namedtuple('Coords', 'location lat lon') + + +def resize_gif(image, output_file, resize_to): + frames, result = extract_and_resize_frames(image, resize_to) + + frames[0].save(output_file, + format='GIF', + optimize=True, + save_all=True, + append_images=frames[1:], + duration=result['duration'], + loop=1000) + + +def analyse_image(image): + ''' + Pre-process pass over the image to determine the mode (full or additive). + Necessary as assessing single frames isn't reliable. Need to know the mode + before processing all frames. + ''' + + result = { + 'size': image.size, + 'mode': 'full', + 'duration': image.info.get('duration', 0) + } + + try: + while True: + if image.tile: + tile = image.tile[0] + update_region = tile[1] + update_region_dimensions = update_region[2:] + if update_region_dimensions != image.size: + result['mode'] = 'partial' + break + image.seek(image.tell() + 1) + except EOFError: + image.seek(0) + return image, result + + +def extract_and_resize_frames(image, resize_to): + image, result = analyse_image(image) + + i = 0 + palette = image.getpalette() + last_frame = image.convert('RGBA') + + frames = [] + + try: + while True: + ''' + If the GIF uses local colour tables, + each frame will have its own palette. + If not, we need to apply the global palette to the new frame. + ''' + if not image.getpalette(): + image.putpalette(palette) + + new_frame = Image.new('RGBA', image.size) + + ''' + Is this file a "partial"-mode GIF where frames update a region + of a different size to the entire image? + If so, we need to construct the new frame by + pasting it on top of the preceding frames. + ''' + if result['mode'] == 'partial': + new_frame.paste(last_frame) + + new_frame.paste(image, (0, 0), image.convert('RGBA')) + + # This method preservs aspect ratio + new_frame.thumbnail(resize_to, Image.ANTIALIAS) + frames.append(new_frame) + + i += 1 + last_frame = new_frame + image.seek(image.tell() + 1) + except EOFError: + pass + + return frames, result + + +def create_thumbnail(data, size): + thumbnail = create_thumbnail_with_pil(data, size) + if thumbnail is not None: + return thumbnail + return create_thumbnail_with_pixbuf(data, size) + + +def create_thumbnail_with_pixbuf(data, size): + loader = GdkPixbuf.PixbufLoader() + try: + loader.write(data) + except GLib.Error as error: + log.warning('making pixbuf failed: %s', error) + return None + + loader.close() + pixbuf = loader.get_pixbuf() + + if size > pixbuf.get_width() and size > pixbuf.get_height(): + return data + + width, height = get_thumbnail_size(pixbuf, size) + thumbnail = pixbuf.scale_simple(width, + height, + GdkPixbuf.InterpType.BILINEAR) + has_error, bytes_ = thumbnail.save_to_bufferv('png', [], []) + if has_error: + log.warning('saving pixbuf to buffer failed') + return None + return bytes_ + + +def create_thumbnail_with_pil(data, size): + input_file = BytesIO(data) + output_file = BytesIO() + try: + image = Image.open(input_file) + except OSError as error: + log.warning('making pil thumbnail failed: %s', error) + log.warning('fallback to pixbuf') + input_file.close() + output_file.close() + return + + image_width, image_height = image.size + if size > image_width and size > image_height: + image.close() + input_file.close() + output_file.close() + return data + + if image.format == 'GIF' and image.n_frames > 1: + resize_gif(image, output_file, (size, size)) + else: + image.thumbnail((size, size)) + image.save(output_file, + format=image.format, + optimize=True) + + bytes_ = output_file.getvalue() + + image.close() + input_file.close() + output_file.close() + + return bytes_ + + +def get_thumbnail_size(pixbuf, size): + # Calculates the new thumbnail size while preserving the aspect ratio + image_width = pixbuf.get_width() + image_height = pixbuf.get_height() + + if image_width > image_height: + if image_width > size: + image_height = math.ceil((size / float(image_width) * image_height)) + image_width = int(size) + else: + if image_height > size: + image_width = math.ceil((size / float(image_height) * image_width)) + image_height = int(size) + + return image_width, image_height + + +def pixbuf_from_data(data): + loader = GdkPixbuf.PixbufLoader() + try: + loader.write(data) + except GLib.Error: + # Fallback to Pillow + input_file = BytesIO(data) + image = Image.open(BytesIO(data)).convert('RGBA') + array = GLib.Bytes.new(image.tobytes()) + width, height = image.size + pixbuf = GdkPixbuf.Pixbuf.new_from_bytes(array, + GdkPixbuf.Colorspace.RGB, + True, + 8, + width, + height, + width * 4) + image.close() + input_file.close() + return pixbuf + + loader.close() + return loader.get_pixbuf() + + +def create_clickable_image(pixbuf, preview): + if isinstance(pixbuf, GdkPixbuf.PixbufAnimation): + image = Gtk.Image.new_from_animation(pixbuf) + else: + image = Gtk.Image.new_from_pixbuf(pixbuf) + + css = '''#Preview { + box-shadow: 0px 0px 3px 0px alpha(@theme_text_color, 0.2); + margin: 5px 10px 5px 10px; }''' + + provider = Gtk.CssProvider() + provider.load_from_data(bytes(css.encode())) + context = image.get_style_context() + context.add_provider(provider, + Gtk.STYLE_PROVIDER_PRIORITY_USER) + + image.set_name('Preview') + + def _on_realize(box): + box.get_window().set_cursor(get_cursor('pointer')) + + event_box = Gtk.EventBox() + event_box.connect('realize', _on_realize) + event_box.set_tooltip_text(preview.uri) + event_box.add(image) + event_box.show_all() + return event_box + + +def parse_fragment(fragment): + if not fragment: + raise ValueError('Invalid fragment') + + fragment = binascii.unhexlify(fragment) + key = fragment[16:] + iv = fragment[:16] + if len(key) != 32 or len(iv) != 16: + raise ValueError('Invalid fragment') + return key, iv + + +def get_image_paths(uri, urlparts, size, orig_dir, thumb_dir): + path = Path(urlparts.path) + web_stem = path.stem + extension = path.suffix + + if len(web_stem) > 90: + # Many Filesystems have a limit on filename length + # Most have 255, some encrypted ones only 143 + # We add around 50 chars for the hash, + # so the filename should not exceed 90 + web_stem = web_stem[:90] + + name_hash = hashlib.sha1(str(uri).encode()).hexdigest() + + orig_filename = '%s_%s%s' % (web_stem, name_hash, extension) + + thumb_filename = '%s_%s_thumb_%s%s' % (web_stem, + name_hash, + size, + extension) + + orig_path = Path(orig_dir) / orig_filename + thumb_path = Path(thumb_dir) / thumb_filename + return orig_path, thumb_path + + +def split_geo_uri(uri): + # Example: + # geo:37.786971,-122.399677,122.3;CRS=epsg:32718;U=20;mapcolors=abc + # Assumption is all coordinates are CRS=WGS-84 + + # Remove "geo:" + coords = uri[4:] + + # Remove arguments + if ';' in coords: + coords, _ = coords.split(';', maxsplit=1) + + # Split coords + coords = coords.split(',') + if len(coords) not in (2, 3): + raise ValueError('Invalid geo uri: invalid coord count') + + # Remoove coord-c (altitude) + if len(coords) == 3: + coords.pop(2) + + lat, lon = coords + if float(lat) < -90 or float(lat) > 90: + raise ValueError('Invalid geo_uri: invalid latitude %s' % lat) + + if float(lon) < -180 or float(lon) > 180: + raise ValueError('Invalid geo_uri: invalid longitude %s' % lon) + + location = ','.join(coords) + return Coords(location=location, lat=lat, lon=lon) + + +def filename_from_uri(uri): + urlparts = urlparse(unquote(uri)) + path = Path(urlparts.path) + return path.name + + +def aes_decrypt(preview, payload): + # Use AES128 GCM with the given key and iv to decrypt the payload + data = payload[:-16] + tag = payload[-16:] + decryptor = Cipher( + algorithms.AES(preview.key), + GCM(preview.iv, tag=tag), + backend=default_backend()).decryptor() + return decryptor.update(data) + decryptor.finalize()