Compare commits

...

5 Commits

Author SHA1 Message Date
hueso
5520afe7ba frfrfr 2025-03-06 21:01:18 -03:00
hueso
4a136da7f6 frfr 2025-03-06 20:59:30 -03:00
hueso
ff696ef92a no omemo fr 2025-03-06 20:57:44 -03:00
hueso
a43f5a9ae2 no omemo 2025-03-06 20:55:17 -03:00
hueso
193aef1d0b satoshi 2025-03-06 20:19:14 -03:00
3 changed files with 40 additions and 145 deletions

View File

@ -8,6 +8,7 @@ pytest
pytest-cov
pytest-mypy-plugins
result
slixmpp
slixmpp_omemo
slixmpp==1.8.6
slixmpp_omemo==0.9.1
twine
ollama

View File

@ -5,8 +5,6 @@ import logging
from getpass import getpass
from argparse import ArgumentParser
import slixmpp_omemo
from ollama_bot import OllamaBot
log = logging.getLogger(__name__)
@ -33,34 +31,16 @@ if __name__ == "__main__":
)
parser.add_argument("-j", "--jid", dest="jid", help="JID to use")
parser.add_argument("-p", "--password", dest="password", help="password to use")
DATA_DIR = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"omemo",
)
parser.add_argument(
"--data-dir", dest="data_dir", help="data directory", default=DATA_DIR
)
args = parser.parse_args()
logging.basicConfig(level=args.loglevel, format="%(levelname)-8s %(message)s")
if args.jid is None:
args.jid = input("JID: ")
if args.password is None:
args.password = getpass("Password: ")
os.makedirs(args.data_dir, exist_ok=True)
xmpp = OllamaBot(args.jid, args.password)
xmpp.register_plugin("xep_0030") # Service Discovery
xmpp.register_plugin("xep_0199") # XMPP Ping
xmpp.register_plugin("xep_0380") # Explicit Message Encryption
try:
xmpp.register_plugin(
"xep_0384",
{
"data_dir": args.data_dir,
},
module=slixmpp_omemo,
)
except slixmpp_omemo.PluginCouldNotLoad:
log.exception("And error occured when loading the omemo plugin.")
sys.exit(1)
xmpp.connect()
xmpp.process()

View File

@ -2,7 +2,7 @@ from enum import Enum
from typing import Dict, Optional
import re
import ollama
from ollama import Client
from slixmpp import ClientXMPP, JID
from slixmpp.exceptions import IqTimeout, IqError
from slixmpp.stanza import Message
@ -10,14 +10,6 @@ from slixmpp.types import JidStr, MessageTypes
from slixmpp.xmlstream.handler import CoroutineCallback
from slixmpp.xmlstream.handler.coroutine_callback import CoroutineFunction
from slixmpp.xmlstream.matcher import MatchXPath
from slixmpp_omemo import (
EncryptionPrepareException,
MissingOwnKey,
NoAvailableSession,
UndecidedException,
UntrustedException,
)
from omemo.exceptions import MissingBundleException
class LEVELS(Enum):
@ -26,8 +18,8 @@ class LEVELS(Enum):
class LLMS(Enum):
LLAMA3 = "llama3"
MISTRAL = "mistral"
LLAMA3 = "based"
MISTRAL = "shreyshah/satoshi-7b-q4_k_m"
class OllamaBot(ClientXMPP):
@ -37,7 +29,7 @@ class OllamaBot(ClientXMPP):
def __init__(self, jid: JidStr, password: str):
ClientXMPP.__init__(self, jid, password)
self.model: LLMS = LLMS.LLAMA3
self.model: LLMS = LLMS.MISTRAL
self.prefix_re: re.Pattern = re.compile(r"^%s" % self.cmd_prefix)
self.cmd_re: re.Pattern = re.compile(
r"^%s(?P<command>\w+)(?:\s+(?P<args>.*))?" % self.cmd_prefix
@ -82,35 +74,35 @@ class OllamaBot(ClientXMPP):
async def cmd_help(self, mto: JID, mtype: Optional[MessageTypes]) -> None:
body = (
"Hello, I am the ollama_slixmpp_omemo_bot!\n\n"
"The following commands are available:\n\n"
f"{self.cmd_prefix}verbose - Send message or reply with log messages.\n\n"
f"{self.cmd_prefix}error -Send message or reply only on error.\n\n"
f"{self.cmd_prefix}llama3 - Enable the llama3 model.\n\n"
f"{self.cmd_prefix}mistral - Enable the mistral model.\n\n"
f"Typing anything else will be sent to {self.model.value}!\n\n"
"Hello, I am Satoshi Nakamoto!\n\n"
# "The following commands are available:\n\n"
# f"{self.cmd_prefix}verbose - Send message or reply with log messages.\n\n"
# f"{self.cmd_prefix}error -Send message or reply only on error.\n\n"
# f"{self.cmd_prefix}llama3 - Enable the llama3 model.\n\n"
# f"{self.cmd_prefix}mistral - Enable the mistral model.\n\n"
# f"Typing anything else will be sent to {self.model.value}!\n\n"
)
return await self.encrypted_reply(mto, mtype, body)
return await self.plain_reply(mto, mtype, body)
async def cmd_set_llama3(self, mto: JID, mtype: Optional[MessageTypes]) -> None:
self.model = LLMS.LLAMA3
body: str = f"""Model set to {LLMS.LLAMA3.value}"""
return await self.encrypted_reply(mto, mtype, body)
return await self.plain_reply(mto, mtype, body)
async def cmd_set_mistral(self, mto: JID, mtype: Optional[MessageTypes]) -> None:
self.model = LLMS.MISTRAL
body: str = f"""Model set to {LLMS.MISTRAL.value}"""
return await self.encrypted_reply(mto, mtype, body)
return await self.plain_reply(mto, mtype, body)
async def cmd_verbose(self, mto: JID, mtype: Optional[MessageTypes]) -> None:
self.debug_level = LEVELS.DEBUG
body: str = """Debug level set to 'verbose'."""
return await self.encrypted_reply(mto, mtype, body)
return await self.plain_reply(mto, mtype, body)
async def cmd_error(self, mto: JID, mtype: Optional[MessageTypes]) -> None:
self.debug_level = LEVELS.ERROR
body: str = """Debug level set to 'error'."""
return await self.encrypted_reply(mto, mtype, body)
return await self.plain_reply(mto, mtype, body)
async def message_handler(
self, msg: Message, allow_untrusted: bool = False
@ -120,114 +112,36 @@ class OllamaBot(ClientXMPP):
mtype: Optional[MessageTypes] = msg["type"]
if mtype not in ("chat", "normal"):
return None
if not self["xep_0384"].is_encrypted(msg):
if self.debug_level == LEVELS.DEBUG:
await self.plain_reply(
mto, mtype, f"Echo unencrypted message: {msg['body']}"
)
if not msg["body"]:
return None
try:
encrypted = msg["omemo_encrypted"]
body: Optional[bytes] = await self["xep_0384"].decrypt_message(
encrypted, mfrom, allow_untrusted
)
if body is not None:
decoded: str = body.decode("utf8")
if self.is_command(decoded):
await self.handle_command(mto, mtype, decoded)
elif self.debug_level == LEVELS.DEBUG:
ollama_server_response: Optional[str] = (
self.message_to_ollama_server(decoded)
)
await self.encrypted_reply(
mto, mtype, f"{ollama_server_response or ''}"
)
except MissingOwnKey:
await self.plain_reply(
mto,
mtype,
"Error: Message not encrypted for me.",
)
except NoAvailableSession:
await self.encrypted_reply(
mto,
mtype,
"Error: Message uses an encrypted session I don't know about.",
)
except (UndecidedException, UntrustedException) as exn:
await self.plain_reply(
mto,
mtype,
(
f"WARNING: Your device '{exn.device}' is not in my trusted devices."
f"Allowing untrusted..."
),
)
await self.message_handler(msg, allow_untrusted=True)
except EncryptionPrepareException:
await self.plain_reply(
mto, mtype, "Error: I was not able to decrypt the message."
)
except Exception as exn:
await self.plain_reply(
mto,
mtype,
"Error: Exception occured while attempting decryption.\n%r" % exn,
)
raise
log = open("log.txt", "a", 1)
if msg['body'] is not None:
log.write(f"{mfrom}: {msg['body']}\n")
if self.is_command(msg['body']):
await self.handle_command(mto, mtype, msg['body'])
elif self.debug_level == LEVELS.DEBUG:
ollama_server_response: Optional[str] = (
self.message_to_ollama_server(msg['body'])
)
await self.plain_reply(
mto, mtype, f"{ollama_server_response or ''}"
)
return None
async def plain_reply(self, mto: JID, mtype: Optional[MessageTypes], body):
log = open("log.txt", "a", 1)
log.write(f"SATOSHI: {body}\n")
msg = self.make_message(mto=mto, mtype=mtype)
msg["body"] = body
return msg.send()
async def encrypted_reply(self, mto: JID, mtype: Optional[MessageTypes], body):
msg = self.make_message(mto=mto, mtype=mtype)
msg["eme"]["namespace"] = self.eme_ns
msg["eme"]["name"] = self["xep_0380"].mechanisms[self.eme_ns]
expect_problems: Optional[dict[JID, list[int]]] = {}
while True:
try:
recipients = [mto]
encrypt = await self["xep_0384"].encrypt_message(
body, recipients, expect_problems
)
msg.append(encrypt)
return msg.send()
except UndecidedException as exn:
await self["xep_0384"].trust(exn.bare_jid, exn.device, exn.ik)
except EncryptionPrepareException as exn:
for error in exn.errors:
if isinstance(error, MissingBundleException):
await self.plain_reply(
mto,
mtype,
f'Could not find keys for device "{error.device}"'
f' of recipient "{error.bare_jid}". Skipping.',
)
jid: JID = JID(error.bare_jid)
device_list = expect_problems.setdefault(jid, [])
device_list.append(error.device)
except (IqError, IqTimeout) as exn:
await self.plain_reply(
mto,
mtype,
"An error occured while fetching information on a recipient.\n%r"
% exn,
)
return None
except Exception as exn:
await self.plain_reply(
mto,
mtype,
"An error occured while attempting to encrypt.\n%r" % exn,
)
raise
def message_to_ollama_server(self, msg: Optional[str]) -> Optional[str]:
if msg is not None:
response = ollama.chat(
client = Client(
host="https://solarpunk.land/ollama/",
headers={"Authorization": "Bearer xxx"},
)
response = client.chat(
model=self.model.value,
messages=[{"role": "user", "content": f"{msg}"}],
)