Fixed user manager script after db changes from a while back. Fixed some object router code that was broken.

This commit is contained in:
Michael Woods
2025-12-27 21:14:19 -05:00
parent 13eac22741
commit e77b08fd0b
3 changed files with 230 additions and 151 deletions

View File

@@ -9,6 +9,8 @@ from persistent.mapping import PersistentMapping
from persistent.list import PersistentList
from packetserver.common.util import is_valid_ax25_callsign
from .database import DbDependency
from typing import Union
from ZODB.Connection import Connection
ph = PasswordHasher()
@@ -52,11 +54,15 @@ class HttpUser(Persistent):
# rf enabled checks..
#
def is_rf_enabled(self, db: DbDependency) -> bool:
def is_rf_enabled(self, db: Union[DbDependency,Connection]) -> bool:
"""
Check if RF gateway is enabled (i.e., callsign NOT in global blacklist).
Requires an open ZODB connection.
"""
if type(db) is Connection:
root = db.root()
blacklist = root.get('config', {}).get('blacklist', [])
return self.username not in blacklist
with db.transaction() as conn:
root = conn.root()
blacklist = root.get('config', {}).get('blacklist', [])

View File

@@ -1,5 +1,5 @@
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Header
from fastapi.responses import PlainTextResponse, Response, JSONResponse, StreamingResponse
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Header, Request
from fastapi.responses import PlainTextResponse, Response, JSONResponse, StreamingResponse, RedirectResponse
from typing import List, Optional
from datetime import datetime
from uuid import UUID
@@ -137,23 +137,54 @@ class TextObjectCreate(BaseModel):
name: Optional[str] = None
private: bool = True
@router.post("/objects/text", response_model=ObjectSummary)
@router.post("/objects/text", response_model=None) # Remove response_model to allow mixed returns
async def create_text_object(
payload: TextObjectCreate,
request: Request,
db: DbDependency,
current_user: HttpUser = Depends(get_current_http_user)
):
username = current_user.username
if not payload.text:
# Determine content type and parse accordingly
content_type = request.headers.get("content-type", "").lower()
if "application/x-www-form-urlencoded" in content_type or "multipart/form-data" in content_type:
form = await request.form()
name = form.get("name")
text = form.get("text")
private_str = form.get("private") # "on" if checked, None otherwise
is_form = True
elif "application/json" in content_type:
try:
json_data = await request.json()
name = json_data.get("name")
text = json_data.get("text")
private_str = json_data.get("private")
is_form = False
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON")
else:
raise HTTPException(status_code=415, detail="Unsupported Media Type")
# Validate text
if not text:
raise HTTPException(status_code=400, detail="Text content cannot be empty")
obj_name = (payload.name or "text_object.txt").strip()
# Normalize name (optional, default like original)
obj_name = (name or "text_object.txt").strip()
if len(obj_name) > 300:
raise HTTPException(status_code=400, detail="Object name too long (max 300 chars)")
if not obj_name:
raise HTTPException(status_code=400, detail="Invalid object name")
# Normalize private to bool (handles form "on"/None, JSON bool, or string)
if isinstance(private_str, bool):
private = private_str
elif isinstance(private_str, str):
private = private_str.lower() in ("true", "on", "1", "yes")
else:
private = False # Default to False if invalid/missing
try:
with db.transaction() as conn:
root = conn.root()
@@ -162,12 +193,12 @@ async def create_text_object(
raise HTTPException(status_code=404, detail="User not found")
# Create object with str data → forces binary=False
new_object = Object(name=obj_name, data=payload.text)
new_object.private = payload.private
new_object = Object(name=obj_name, data=text)
new_object.private = private
obj_uuid = new_object.write_new(db, username=username)
logging.info(f"User {username} created text object {obj_uuid} ({obj_name}, {len(payload.text)} chars)")
logging.info(f"User {username} created text object {obj_uuid} ({obj_name}, {len(text)} chars)")
except HTTPException:
raise
@@ -175,22 +206,28 @@ async def create_text_object(
logging.error(f"Text object creation failed for {username}: {e}\n{format_exc()}")
raise HTTPException(status_code=500, detail="Failed to create text object")
# Build summary
content_type, _ = mimetypes.guess_type(new_object.name)
if content_type is None:
content_type = "text/plain" # always text here
# Build summary (for JSON responses)
content_type_guess, _ = mimetypes.guess_type(new_object.name)
if content_type_guess is None:
content_type_guess = "text/plain" # always text here
return ObjectSummary(
summary = ObjectSummary(
uuid=obj_uuid,
name=new_object.name,
binary=new_object.binary, # should be False
size=new_object.size,
content_type=content_type,
content_type=content_type_guess,
private=new_object.private,
created_at=new_object.created_at,
modified_at=new_object.modified_at
)
# Return based on input type
if is_form:
return RedirectResponse(url="/objects", status_code=303) # Back to HTML list
else:
return JSONResponse(content=summary.model_dump(), status_code=201)
class BinaryObjectCreate(BaseModel):
data_base64: str
name: Optional[str] = None

View File

@@ -14,12 +14,16 @@ import sys
import time
from getpass import getpass
import ax25
import os.path
import os
import ZODB.FileStorage
import ZODB.DB
import transaction
from persistent.mapping import PersistentMapping
os.environ['PS_APP_ZEO_FILE'] = "N/A"
# Import our HTTP package internals
from packetserver.http.auth import HttpUser, ph # ph = PasswordHasher
@@ -42,6 +46,13 @@ def open_database(db_arg: str) -> ZODB.DB:
return ZODB.DB(storage)
def open_database_zeo_file(filename: str) -> ZODB.DB:
if os.path.isfile(filename):
return open_database(open(filename,'r').read().strip())
else:
raise FileNotFoundError("Must provide a filename to a zeo address.")
def get_or_create_http_users(root):
if HTTP_USERS_KEY not in root:
root[HTTP_USERS_KEY] = PersistentMapping()
@@ -55,7 +66,8 @@ def confirm(prompt: str) -> bool:
def main():
parser = argparse.ArgumentParser(description="Manage PacketServer HTTP API users")
parser.add_argument("--db", required=True, help="DB path (local /path/to/Data.fs) or ZEO (host:port)")
parser.add_argument("--db", required=False, help="DB path (local /path/to/Data.fs) or ZEO (host:port)")
parser.add_argument("--zeo-file", required=False, help="zeo address file")
subparsers = parser.add_subparsers(dest="command", required=True)
# add
@@ -99,12 +111,16 @@ def main():
args = parser.parse_args()
# Open the database
if args.db:
db = open_database(args.db)
connection = db.open()
root = connection.root()
else:
db = open_database_zeo_file(args.zeo_file)
try:
users_mapping = get_or_create_http_users(root)
with db.transaction() as conn:
root = conn.root()
http_users_list = list(get_or_create_http_users(root).keys())
upper_callsign = lambda c: c.upper()
@@ -117,7 +133,7 @@ def main():
print(f"Error: Trying to add valid callsign + ssid. Remove -<num> and add again.")
sys.exit(1)
if callsign in users_mapping:
if callsign in http_users_list:
print(f"Error: HTTP user {callsign} already exists")
sys.exit(1)
@@ -127,35 +143,41 @@ def main():
sys.exit(1)
# Create the HTTP-specific user
with db.transaction() as conn:
root = conn.root()
http_user = HttpUser(args.callsign, password)
users_mapping = get_or_create_http_users(conn.root())
users_mapping[callsign] = http_user
# Sync: create corresponding regular BBS user using proper write_new for UUID/uniqueness
from packetserver.server.users import User
main_users = root.setdefault('users', PersistentMapping())
if callsign not in main_users:
User.write_new(main_users, args.callsign) # correct: pass mapping + callsign
print(f" → Also created regular BBS user {callsign} (with UUID)")
new_user = User(args.callsign)
new_user.write_new(conn.root())
print(f" → Also created regular BBS user {callsign}")
else:
print(f" → Regular BBS user {callsign} already exists")
transaction.commit()
print(f"Created HTTP user {callsign}")
elif args.command == "delete":
callsign = upper_callsign(args.callsign)
if callsign not in users_mapping:
if callsign not in http_users_list:
print(f"Error: User {callsign} not found")
sys.exit(1)
if not confirm(f"Delete HTTP user {callsign}?"):
sys.exit(0)
with db.transaction() as conn:
root = conn.root()
users_mapping = get_or_create_http_users(root)
del users_mapping[callsign]
transaction.commit()
print(f"Deleted HTTP user {callsign}")
elif args.command == "set-password":
callsign = upper_callsign(args.callsign)
with db.transaction() as conn:
root = conn.root()
users_mapping = get_or_create_http_users(root)
user = users_mapping.get(callsign)
if not user:
print(f"Error: User {callsign} not found")
@@ -166,40 +188,45 @@ def main():
sys.exit(1)
user.password_hash = ph.hash(newpass)
user._p_changed = True
transaction.commit()
print(f"Password updated for {callsign}")
elif args.command == "enable":
with db.transaction() as conn:
root = conn.root()
users_mapping = get_or_create_http_users(root)
callsign = upper_callsign(args.callsign)
user = users_mapping.get(callsign)
if not user:
print(f"Error: User {callsign} not found")
sys.exit(1)
user.enabled = True
user.http_enabled = True
user._p_changed = True
transaction.commit()
print(f"HTTP access enabled for {callsign}")
elif args.command == "disable":
callsign = upper_callsign(args.callsign)
with db.transaction() as conn:
root = conn.root()
users_mapping = get_or_create_http_users(root)
user = users_mapping.get(callsign)
if not user:
print(f"Error: User {callsign} not found")
sys.exit(1)
user.enabled = False
user.http_enabled = False
user._p_changed = True
transaction.commit()
print(f"HTTP access disabled for {callsign}")
elif args.command == "rf-enable":
callsign = upper_callsign(args.callsign)
with db.transaction() as conn:
root = conn.root()
users_mapping = get_or_create_http_users(root)
user = users_mapping.get(callsign)
if not user:
print(f"Error: User {callsign} not found")
sys.exit(1)
try:
user.set_rf_enabled(connection, True)
transaction.commit()
user.set_rf_enabled(db, True)
print(f"RF gateway enabled for {callsign}")
except ValueError as e:
print(f"Error: {e}")
@@ -207,31 +234,39 @@ def main():
elif args.command == "rf-disable":
callsign = upper_callsign(args.callsign)
with db.transaction() as conn:
root = conn.root()
users_mapping = get_or_create_http_users(root)
user = users_mapping.get(callsign)
if not user:
print(f"Error: User {callsign} not found")
sys.exit(1)
user.set_rf_enabled(connection, False)
transaction.commit()
user.set_rf_enabled(db, False)
print(f"RF gateway disabled for {callsign}")
elif args.command == "list":
if not users_mapping:
if not http_users_list:
print("No HTTP users configured")
else:
with db.transaction() as conn:
root = conn.root()
users_mapping = get_or_create_http_users(root)
print(f"{'Callsign':<12} {'HTTP Enabled':<13} {'RF Enabled':<11} {'Created':<20} Last Login")
print("-" * 75)
for user in sorted(users_mapping.values(), key=lambda u: u.username):
created = time.strftime("%Y-%m-%d %H:%M", time.localtime(user.created_at))
last = (time.strftime("%Y-%m-%d %H:%M", time.localtime(user.last_login))
if user.last_login else "-")
rf_status = "True" if user.is_rf_enabled(connection) else "False"
rf_status = "True" if user.is_rf_enabled(conn) else "False"
print(f"{user.username:<12} {str(user.http_enabled):<13} {rf_status:<11} {created:<20} {last}")
elif args.command == "dump":
import json
callsign = upper_callsign(args.callsign)
with db.transaction() as conn:
root = conn.root()
users_mapping = get_or_create_http_users(root)
http_user = users_mapping.get(callsign)
if not http_user:
print(f"Error: No HTTP user {callsign} found")
@@ -247,8 +282,8 @@ def main():
"http_user": {
"username": http_user.username,
"http_enabled": http_user.http_enabled,
"rf_enabled": http_user.is_rf_enabled(connection),
"blacklisted": not http_user.is_rf_enabled(connection), # explicit inverse
"rf_enabled": http_user.is_rf_enabled(conn),
"blacklisted": not http_user.is_rf_enabled(conn), # explicit inverse
"created_at": time.strftime("%Y-%m-%d %H:%M:%S UTC", time.gmtime(http_user.created_at)),
"failed_attempts": http_user.failed_attempts,
},
@@ -277,6 +312,8 @@ def main():
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
return ''.join(secrets.choice(alphabet) for _ in range(length))
with db.transaction() as conn:
root = conn.root()
bbs_users = root.get('users', {})
http_users = get_or_create_http_users(root)
@@ -308,7 +345,6 @@ def main():
print("Use 'set-password <call>' to set a known password before enabling login")
finally:
connection.close()
db.close()