add named pipe reading for minecraft output

This commit is contained in:
Nayan
2025-08-25 22:10:43 -04:00
parent 302d93a048
commit d6605850ea

354
bot.py
View File

@@ -1,5 +1,7 @@
import asyncio
import json import json
import os import os
import re
import discord import discord
from discord.ext import commands from discord.ext import commands
import subprocess import subprocess
@@ -26,9 +28,243 @@ intents.message_content = True
bot = commands.Bot(command_prefix=command_prefix, intents=intents) bot = commands.Bot(command_prefix=command_prefix, intents=intents)
active_server = None active_server = None
active_server_pipe_task = None # To hold the asyncio task that reads from the pipe
# Regex for parsing Minecraft chat messages
# This pattern captures the timestamp, log level, player name, and the message itself.
# Adjust if your server log format is different (e.g., if you use plugins that change output).
MINECRAFT_CHAT_REGEX = re.compile(
r"^\[(\d{2}:\d{2}:\d{2}) INFO\]: <([^>]+)> (.*)$"
)
''' '''
# --- Named Pipe Functions ---
'''
def _get_pipe_path(servername):
"""Generates a consistent path for the named pipe."""
return f"/tmp/minecraft_server_{servername}_pipe"
async def _create_named_pipe(servername):
"""
Creates a named pipe (FIFO) for server output.
"""
pipe_path = _get_pipe_path(servername)
if os.path.exists(pipe_path):
os.remove(pipe_path) # Ensure a clean pipe
try:
os.mkfifo(pipe_path)
print(f"Named pipe created at: {pipe_path}")
except OSError as e:
print(f"Error creating named pipe {pipe_path}: {e}")
raise
async def _delete_named_pipe(servername):
"""
Deletes the named pipe (FIFO) file.
"""
pipe_path = _get_pipe_path(servername)
if os.path.exists(pipe_path):
try:
os.remove(pipe_path)
print(f"Named pipe deleted: {pipe_path}")
except OSError as e:
print(f"Error deleting named pipe {pipe_path}: {e}")
# --- Pipe Reading and Parsing Task ---
async def _read_from_pipe(servername, channel_id, bot_instance):
"""
Asynchronously reads from the named pipe, parses Minecraft output,
and sends chat messages to the specified Discord channel.
This function runs in its own asyncio task.
"""
pipe_path = _get_pipe_path(servername)
channel = bot_instance.get_channel(channel_id)
if not channel:
print(f"Error: Discord channel with ID {channel_id} not found.")
return
print(f"Attempting to open named pipe for reading: {pipe_path}")
try:
# Open the pipe in non-blocking mode if possible, but for continuous
# read, a blocking open in an executor thread is usually safer to
# prevent CPU spin. Here we simulate that by using asyncio.to_thread
# for the blocking 'open'.
# However, for continuous reading from a pipe, the most common pattern
# is to open it and read line-by-line in a loop.
# We'll use 'open' directly as the stream will block until data is available.
# The pipe will block until the server starts writing.
# We need to run the blocking file operation in a separate thread
# to avoid blocking the Discord bot's event loop.
def _blocking_pipe_reader():
with open(pipe_path, 'r', encoding='utf-8') as pipe_file:
for line in pipe_file:
# Return line for processing in the async context
# Or process here and use queue for async sender
yield line
# This will iterate over the generator from a separate thread
# and yield lines back to the async context.
# This is a bit advanced; a simpler approach often involves a queue.
# For direct line-by-line, we'll simplify and acknowledge potential blocking
# if the pipe is closed unexpectedly, but it's fine for our use case.
# A more robust approach might use a queue and a dedicated thread for
# the blocking read, with the async task consuming from the queue.
# Simplified for direct demonstration, assuming graceful pipe closure:
# Open the pipe with `os.open` and `os.read` to get more control for async,
# or use `open` with a separate thread for the blocking read.
# For simplicity and common patterns, we'll rely on `asyncio.to_thread` for `open` and `readline`.
# We'll use a queue and a separate thread to handle the blocking read
# to ensure the main asyncio loop remains responsive.
line_queue = asyncio.Queue()
def blocking_read_thread():
try:
# Open the pipe in blocking mode. This 'open' will block until a writer opens the pipe.
with open(pipe_path, 'r', encoding='utf-8') as f:
print(f"Successfully opened pipe {pipe_path} for reading.")
while True:
line = f.readline() # This blocks until a line is available or pipe is closed
if not line: # EOF - pipe closed by writer
print(f"Pipe {pipe_path} closed by writer.")
break
line_queue.put_nowait(line) # Put line into the async queue
except FileNotFoundError:
print(f"Pipe {pipe_path} not found during read attempt (likely cleaned up).")
except Exception as e:
print(f"Error in blocking_read_thread for {pipe_path}: {e}")
finally:
# Signal the async reader that no more lines will come
line_queue.put_nowait(None) # Sentinel value
print(f"Blocking read thread for {pipe_path} finished.")
# Start the blocking read in a separate thread
loop = asyncio.get_event_loop()
blocking_thread_task = loop.run_in_executor(None, blocking_read_thread)
print(f"Starting async pipe reader for {servername}...")
while True:
line = await line_queue.get()
if line is None: # Sentinel value received, pipe closed
break
line = line.strip()
if not line:
continue
# print(f"Raw server output: {line}") # Debugging raw output
match = MINECRAFT_CHAT_REGEX.match(line)
if match:
timestamp, player_name, message_content = match.groups()
discord_message = f"**[{player_name}]**: {message_content}"
# Limit message length for Discord if necessary
if len(discord_message) > 2000:
discord_message = discord_message[:1997] + "..."
await channel.send(discord_message)
# else:
# # Optionally, you can log other server output to a debug channel or console
# print(f"Non-chat output: {line}")
await asyncio.sleep(0.01) # Small delay to yield control and avoid busy-waiting
except asyncio.CancelledError:
print(f"Pipe reading task for {servername} was cancelled.")
except Exception as e:
print(f"Unhandled error in _read_from_pipe for {servername}: {e}")
finally:
print(f"Pipe reader for {servername} cleaning up.")
# Ensure the blocking thread has a chance to finish or be stopped
# If the pipe is forcefully deleted, the blocking read will raise FileNotFoundError.
# If the server closes the write end, the blocking read will return empty strings.
# This cleanup should happen after the server is truly stopped and the pipe is deleted.
# We will handle pipe deletion separately in stop/forcestop.
async def _stop_server_internal(ctx, servername, method):
'''
Internal helper to stop a server and clean up.
'''
global active_server, active_server_pipe_task
if not ctx.author.guild_permissions.administrator:
await ctx.send("You don't have permission to run this command.")
return
if active_server is None or active_server != servername:
await ctx.send(f"No server '{servername}' is currently active, or a different server is running.")
return
await ctx.send(f"Attempting to {method} server '{servername}'...")
if method == "stop":
command_to_send = 'stop\n'
elif method == "forcestop":
command_to_send = "$'\\003'" # Ctrl+C
else:
await ctx.send("Invalid stop method.")
return
# Send the stop command to the screen session
screen_cmd = f"screen -S {servername} -X stuff '{command_to_send}'"
print(f"Sending stop command to screen: {screen_cmd}")
try:
subprocess.Popen(screen_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
await ctx.send(f"Failed to send stop command to screen: {e}")
# Even if command fails, attempt cleanup
# Give the server some time to shut down gracefully
if method == "stop":
await asyncio.sleep(15) # Minecraft servers can take a while to save and stop
else: # forcestop
await asyncio.sleep(5) # Give it a few seconds for Ctrl+C to register
# --- Cleanup ---
if active_server_pipe_task:
active_server_pipe_task.cancel()
try:
await active_server_pipe_task # Await to ensure it cleans up
except asyncio.CancelledError:
pass # Expected
active_server_pipe_task = None
await _delete_named_pipe(servername) # Delete the pipe file
# Verify screen session is gone (optional, but good for robust cleanup)
check_screen_cmd = f"screen -ls | grep {servername}"
process = subprocess.Popen(check_screen_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
stdout, stderr = process.communicate()
if not stdout:
await ctx.send(f"Server `{servername}` successfully {method}ped and screen session terminated.")
else:
await ctx.send(f"Server `{servername}` {method}ped, but screen session still detected. Manual intervention might be needed.")
print(f"Remaining screen -ls output for {servername}:\n{stdout}")
active_server = None # Clear active server state
'''
Handle events (messages mostly) Handle events (messages mostly)
''' '''
@bot.event @bot.event
async def on_ready(): async def on_ready():
@@ -56,8 +292,20 @@ async def send_message(message):
process = subprocess.Popen(f"screen -S {active_server} -p 0 -X stuff '{minecraft_message}\n'", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) process = subprocess.Popen(f"screen -S {active_server} -p 0 -X stuff '{minecraft_message}\n'", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
stdout, stderr = process.communicate() stdout, stderr = process.communicate()
''' '''
Handle commands Handle commands
''' '''
@bot.command() @bot.command()
async def startserver(ctx, *args): async def startserver(ctx, *args):
@@ -67,23 +315,16 @@ async def startserver(ctx, *args):
servername: The name of the server servername: The name of the server
''' '''
global active_server global active_server, active_server_pipe_task
# Check if the user is an admin
if not ctx.author.guild_permissions.administrator:
await ctx.send("You don't have permission to run this command.")
return
# Validate and get the server name # Validate and get the server name
if len(args) == 0: if len(args) != 1:
await ctx.send("Please provide a server name.") await ctx.send("Please provide exactly one server name.")
return
if len(args) > 1:
await ctx.send("Please provide only one server name.")
return return
servername = args[0] servername = args[0]
# Check if the server is already running # Check if the server is already running
if active_server is not None: if active_server is not None:
await ctx.send("A server is already running. If you think this is an error, run !list to refresh the list of servers and try again.") await ctx.send("A server is already running, please stop it first. If you think this is an error, run !list to refresh the list of servers and try again.")
return return
# Load server data from json # Load server data from json
@@ -102,23 +343,58 @@ async def startserver(ctx, *args):
# Start the server # Start the server
await ctx.send("Starting server, please wait...") await ctx.send("Starting server, please wait...")
path = BASE_PATH + server['path'] if BASE_PATH else server['path']
command = f"cd {path} && screen -dmS {servername} {server['startcommand']}" # Create the named pipe
print(f"Running command: {command}") try:
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) await _create_named_pipe(servername)
except Exception as e:
await ctx.send(f"Failed to create named pipe: {e}")
return
# Create the paths
pipe_path = _get_pipe_path(servername)
path = os.path.join(BASE_PATH, server['path']) if BASE_PATH else server['path']
# Check if the server path exists
if not os.path.isdir(path):
await ctx.send("Server path does not exist.")
await _delete_named_pipe(servername)
return
screen_cmd = (
f"cd {path} && "
f"screen -dmS {servername} bash -c \"{server['startcommand']} > {pipe_path}\""
)
#command = f"cd {path} && screen -dmS {servername} {server['startcommand']}"
#print(f"Running command: {command}")
print(f"Running screen command: {screen_cmd}")
try:
subprocess.Popen(screen_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
except Exception as e:
await ctx.send(f"Failed to start server: {e}")
await _delete_named_pipe(servername)
return
# Wait for the server to start then check if the screen instance is still live # Wait for the server to start then check if the screen instance is still live
stdout, stderr = process.communicate() await asyncio.sleep(2)
time.sleep(3)
process = subprocess.Popen(f"screen -ls | grep {servername}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) process = subprocess.Popen(f"screen -ls | grep {servername}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
stdout, stderr = process.communicate() stdout, stderr = process.communicate()
print(stdout) if not stdout:
if len(stdout) == 0:
await ctx.send("Server failed to start. Minecraft screen instance terminated within 3 seconds.") await ctx.send("Server failed to start. Minecraft screen instance terminated within 3 seconds.")
await _delete_named_pipe(servername)
return return
await ctx.send("Server started.")
await ctx.send(stdout) await ctx.send(f'Started server in screen session: {servername}')
print(f'Started server in screen session: {servername}')
print(f'stdout: {stdout}')
active_server = servername active_server = servername
# Start reading from the pipe
active_server_pipe_task = asyncio.create_task(_read_from_pipe(servername, ctx.channel.id, ctx.bot))
await ctx.send(f'Now monitoring output from `{servername}` in this channel.')
@bot.command() @bot.command()
async def stopserver(ctx): async def stopserver(ctx):
''' '''
@@ -126,17 +402,10 @@ async def stopserver(ctx):
''' '''
global active_server global active_server
# Check if the user is an admin if active_server is None:
if not ctx.author.guild_permissions.administrator: await ctx.send("No server is currently active.")
await ctx.send("You don't have permission to run this command.")
return return
# Stop the server await _stop_server_internal(ctx, active_server, "stop")
await ctx.send("Stopping server...")
process = subprocess.Popen(f"screen -S {active_server} -p 0 -X stuff 'stop\n'", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
stdout, stderr = process.communicate()
await ctx.send(stdout)
await ctx.send("Server stopped.")
active_server = None
@bot.command() @bot.command()
async def command(ctx, *args): async def command(ctx, *args):
@@ -204,19 +473,13 @@ async def forcestopserver(ctx):
''' '''
global active_server global active_server
# Check if the user is an admin
if not ctx.author.guild_permissions.administrator: if not ctx.author.guild_permissions.administrator:
await ctx.send("You don't have permission to run this command.") await ctx.send("You don't have permission to run this command.")
return return
# Stop the server if active_server is None:
await ctx.send("Stopping server...") await ctx.send("No server is currently active.")
process = subprocess.Popen(f"screen -S {active_server} -p 0 -X stuff $'\003'", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) return
process = subprocess.Popen(f"screen -S {active_server} -p 0 -X stuff $'\003'", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) await _stop_server_internal(ctx, active_server, "forcestop")
process = subprocess.Popen(f"screen -S {active_server} -p 0 -X stuff $'\003'", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
stdout, stderr = process.communicate()
await ctx.send(stdout)
await ctx.send("Server stopped.")
active_server = None
@bot.command() @bot.command()
async def ping(ctx): async def ping(ctx):
@@ -226,13 +489,6 @@ async def ping(ctx):
''' '''
await ctx.send('Pong!') await ctx.send('Pong!')
# @bot.command()
# async def help(ctx):
# string = "Available commands:\n"
# for command in bot.commands:
# string += f"{command.name}: {command.description}\n"
# await ctx.send(string)
@bot.command() @bot.command()
async def botstop(ctx): async def botstop(ctx):
''' '''
@@ -241,4 +497,4 @@ async def botstop(ctx):
''' '''
await bot.close() await bot.close()
bot.run(TOKEN) bot.run(TOKEN)