# This file is part of Gajim. # # Gajim is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Gajim is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Gajim. If not, see . import logging import typing from dataclasses import dataclass from pathlib import Path import gi import numpy as np from gi.repository import Gio, GObject try: gi.require_version('Gst', '1.0') from gi.repository import Gst except Exception: if typing.TYPE_CHECKING: from gi.repository import Gst log = logging.getLogger('gajim.p.sttvm_helper') @dataclass class Results: text: str def load_audio(path: Path, sample_rate: int = 16000) -> np.ndarray: Gst.init(None) pipeline = Gst.parse_launch( 'filesrc name=src ! decodebin ! audioconvert ! audioresample ! ' f'audio/x-raw,format=F32LE,rate={sample_rate},channels=1 ! ' 'appsink name=sink sync=false' ) pipeline.get_by_name('src').set_property('location', str(path)) sink = pipeline.get_by_name('sink') chunks: list[np.ndarray] = [] pipeline.set_state(Gst.State.PLAYING) while (sample := sink.emit('try-pull-sample', 10 * Gst.SECOND)) is not None: buf = sample.get_buffer() _, info = buf.map(Gst.MapFlags.READ) chunks.append(np.frombuffer(bytes(info.data), dtype=np.float32)) buf.unmap(info) pipeline.set_state(Gst.State.NULL) if not chunks: raise RuntimeError(f'Could not decode audio: {path}') return np.concatenate(chunks) ''' https://discourse.gnome.org/t/gtk-threading-problem-with-glib-idle-add/13597/5 https://github.com/gdm-settings/gdm-settings/blob/f245d3000200fa6be2a35c7f6ac45b131dadb5d6/src/utils.py#L116..L162 ''' class BackgroundTask(GObject.Object): __gtype_name__ = 'BackgroundTask' def __init__(self, function, finish_callback, **kwargs): super().__init__(**kwargs) self.function = function self.finish_callback = finish_callback self._current = None def start(self): if self._current: AlreadyRunningError('Task is already running') finish_callback = lambda self, task, nothing: self.finish_callback() task = Gio.Task.new(self, None, finish_callback, None) task.run_in_thread(self._thread_cb) self._current = task @staticmethod def _thread_cb(task, self, task_data, cancellable): try: retval = self.function() task.return_value(retval) except Exception as e: log.exception('Background task failed') task.return_value(e) def finish(self): task = self._current self._current = None if not Gio.Task.is_valid(task, self): raise InvalidGioTaskError() value = task.propagate_value().value if isinstance(value, Exception): raise value return value