Compare commits

..

10 Commits

Author SHA1 Message Date
99b9680ef2 Update garfbot.sh 2024-10-17 17:24:34 +00:00
ff812f0afe Update README.md 2024-10-05 18:23:31 +00:00
6fb9268bc3 Update README.md 2024-10-05 18:22:37 +00:00
0d849f71c2 Merge branch 'refactor' into 'main'
Refactor

See merge request crate/garfbot!3
2024-10-05 16:50:19 +00:00
b9a7dd298c Refactor 2024-10-05 16:50:19 +00:00
47da73aec6 small changes 2024-10-03 03:06:36 +00:00
2a367c321e small changes 2024-10-03 01:46:17 +00:00
7b00e844fd image change 2024-10-03 01:44:19 +00:00
108471edfd delete image locally after send 2024-10-03 01:10:10 +00:00
de27f5bb62 forgot to await garfbot 2024-10-02 02:33:03 +00:00
12 changed files with 468 additions and 448 deletions

8
.gitignore vendored
View File

@ -1,6 +1,8 @@
config.py config.py
/__pycache__/* __pycache__/
/images/* garfpy/__pycache__/
*.venv*
*.json *.json
*.old *.old
*.log* *.log*
meows.py

View File

@ -13,4 +13,4 @@ RUN pip3 install openai
RUN pip3 install aiohttp RUN pip3 install aiohttp
RUN pip3 install requests RUN pip3 install requests
CMD [ "python", "garfbot.py" ] CMD [ "python", "garfmain.py" ]

View File

@ -1,27 +1,19 @@
GarfBot is a discord bot that uses openai generative pre-trained AI models to produce text and images for your personal entertainment and companionship. GarfBot is a discord bot that uses openai generative pre-trained AI models to produce text and images for your personal entertainment and companionship.
JonBot is his owner and MoneyBot is his eccentric friend who plays the classic F2P video game Planetside 2.
To interact: (not case-sensitive) To interact: (not case-sensitive)
`"hey garfield"` `"hey garfield {prompt}"`
responds with text. responds with text.
`"garfpic"` `"garfpic {prompt}"`
responds with image. responds with image.
`"hey jon"`
responds with text.
`"hey money"`
responds with text.
To get started, clone this repo and create a `config.py` file in GarfBot's root directory. Open your favorite text editor or IDE and add your various API tokens as such: To get started, clone this repo and create a `config.py` file in GarfBot's root directory. Open your favorite text editor or IDE and add your various API tokens as such:
```python ```python
GARFBOT_TOKEN = "token" GARFBOT_TOKEN = "token"
JONBOT_TOKEN = "token"
MONEYBOT_TOKEN = "token"
OPENAI_TOKEN = "token" OPENAI_TOKEN = "token"
GIF_TOKEN = "token" GIF_TOKEN = "token"
``` ```

View File

@ -1,433 +0,0 @@
import os
import json
import config
import random
import openai
import logging
import aiohttp
import asyncio
import discord
import requests
import ipaddress
import subprocess
from base64 import b64encode
from openai import AsyncOpenAI
from datetime import datetime
from collections import defaultdict
from operator import itemgetter
from logging.handlers import TimedRotatingFileHandler
# Log setup
logger = logging.getLogger('garflog')
logger.setLevel(logging.INFO)
file_handler = TimedRotatingFileHandler(
'garfbot.log',
when='midnight',
interval=1,
backupCount=7,
delay=True # Counterintuitively, will flush output immediately
)
console_handler = logging.StreamHandler()
formatter=logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
# Bot Setup
openaikey = config.OPENAI_TOKEN
gapikey = config.GIF_TOKEN
garfkey = config.GARFBOT_TOKEN
txtmodel = "gpt-4o-mini"
imgmodel = "dall-e-3"
intents = discord.Intents.default()
intents.members = True
intents.messages = True
intents.message_content = True
garfbot = discord.Client(intents=intents)
# Network Utils Setup
def is_private(target):
try:
ip_obj = ipaddress.ip_address(target)
if ip_obj.is_private:
return True
except ValueError:
if "crate.lan" in target.lower():
return True
if "crate.zip" in target.lower():
return True
if "memtec.org" in target.lower():
return True
if "crateit.net" in target.lower():
return True
if "garfbot.art" in target.lower():
return True
return False
# Kroger Setup
client_id = config.CLIENT_ID
client_secret = config.CLIENT_SECRET
auth = b64encode(f"{client_id}:{client_secret}".encode()).decode()
def kroger_token():
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': f'Basic {auth}'
}
response = requests.post('https://api.kroger.com/v1/connect/oauth2/token', headers=headers, data={
'grant_type': 'client_credentials',
'scope': 'product.compact'
})
response.raise_for_status()
return response.json()['access_token']
def find_store(zipcode, kroken):
headers = {
'Authorization': f'Bearer {kroken}',
}
params = {
'filter.zipCode.near': zipcode,
'filter.limit': 1,
}
response = requests.get('https://api.kroger.com/v1/locations', headers=headers, params=params)
return response.json()
def search_product(product, loc_id, kroken):
headers = {
'Authorization': f'Bearer {kroken}',
}
params = {
'filter.term': product,
'filter.locationId': loc_id,
'filter.limit': 10
}
response = requests.get('https://api.kroger.com/v1/products', headers=headers, params=params)
return response.json()
# Meows Json Handling
meows_file = "meow_counts.json"
stats_file = "user_stats.json"
def json_load(file_path, default):
if os.path.isfile(file_path):
with open(file_path, "r") as f:
return json.load(f)
else:
return default
meow_counts = defaultdict(int, json_load(meows_file, {}))
user_stats = json_load(stats_file, {})
# GarfBot Ready!
@garfbot.event
async def on_ready():
asyncio.create_task(process_image_requests()) # Important!
logger.info(f"Logged in as {garfbot.user.name} running {txtmodel} and {imgmodel}.")
print(f"Logged in as {garfbot.user.name} running {txtmodel} and {imgmodel}.", flush=True)
# GarfChats
async def generate_chat_response(question):
try:
client = AsyncOpenAI(api_key = openaikey)
response = await client.chat.completions.create(
model=txtmodel,
messages=[
{"role": "system", "content": "Pretend you are sarcastic Garfield."},
{"role": "user", "content": f"{question}"}
],
max_tokens=400
)
answer = response.choices[0].message.content
return answer.replace("an AI language model", "a cartoon animal")
except openai.BadRequestError as e:
return f"`GarfBot Error: {e}`"
except openai.APIError as e:
print(e, flush=True)
return f"`GarfBot Error: Monday`"
except Exception as e:
print(e, flush=True)
return f"`GarfBot Error: Lasagna`"
# GarfPics
async def generate_image(prompt):
try:
client = AsyncOpenAI(api_key = openaikey)
response = await client.images.generate(
model=imgmodel,
prompt=prompt,
n=1,
size="1024x1024"
)
image_url = response.data[0].url
return image_url
except openai.BadRequestError as e:
return f"`GarfBot Error: ({e.status_code}) - Your request was rejected as a result of our safety system.`"
except openai.InternalServerError as e:
logger.error(e)
print(e, flush=True)
return f"`GarfBot Error: ({e.status_code}) - Monday`"
except Exception as e:
logger.error(e)
print(e, flush=True)
return f"`GarfBot Error: Lasagna`"
image_request_queue = asyncio.Queue()
async def process_image_requests():
async with aiohttp.ClientSession() as session:
while True:
request = await image_request_queue.get()
message = request['message']
prompt = request['prompt']
image_url = await generate_image(prompt)
if "GarfBot Error" not in image_url:
async with session.get(image_url) as resp:
if resp.status == 200:
image_data = await resp.read()
timestamp = message.created_at.strftime('%Y%m%d%H%M%S')
save_filename = f"images/{timestamp}_generated_image.png"
send_filename = f"{timestamp}_generated_image.png" # There is probably a better way to do this.
with open(save_filename, "wb") as f:
f.write(image_data)
with open(save_filename, "rb") as f:
await message.channel.send(file=discord.File(f, send_filename))
else:
await message.channel.send("`GarfBot Error: Odie`")
else:
await message.channel.send(image_url)
image_request_queue.task_done()
await asyncio.sleep(2)
# Message Listener
@garfbot.event
async def on_message(message):
if message.author == garfbot.user:
return
if message.content.lower().startswith("hey garfield") or isinstance(message.channel, discord.DMChannel):
question = message.content[12:] if message.content.lower().startswith("hey garfield") else message.content
answer = await generate_chat_response(question)
await message.channel.send(answer)
if message.content.lower().startswith('garfpic '):
user = message.author.name
server = message.guild.name if message.guild else "Direct Message"
prompt = message.content[8:]
logger.info(f"Image Request - User: {user}, Server: {server}, Prompt: {prompt}")
print(f"Image Request - User: {user}, Server: {server}, Prompt: {prompt}", flush=True)
await message.channel.send(f"`Please wait... image generation queued: {prompt}`")
await image_request_queue.put({'message': message, 'prompt': prompt})
if message.content.lower() == "lasagna":
await send_gif(message, "garfield lasagna")
if message.content.lower() == "monday":
await send_gif(message, "garfield monday")
if message.content.lower().startswith("garfgif "):
search_term = message.content[8:]
await send_gif(message, search_term)
if message.content.lower().startswith("garfping "):
try:
query = message.content.split()
user = message.author.name
server = message.guild.name if message.guild else "Direct Message"
target = query[-1]
print(f"Ping Request - User: {user}, Server: {server}, Target: {target}", flush=True)
if is_private(target):
rejection = await generate_chat_response("Hey Garfield, explain to me why I am dumb for trying to hack your private computer network.")
await message.channel.send(rejection)
else:
result = subprocess.run(['ping', '-c', '4', target], capture_output=True, text=True)
await message.channel.send(f"`Ping result for {target}:`\n```\n{result.stdout}\n```")
except Exception as e:
await message.channel.send(f"`GarfBot Error: {str(e)}`")
if message.content.lower().startswith("garfdns "):
try:
query = message.content.split()
user = message.author.name
server = message.guild.name if message.guild else "Direct Message"
target = query[-1]
print(f"NSLookup Request - User: {user}, Server: {server}, Target: {target}", flush=True)
if is_private(target):
rejection = await generate_chat_response("Hey Garfield, explain to me why I am dumb for trying to hack your private computer network.")
await message.channel.send(rejection)
else:
result = subprocess.run(['nslookup', target], capture_output=True, text=True)
await message.channel.send(f"`NSLookup result for {target}:`\n```\n{result.stdout}\n```")
except Exception as e:
await message.channel.send(f"`GarfBot Error: {str(e)}`")
if message.content.lower().startswith("garfhack "):
try:
query = message.content.split()
user = message.author.name
server = message.guild.name if message.guild else "Direct Message"
target = query[-1]
print(f"Nmap Request - User: {user}, Server: {server}, Target: {target}", flush=True)
if is_private(target):
rejection = await generate_chat_response("Hey Garfield, explain to me why I am dumb for trying to hack your private computer network.")
await message.channel.send(rejection)
else:
await message.channel.send(f"`Scanning {target}...`")
result = subprocess.run(['nmap', '-Pn', '-O', '-v', target], capture_output=True, text=True)
await message.channel.send(f"`Ping result for {target}:`\n```\n{result.stdout}\n```")
except Exception as e:
await message.channel.send(f"`GarfBot Error: {str(e)}`")
# Kroger Shopping
if message.content.lower().startswith("garfshop "):
try:
kroken = kroger_token()
kroger_query = message.content.split()
product = " ".join(kroger_query[1:-1])
zipcode = kroger_query[-1]
loc_data = find_store(zipcode, kroken)
loc_id = loc_data['data'][0]['locationId']
store_name = loc_data['data'][0]['name']
product_query = search_product(product, loc_id, kroken)
products = product_query['data']
sorted_products = sorted(products, key=lambda item: item['items'][0]['price']['regular'])
response = f"Prices for `{product}` at `{store_name}` near `{zipcode}`:\n"
for item in sorted_products:
product_name = item['description']
price = item['items'][0]['price']['regular']
response += f"- `${price}`: {product_name} \n"
await message.channel.send(response)
except Exception as e:
await message.channel.send(f"`GarfBot Error: {str(e)}`")
# Army of Dawn Server only!!
if message.guild and message.guild.id == 719605634772893757:
if "meow" in message.content.lower():
logger.info(f"Meow detected! {message.author.name} said: {message.content}")
print(f"Meow detected! {message.author.name} said: {message.content}", flush=True)
meow_counts[str(message.author.id)] += 1
with open(meows_file, "w") as f:
json.dump(dict(meow_counts), f)
if message.content.lower() == "meowcount":
response = f"My records show that <@{message.author.id}> has meowed {meow_counts[str(message.author.id)]} time(s). Have a nice day."
await message.channel.send(response)
if message.content.lower() == "top meowers":
top_meowers = sorted(meow_counts.items(), key=itemgetter(1), reverse=True)[:10]
embed = discord.Embed(title="Top Meowers :cat:", color=0x000000)
for i, (user_id, meow_count) in enumerate(top_meowers):
user = await garfbot.fetch_user(int(user_id))
embed.add_field(name=f"{i+1}. {user.name}", value=f"{meow_count} meows", inline=False)
await message.channel.send(embed=embed)
if message.content.lower() == "checking in":
user_id = str(message.author.id)
if user_id in user_stats and user_stats[user_id]["check_in_time"] is not None:
await message.channel.send(f"{message.author.mention} You are already checked in. Please check out first.")
return
check_in_time = datetime.utcnow().timestamp()
if user_id not in user_stats:
user_stats[user_id] = {"check_ins": 0, "total_time": 0, "check_in_time": None}
user_stats[user_id]["check_in_time"] = check_in_time
await message.channel.send(f"{message.author.mention} You have been checked in. Please mute your microphone.")
elif message.content.lower() == "checking out":
user_id = str(message.author.id)
if user_id not in user_stats or user_stats[user_id]["check_in_time"] is None:
await message.channel.send(f"{message.author.mention} You have not checked in yet. Please check in first.")
return
check_out_time = datetime.utcnow().timestamp()
check_in_time = user_stats[user_id]["check_in_time"]
time_delta = check_out_time - check_in_time
user_stats[user_id]["check_ins"] += 1
user_stats[user_id]["total_time"] += time_delta
user_stats[user_id]["check_in_time"] = None
with open("user_stats.json", "w") as f:
json.dump(user_stats, f)
await message.channel.send(f"{message.author.mention} You have been checked out. Your session was {time_delta:.2f} seconds.")
elif message.content.lower() == "stats":
stats_embed = discord.Embed(title="User stats :trophy:", color=0x000000)
sorted_user_stats = sorted(user_stats.items(), key=lambda x: x[1]["total_time"], reverse=True)
table_rows = [["Name", "Check-ins", "Total Time"]]
for user_id, stats in sorted_user_stats:
if stats["check_in_time"] is None:
total_time_seconds = stats["total_time"]
hours, total_time_seconds = divmod(total_time_seconds, 3600)
minutes, total_time_seconds = divmod(total_time_seconds, 60)
seconds, fractions = divmod(total_time_seconds, 1)
fractions_str = f"{fractions:.3f}"[2:]
username = garfbot.get_user(int(user_id)).name
table_rows.append([username, str(stats["check_ins"]), f"{int(hours)}h {int(minutes)}m {int(seconds)}s {fractions_str}ms"])
else:
username = garfbot.get_user(int(user_id)).name
table_rows.append([username, "Currently checked in", "-"])
table_columns = list(zip(*table_rows[1:]))
table_fields = table_rows[0]
for field, values in zip(table_fields, table_columns):
stats_embed.add_field(name=field, value="\n".join(values), inline=True)
await message.channel.send(embed=stats_embed)
# GarfGifs
@garfbot.event
async def send_gif(message, search_term):
lmt = 50
ckey = "garfbot"
r = requests.get(f"https://tenor.googleapis.com/v2/search?q={search_term}&key={gapikey}&client_key={ckey}&limit={lmt}")
if r.status_code == 200:
top_50gifs = json.loads(r.content)
gif_url = random.choice(top_50gifs["results"])["itemurl"]
print(gif_url)
try:
await message.channel.send(gif_url)
except KeyError:
await message.channel.send("Oops, something went wrong.")
else:
await message.channel.send(f"`Oops, something went wrong. Error code: {r.status_code}`")
# discord.py Error Handling
@garfbot.event
async def on_error(event, *args, **kwargs):
logger.error(f'GarfBot Error: {event}')
print(f'GarfBot Error: {event}', flush=True)
# Run GarfBot!
async def garfbot_connect():
while True:
try:
garfbot.start(garfkey)
except Exception as e:
e = str(e)
logger.error(f"Garfbot couldn't connect! {e}")
print(f"Garfbot couldn't connect! {e}", flush=True)
await asyncio.sleep(300)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(garfbot_connect())

View File

@ -1 +1,3 @@
docker run -d --restart always -v $PWD:/usr/src/app --name garfbot garfbot #!/bin/bash
docker run -d --restart always -v $PWD:/usr/src/app --name garfbot garfbot

169
garfmain.py Normal file
View File

@ -0,0 +1,169 @@
import config
# import random
import asyncio
import discord
import subprocess
from garfpy import(
logger, is_private,
kroger_token, find_store, search_product,
picture_time, process_image_requests, generate_chat,
aod_message)
gapikey = config.GIF_TOKEN
garfkey = config.GARFBOT_TOKEN
txtmodel = config.TXT_MODEL
imgmodel = config.IMG_MODEL
intents = discord.Intents.default()
intents.members = True
intents.messages = True
intents.message_content = True
garfbot = discord.Client(intents=intents)
@garfbot.event
async def on_ready():
try:
asyncio.create_task(process_image_requests())
logger.info(f"Logged in as {garfbot.user.name} running {txtmodel} and {imgmodel}.")
except Exception as e:
logger.error(e)
@garfbot.event
async def on_message(message):
if message.author == garfbot.user:
return
if message.content.lower().startswith("hey garfield") or isinstance(message.channel, discord.DMChannel):
question = message.content[12:] if message.content.lower().startswith("hey garfield") else message.content
answer = await generate_chat(question)
await message.channel.send(answer)
if message.content.lower().startswith('garfpic '):
user = message.author.name
server = message.guild.name if message.guild else "Direct Message"
prompt = message.content[8:]
logger.info(f"Image Request - User: {user}, Server: {server}, Prompt: {prompt}")
await message.channel.send(f"`Please wait... image generation queued: {prompt}`")
await picture_time(message, prompt)
# if message.content.lower() == "lasagna":
# await send_gif(message, "garfield lasagna")
# if message.content.lower() == "monday":
# await send_gif(message, "garfield monday")
# if message.content.lower().startswith("garfgif "):
# search_term = message.content[8:]
# await send_gif(message, search_term)
if message.content.lower().startswith("garfping "):
try:
query = message.content.split()
user = message.author.name
server = message.guild.name if message.guild else "Direct Message"
target = query[-1]
logger.info(f"Ping Request - User: {user}, Server: {server}, Target: {target}")
if is_private(target):
rejection = await generate_chat("Hey Garfield, explain to me why I am dumb for trying to hack your private computer network.")
await message.channel.send(rejection)
else:
result = subprocess.run(['ping', '-c', '4', target], capture_output=True, text=True)
await message.channel.send(f"`Ping result for {target}:`\n```\n{result.stdout}\n```")
except Exception as e:
await message.channel.send(f"`GarfBot Error: {str(e)}`")
if message.content.lower().startswith("garfdns "):
try:
query = message.content.split()
user = message.author.name
server = message.guild.name if message.guild else "Direct Message"
target = query[-1]
logger.info(f"NSLookup Request - User: {user}, Server: {server}, Target: {target}")
if is_private(target):
rejection = await generate_chat("Hey Garfield, explain to me why I am dumb for trying to hack your private computer network.")
await message.channel.send(rejection)
else:
result = subprocess.run(['nslookup', target], capture_output=True, text=True)
await message.channel.send(f"`NSLookup result for {target}:`\n```\n{result.stdout}\n```")
except Exception as e:
await message.channel.send(f"`GarfBot Error: {str(e)}`")
if message.content.lower().startswith("garfhack "):
try:
query = message.content.split()
user = message.author.name
server = message.guild.name if message.guild else "Direct Message"
target = query[-1]
logger.info(f"Nmap Request - User: {user}, Server: {server}, Target: {target}")
if is_private(target):
rejection = await generate_chat("Hey Garfield, explain to me why I am dumb for trying to hack your private computer network.")
await message.channel.send(rejection)
else:
await message.channel.send(f"`Scanning {target}...`")
result = subprocess.run(['nmap', '-Pn', '-O', '-v', target], capture_output=True, text=True)
await message.channel.send(f"`Ping result for {target}:`\n```\n{result.stdout}\n```")
except Exception as e:
await message.channel.send(f"`GarfBot Error: {str(e)}`")
# Kroger Shopping
if message.content.lower().startswith("garfshop "):
try:
kroken = kroger_token()
kroger_query = message.content.split()
product = " ".join(kroger_query[1:-1])
zipcode = kroger_query[-1]
loc_data = find_store(zipcode, kroken)
loc_id = loc_data['data'][0]['locationId']
store_name = loc_data['data'][0]['name']
product_query = search_product(product, loc_id, kroken)
products = product_query['data']
sorted_products = sorted(products, key=lambda item: item['items'][0]['price']['regular'])
response = f"Prices for `{product}` at `{store_name}` near `{zipcode}`:\n"
for item in sorted_products:
product_name = item['description']
price = item['items'][0]['price']['regular']
response += f"- `${price}`: {product_name} \n"
await message.channel.send(response)
except Exception as e:
await message.channel.send(f"`GarfBot Error: {str(e)}`")
# Army of Dawn Server only!!
if message.guild and message.guild.id == 719605634772893757:
await aod_message(garfbot, message)
# # GarfGifs
# @garfbot.event
# async def send_gif(message, search_term):
# lmt = 50
# ckey = "garfbot"
# r = requests.get(f"https://tenor.googleapis.com/v2/search?q={search_term}&key={gapikey}&client_key={ckey}&limit={lmt}")
# if r.status_code == 200:
# top_50gifs = json.loads(r.content)
# gif_url = random.choice(top_50gifs["results"])["itemurl"]
# logger.info(gif_url)
# # logger.info(gif_url)
# try:
# await message.channel.send(gif_url)
# except KeyError:
# await message.channel.send("Oops, something went wrong.")
# else:
# await message.channel.send(f"`Oops, something went wrong. Error code: {r.status_code}`")
async def garfbot_connect():
while True:
try:
await garfbot.start(garfkey)
except Exception as e:
e = str(e)
logger.error(f"Garfbot couldn't connect! {e}")
await asyncio.sleep(60)
if __name__ == "__main__":
asyncio.run(garfbot_connect())

12
garfpy/__init__.py Normal file
View File

@ -0,0 +1,12 @@
# garfpy/__init__.py
from .log import logger
from .kroger import(
kroger_token, find_store, search_product
)
from .garfai import(
picture_time,
process_image_requests,
generate_chat
)
from .iputils import is_private
from .aod import aod_message

94
garfpy/aod.py Normal file
View File

@ -0,0 +1,94 @@
import os
import json
import discord
from garfpy import logger
from datetime import datetime
from operator import itemgetter
from collections import defaultdict
# Meows Json Handling
meows_file = "meow_counts.json"
stats_file = "user_stats.json"
def json_load(file_path, default):
if os.path.isfile(file_path):
with open(file_path, "r") as f:
return json.load(f)
else:
return default
meow_counts = defaultdict(int, json_load(meows_file, {}))
user_stats = json_load(stats_file, {})
async def aod_message(garfbot, message):
if "meow" in message.content.lower():
logger.info(f"Meow detected! {message.author.name} said: {message.content}")
meow_counts[str(message.author.id)] += 1
with open(meows_file, "w") as f:
json.dump(dict(meow_counts), f)
if message.content.lower() == "meowcount":
response = f"My records show that <@{message.author.id}> has meowed {meow_counts[str(message.author.id)]} time(s). Have a nice day."
await message.channel.send(response)
if message.content.lower() == "top meowers":
top_meowers = sorted(meow_counts.items(), key=itemgetter(1), reverse=True)[:10]
embed = discord.Embed(title="Top Meowers :cat:", color=0x000000)
for i, (user_id, meow_count) in enumerate(top_meowers):
user = await garfbot.fetch_user(int(user_id))
embed.add_field(name=f"{i+1}. {user.name}", value=f"{meow_count} meows", inline=False)
await message.channel.send(embed=embed)
if message.content.lower() == "checking in":
user_id = str(message.author.id)
if user_id in user_stats and user_stats[user_id]["check_in_time"] is not None:
await message.channel.send(f"{message.author.mention} You are already checked in. Please check out first.")
return
check_in_time = datetime.now().timestamp()
if user_id not in user_stats:
user_stats[user_id] = {"check_ins": 0, "total_time": 0, "check_in_time": None}
user_stats[user_id]["check_in_time"] = check_in_time
await message.channel.send(f"{message.author.mention} You have been checked in. Please mute your microphone.")
elif message.content.lower() == "checking out":
user_id = str(message.author.id)
if user_id not in user_stats or user_stats[user_id]["check_in_time"] is None:
await message.channel.send(f"{message.author.mention} You have not checked in yet. Please check in first.")
return
check_out_time = datetime.now().timestamp()
check_in_time = user_stats[user_id]["check_in_time"]
time_delta = check_out_time - check_in_time
user_stats[user_id]["check_ins"] += 1
user_stats[user_id]["total_time"] += time_delta
user_stats[user_id]["check_in_time"] = None
with open("user_stats.json", "w") as f:
json.dump(user_stats, f)
await message.channel.send(f"{message.author.mention} You have been checked out. Your session was {time_delta:.2f} seconds.")
elif message.content.lower() == "stats":
stats_embed = discord.Embed(title="User stats :trophy:", color=0x000000)
sorted_user_stats = sorted(user_stats.items(), key=lambda x: x[1]["total_time"], reverse=True)
table_rows = [["Name", "Check-ins", "Total Time"]]
for user_id, stats in sorted_user_stats:
if stats["check_in_time"] is None:
total_time_seconds = stats["total_time"]
hours, total_time_seconds = divmod(total_time_seconds, 3600)
minutes, total_time_seconds = divmod(total_time_seconds, 60)
seconds, fractions = divmod(total_time_seconds, 1)
fractions_str = f"{fractions:.3f}"[2:]
username = garfbot.get_user(int(user_id)).name
table_rows.append([username, str(stats["check_ins"]), f"{int(hours)}h {int(minutes)}m {int(seconds)}s {fractions_str}ms"])
else:
username = garfbot.get_user(int(user_id)).name
table_rows.append([username, "Currently checked in", "-"])
table_columns = list(zip(*table_rows[1:]))
table_fields = table_rows[0]
for field, values in zip(table_fields, table_columns):
stats_embed.add_field(name=field, value="\n".join(values), inline=True)
await message.channel.send(embed=stats_embed)

90
garfpy/garfai.py Normal file
View File

@ -0,0 +1,90 @@
import io
import openai
import config
import aiohttp
import asyncio
import discord
from openai import AsyncOpenAI
from garfpy import logger
openaikey = config.OPENAI_TOKEN
txtmodel = config.TXT_MODEL
imgmodel = config.IMG_MODEL
# GarfPics
image_request_queue = asyncio.Queue()
async def picture_time(message, prompt):
await image_request_queue.put({'message': message, 'prompt': prompt})
async def generate_image(prompt):
try:
client = AsyncOpenAI(api_key = openaikey)
response = await client.images.generate(
model=imgmodel,
prompt=prompt,
n=1,
size="1024x1024"
)
image_url = response.data[0].url
return image_url
except openai.BadRequestError as e:
return f"`GarfBot Error: ({e.status_code}) - Your request was rejected as a result of our safety system.`"
except openai.InternalServerError as e:
logger.error(e)
return f"`GarfBot Error: ({e.status_code}) - Monday`"
except Exception as e:
logger.error(e)
return f"`GarfBot Error: Lasagna`"
async def process_image_requests():
async with aiohttp.ClientSession() as session:
while True:
request = await image_request_queue.get()
message = request['message']
prompt = request['prompt']
image_url = await generate_image(prompt)
if "GarfBot Error" not in image_url:
logger.info("Downloading & sending image...")
async with session.get(image_url) as resp:
if resp.status == 200:
image_data = await resp.read()
ram_image = io.BytesIO(image_data)
ram_image.seek(0)
timestamp = message.created_at.strftime('%Y%m%d%H%M%S')
filename = f"{timestamp}_generated_image.png"
sendfile = discord.File(fp=ram_image, filename=filename)
try:
await message.channel.send(file=sendfile)
except Exception as e:
logger.error(e)
else:
await message.channel.send("`GarfBot Error: Odie`")
else:
await message.channel.send(image_url)
image_request_queue.task_done()
await asyncio.sleep(2)
# GarfChats
async def generate_chat(question):
try:
client = AsyncOpenAI(api_key = openaikey)
response = await client.chat.completions.create(
model=txtmodel,
messages=[
{"role": "system", "content": "Pretend you are sarcastic Garfield."},
{"role": "user", "content": f"{question}"}
],
max_tokens=400
)
answer = response.choices[0].message.content
return answer.replace("an AI language model", "a cartoon animal")
except openai.BadRequestError as e:
return f"`GarfBot Error: {e}`"
except openai.APIError as e:
logger.info(e, flush=True)
return f"`GarfBot Error: Monday`"
except Exception as e:
logger.info(e, flush=True)
return f"`GarfBot Error: Lasagna`"

19
garfpy/iputils.py Normal file
View File

@ -0,0 +1,19 @@
import ipaddress
def is_private(target):
try:
ip_obj = ipaddress.ip_address(target)
if ip_obj.is_private:
return True
except ValueError:
if "crate.lan" in target.lower():
return True
if "crate.zip" in target.lower():
return True
if "memtec.org" in target.lower():
return True
if "crateit.net" in target.lower():
return True
if "garfbot.art" in target.lower():
return True
return False

48
garfpy/kroger.py Normal file
View File

@ -0,0 +1,48 @@
import config
import requests
from base64 import b64encode
from garfpy import logger
client_id = config.CLIENT_ID
client_secret = config.CLIENT_SECRET
auth = b64encode(f"{client_id}:{client_secret}".encode()).decode()
def kroger_token():
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': f'Basic {auth}'
}
response = requests.post('https://api.kroger.com/v1/connect/oauth2/token', headers=headers, data={
'grant_type': 'client_credentials',
'scope': 'product.compact'
})
response.raise_for_status()
return response.json()['access_token']
def find_store(zipcode, kroken):
headers = {
'Authorization': f'Bearer {kroken}',
}
params = {
'filter.zipCode.near': zipcode,
'filter.limit': 1,
}
response = requests.get('https://api.kroger.com/v1/locations', headers=headers, params=params)
return response.json()
def search_product(product, loc_id, kroken):
logger.info(f"Searching for {product}...")
headers = {
'Authorization': f'Bearer {kroken}',
}
params = {
'filter.term': product,
'filter.locationId': loc_id,
'filter.limit': 10
}
response = requests.get('https://api.kroger.com/v1/products', headers=headers, params=params)
return response.json()

25
garfpy/log.py Normal file
View File

@ -0,0 +1,25 @@
import logging
from logging.handlers import TimedRotatingFileHandler
logger = logging.getLogger('garflog')
logger.setLevel(logging.INFO)
formatter=logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler = TimedRotatingFileHandler(
'garfbot.log',
when='midnight',
interval=1,
backupCount=7,
delay=True # Counter-intuitively, this will flush output immediately
)
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
if not logger.hasHandlers():
logger.addHandler(file_handler)
logger.addHandler(console_handler)