iputils-refactor #2

Merged
crate merged 3 commits from iputils-refactor into main 2025-06-05 22:27:09 +00:00
9 changed files with 346 additions and 268 deletions
Showing only changes of commit 6b9c2b638a - Show all commits

View File

@ -2,10 +2,15 @@ import config
import asyncio import asyncio
import discord import discord
from garfpy import( from garfpy import (
logger, IPUtils, logger,
aod_message, generate_qr, IPUtils,
Kroger, GarfAI, GarfbotRespond) aod_message,
generate_qr,
Kroger,
GarfAI,
GarfbotRespond,
)
gapikey = config.GIF_TOKEN gapikey = config.GIF_TOKEN
@ -25,13 +30,14 @@ iputils = IPUtils()
kroger = Kroger() kroger = Kroger()
@garfbot.event @garfbot.event
async def on_ready(): async def on_ready():
try: try:
garf_respond.load_responses() garf_respond.load_responses()
asyncio.create_task(garfield.process_image_requests()) asyncio.create_task(garfield.process_image_requests())
logger.info(f"Logged in as {garfbot.user.name} running {txtmodel} and {imgmodel}.") logger.info(
f"Logged in as {garfbot.user.name} running {txtmodel} and {imgmodel}."
)
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
@ -40,7 +46,7 @@ async def on_ready():
async def on_message(message): async def on_message(message):
if message.author == garfbot.user: if message.author == garfbot.user:
return return
content = message.content.strip() content = message.content.strip()
lower = content.lower() lower = content.lower()
user_name = message.author.name user_name = message.author.name
@ -52,13 +58,13 @@ async def on_message(message):
await iputils.scan(message, user_name, guild_name, lower) await iputils.scan(message, user_name, guild_name, lower)
# Wikipedia # Wikipedia
if lower.startswith('garfwiki '): if lower.startswith("garfwiki "):
query = message.content[9:] query = message.content[9:]
summary = await garfield.wikisum(query) summary = await garfield.wikisum(query)
await message.channel.send(summary) await message.channel.send(summary)
# QR codes # QR codes
if lower.startswith('garfqr '): if lower.startswith("garfqr "):
text = message.content[7:] text = message.content[7:]
if len(text) > 1000: if len(text) > 1000:
await message.channel.send("❌ Text too long! Maximum 1000 characters.") await message.channel.send("❌ Text too long! Maximum 1000 characters.")
@ -80,36 +86,44 @@ async def on_message(message):
except Exception as e: except Exception as e:
await message.channel.send(f"`GarfBot Error: {str(e)}`") await message.channel.send(f"`GarfBot Error: {str(e)}`")
# Chats & pics
elif lower.startswith("hey garfield") or isinstance(
message.channel, discord.DMChannel
):
prompt = content[12:] if lower.startswith("hey garfield") else message.content
answer = await garfield.generate_chat(prompt)
logger.info(
f"Chat Request - User: {user_name}, Server: {guild_name}, Prompt: {prompt}"
)
await message.channel.send(answer)
elif lower.startswith("garfpic "):
prompt = content[8:]
logger.info(
f"Image Request - User: {user_name}, Server: {guild_name}, Prompt: {prompt}"
)
await message.channel.send(
f"`Please wait... image generation queued: {prompt}`"
)
await garfield.garfpic(message, prompt)
# Army of Dawn Server only!! # Army of Dawn Server only!!
if message.guild and message.guild.id == 719605634772893757: elif message.guild and message.guild.id == 719605634772893757:
await aod_message(garfbot, message) await aod_message(garfbot, message)
# Auto-responses # Auto-responses
if message.guild: elif message.guild:
responses = garf_respond.get_responses(guild_id) responses = garf_respond.get_responses(guild_id)
if lower.startswith('garfbot response '): if lower.startswith("garfbot response "):
await garf_respond.garfbot_response(message, content) await garf_respond.garfbot_response(message, content)
return return
for trigger, response in responses.items(): for trigger, response in responses.items():
if trigger.lower() in lower: if trigger.lower() in lower:
await message.channel.send(response) await message.channel.send(response)
break break
# Chats & pics
elif lower.startswith("hey garfield") or isinstance(message.channel, discord.DMChannel):
prompt = content[12:] if lower.startswith("hey garfield") else message.content
answer = await garfield.generate_chat(prompt)
logger.info(f"Chat Request - User: {user_name}, Server: {guild_name}, Prompt: {prompt}")
await message.channel.send(answer)
elif lower.startswith('garfpic '):
prompt = content[8:]
logger.info(f"Image Request - User: {user_name}, Server: {guild_name}, Prompt: {prompt}")
await message.channel.send(f"`Please wait... image generation queued: {prompt}`")
await garfield.garfpic(message, prompt)
# Run Garfbot # Run Garfbot
async def garfbot_connect(): async def garfbot_connect():
@ -117,9 +131,10 @@ async def garfbot_connect():
try: try:
await garfbot.start(garfkey) await garfbot.start(garfkey)
except Exception as e: except Exception as e:
e = str(e) e = str(e)
logger.error(f"Garfbot couldn't connect! {e}") logger.error(f"Garfbot couldn't connect! {e}")
await asyncio.sleep(60) await asyncio.sleep(60)
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(garfbot_connect()) asyncio.run(garfbot_connect())

View File

@ -2,10 +2,9 @@
from .log import logger from .log import logger
from .kroger import Kroger from .kroger import Kroger
# from .iputils import scan
from .kroger import Kroger from .kroger import Kroger
from .garfai import GarfAI from .garfai import GarfAI
from .respond import GarfbotRespond from .respond import GarfbotRespond
from .aod import aod_message from .aod import aod_message
from .qr import generate_qr from .qr import generate_qr
from .iputils import IPUtils from .iputils import IPUtils

View File

@ -11,6 +11,7 @@ from collections import defaultdict
meows_file = "meow_counts.json" meows_file = "meow_counts.json"
stats_file = "user_stats.json" stats_file = "user_stats.json"
def json_load(file_path, default): def json_load(file_path, default):
if os.path.isfile(file_path): if os.path.isfile(file_path):
with open(file_path, "r") as f: with open(file_path, "r") as f:
@ -18,77 +19,105 @@ def json_load(file_path, default):
else: else:
return default return default
meow_counts = defaultdict(int, json_load(meows_file, {})) meow_counts = defaultdict(int, json_load(meows_file, {}))
user_stats = json_load(stats_file, {}) user_stats = json_load(stats_file, {})
async def aod_message(garfbot, message): async def aod_message(garfbot, message):
if "meow" in message.content.lower(): if "meow" in message.content.lower():
logger.info(f"Meow detected! {message.author.name} said: {message.content}") logger.info(f"Meow detected! {message.author.name} said: {message.content}")
meow_counts[str(message.author.id)] += 1 meow_counts[str(message.author.id)] += 1
with open(meows_file, "w") as f: with open(meows_file, "w") as f:
json.dump(dict(meow_counts), f) json.dump(dict(meow_counts), f)
if message.content.lower() == "meowcount": if message.content.lower() == "meowcount":
response = f"My records show that <@{message.author.id}> has meowed {meow_counts[str(message.author.id)]} time(s). Have a nice day." response = f"My records show that <@{message.author.id}> has meowed {meow_counts[str(message.author.id)]} time(s). Have a nice day."
await message.channel.send(response) await message.channel.send(response)
if message.content.lower() == "top meowers": if message.content.lower() == "top meowers":
top_meowers = sorted(meow_counts.items(), key=itemgetter(1), reverse=True)[:10] top_meowers = sorted(meow_counts.items(), key=itemgetter(1), reverse=True)[
embed = discord.Embed(title="Top Meowers :cat:", color=0x000000) :10
for i, (user_id, meow_count) in enumerate(top_meowers): ]
user = await garfbot.fetch_user(int(user_id)) embed = discord.Embed(title="Top Meowers :cat:", color=0x000000)
embed.add_field(name=f"{i+1}. {user.name}", value=f"{meow_count} meows", inline=False) for i, (user_id, meow_count) in enumerate(top_meowers):
await message.channel.send(embed=embed) user = await garfbot.fetch_user(int(user_id))
embed.add_field(
name=f"{i + 1}. {user.name}",
value=f"{meow_count} meows",
inline=False,
)
await message.channel.send(embed=embed)
if message.content.lower() == "checking in": if message.content.lower() == "checking in":
user_id = str(message.author.id) user_id = str(message.author.id)
if user_id in user_stats and user_stats[user_id]["check_in_time"] is not None: if user_id in user_stats and user_stats[user_id]["check_in_time"] is not None:
await message.channel.send(f"{message.author.mention} You are already checked in. Please check out first.") await message.channel.send(
return f"{message.author.mention} You are already checked in. Please check out first."
)
return
check_in_time = datetime.now().timestamp() check_in_time = datetime.now().timestamp()
if user_id not in user_stats: if user_id not in user_stats:
user_stats[user_id] = {"check_ins": 0, "total_time": 0, "check_in_time": None} user_stats[user_id] = {
user_stats[user_id]["check_in_time"] = check_in_time "check_ins": 0,
await message.channel.send(f"{message.author.mention} You have been checked in. Please mute your microphone.") "total_time": 0,
"check_in_time": None,
}
user_stats[user_id]["check_in_time"] = check_in_time
await message.channel.send(
f"{message.author.mention} You have been checked in. Please mute your microphone."
)
elif message.content.lower() == "checking out": elif message.content.lower() == "checking out":
user_id = str(message.author.id) user_id = str(message.author.id)
if user_id not in user_stats or user_stats[user_id]["check_in_time"] is None: if user_id not in user_stats or user_stats[user_id]["check_in_time"] is None:
await message.channel.send(f"{message.author.mention} You have not checked in yet. Please check in first.") await message.channel.send(
return f"{message.author.mention} You have not checked in yet. Please check in first."
)
return
check_out_time = datetime.now().timestamp() check_out_time = datetime.now().timestamp()
check_in_time = user_stats[user_id]["check_in_time"] check_in_time = user_stats[user_id]["check_in_time"]
time_delta = check_out_time - check_in_time time_delta = check_out_time - check_in_time
user_stats[user_id]["check_ins"] += 1 user_stats[user_id]["check_ins"] += 1
user_stats[user_id]["total_time"] += time_delta user_stats[user_id]["total_time"] += time_delta
user_stats[user_id]["check_in_time"] = None user_stats[user_id]["check_in_time"] = None
with open("user_stats.json", "w") as f: with open("user_stats.json", "w") as f:
json.dump(user_stats, f) json.dump(user_stats, f)
await message.channel.send(f"{message.author.mention} You have been checked out. Your session was {time_delta:.2f} seconds.") await message.channel.send(
f"{message.author.mention} You have been checked out. Your session was {time_delta:.2f} seconds."
)
elif message.content.lower() == "stats": elif message.content.lower() == "stats":
stats_embed = discord.Embed(title="User stats :trophy:", color=0x000000) stats_embed = discord.Embed(title="User stats :trophy:", color=0x000000)
sorted_user_stats = sorted(user_stats.items(), key=lambda x: x[1]["total_time"], reverse=True) sorted_user_stats = sorted(
table_rows = [["Name", "Check-ins", "Total Time"]] user_stats.items(), key=lambda x: x[1]["total_time"], reverse=True
for user_id, stats in sorted_user_stats: )
if stats["check_in_time"] is None: table_rows = [["Name", "Check-ins", "Total Time"]]
total_time_seconds = stats["total_time"] for user_id, stats in sorted_user_stats:
hours, total_time_seconds = divmod(total_time_seconds, 3600) if stats["check_in_time"] is None:
minutes, total_time_seconds = divmod(total_time_seconds, 60) total_time_seconds = stats["total_time"]
seconds, fractions = divmod(total_time_seconds, 1) hours, total_time_seconds = divmod(total_time_seconds, 3600)
fractions_str = f"{fractions:.3f}"[2:] minutes, total_time_seconds = divmod(total_time_seconds, 60)
username = garfbot.get_user(int(user_id)).name seconds, fractions = divmod(total_time_seconds, 1)
table_rows.append([username, str(stats["check_ins"]), f"{int(hours)}h {int(minutes)}m {int(seconds)}s {fractions_str}ms"]) fractions_str = f"{fractions:.3f}"[2:]
else: username = garfbot.get_user(int(user_id)).name
username = garfbot.get_user(int(user_id)).name table_rows.append(
table_rows.append([username, "Currently checked in", "-"]) [
table_columns = list(zip(*table_rows[1:])) username,
table_fields = table_rows[0] str(stats["check_ins"]),
for field, values in zip(table_fields, table_columns): f"{int(hours)}h {int(minutes)}m {int(seconds)}s {fractions_str}ms",
stats_embed.add_field(name=field, value="\n".join(values), inline=True) ]
await message.channel.send(embed=stats_embed) )
else:
username = garfbot.get_user(int(user_id)).name
table_rows.append([username, "Currently checked in", "-"])
table_columns = list(zip(*table_rows[1:]))
table_fields = table_rows[0]
for field, values in zip(table_fields, table_columns):
stats_embed.add_field(name=field, value="\n".join(values), inline=True)
await message.channel.send(embed=stats_embed)

View File

@ -17,16 +17,13 @@ class GarfAI:
self.image_request_queue = asyncio.Queue() self.image_request_queue = asyncio.Queue()
async def garfpic(self, message, prompt): async def garfpic(self, message, prompt):
await self.image_request_queue.put({'message': message, 'prompt': prompt}) await self.image_request_queue.put({"message": message, "prompt": prompt})
async def generate_image(self, prompt): async def generate_image(self, prompt):
try: try:
client = AsyncOpenAI(api_key = self.openaikey) client = AsyncOpenAI(api_key=self.openaikey)
response = await client.images.generate( response = await client.images.generate(
model=self.imgmodel, model=self.imgmodel, prompt=prompt, n=1, size="1024x1024"
prompt=prompt,
n=1,
size="1024x1024"
) )
image_url = response.data[0].url image_url = response.data[0].url
return image_url return image_url
@ -37,14 +34,14 @@ class GarfAI:
return f"`GarfBot Error: ({e.status_code}) - Monday`" return f"`GarfBot Error: ({e.status_code}) - Monday`"
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
return f"`GarfBot Error: Lasagna`" return "`GarfBot Error: Lasagna`"
async def process_image_requests(self): async def process_image_requests(self):
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
while True: while True:
request = await self.image_request_queue.get() request = await self.image_request_queue.get()
message = request['message'] message = request["message"]
prompt = request['prompt'] prompt = request["prompt"]
image_url = await self.generate_image(prompt) image_url = await self.generate_image(prompt)
if "GarfBot Error" not in image_url: if "GarfBot Error" not in image_url:
logger.info("Downloading & sending image...") logger.info("Downloading & sending image...")
@ -53,7 +50,7 @@ class GarfAI:
image_data = await resp.read() image_data = await resp.read()
ram_image = io.BytesIO(image_data) ram_image = io.BytesIO(image_data)
ram_image.seek(0) ram_image.seek(0)
timestamp = message.created_at.strftime('%Y%m%d%H%M%S') timestamp = message.created_at.strftime("%Y%m%d%H%M%S")
filename = f"{timestamp}_generated_image.png" filename = f"{timestamp}_generated_image.png"
sendfile = discord.File(fp=ram_image, filename=filename) sendfile = discord.File(fp=ram_image, filename=filename)
try: try:
@ -69,14 +66,17 @@ class GarfAI:
async def generate_chat(self, question): async def generate_chat(self, question):
try: try:
client = AsyncOpenAI(api_key = self.openaikey) client = AsyncOpenAI(api_key=self.openaikey)
response = await client.chat.completions.create( response = await client.chat.completions.create(
model=self.txtmodel, model=self.txtmodel,
messages=[ messages=[
{"role": "system", "content": "Pretend you are sarcastic Garfield."}, {
{"role": "user", "content": f"{question}"} "role": "system",
"content": "Pretend you are sarcastic Garfield.",
},
{"role": "user", "content": f"{question}"},
], ],
max_tokens=400 max_tokens=400,
) )
answer = response.choices[0].message.content answer = response.choices[0].message.content
return answer.replace("an AI language model", "a cartoon animal") return answer.replace("an AI language model", "a cartoon animal")
@ -84,15 +84,17 @@ class GarfAI:
return f"`GarfBot Error: {e}`" return f"`GarfBot Error: {e}`"
except openai.APIError as e: except openai.APIError as e:
logger.info(e, flush=True) logger.info(e, flush=True)
return f"`GarfBot Error: Monday`" return "`GarfBot Error: Monday`"
except Exception as e: except Exception as e:
logger.info(e, flush=True) logger.info(e, flush=True)
return f"`GarfBot Error: Lasagna`" return "`GarfBot Error: Lasagna`"
async def wikisum(self, query): async def wikisum(self, query):
try: try:
summary = wikipedia.summary(query) summary = wikipedia.summary(query)
garfsum = await self.generate_chat(f"Please summarize in your own words: {summary}") garfsum = await self.generate_chat(
f"Please summarize in your own words: {summary}"
)
return garfsum return garfsum
except Exception as e: except Exception as e:
return e return e

View File

@ -4,7 +4,7 @@ import subprocess
from garfpy import logger from garfpy import logger
class IPUtils: class IPUtils:
def is_private(self, target): def is_private(self, target):
try: try:
ip_obj = ipaddress.ip_address(target) ip_obj = ipaddress.ip_address(target)
@ -31,10 +31,14 @@ class IPUtils:
if query.startswith("garfping "): if query.startswith("garfping "):
try: try:
logger.info(f"Ping Request - User: {user}, Server: {guild}, Target: {target}") logger.info(
f"Ping Request - User: {user}, Server: {guild}, Target: {target}"
)
await message.channel.send(f"`Pinging {target}...`") await message.channel.send(f"`Pinging {target}...`")
result = subprocess.run(['ping', '-c', '4', target], capture_output=True, text=True) result = subprocess.run(
embed = discord.Embed(title=f"Ping result:", color=0x4d4d4d) ["ping", "-c", "4", target], capture_output=True, text=True
)
embed = discord.Embed(title=f"Ping result:", color=0x4D4D4D)
embed.add_field(name=target, value=f"```{result.stdout}```") embed.add_field(name=target, value=f"```{result.stdout}```")
await message.channel.send(embed=embed) await message.channel.send(embed=embed)
except Exception as e: except Exception as e:
@ -42,10 +46,14 @@ class IPUtils:
if query.startswith("garfdns "): if query.startswith("garfdns "):
try: try:
logger.info(f"NSLookup Request - User: {user}, Server: {guild}, Target: {target}") logger.info(
f"NSLookup Request - User: {user}, Server: {guild}, Target: {target}"
)
await message.channel.send(f"`Requesting {target}...`") await message.channel.send(f"`Requesting {target}...`")
result = subprocess.run(['nslookup', target], capture_output=True, text=True) result = subprocess.run(
embed = discord.Embed(title=f"NSLookup result:", color=0x4d4d4d) ["nslookup", target], capture_output=True, text=True
)
embed = discord.Embed(title=f"NSLookup result:", color=0x4D4D4D)
embed.add_field(name=target, value=f"```{result.stdout}```") embed.add_field(name=target, value=f"```{result.stdout}```")
await message.channel.send(embed=embed) await message.channel.send(embed=embed)
except Exception as e: except Exception as e:
@ -53,12 +61,16 @@ class IPUtils:
if query.startswith("garfhack "): if query.startswith("garfhack "):
try: try:
logger.info(f"Nmap Request - User: {user}, Server: {guild}, Target: {target}") logger.info(
await message.channel.send(f"`Scanning {target}...`") f"Nmap Request - User: {user}, Server: {guild}, Target: {target}"
result = subprocess.run(['nmap', '-Pn', '-O', '-v', target], capture_output=True, text=True) )
embed = discord.Embed(title=f"Nmap scan result:", color=0x4d4d4d) await message.channel.send(f"`Scanning {target}...`")
result = subprocess.run(
["nmap", "-Pn", "-O", "-v", target], capture_output=True, text=True
)
embed = discord.Embed(title=f"Nmap scan result:", color=0x4D4D4D)
embed.add_field(name=target, value=f"```{result.stdout}```") embed.add_field(name=target, value=f"```{result.stdout}```")
embed.set_footer(text="https://nmap.org/") embed.set_footer(text="https://nmap.org/")
await message.channel.send(embed=embed) await message.channel.send(embed=embed)
except Exception as e: except Exception as e:
await message.channel.send(f"`GarfBot Error: {str(e)}`") await message.channel.send(f"`GarfBot Error: {str(e)}`")

View File

@ -8,44 +8,51 @@ class Kroger:
def __init__(self): def __init__(self):
self.client_id = config.CLIENT_ID self.client_id = config.CLIENT_ID
self.client_secret = config.CLIENT_SECRET self.client_secret = config.CLIENT_SECRET
self.auth = b64encode(f"{self.client_id}:{self.client_secret}".encode()).decode() self.auth = b64encode(
f"{self.client_id}:{self.client_secret}".encode()
).decode()
def kroger_token(self): def kroger_token(self):
headers = { headers = {
'Content-Type': 'application/x-www-form-urlencoded', "Content-Type": "application/x-www-form-urlencoded",
'Authorization': f'Basic {self.auth}' "Authorization": f"Basic {self.auth}",
} }
response = requests.post('https://api.kroger.com/v1/connect/oauth2/token', headers=headers, data={ response = requests.post(
'grant_type': 'client_credentials', "https://api.kroger.com/v1/connect/oauth2/token",
'scope': 'product.compact' headers=headers,
}) data={"grant_type": "client_credentials", "scope": "product.compact"},
)
response.raise_for_status() response.raise_for_status()
return response.json()['access_token'] return response.json()["access_token"]
def find_store(self, zipcode, kroken): def find_store(self, zipcode, kroken):
headers = { headers = {
'Authorization': f'Bearer {kroken}', "Authorization": f"Bearer {kroken}",
} }
params = { params = {
'filter.zipCode.near': zipcode, "filter.zipCode.near": zipcode,
'filter.limit': 1, "filter.limit": 1,
} }
response = requests.get('https://api.kroger.com/v1/locations', headers=headers, params=params) response = requests.get(
"https://api.kroger.com/v1/locations", headers=headers, params=params
)
return response.json() return response.json()
def search_product(self, product, loc_id, kroken): def search_product(self, product, loc_id, kroken):
logger.info(f"Searching for {product}...") logger.info(f"Searching for {product}...")
headers = { headers = {
'Authorization': f'Bearer {kroken}', "Authorization": f"Bearer {kroken}",
} }
params = { params = {
'filter.term': product, "filter.term": product,
'filter.locationId': loc_id, "filter.locationId": loc_id,
'filter.limit': 10 "filter.limit": 10,
} }
response = requests.get('https://api.kroger.com/v1/products', headers=headers, params=params) response = requests.get(
"https://api.kroger.com/v1/products", headers=headers, params=params
)
return response.json() return response.json()
def garfshop(self, query): def garfshop(self, query):
@ -55,15 +62,17 @@ class Kroger:
product = query[-2] product = query[-2]
zipcode = query[-1] zipcode = query[-1]
loc_data = self.find_store(zipcode, kroken) loc_data = self.find_store(zipcode, kroken)
loc_id = loc_data['data'][0]['locationId'] loc_id = loc_data["data"][0]["locationId"]
store_name = loc_data['data'][0]['name'] store_name = loc_data["data"][0]["name"]
product_query = self.search_product(product, loc_id, kroken) product_query = self.search_product(product, loc_id, kroken)
products = product_query['data'] products = product_query["data"]
sorted_products = sorted(products, key=lambda item: item['items'][0]['price']['regular']) sorted_products = sorted(
products, key=lambda item: item["items"][0]["price"]["regular"]
)
response = f"Prices for `{product}` at `{store_name}` near `{zipcode}`:\n" response = f"Prices for `{product}` at `{store_name}` near `{zipcode}`:\n"
for item in sorted_products: for item in sorted_products:
product_name = item['description'] product_name = item["description"]
price = item['items'][0]['price']['regular'] price = item["items"][0]["price"]["regular"]
response += f"- `${price}`: {product_name} \n" response += f"- `${price}`: {product_name} \n"
return response return response
except Exception as e: except Exception as e:

View File

@ -1,19 +1,18 @@
import logging import logging
from logging.handlers import TimedRotatingFileHandler from logging.handlers import TimedRotatingFileHandler
logger = logging.getLogger('garflog') logger = logging.getLogger("garflog")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
formatter=logging.Formatter( formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s', "%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
datefmt='%Y-%m-%d %H:%M:%S' )
)
file_handler = TimedRotatingFileHandler( file_handler = TimedRotatingFileHandler(
'garfbot.log', "garfbot.log",
when='midnight', when="midnight",
interval=1, interval=1,
backupCount=7, backupCount=7,
delay=True # Counter-intuitively, this will flush output immediately delay=True, # Counter-intuitively, this will flush output immediately
) )
file_handler.setFormatter(formatter) file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler() console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter) console_handler.setFormatter(formatter)

View File

@ -4,7 +4,7 @@ from io import BytesIO
def calculate_qr_settings(text): def calculate_qr_settings(text):
text_length = len(text) text_length = len(text)
if text_length <= 25: if text_length <= 25:
version = 1 version = 1
box_size = 12 box_size = 12
@ -38,26 +38,27 @@ def calculate_qr_settings(text):
else: else:
version = None version = None
box_size = 3 box_size = 3
return version, box_size return version, box_size
async def generate_qr(text): async def generate_qr(text):
version, box_size = calculate_qr_settings(text) version, box_size = calculate_qr_settings(text)
qr = qrcode.QRCode( qr = qrcode.QRCode(
version=version, version=version,
error_correction=qrcode.constants.ERROR_CORRECT_L, error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=box_size, box_size=box_size,
border=4, border=4,
) )
qr.add_data(text) qr.add_data(text)
qr.make(fit=True) qr.make(fit=True)
qr_image = qr.make_image(fill_color="black", back_color="white") qr_image = qr.make_image(fill_color="black", back_color="white")
img_buffer = BytesIO() img_buffer = BytesIO()
qr_image.save(img_buffer, format='PNG') qr_image.save(img_buffer, format="PNG")
img_buffer.seek(0) img_buffer.seek(0)
return img_buffer return img_buffer

View File

@ -8,16 +8,22 @@ import re
class GarfbotRespond: class GarfbotRespond:
def __init__(self): def __init__(self):
self.guild_responses = {} self.guild_responses = {}
self.responses_file = 'responses.json' self.responses_file = "responses.json"
def load_responses(self): def load_responses(self):
if os.path.exists(self.responses_file): if os.path.exists(self.responses_file):
try: try:
with open(self.responses_file, 'r', encoding='utf-8') as f: with open(self.responses_file, "r", encoding="utf-8") as f:
self.guild_responses = json.load(f) self.guild_responses = json.load(f)
self.guild_responses = {int(k): v for k, v in self.guild_responses.items()} self.guild_responses = {
total_responses = sum(len(responses) for responses in self.guild_responses.values()) int(k): v for k, v in self.guild_responses.items()
logger.info(f"Loaded responses for {len(self.guild_responses)} server(s), ({total_responses} total responses)") }
total_responses = sum(
len(responses) for responses in self.guild_responses.values()
)
logger.info(
f"Loaded responses for {len(self.guild_responses)} server(s), ({total_responses} total responses)"
)
except Exception as e: except Exception as e:
logger.info(f"Error loading responses: {e}") logger.info(f"Error loading responses: {e}")
self.guild_responses = {} self.guild_responses = {}
@ -27,10 +33,14 @@ class GarfbotRespond:
def save_responses(self): def save_responses(self):
try: try:
save_data = {str(k): v for k, v in self.guild_responses.items()} save_data = {str(k): v for k, v in self.guild_responses.items()}
with open(self.responses_file, 'w', encoding='utf-8') as f: with open(self.responses_file, "w", encoding="utf-8") as f:
json.dump(save_data, f, indent=2, ensure_ascii=False) json.dump(save_data, f, indent=2, ensure_ascii=False)
total_responses = sum(len(responses) for responses in self.guild_responses.values()) total_responses = sum(
logger.info(f"Saved responses for {len(self.guild_responses)} servers ({total_responses} total responses)") len(responses) for responses in self.guild_responses.values()
)
logger.info(
f"Saved responses for {len(self.guild_responses)} servers ({total_responses} total responses)"
)
except Exception as e: except Exception as e:
logger.info(f"Error saving responses: {e}") logger.info(f"Error saving responses: {e}")
@ -43,121 +53,113 @@ class GarfbotRespond:
guild_id = message.guild.id guild_id = message.guild.id
logger.info(message.content) logger.info(message.content)
match = re.search(r'garfbot response add "(.+)" "(.+)"', content, re.IGNORECASE) match = re.search(r'garfbot response add "(.+)" "(.+)"', content, re.IGNORECASE)
if match: if match:
trigger = match.group(1) trigger = match.group(1)
response_text = match.group(2) response_text = match.group(2)
await self.add_response(message, guild_id, trigger, response_text) await self.add_response(message, guild_id, trigger, response_text)
return return
match = re.search(r'garfbot response add (\S+) (.+)', content, re.IGNORECASE) match = re.search(r"garfbot response add (\S+) (.+)", content, re.IGNORECASE)
if match: if match:
trigger = match.group(1) trigger = match.group(1)
response_text = match.group(2) response_text = match.group(2)
await self.add_response(message, guild_id, trigger, response_text) await self.add_response(message, guild_id, trigger, response_text)
return return
match = re.search(r'garfbot\s+response\s+remove\s+(.+)', content, re.IGNORECASE) match = re.search(r"garfbot\s+response\s+remove\s+(.+)", content, re.IGNORECASE)
if match: if match:
trigger = match.group(1).strip() trigger = match.group(1).strip()
await self.remove_response(message, guild_id, trigger) await self.remove_response(message, guild_id, trigger)
return return
if content.lower() == "garfbot response list": if content.lower() == "garfbot response list":
await self.list_responses(message, guild_id) await self.list_responses(message, guild_id)
return return
await message.channel.send( await message.channel.send(
"**Garfbot Auto-Response Commands:**\n" "**Garfbot Auto-Response Commands:**\n"
"`garfbot response add \"trigger\" \"response\"`\n" '`garfbot response add "trigger" "response"`\n'
"`garfbot response remove \"trigger\"`\n" '`garfbot response remove "trigger"`\n'
"`garfbot response list`\n\n" "`garfbot response list`\n\n"
"**Examples:**\n" "**Examples:**\n"
"`garfbot response add \"hi\" \"Hello there!\"`\n" '`garfbot response add "hi" "Hello there!"`\n'
"`garfbot response add \"that's what\" \"That's what she said!\"`\n" '`garfbot response add "that\'s what" "That\'s what she said!"`\n'
"`garfbot response remove \"hi\"`" '`garfbot response remove "hi"`'
) )
async def add_response(self, message, guild_id, trigger, response_text): async def add_response(self, message, guild_id, trigger, response_text):
if not response_text or not trigger: if not response_text or not trigger:
await message.channel.send("❌ Trigger and response must not be null.") await message.channel.send("❌ Trigger and response must not be null.")
return return
responses = self.get_responses(guild_id) responses = self.get_responses(guild_id)
responses[trigger] = response_text responses[trigger] = response_text
self.guild_responses[guild_id] = responses self.guild_responses[guild_id] = responses
self.save_responses() self.save_responses()
embed = discord.Embed( embed = discord.Embed(title="✅ Auto-response Added.", color=0x00FF00)
title="✅ Auto-response Added.",
color=0x00ff00
)
embed.add_field(name="Trigger", value=f"`{trigger}`", inline=True) embed.add_field(name="Trigger", value=f"`{trigger}`", inline=True)
embed.add_field(name="Response", value=f"`{response_text}`", inline=True) embed.add_field(name="Response", value=f"`{response_text}`", inline=True)
embed.set_footer(text=f"Server: {message.guild.name}") embed.set_footer(text=f"Server: {message.guild.name}")
await message.channel.send(embed=embed) await message.channel.send(embed=embed)
async def remove_response(self, message, guild_id, trigger): async def remove_response(self, message, guild_id, trigger):
responses = self.get_responses(guild_id) responses = self.get_responses(guild_id)
if trigger in responses: if trigger in responses:
removed_response = responses[trigger] removed_response = responses[trigger]
del responses[trigger] del responses[trigger]
self.guild_responses[guild_id] = responses self.guild_responses[guild_id] = responses
self.save_responses() self.save_responses()
embed = discord.Embed( embed = discord.Embed(title="✅ Auto-response Removed.", color=0xFF6B6B)
title="✅ Auto-response Removed.",
color=0xff6b6b
)
embed.add_field(name="Trigger", value=f"`{trigger}`", inline=True) embed.add_field(name="Trigger", value=f"`{trigger}`", inline=True)
embed.add_field(name="Response", value=f"`{removed_response}`", inline=True) embed.add_field(name="Response", value=f"`{removed_response}`", inline=True)
embed.set_footer(text=f"Server: {message.guild.name}") embed.set_footer(text=f"Server: {message.guild.name}")
await message.channel.send(embed=embed) await message.channel.send(embed=embed)
return return
for key in responses.keys(): for key in responses.keys():
if key.lower() == trigger.lower(): if key.lower() == trigger.lower():
removed_response = responses[key] removed_response = responses[key]
del responses[key] del responses[key]
self.guild_responses[guild_id] = responses self.guild_responses[guild_id] = responses
self.save_responses() self.save_responses()
embed = discord.Embed( embed = discord.Embed(title="✅ Auto-response Removed.", color=0xFF6B6B)
title="✅ Auto-response Removed.",
color=0xff6b6b
)
embed.add_field(name="Trigger", value=f"`{key}`", inline=True) embed.add_field(name="Trigger", value=f"`{key}`", inline=True)
embed.add_field(name="Response", value=f"`{removed_response}`", inline=True) embed.add_field(
name="Response", value=f"`{removed_response}`", inline=True
)
embed.set_footer(text=f"Server: {message.guild.name}") embed.set_footer(text=f"Server: {message.guild.name}")
await message.channel.send(embed=embed) await message.channel.send(embed=embed)
return return
await message.channel.send(f"❌ No auto-response found for trigger: `{trigger}`") await message.channel.send(
f"❌ No auto-response found for trigger: `{trigger}`"
)
async def list_responses(self, message, guild_id): async def list_responses(self, message, guild_id):
responses = self.get_responses(guild_id) responses = self.get_responses(guild_id)
if not responses: if not responses:
await message.channel.send("No auto-responses configured for this server.") await message.channel.send("No auto-responses configured for this server.")
return return
embed = discord.Embed( embed = discord.Embed(
title=f"Auto-Responses for {message.guild.name}", title=f"Auto-Responses for {message.guild.name}", color=0x3498DB
color=0x3498db
) )
for i, (trigger, response) in enumerate(responses.items(), 1): for i, (trigger, response) in enumerate(responses.items(), 1):
display_response = response[:50] + "..." if len(response) > 50 else response display_response = response[:50] + "..." if len(response) > 50 else response
embed.add_field( embed.add_field(
name=f"{i}. `{trigger}`", name=f"{i}. `{trigger}`", value=f"{display_response}", inline=False
value=f"{display_response}",
inline=False
) )
embed.set_footer(text=f"Total responses: {len(responses)}") embed.set_footer(text=f"Total responses: {len(responses)}")
await message.channel.send(embed=embed) await message.channel.send(embed=embed)