Alpaca/src/dialogs.py
2024-09-02 16:04:17 -06:00

415 lines
16 KiB
Python

# dialogs.py
"""
Handles UI dialogs
"""
import os
import logging, requests, threading
from pytube import YouTube
from html2text import html2text
from gi.repository import Adw, Gtk
logger = logging.getLogger(__name__)
# CLEAR CHAT | WORKS
def clear_chat_response(self, dialog, task):
if dialog.choose_finish(task) == "clear":
self.chat_list_box.get_current_chat().clear_chat()
def clear_chat(self):
if self.bot_message is not None:
self.show_toast(_("Chat cannot be cleared while receiving a message"), self.main_overlay)
return
dialog = Adw.AlertDialog(
heading=_("Clear Chat?"),
body=_("Are you sure you want to clear the chat?"),
close_response="cancel"
)
dialog.add_response("cancel", _("Cancel"))
dialog.add_response("clear", _("Clear"))
dialog.set_response_appearance("clear", Adw.ResponseAppearance.DESTRUCTIVE)
dialog.set_default_response("clear")
dialog.choose(
parent = self,
cancellable = None,
callback = lambda dialog, task: clear_chat_response(self, dialog, task)
)
# DELETE CHAT | WORKS
def delete_chat_response(self, dialog, task, chat_name):
if dialog.choose_finish(task) == "delete":
self.chat_list_box.delete_chat(chat_name)
def delete_chat(self, chat_name):
dialog = Adw.AlertDialog(
heading=_("Delete Chat?"),
body=_("Are you sure you want to delete '{}'?").format(chat_name),
close_response="cancel"
)
dialog.add_response("cancel", _("Cancel"))
dialog.add_response("delete", _("Delete"))
dialog.set_response_appearance("delete", Adw.ResponseAppearance.DESTRUCTIVE)
dialog.set_default_response("delete")
dialog.choose(
parent = self,
cancellable = None,
callback = lambda dialog, task, chat_name=chat_name: delete_chat_response(self, dialog, task, chat_name)
)
# RENAME CHAT | WORKS
def rename_chat_response(self, dialog, task, old_chat_name, entry):
if not entry:
return
new_chat_name = entry.get_text()
if old_chat_name == new_chat_name:
return
if new_chat_name and (task is None or dialog.choose_finish(task) == "rename"):
self.chat_list_box.rename_chat(old_chat_name, new_chat_name)
def rename_chat(self, chat_name):
entry = Gtk.Entry()
dialog = Adw.AlertDialog(
heading=_("Rename Chat?"),
body=_("Renaming '{}'").format(chat_name),
extra_child=entry,
close_response="cancel"
)
dialog.add_response("cancel", _("Cancel"))
dialog.add_response("rename", _("Rename"))
dialog.set_response_appearance("rename", Adw.ResponseAppearance.SUGGESTED)
dialog.set_default_response("rename")
dialog.choose(
parent = self,
cancellable = None,
callback = lambda dialog, task, old_chat_name=chat_name, entry=entry: rename_chat_response(self, dialog, task, old_chat_name, entry)
)
# NEW CHAT | WORKS | UNUSED REASON: The 'Add Chat' button now creates a chat without a name AKA "New Chat"
def new_chat_response(self, dialog, task, entry):
chat_name = _("New Chat")
if entry is not None and entry.get_text() != "":
chat_name = entry.get_text()
if chat_name and (task is None or dialog.choose_finish(task) == "create"):
self.new_chat(chat_name)
def new_chat(self):
entry = Gtk.Entry()
dialog = Adw.AlertDialog(
heading=_("Create Chat?"),
body=_("Enter name for new chat"),
extra_child=entry,
close_response="cancel"
)
dialog.add_response("cancel", _("Cancel"))
dialog.add_response("create", _("Create"))
dialog.set_response_appearance("create", Adw.ResponseAppearance.SUGGESTED)
dialog.set_default_response("create")
dialog.choose(
parent = self,
cancellable = None,
callback = lambda dialog, task, entry=entry: new_chat_response(self, dialog, task, entry)
)
# STOP PULL MODEL | WORKS
def stop_pull_model_response(self, dialog, task, pulling_model):
if dialog.choose_finish(task) == "stop":
if len(list(pulling_model.get_parent())) == 1:
pulling_model.get_parent().set_visible(False)
pulling_model.get_parent().remove(pulling_model)
def stop_pull_model(self, pulling_model):
dialog = Adw.AlertDialog(
heading=_("Stop Download?"),
body=_("Are you sure you want to stop pulling '{}'?").format(self.convert_model_name(pulling_model.get_name(), 0)),
close_response="cancel"
)
dialog.add_response("cancel", _("Cancel"))
dialog.add_response("stop", _("Stop"))
dialog.set_response_appearance("stop", Adw.ResponseAppearance.DESTRUCTIVE)
dialog.set_default_response("stop")
dialog.choose(
parent = self.manage_models_dialog,
cancellable = None,
callback = lambda dialog, task, model=pulling_model: stop_pull_model_response(self, dialog, task, model)
)
# DELETE MODEL | WORKS
def delete_model_response(self, dialog, task, model_name):
if dialog.choose_finish(task) == "delete":
self.model_manager.remove_local_model(model_name)
def delete_model(self, model_name):
dialog = Adw.AlertDialog(
heading=_("Delete Model?"),
body=_("Are you sure you want to delete '{}'?").format(self.convert_model_name(model_name, 0)),
close_response="cancel"
)
dialog.add_response("cancel", _("Cancel"))
dialog.add_response("delete", _("Delete"))
dialog.set_response_appearance("delete", Adw.ResponseAppearance.DESTRUCTIVE)
dialog.set_default_response("delete")
dialog.choose(
parent = self.manage_models_dialog,
cancellable = None,
callback = lambda dialog, task, model_name = model_name: delete_model_response(self, dialog, task, model_name)
)
# REMOVE IMAGE | WORKS
def remove_attached_file_response(self, dialog, task, name):
if dialog.choose_finish(task) == 'remove':
self.file_preview_dialog.close()
self.remove_attached_file(name)
def remove_attached_file(self, name):
dialog = Adw.AlertDialog(
heading=_("Remove Attachment?"),
body=_("Are you sure you want to remove attachment?"),
close_response="cancel"
)
dialog.add_response("cancel", _("Cancel"))
dialog.add_response("remove", _("Remove"))
dialog.set_response_appearance("remove", Adw.ResponseAppearance.DESTRUCTIVE)
dialog.set_default_response("remove")
dialog.choose(
parent = self,
cancellable = None,
callback = lambda dialog, task, name=name: remove_attached_file_response(self, dialog, task, name)
)
# RECONNECT REMOTE | WORKS
def reconnect_remote_response(self, dialog, task, url_entry, bearer_entry):
response = dialog.choose_finish(task)
if not task or response == "remote":
self.ollama_instance.remote_url = url_entry.get_text()
self.ollama_instance.bearer_token = bearer_entry.get_text()
self.ollama_instance.remote = True
self.model_manager.update_local_list()
elif response == "local":
self.ollama_instance.remote = False
self.ollama_instance.start()
self.model_manager.update_local_list()
elif response == "close":
self.destroy()
def reconnect_remote(self):
entry_url = Gtk.Entry(
css_classes = ["error"],
text = self.ollama_instance.remote_url,
placeholder_text = "URL"
)
entry_bearer_token = Gtk.Entry(
css_classes = ["error"] if self.ollama_instance.bearer_token else None,
text = self.ollama_instance.bearer_token,
placeholder_text = "Bearer Token (Optional)"
)
container = Gtk.Box(
orientation = 1,
spacing = 10
)
container.append(entry_url)
container.append(entry_bearer_token)
dialog = Adw.AlertDialog(
heading=_("Connection Error"),
body=_("The remote instance has disconnected"),
extra_child=container
)
dialog.add_response("close", _("Close Alpaca"))
dialog.add_response("local", _("Use local instance"))
dialog.add_response("remote", _("Connect"))
dialog.set_response_appearance("remote", Adw.ResponseAppearance.SUGGESTED)
dialog.set_default_response("remote")
dialog.choose(
parent = self,
cancellable = None,
callback = lambda dialog, task, url_entry=entry_url, bearer_entry=entry_bearer_token: reconnect_remote_response(self, dialog, task, url_entry, bearer_entry)
)
# CREATE MODEL | WORKS
def create_model_from_existing_response(self, dialog, task, dropdown):
model = dropdown.get_selected_item().get_string()
if dialog.choose_finish(task) == 'accept' and model:
self.create_model(model, False)
def create_model_from_existing(self):
string_list = Gtk.StringList()
for model in self.model_manager.get_model_list():
string_list.append(self.convert_model_name(model, 0))
dropdown = Gtk.DropDown()
dropdown.set_model(string_list)
dialog = Adw.AlertDialog(
heading=_("Select Model"),
body=_("This model will be used as the base for the new model"),
extra_child=dropdown
)
dialog.add_response("cancel", _("Cancel"))
dialog.add_response("accept", _("Accept"))
dialog.set_response_appearance("accept", Adw.ResponseAppearance.SUGGESTED)
dialog.set_default_response("accept")
dialog.choose(
parent = self,
cancellable = None,
callback = lambda dialog, task, dropdown=dropdown: create_model_from_existing_response(self, dialog, task, dropdown)
)
def create_model_from_file_response(self, file_dialog, result):
try:
file = file_dialog.open_finish(result)
try:
self.create_model(file.get_path(), True)
except Exception as e:
logger.error(e)
self.show_toast(_("An error occurred while creating the model"), self.main_overlay)
except Exception as e:
logger.error(e)
def create_model_from_file(self):
file_dialog = Gtk.FileDialog(default_filter=self.file_filter_gguf)
file_dialog.open(self, None, lambda file_dialog, result: create_model_from_file_response(self, file_dialog, result))
def create_model_from_name_response(self, dialog, task, entry):
model = entry.get_text().lower().strip()
if dialog.choose_finish(task) == 'accept' and model:
threading.Thread(target=self.model_manager.pull_model, kwargs={"model_name": model}).start()
def create_model_from_name(self):
entry = Gtk.Entry()
entry.get_delegate().connect("insert-text", lambda *_ : self.check_alphanumeric(*_, ['-', '.', ':', '_', '/']))
dialog = Adw.AlertDialog(
heading=_("Pull Model"),
body=_("Input the name of the model in this format\nname:tag"),
extra_child=entry
)
dialog.add_response("cancel", _("Cancel"))
dialog.add_response("accept", _("Accept"))
dialog.set_response_appearance("accept", Adw.ResponseAppearance.SUGGESTED)
dialog.set_default_response("accept")
dialog.choose(
parent = self,
cancellable = None,
callback = lambda dialog, task, entry=entry: create_model_from_name_response(self, dialog, task, entry)
)
# FILE CHOOSER | WORKS
def attach_file_response(self, file_dialog, result):
file_types = {
"plain_text": ["txt", "md", "html", "css", "js", "py", "java", "json", "xml"],
"image": ["png", "jpeg", "jpg", "webp", "gif"],
"pdf": ["pdf"]
}
try:
file = file_dialog.open_finish(result)
except Exception as e:
logger.error(e)
return
extension = file.get_path().split(".")[-1]
file_type = next(key for key, value in file_types.items() if extension in value)
if not file_type:
return
if file_type == 'image' and not self.verify_if_image_can_be_used():
self.show_toast(_("Image recognition is only available on specific models"), self.main_overlay)
return
self.attach_file(file.get_path(), file_type)
def attach_file(self, file_filter):
file_dialog = Gtk.FileDialog(default_filter=file_filter)
file_dialog.open(self, None, lambda file_dialog, result: attach_file_response(self, file_dialog, result))
# YouTube caption | WORKS
def youtube_caption_response(self, dialog, task, video_url, caption_drop_down):
if dialog.choose_finish(task) == "accept":
buffer = self.message_text_view.get_buffer()
text = buffer.get_text(buffer.get_start_iter(), buffer.get_end_iter(), False).replace(video_url, "")
buffer.delete(buffer.get_start_iter(), buffer.get_end_iter())
buffer.insert(buffer.get_start_iter(), text, len(text))
yt = YouTube(video_url)
text = "{}\n{}\n{}\n\n".format(yt.title, yt.author, yt.watch_url)
selected_caption = caption_drop_down.get_selected_item().get_string()
for event in yt.captions[selected_caption.split('(')[-1][:-1]].json_captions['events']:
text += "{}\n".format(event['segs'][0]['utf8'].replace('\n', '\\n'))
if not os.path.exists(os.path.join(self.cache_dir, 'tmp/youtube')):
os.makedirs(os.path.join(self.cache_dir, 'tmp/youtube'))
file_path = os.path.join(os.path.join(self.cache_dir, 'tmp/youtube'), f'{yt.title} ({selected_caption.split(" (")[0]})')
with open(file_path, 'w+', encoding="utf-8") as f:
f.write(text)
self.attach_file(file_path, 'youtube')
def youtube_caption(self, video_url):
yt = YouTube(video_url)
video_title = yt.title
captions = yt.captions
if len(captions) == 0:
self.show_toast(_("This video does not have any transcriptions"), self.main_overlay)
return
caption_list = Gtk.StringList()
for caption in captions:
caption_list.append("{} ({})".format(caption.name.title(), caption.code))
caption_drop_down = Gtk.DropDown(
enable_search=len(captions) > 10,
model=caption_list
)
dialog = Adw.AlertDialog(
heading=_("Attach YouTube Video?"),
body=_("{}\n\nPlease select a transcript to include").format(video_title),
extra_child=caption_drop_down,
close_response="cancel"
)
dialog.add_response("cancel", _("Cancel"))
dialog.add_response("accept", _("Accept"))
dialog.set_response_appearance("accept", Adw.ResponseAppearance.SUGGESTED)
dialog.set_default_response("accept")
dialog.choose(
parent = self,
cancellable = None,
callback = lambda dialog, task, video_url = video_url, caption_drop_down = caption_drop_down: youtube_caption_response(self, dialog, task, video_url, caption_drop_down)
)
# Website extraction |
def attach_website_response(self, dialog, task, url):
if dialog.choose_finish(task) == "accept":
response = requests.get(url)
if response.status_code == 200:
html = response.text
md = html2text(html)
buffer = self.message_text_view.get_buffer()
textview_text = buffer.get_text(buffer.get_start_iter(), buffer.get_end_iter(), False).replace(url, "")
buffer.delete(buffer.get_start_iter(), buffer.get_end_iter())
buffer.insert(buffer.get_start_iter(), textview_text, len(textview_text))
if not os.path.exists('/tmp/alpaca/websites/'):
os.makedirs('/tmp/alpaca/websites/')
md_name = self.generate_numbered_name('website.md', os.listdir('/tmp/alpaca/websites'))
file_path = os.path.join('/tmp/alpaca/websites/', md_name)
with open(file_path, 'w+', encoding="utf-8") as f:
f.write('{}\n\n{}'.format(url, md))
self.attach_file(file_path, 'website')
else:
self.show_toast(_("An error occurred while extracting text from the website"), self.main_overlay)
def attach_website(self, url):
dialog = Adw.AlertDialog(
heading=_("Attach Website? (Experimental)"),
body=_("Are you sure you want to attach\n'{}'?").format(url),
close_response="cancel"
)
dialog.add_response("cancel", _("Cancel"))
dialog.add_response("accept", _("Accept"))
dialog.set_response_appearance("accept", Adw.ResponseAppearance.SUGGESTED)
dialog.set_default_response("accept")
dialog.choose(
parent = self,
cancellable = None,
callback = lambda dialog, task, url=url: attach_website_response(self, dialog, task, url)
)