Alpaca/src/window.py

993 lines
49 KiB
Python

# window.py
#
# Copyright 2024 Jeffser
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""
Handles the main window
"""
import json, threading, os, re, base64, gettext, uuid, shutil, logging, time
from io import BytesIO
from PIL import Image
from pypdf import PdfReader
from datetime import datetime
from pytube import YouTube
import gi
gi.require_version('GtkSource', '5')
gi.require_version('GdkPixbuf', '2.0')
from gi.repository import Adw, Gtk, Gdk, GLib, GtkSource, Gio, GdkPixbuf
from . import connection_handler, generic_actions
from .custom_widgets import message_widget, chat_widget, model_widget, terminal_widget, dialog_widget
from .internal import config_dir, data_dir, cache_dir, source_dir
logger = logging.getLogger(__name__)
@Gtk.Template(resource_path='/com/jeffser/Alpaca/window.ui')
class AlpacaWindow(Adw.ApplicationWindow):
app_dir = os.getenv("FLATPAK_DEST")
__gtype_name__ = 'AlpacaWindow'
localedir = os.path.join(source_dir, 'locale')
gettext.bindtextdomain('com.jeffser.Alpaca', localedir)
gettext.textdomain('com.jeffser.Alpaca')
_ = gettext.gettext
#Variables
attachments = {}
#Override elements
overrides_group = Gtk.Template.Child()
instance_page = Gtk.Template.Child()
#Elements
split_view_overlay = Gtk.Template.Child()
regenerate_button : Gtk.Button = None
selected_chat_row : Gtk.ListBoxRow = None
create_model_base = Gtk.Template.Child()
create_model_name = Gtk.Template.Child()
create_model_system = Gtk.Template.Child()
create_model_modelfile = Gtk.Template.Child()
tweaks_group = Gtk.Template.Child()
preferences_dialog = Gtk.Template.Child()
shortcut_window : Gtk.ShortcutsWindow = Gtk.Template.Child()
file_preview_dialog = Gtk.Template.Child()
file_preview_text_label = Gtk.Template.Child()
file_preview_image = Gtk.Template.Child()
welcome_dialog = Gtk.Template.Child()
welcome_carousel = Gtk.Template.Child()
welcome_previous_button = Gtk.Template.Child()
welcome_next_button = Gtk.Template.Child()
main_overlay = Gtk.Template.Child()
manage_models_overlay = Gtk.Template.Child()
chat_stack = Gtk.Template.Child()
message_text_view = Gtk.Template.Child()
send_button = Gtk.Template.Child()
stop_button = Gtk.Template.Child()
attachment_container = Gtk.Template.Child()
attachment_box = Gtk.Template.Child()
file_filter_tar = Gtk.Template.Child()
file_filter_gguf = Gtk.Template.Child()
file_filter_attachments = Gtk.Template.Child()
attachment_button = Gtk.Template.Child()
chat_right_click_menu = Gtk.Template.Child()
model_tag_list_box = Gtk.Template.Child()
navigation_view_manage_models = Gtk.Template.Child()
file_preview_open_button = Gtk.Template.Child()
file_preview_remove_button = Gtk.Template.Child()
secondary_menu_button = Gtk.Template.Child()
model_searchbar = Gtk.Template.Child()
message_searchbar = Gtk.Template.Child()
message_search_button = Gtk.Template.Child()
searchentry_messages = Gtk.Template.Child()
no_results_page = Gtk.Template.Child()
model_link_button = Gtk.Template.Child()
title_stack = Gtk.Template.Child()
manage_models_dialog = Gtk.Template.Child()
model_scroller = Gtk.Template.Child()
model_detail_page = Gtk.Template.Child()
model_detail_create_button = Gtk.Template.Child()
ollama_information_label = Gtk.Template.Child()
chat_list_container = Gtk.Template.Child()
chat_list_box = None
ollama_instance = None
model_manager = None
add_chat_button = Gtk.Template.Child()
instance_idle_timer = Gtk.Template.Child()
background_switch = Gtk.Template.Child()
powersaver_warning_switch = Gtk.Template.Child()
remote_connection_switch = Gtk.Template.Child()
banner = Gtk.Template.Child()
style_manager = Adw.StyleManager()
terminal_scroller = Gtk.Template.Child()
terminal_dialog = Gtk.Template.Child()
@Gtk.Template.Callback()
def stop_message(self, button=None):
self.chat_list_box.get_current_chat().stop_message()
@Gtk.Template.Callback()
def send_message(self, button=None):
if button and not button.get_visible():
return
if not self.message_text_view.get_buffer().get_text(self.message_text_view.get_buffer().get_start_iter(), self.message_text_view.get_buffer().get_end_iter(), False):
return
current_chat = self.chat_list_box.get_current_chat()
if current_chat.busy == True:
return
self.chat_list_box.send_tab_to_top(self.chat_list_box.get_selected_row())
current_model = self.model_manager.get_selected_model()
if current_model is None:
self.show_toast(_("Please select a model before chatting"), self.main_overlay)
return
message_id = self.generate_uuid()
attached_images = []
attached_files = {}
for name, content in self.attachments.items():
if content["type"] == 'image':
if self.model_manager.verify_if_image_can_be_used():
attached_images.append(os.path.join(data_dir, "chats", current_chat.get_name(), message_id, name))
else:
attached_files[os.path.join(data_dir, "chats", current_chat.get_name(), message_id, name)] = content['type']
if not os.path.exists(os.path.join(data_dir, "chats", current_chat.get_name(), message_id)):
os.makedirs(os.path.join(data_dir, "chats", current_chat.get_name(), message_id))
shutil.copy(content['path'], os.path.join(data_dir, "chats", current_chat.get_name(), message_id, name))
content["button"].get_parent().remove(content["button"])
self.attachments = {}
self.attachment_box.set_visible(False)
raw_message = self.message_text_view.get_buffer().get_text(self.message_text_view.get_buffer().get_start_iter(), self.message_text_view.get_buffer().get_end_iter(), False)
current_chat.add_message(message_id, None)
m_element = current_chat.messages[message_id]
if len(attached_files) > 0:
m_element.add_attachments(attached_files)
if len(attached_images) > 0:
m_element.add_images(attached_images)
m_element.set_text(raw_message)
m_element.add_footer(datetime.now())
m_element.add_action_buttons()
data = {
"model": current_model,
"messages": self.convert_history_to_ollama(current_chat),
"options": {"temperature": self.ollama_instance.tweaks["temperature"], "seed": self.ollama_instance.tweaks["seed"]},
"keep_alive": f"{self.ollama_instance.tweaks['keep_alive']}m",
"stream": True
}
self.message_text_view.get_buffer().set_text("", 0)
bot_id=self.generate_uuid()
current_chat.add_message(bot_id, current_model)
m_element_bot = current_chat.messages[bot_id]
m_element_bot.set_text()
threading.Thread(target=self.run_message, args=(data, m_element_bot, current_chat)).start()
@Gtk.Template.Callback()
def welcome_carousel_page_changed(self, carousel, index):
logger.debug("Showing welcome carousel")
if index == 0:
self.welcome_previous_button.set_sensitive(False)
else:
self.welcome_previous_button.set_sensitive(True)
if index == carousel.get_n_pages()-1:
self.welcome_next_button.set_label(_("Close"))
self.welcome_next_button.set_tooltip_text(_("Close"))
else:
self.welcome_next_button.set_label(_("Next"))
self.welcome_next_button.set_tooltip_text(_("Next"))
@Gtk.Template.Callback()
def welcome_previous_button_activate(self, button):
self.welcome_carousel.scroll_to(self.welcome_carousel.get_nth_page(self.welcome_carousel.get_position()-1), True)
@Gtk.Template.Callback()
def welcome_next_button_activate(self, button):
if button.get_label() == "Next":
self.welcome_carousel.scroll_to(self.welcome_carousel.get_nth_page(self.welcome_carousel.get_position()+1), True)
else:
self.welcome_dialog.force_close()
self.powersaver_warning_switch.set_active(True)
@Gtk.Template.Callback()
def switch_run_on_background(self, switch, user_data):
logger.debug("Switching run on background")
self.set_hide_on_close(switch.get_active())
self.save_server_config()
@Gtk.Template.Callback()
def switch_powersaver_warning(self, switch, user_data):
logger.debug("Switching powersaver warning banner")
if switch.get_active():
self.banner.set_revealed(Gio.PowerProfileMonitor.dup_default().get_power_saver_enabled())
else:
self.banner.set_revealed(False)
self.save_server_config()
@Gtk.Template.Callback()
def closing_app(self, user_data):
with open(os.path.join(data_dir, "chats", "selected_chat.txt"), 'w') as f:
f.write(self.chat_list_box.get_selected_row().chat_window.get_name())
if self.get_hide_on_close():
logger.info("Hiding app...")
else:
logger.info("Closing app...")
self.ollama_instance.stop()
self.get_application().quit()
@Gtk.Template.Callback()
def model_spin_changed(self, spin):
value = spin.get_value()
if spin.get_name() != "temperature":
value = round(value)
else:
value = round(value, 1)
if self.ollama_instance.tweaks[spin.get_name()] != value:
self.ollama_instance.tweaks[spin.get_name()] = value
self.save_server_config()
@Gtk.Template.Callback()
def instance_idle_timer_changed(self, spin):
self.ollama_instance.idle_timer_delay = round(spin.get_value())
self.save_server_config()
@Gtk.Template.Callback()
def create_model_start(self, button):
name = self.create_model_name.get_text().lower().replace(":", "")
modelfile_buffer = self.create_model_modelfile.get_buffer()
modelfile_raw = modelfile_buffer.get_text(modelfile_buffer.get_start_iter(), modelfile_buffer.get_end_iter(), False)
modelfile = ["FROM {}".format(self.create_model_base.get_subtitle()), "SYSTEM {}".format(self.create_model_system.get_text())]
for line in modelfile_raw.split('\n'):
if not line.startswith('SYSTEM') and not line.startswith('FROM'):
modelfile.append(line)
threading.Thread(target=self.model_manager.pull_model, kwargs={"model_name": name, "modelfile": '\n'.join(modelfile)}).start()
self.navigation_view_manage_models.pop()
@Gtk.Template.Callback()
def override_changed(self, entry):
name = entry.get_name()
value = entry.get_text()
if self.ollama_instance:
if value:
self.ollama_instance.overrides[name] = value
elif name in self.ollama_instance.overrides:
del self.ollama_instance.overrides[name]
if not self.ollama_instance.remote:
self.ollama_instance.reset()
self.save_server_config()
@Gtk.Template.Callback()
def link_button_handler(self, button):
os.system(f'xdg-open "{button.get_name()}"'.replace("{selected_chat}", self.chat_list_box.get_current_chat().get_name()))
@Gtk.Template.Callback()
def model_search_toggle(self, button):
self.model_searchbar.set_search_mode(button.get_active())
self.model_manager.pulling_list.set_visible(not button.get_active() and len(list(self.model_manager.pulling_list)) > 0)
self.model_manager.local_list.set_visible(not button.get_active() and len(list(self.model_manager.local_list)) > 0)
@Gtk.Template.Callback()
def message_search_toggle(self, button):
self.message_searchbar.set_search_mode(button.get_active())
@Gtk.Template.Callback()
def model_search_changed(self, entry):
results = 0
if self.model_manager:
for model in list(self.model_manager.available_list):
model.set_visible(re.search(entry.get_text(), '{} {} {} {} {}'.format(model.get_name(), model.model_title, model.model_author, model.model_description, (_('image') if model.image_recognition else '')), re.IGNORECASE))
if model.get_visible():
results += 1
if entry.get_text() and results == 0:
self.no_results_page.set_visible(True)
self.model_scroller.set_visible(False)
else:
self.model_scroller.set_visible(True)
self.no_results_page.set_visible(False)
@Gtk.Template.Callback()
def message_search_changed(self, entry, current_chat=None):
search_term=entry.get_text()
results = 0
if not current_chat:
current_chat = self.chat_list_box.get_current_chat()
if current_chat:
for key, message in current_chat.messages.items():
if message and message.text:
message.set_visible(re.search(search_term, message.text, re.IGNORECASE))
for block in message.content_children:
if isinstance(block, message_widget.text_block):
if search_term:
highlighted_text = re.sub(f"({re.escape(search_term)})", r"<span background='yellow' bgalpha='30%'>\1</span>", block.get_text(),flags=re.IGNORECASE)
block.set_markup(highlighted_text)
else:
block.set_markup(block.get_text())
@Gtk.Template.Callback()
def on_clipboard_paste(self, textview):
logger.debug("Pasting from clipboard")
clipboard = Gdk.Display.get_default().get_clipboard()
clipboard.read_text_async(None, self.cb_text_received)
clipboard.read_texture_async(None, self.cb_image_received)
@Gtk.Template.Callback()
def model_detail_create_button_clicked(self, button):
self.create_model(button.get_name(), False)
def convert_model_name(self, name:str, mode:int) -> str: # mode=0 name:tag -> Name (tag) | mode=1 Name (tag) -> name:tag
try:
if mode == 0:
return "{} ({})".format(name.split(":")[0].replace("-", " ").title(), name.split(":")[1])
if mode == 1:
return "{}:{}".format(name.split(" (")[0].replace(" ", "-").lower(), name.split(" (")[1][:-1])
except Exception as e:
pass
def check_alphanumeric(self, editable, text, length, position, allowed_chars):
new_text = ''.join([char for char in text if char.isalnum() or char in allowed_chars])
if new_text != text:
editable.stop_emission_by_name("insert-text")
def create_model(self, model:str, file:bool):
modelfile_buffer = self.create_model_modelfile.get_buffer()
modelfile_buffer.delete(modelfile_buffer.get_start_iter(), modelfile_buffer.get_end_iter())
self.create_model_system.set_text('')
if not file:
data = next((element for element in list(self.model_manager.model_selector.get_popover().model_list_box) if element.get_name() == self.convert_model_name(model, 1)), None).data
modelfile = []
for line in data['modelfile'].split('\n'):
if line.startswith('SYSTEM'):
self.create_model_system.set_text(line[len('SYSTEM'):].strip())
if not line.startswith('SYSTEM') and not line.startswith('FROM') and not line.startswith('#'):
modelfile.append(line)
self.create_model_name.set_text(self.convert_model_name(model, 1).split(':')[0] + "-custom")
modelfile_buffer.insert(modelfile_buffer.get_start_iter(), '\n'.join(modelfile), len('\n'.join(modelfile).encode('utf-8')))
self.create_model_base.set_subtitle(self.convert_model_name(model, 1))
else:
self.create_model_name.set_text(os.path.splitext(os.path.basename(model))[0])
self.create_model_base.set_subtitle(model)
self.navigation_view_manage_models.push_by_tag('model_create_page')
def show_toast(self, message:str, overlay):
logger.info(message)
toast = Adw.Toast(
title=message,
timeout=2
)
overlay.add_toast(toast)
def show_notification(self, title:str, body:str, icon:Gio.ThemedIcon=None):
if not self.is_active():
logger.info(f"{title}, {body}")
notification = Gio.Notification.new(title)
notification.set_body(body)
if icon:
notification.set_icon(icon)
self.get_application().send_notification(None, notification)
def preview_file(self, file_path, file_type, presend_name):
logger.debug(f"Previewing file: {file_path}")
file_path = file_path.replace("{selected_chat}", self.chat_list_box.get_current_chat().get_name())
if not os.path.isfile(file_path):
self.show_toast(_("Missing file"), self.main_overlay)
return
content = self.get_content_of_file(file_path, file_type)
if presend_name:
self.file_preview_remove_button.set_visible(True)
self.file_preview_remove_button.set_name(presend_name)
else:
self.file_preview_remove_button.set_visible(False)
if content:
if file_type == 'image':
self.file_preview_image.set_visible(True)
self.file_preview_text_label.set_visible(False)
image_data = base64.b64decode(content)
loader = GdkPixbuf.PixbufLoader.new()
loader.write(image_data)
loader.close()
pixbuf = loader.get_pixbuf()
texture = Gdk.Texture.new_for_pixbuf(pixbuf)
self.file_preview_image.set_from_paintable(texture)
self.file_preview_image.set_size_request(240, 240)
self.file_preview_dialog.set_title(os.path.basename(file_path))
self.file_preview_open_button.set_name(file_path)
else:
self.file_preview_image.set_visible(False)
self.file_preview_text_label.set_visible(True)
buffer = self.file_preview_text_label.set_label(content)
if file_type == 'youtube':
self.file_preview_dialog.set_title(content.split('\n')[0])
self.file_preview_open_button.set_name(content.split('\n')[2])
elif file_type == 'website':
self.file_preview_open_button.set_name(content.split('\n')[0])
else:
self.file_preview_dialog.set_title(os.path.basename(file_path))
self.file_preview_open_button.set_name(file_path)
self.file_preview_dialog.present(self)
def convert_history_to_ollama(self, chat):
messages = []
for message_id, message in chat.messages_to_dict().items():
new_message = message.copy()
if 'model' in new_message:
del new_message['model']
if 'date' in new_message:
del new_message['date']
if 'files' in message and len(message['files']) > 0:
del new_message['files']
new_message['content'] = ''
for name, file_type in message['files'].items():
file_path = os.path.join(data_dir, "chats", chat.get_name(), message_id, name)
file_data = self.get_content_of_file(file_path, file_type)
if file_data:
new_message['content'] += f"```[{name}]\n{file_data}\n```"
new_message['content'] += message['content']
if 'images' in message and len(message['images']) > 0:
new_message['images'] = []
for name in message['images']:
file_path = os.path.join(data_dir, "chats", chat.get_name(), message_id, name)
image_data = self.get_content_of_file(file_path, 'image')
if image_data:
new_message['images'].append(image_data)
messages.append(new_message)
return messages
def generate_chat_title(self, message, old_chat_name):
logger.debug("Generating chat title")
prompt = f"""
Generate a title following these rules:
- The title should be based on the prompt at the end
- Keep it in the same language as the prompt
- The title needs to be less than 30 characters
- Use only alphanumeric characters and spaces
- Just write the title, NOTHING ELSE
```PROMPT
{message['content']}
```"""
current_model = self.model_manager.get_selected_model()
data = {"model": current_model, "prompt": prompt, "stream": False}
if 'images' in message:
data["images"] = message['images']
response = self.ollama_instance.request("POST", "api/generate", json.dumps(data))
if response.status_code == 200:
new_chat_name = json.loads(response.text)["response"].strip().removeprefix("Title: ").removeprefix("title: ").strip('\'"').replace('\n', ' ').title().replace('\'S', '\'s')
new_chat_name = new_chat_name[:50] + (new_chat_name[50:] and '...')
self.chat_list_box.rename_chat(old_chat_name, new_chat_name)
def save_server_config(self):
if self.ollama_instance:
with open(os.path.join(config_dir, "server.json"), "w+", encoding="utf-8") as f:
data = {
'remote_url': self.ollama_instance.remote_url,
'remote_bearer_token': self.ollama_instance.bearer_token,
'run_remote': self.ollama_instance.remote,
'local_port': self.ollama_instance.local_port,
'run_on_background': self.background_switch.get_active(),
'powersaver_warning': self.powersaver_warning_switch.get_active(),
'model_tweaks': self.ollama_instance.tweaks,
'ollama_overrides': self.ollama_instance.overrides,
'idle_timer': self.ollama_instance.idle_timer_delay
}
json.dump(data, f, indent=6)
def verify_connection(self):
try:
response = self.ollama_instance.request("GET", "api/tags")
if response.status_code == 200:
self.save_server_config()
#self.update_list_local_models()
return response.status_code == 200
except Exception as e:
logger.error(e)
return False
def on_theme_changed(self, manager, dark, buffer):
logger.debug("Theme changed")
if manager.get_dark():
source_style = GtkSource.StyleSchemeManager.get_default().get_scheme('Adwaita-dark')
else:
source_style = GtkSource.StyleSchemeManager.get_default().get_scheme('Adwaita')
buffer.set_style_scheme(source_style)
def switch_send_stop_button(self, send:bool):
self.stop_button.set_visible(not send)
self.send_button.set_visible(send)
def run_message(self, data:dict, message_element:message_widget.message, chat:chat_widget.chat):
logger.debug("Running message")
self.save_history(chat)
chat.busy = True
self.chat_list_box.get_tab_by_name(chat.get_name()).spinner.set_visible(True)
if len(data['messages']) == 1 and chat.get_name().startswith(_("New Chat")):
threading.Thread(target=self.generate_chat_title, args=(data['messages'][0].copy(), chat.get_name())).start()
if chat.welcome_screen:
chat.welcome_screen.set_visible(False)
if chat.regenerate_button:
chat.container.remove(chat.regenerate_button)
self.switch_send_stop_button(False)
if self.regenerate_button:
GLib.idle_add(self.chat_list_box.get_current_chat().remove, self.regenerate_button)
try:
response = self.ollama_instance.request("POST", "api/chat", json.dumps(data), lambda data, message_element=message_element: message_element.update_message(data))
if response.status_code != 200:
raise Exception('Network Error')
except Exception as e:
logger.error(e)
self.chat_list_box.get_tab_by_name(chat.get_name()).spinner.set_visible(False)
chat.busy = False
GLib.idle_add(message_element.add_action_buttons)
if message_element.spinner:
GLib.idle_add(message_element.container.remove, message_element.spinner)
message_element.spinner = None
GLib.idle_add(chat.show_regenerate_button, message_element)
GLib.idle_add(self.connection_error)
def save_history(self, chat:chat_widget.chat=None):
logger.info("Saving history")
history = None
if chat and os.path.exists(os.path.join(data_dir, "chats", "chats.json")):
history = {'chats': {chat.get_name(): {'messages': chat.messages_to_dict()}}}
try:
with open(os.path.join(data_dir, "chats", "chats.json"), "r", encoding="utf-8") as f:
data = json.load(f)
for chat_tab in self.chat_list_box.tab_list:
if chat_tab.chat_window.get_name() != chat.get_name():
history['chats'][chat_tab.chat_window.get_name()] = data['chats'][chat_tab.chat_window.get_name()]
history['chats'][chat.get_name()] = {'messages': chat.messages_to_dict()}
except Exception as e:
logger.error(e)
history = None
if not history:
history = {'chats': {}}
for chat_tab in self.chat_list_box.tab_list:
history['chats'][chat_tab.chat_window.get_name()] = {'messages': chat_tab.chat_window.messages_to_dict()}
with open(os.path.join(data_dir, "chats", "chats.json"), "w+", encoding="utf-8") as f:
json.dump(history, f, indent=4)
def load_history(self):
logger.debug("Loading history")
if os.path.exists(os.path.join(data_dir, "chats", "chats.json")):
try:
with open(os.path.join(data_dir, "chats", "chats.json"), "r", encoding="utf-8") as f:
data = json.load(f)
selected_chat = None
if len(list(data)) == 0:
data['chats'][_("New Chat")] = {"messages": {}}
if os.path.exists(os.path.join(data_dir, "chats", "selected_chat.txt")):
with open(os.path.join(data_dir, "chats", "selected_chat.txt"), 'r') as scf:
selected_chat = scf.read()
elif 'selected_chat' in data and data['selected_chat'] in data['chats']:
selected_chat = data['selected_chat']
if not selected_chat or selected_chat not in data['chats']:
selected_chat = list(data['chats'])[0]
if len(data['chats'][selected_chat]['messages'].keys()) > 0:
last_model_used = data['chats'][selected_chat]['messages'][list(data["chats"][selected_chat]["messages"])[-1]]["model"]
self.model_manager.change_model(last_model_used)
for chat_name in data['chats']:
self.chat_list_box.append_chat(chat_name)
chat_container = self.chat_list_box.get_chat_by_name(chat_name)
if chat_name == selected_chat:
self.chat_list_box.select_row(self.chat_list_box.tab_list[-1])
chat_container.load_chat_messages(data['chats'][chat_name]['messages'])
except Exception as e:
logger.error(e)
self.chat_list_box.prepend_chat(_("New Chat"))
else:
self.chat_list_box.prepend_chat(_("New Chat"))
def generate_numbered_name(self, chat_name:str, compare_list:list) -> str:
if chat_name in compare_list:
for i in range(len(compare_list)):
if "." in chat_name:
if f"{'.'.join(chat_name.split('.')[:-1])} {i+1}.{chat_name.split('.')[-1]}" not in compare_list:
chat_name = f"{'.'.join(chat_name.split('.')[:-1])} {i+1}.{chat_name.split('.')[-1]}"
break
else:
if f"{chat_name} {i+1}" not in compare_list:
chat_name = f"{chat_name} {i+1}"
break
return chat_name
def generate_uuid(self) -> str:
return f"{datetime.today().strftime('%Y%m%d%H%M%S%f')}{uuid.uuid4().hex}"
def connection_error(self):
logger.error("Connection error")
if self.ollama_instance.remote:
options = {
_("Close Alpaca"): {"callback": lambda *_: self.get_application().quit(), "appearance": "destructive"},
_("Use Local Instance"): {"callback": lambda *_: self.remote_connection_switch.set_active(False)},
_("Connect"): {"callback": lambda url, bearer: generic_actions.connect_remote(url,bearer), "appearance": "suggested"}
}
entries = [
{"text": self.ollama_instance.remote_url, "css": ['error'], "placeholder": _('Server URL')},
{"text": self.ollama_instance.bearer_token, "css": ['error'] if self.ollama_instance.bearer_token else None, "placeholder": _('Bearer Token (Optional)')}
]
dialog_widget.Entry(_('Connection Error'), _('The remote instance has disconnected'), list(options)[0], options, entries)
else:
self.ollama_instance.reset()
self.show_toast(_("There was an error with the local Ollama instance, so it has been reset"), self.main_overlay)
def get_content_of_file(self, file_path, file_type):
if not os.path.exists(file_path): return None
if file_type == 'image':
try:
with Image.open(file_path) as img:
width, height = img.size
max_size = 240
if width > height:
new_width = max_size
new_height = int((max_size / width) * height)
else:
new_height = max_size
new_width = int((max_size / height) * width)
resized_img = img.resize((new_width, new_height), Image.LANCZOS)
with BytesIO() as output:
resized_img.save(output, format="PNG")
image_data = output.getvalue()
return base64.b64encode(image_data).decode("utf-8")
except Exception as e:
logger.error(e)
self.show_toast(_("Cannot open image"), self.main_overlay)
elif file_type == 'plain_text' or file_type == 'youtube' or file_type == 'website':
with open(file_path, 'r', encoding="utf-8") as f:
return f.read()
elif file_type == 'pdf':
reader = PdfReader(file_path)
if len(reader.pages) == 0:
return None
text = ""
for i, page in enumerate(reader.pages):
text += f"\n- Page {i}\n{page.extract_text(extraction_mode='layout', layout_mode_space_vertically=False)}\n"
return text
def remove_attached_file(self, name):
logger.debug("Removing attached file")
button = self.attachments[name]['button']
button.get_parent().remove(button)
del self.attachments[name]
if len(self.attachments) == 0:
self.attachment_box.set_visible(False)
if self.file_preview_dialog.get_visible():
self.file_preview_dialog.close()
def attach_file(self, file_path, file_type):
logger.debug(f"Attaching file: {file_path}")
file_name = self.generate_numbered_name(os.path.basename(file_path), self.attachments.keys())
content = self.get_content_of_file(file_path, file_type)
if content:
button_content = Adw.ButtonContent(
label=file_name,
icon_name={
"image": "image-x-generic-symbolic",
"plain_text": "document-text-symbolic",
"pdf": "document-text-symbolic",
"youtube": "play-symbolic",
"website": "globe-symbolic"
}[file_type]
)
button = Gtk.Button(
vexpand=True,
valign=3,
name=file_name,
css_classes=["flat"],
tooltip_text=file_name,
child=button_content
)
self.attachments[file_name] = {"path": file_path, "type": file_type, "content": content, "button": button}
button.connect("clicked", lambda button : self.preview_file(file_path, file_type, file_name))
self.attachment_container.append(button)
self.attachment_box.set_visible(True)
def chat_actions(self, action, user_data):
chat_row = self.selected_chat_row
chat_name = chat_row.label.get_label()
action_name = action.get_name()
if action_name in ('delete_chat', 'delete_current_chat'):
dialog_widget.simple(
_('Delete Chat?'),
_("Are you sure you want to delete '{}'?").format(chat_name),
lambda chat_name=chat_name, *_: self.chat_list_box.delete_chat(chat_name),
_('Delete'),
'destructive'
)
elif action_name in ('duplicate_chat', 'duplicate_current_chat'):
self.chat_list_box.duplicate_chat(chat_name)
elif action_name in ('rename_chat', 'rename_current_chat'):
dialog_widget.simple_entry(
_('Rename Chat?'),
_("Renaming '{}'").format(chat_name),
lambda new_chat_name, old_chat_name=chat_name, *_: self.chat_list_box.rename_chat(old_chat_name, new_chat_name),
{'placeholder': _('Chat name')},
_('Rename')
)
elif action_name in ('export_chat', 'export_current_chat'):
self.chat_list_box.export_chat(chat_name)
def current_chat_actions(self, action, user_data):
self.selected_chat_row = self.chat_list_box.get_selected_row()
self.chat_actions(action, user_data)
def youtube_detected(self, video_url):
try:
tries=0
while True:
try:
yt = YouTube(video_url)
video_title = yt.title
break
except Exception as e:
tries+=1
if tries == 4:
raise Exception(e)
transcriptions = generic_actions.get_youtube_transcripts(yt.video_id)
if len(transcriptions) == 0:
self.show_toast(_("This video does not have any transcriptions"), self.main_overlay)
return
if not any(filter(lambda x: '(en' in x and 'auto-generated' not in x and len(transcriptions) > 1, transcriptions)):
transcriptions.insert(1, 'English (translate:en)')
dialog_widget.simple_dropdown(
_('Attach YouTube Video?'),
_('{}\n\nPlease select a transcript to include').format(video_title),
lambda caption_name, yt=yt, video_url=video_url: generic_actions.attach_youtube(yt.title, yt.author, yt.watch_url, video_url, yt.video_id, caption_name),
transcriptions
)
except Exception as e:
logger.error(e)
self.show_toast(_("Error attaching video, please try again"), self.main_overlay)
def cb_text_received(self, clipboard, result):
try:
text = clipboard.read_text_finish(result)
#Check if text is a Youtube URL
youtube_regex = re.compile(
r'(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/'
r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})')
url_regex = re.compile(
r'http[s]?://'
r'(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|'
r'(?:%[0-9a-fA-F][0-9a-fA-F]))+'
r'(?:\\:[0-9]{1,5})?'
r'(?:/[^\\s]*)?'
)
if youtube_regex.match(text):
self.youtube_detected(text)
elif url_regex.match(text):
dialog_widget.simple(
_('Attach Website? (Experimental)'),
_("Are you sure you want to attach\n'{}'?").format(text),
lambda url=text: generic_actions.attach_website(url)
)
except Exception as e:
logger.error(e)
def cb_image_received(self, clipboard, result):
try:
texture = clipboard.read_texture_finish(result)
if texture:
if self.model_manager.verify_if_image_can_be_used():
pixbuf = Gdk.pixbuf_get_from_texture(texture)
if not os.path.exists(os.path.join(cache_dir, 'tmp/images/')):
os.makedirs(os.path.join(cache_dir, 'tmp/images/'))
image_name = self.generate_numbered_name('image.png', os.listdir(os.path.join(cache_dir, os.path.join(cache_dir, 'tmp/images'))))
pixbuf.savev(os.path.join(cache_dir, 'tmp/images/{}'.format(image_name)), "png", [], [])
self.attach_file(os.path.join(cache_dir, 'tmp/images/{}'.format(image_name)), 'image')
else:
self.show_toast(_("Image recognition is only available on specific models"), self.main_overlay)
except Exception as e:
pass
def handle_enter_key(self):
self.send_message()
return True
def on_file_drop(self, drop_target, value, x, y):
files = value.get_files()
for file in files:
extension = os.path.splitext(file.get_path())[1][1:]
if extension in ('png', 'jpeg', 'jpg', 'webp', 'gif'):
self.attach_file(file.get_path(), 'image')
elif extension in ('txt', 'md', 'html', 'css', 'js', 'py', 'java', 'json', 'xml'):
self.attach_file(file.get_path(), 'plain_text')
elif extension == 'pdf':
self.attach_file(file.get_path(), 'pdf')
def power_saver_toggled(self, monitor):
self.banner.set_revealed(monitor.get_power_saver_enabled() and self.powersaver_warning_switch.get_active())
def remote_switched(self, switch, state):
def local_instance_process():
sensitive_elements = [switch, self.tweaks_group, self.instance_page, self.send_button, self.attachment_button]
[element.set_sensitive(False) for element in sensitive_elements]
self.get_application().lookup_action('manage_models').set_enabled(False)
self.title_stack.set_visible_child_name('loading')
self.ollama_instance.remote = False
self.ollama_instance.start()
self.model_manager.update_local_list()
self.save_server_config()
[element.set_sensitive(True) for element in sensitive_elements]
self.get_application().lookup_action('manage_models').set_enabled(True)
self.title_stack.set_visible_child_name('model_selector' if len(self.model_manager.get_model_list()) > 0 else 'no_models')
if state:
options = {
_("Cancel"): {"callback": lambda *_: self.remote_connection_switch.set_active(False)},
_("Connect"): {"callback": lambda url, bearer: generic_actions.connect_remote(url, bearer), "appearance": "suggested"}
}
entries = [
{"text": self.ollama_instance.remote_url, "placeholder": _('Server URL')},
{"text": self.ollama_instance.bearer_token, "placeholder": _('Bearer Token (Optional)')}
]
dialog_widget.Entry(
_('Connect Remote Instance'),
_('Enter instance information to continue'),
list(options)[0],
options,
entries
)
elif self.ollama_instance.remote:
threading.Thread(target=local_instance_process).start()
def prepare_alpaca(self, local_port:int, remote_url:str, remote:bool, tweaks:dict, overrides:dict, bearer_token:str, idle_timer_delay:int, save:bool):
#Model Manager
self.model_manager = model_widget.model_manager_container()
self.model_scroller.set_child(self.model_manager)
#Chat History
self.load_history()
#Instance
self.ollama_instance = connection_handler.instance(local_port, remote_url, remote, tweaks, overrides, bearer_token, idle_timer_delay)
#Model Manager P.2
self.model_manager.update_available_list()
self.model_manager.update_local_list()
#User Preferences
for element in list(list(list(list(self.tweaks_group)[0])[1])[0]):
if element.get_name() in self.ollama_instance.tweaks:
element.set_value(self.ollama_instance.tweaks[element.get_name()])
for element in list(list(list(list(self.overrides_group)[0])[1])[0]):
if element.get_name() in self.ollama_instance.overrides:
element.set_text(self.ollama_instance.overrides[element.get_name()])
self.set_hide_on_close(self.background_switch.get_active())
self.instance_idle_timer.set_value(self.ollama_instance.idle_timer_delay)
self.remote_connection_switch.set_active(self.ollama_instance.remote)
self.remote_connection_switch.get_activatable_widget().connect('state-set', self.remote_switched)
#Save preferences
if save:
self.save_server_config()
self.send_button.set_sensitive(True)
self.attachment_button.set_sensitive(True)
self.remote_connection_switch.set_sensitive(True)
self.tweaks_group.set_sensitive(True)
self.instance_page.set_sensitive(True)
self.get_application().lookup_action('manage_models').set_enabled(True)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.message_searchbar.connect('notify::search-mode-enabled', lambda *_: self.message_search_button.set_active(self.message_searchbar.get_search_mode()))
message_widget.window = self
chat_widget.window = self
model_widget.window = self
dialog_widget.window = self
terminal_widget.window = self
generic_actions.window = self
connection_handler.window = self
drop_target = Gtk.DropTarget.new(Gdk.FileList, Gdk.DragAction.COPY)
drop_target.connect('drop', self.on_file_drop)
self.message_text_view.add_controller(drop_target)
self.chat_list_box = chat_widget.chat_list()
self.chat_list_container.set_child(self.chat_list_box)
GtkSource.init()
if not os.path.exists(os.path.join(data_dir, "chats")):
os.makedirs(os.path.join(data_dir, "chats"))
enter_key_controller = Gtk.EventControllerKey.new()
enter_key_controller.connect("key-pressed", lambda controller, keyval, keycode, state: self.handle_enter_key() if keyval==Gdk.KEY_Return and not (state & Gdk.ModifierType.SHIFT_MASK) else None)
self.message_text_view.add_controller(enter_key_controller)
self.set_help_overlay(self.shortcut_window)
self.get_application().set_accels_for_action("win.show-help-overlay", ['<primary>slash'])
universal_actions = {
'new_chat': [lambda *_: self.chat_list_box.new_chat(), ['<primary>n']],
'clear': [lambda *i: dialog_widget.simple(_('Clear Chat?'), _('Are you sure you want to clear the chat?'), self.chat_list_box.get_current_chat().clear_chat, _('Clear')), ['<primary>e']],
'import_chat': [lambda *_: self.chat_list_box.import_chat(), ['<primary>i']],
'create_model_from_existing': [lambda *i: dialog_widget.simple_dropdown(_('Select Model'), _('This model will be used as the base for the new model'), lambda model: self.create_model(model, False), [self.convert_model_name(model, 0) for model in self.model_manager.get_model_list()])],
'create_model_from_file': [lambda *i, file_filter=self.file_filter_gguf: dialog_widget.simple_file(file_filter, lambda file: self.create_model(file.get_path(), True))],
'create_model_from_name': [lambda *i: dialog_widget.simple_entry(_('Pull Model'), _('Input the name of the model in this format\nname:tag'), lambda model: threading.Thread(target=self.model_manager.pull_model, kwargs={"model_name": model}).start(), {'placeholder': 'llama3.2:latest'})],
'duplicate_chat': [self.chat_actions],
'duplicate_current_chat': [self.current_chat_actions],
'delete_chat': [self.chat_actions],
'delete_current_chat': [self.current_chat_actions],
'rename_chat': [self.chat_actions],
'rename_current_chat': [self.current_chat_actions, ['F2']],
'export_chat': [self.chat_actions],
'export_current_chat': [self.current_chat_actions],
'toggle_sidebar': [lambda *_: self.split_view_overlay.set_show_sidebar(not self.split_view_overlay.get_show_sidebar()), ['F9']],
'manage_models': [lambda *_: self.manage_models_dialog.present(self), ['<primary>m']],
'search_messages': [lambda *_: self.message_searchbar.set_search_mode(not self.message_searchbar.get_search_mode()), ['<primary>f']]
}
for action_name, data in universal_actions.items():
self.get_application().create_action(action_name, data[0], data[1] if len(data) > 1 else None)
self.get_application().lookup_action('manage_models').set_enabled(False)
self.remote_connection_switch.set_sensitive(False)
self.tweaks_group.set_sensitive(False)
self.instance_page.set_sensitive(False)
self.file_preview_remove_button.connect('clicked', lambda button : dialog_widget.simple(_('Remove Attachment?'), _("Are you sure you want to remove attachment?"), lambda button=button: self.remove_attached_file(button.get_name()), _('Remove'), 'destructive'))
self.attachment_button.connect("clicked", lambda button, file_filter=self.file_filter_attachments: dialog_widget.simple_file(file_filter, generic_actions.attach_file))
self.create_model_name.get_delegate().connect("insert-text", lambda *_: self.check_alphanumeric(*_, ['-', '.', '_']))
self.set_focus(self.message_text_view)
if os.path.exists(os.path.join(config_dir, "server.json")):
try:
with open(os.path.join(config_dir, "server.json"), "r", encoding="utf-8") as f:
data = json.load(f)
self.background_switch.set_active(data['run_on_background'])
if 'idle_timer' not in data:
data['idle_timer'] = 0
if 'powersaver_warning' not in data:
data['powersaver_warning'] = True
self.powersaver_warning_switch.set_active(data['powersaver_warning'])
threading.Thread(target=self.prepare_alpaca, args=(data['local_port'], data['remote_url'], data['run_remote'], data['model_tweaks'], data['ollama_overrides'], data['remote_bearer_token'], round(data['idle_timer']), False)).start()
except Exception as e:
logger.error(e)
threading.Thread(target=self.prepare_alpaca, args=(11435, '', False, {'temperature': 0.7, 'seed': 0, 'keep_alive': 5}, {}, '', 0, True)).start()
self.powersaver_warning_switch.set_active(True)
else:
if shutil.which('ollama'):
threading.Thread(target=self.prepare_alpaca, args=(11435, '', False, {'temperature': 0.7, 'seed': 0, 'keep_alive': 5}, {}, '', 0, True)).start()
else:
threading.Thread(target=self.prepare_alpaca, args=(11435, 'http://0.0.0.0:11434', True, {'temperature': 0.7, 'seed': 0, 'keep_alive': 5}, {}, '', 0, True)).start()
self.welcome_dialog.present(self)
if self.powersaver_warning_switch.get_active():
self.banner.set_revealed(Gio.PowerProfileMonitor.dup_default().get_power_saver_enabled())
Gio.PowerProfileMonitor.dup_default().connect("notify::power-saver-enabled", lambda monitor, *_: self.power_saver_toggled(monitor))
self.banner.connect('button-clicked', lambda *_: self.banner.set_revealed(False))