Fixed some server object code bugs.. probably object code is borked everywhere that uses get_objects_by_username.

I really needed to adopt a single standard with what type of connection object gets passed to a classmethod.
This commit is contained in:
Michael Woods
2025-12-25 23:12:54 -05:00
parent 07e6519679
commit 2693ad49b8
2 changed files with 60 additions and 38 deletions

View File

@@ -28,32 +28,32 @@ class ObjectSummary(BaseModel):
@router.get("/objects", response_model=List[ObjectSummary])
async def list_my_objects(db: DbDependency, current_user: HttpUser = Depends(get_current_http_user)):
username = current_user.username # uppercase callsign
core_objects = Object.get_objects_by_username(username, db)
# Sort newest first by created_at
core_objects.sort(key=lambda o: o.created_at, reverse=True)
username = current_user.username.upper().strip() # ensure uppercase consistency
logging.debug(f"Listing objects for user {username}")
user_objects = []
for obj in core_objects:
content_type, _ = mimetypes.guess_type(obj.name)
if content_type is None:
content_type = "application/octet-stream" if obj.binary else "text/plain"
with db.transaction() as conn:
for obj in Object.get_objects_by_username(username, conn):
logging.debug(f"Found object {obj.uuid} for {username}")
if obj: # should always exist, but guard anyway
content_type, _ = mimetypes.guess_type(obj.name)
if content_type is None:
content_type = "application/octet-stream" if obj.binary else "text/plain"
user_objects.append(ObjectSummary(
uuid=obj.uuid,
name=obj.name,
binary=obj.binary,
size=obj.size,
content_type=content_type,
private=obj.private,
created_at=obj.created_at,
modified_at=obj.modified_at
))
user_objects.append(ObjectSummary(
uuid=obj.uuid,
name=obj.name,
binary=obj.binary,
size=obj.size,
content_type=content_type,
private=obj.private,
created_at=obj.created_at,
modified_at=obj.modified_at
))
return user_objects
# Sort newest first
user_objects.sort(key=lambda x: x.created_at, reverse=True)
return user_objects
@router.post("/objects", response_model=ObjectSummary)
async def upload_object(
@@ -97,9 +97,7 @@ async def upload_object(
new_object = Object(name=obj_name, data=object_data)
new_object.private = private
obj_uuid = new_object.write_new(db) # assuming write_new takes db and handles root.objects storage
new_object.chown(username, db)
obj_uuid = new_object.write_new(db, username=username)
if force_text:
obj_type = 'string'

View File

@@ -1,4 +1,5 @@
"""Server object storage system."""
import traceback
from copy import deepcopy
import persistent
@@ -9,6 +10,7 @@ import datetime
from typing import Self,Union,Optional
from packetserver.common import PacketServerConnection, Request, Response, Message, send_response, send_blank_response
import ZODB
from ZODB.Connection import Connection
import logging
import uuid
from uuid import UUID
@@ -96,12 +98,12 @@ class Object(persistent.Persistent):
logging.debug(f"chowning object {self} to user {username}")
un = username.strip().upper()
old_owner_uuid = self._owner
with db.transaction() as db:
user = User.get_user_by_username(username, db.root())
old_owner = User.get_user_by_uuid(old_owner_uuid, db.root())
with db.transaction() as conn:
user = User.get_user_by_username(username, conn.root())
old_owner = User.get_user_by_uuid(old_owner_uuid, conn.root())
if user:
logging.debug(f"new owner user exists: {user}")
db.root.objects[self.uuid].owner = user.uuid
conn.root.objects[self.uuid].owner = user.uuid
if old_owner_uuid:
if old_owner:
logging.debug(f"The object has an old owner user: {old_owner}")
@@ -118,35 +120,57 @@ class Object(persistent.Persistent):
return db_root['objects'].get(obj)
@classmethod
def get_objects_by_username(cls, username: str, db: ZODB.DB) -> list[Self]:
def get_objects_by_username(cls, username: str, db: Union[ZODB.DB,Connection]) -> list[Self]:
un = username.strip().upper()
objs = []
with db.transaction() as db:
user = User.get_user_by_username(username, db.root())
if type(db) is Connection:
conn = db
user = User.get_user_by_username(un, conn.root())
if user:
uuids = user.object_uuids
for u in uuids:
try:
obj = cls.get_object_by_uuid(u, db)
obj = cls.get_object_by_uuid(u, conn.root())
if obj:
objs.append(obj)
except:
pass
else:
with db.transaction() as conn:
user = User.get_user_by_username(un, conn.root())
if user:
uuids = user.object_uuids
for u in uuids:
try:
obj = cls.get_object_by_uuid(u, conn.root())
if obj:
objs.append(obj)
except:
pass
return objs
@property
def uuid(self) -> Optional[UUID]:
return self._uuid
def write_new(self, db: ZODB.DB) -> UUID:
def write_new(self, db: ZODB.DB, username: str = None) -> UUID:
if self.uuid:
raise KeyError("Object already has UUID. Manually clear it to write it again.")
self._uuid = uuid.uuid4()
with db.transaction() as db:
while self.uuid in db.root.objects:
with db.transaction() as conn:
while self.uuid in conn.root.objects:
self._uuid = uuid.uuid4()
db.root.objects[self.uuid] = self
conn.root.objects[self.uuid] = self
self.touch()
logging.debug(f"New object assigned uuid {self.uuid}")
if username:
logging.debug(f"Attempting to assign new object to user: {username}")
try:
self.chown(username,db)
logging.debug(f"New object assigned to user: {username}")
except:
logging.warning(f"Unable to chown this object to user {username}: {traceback.format_exc()}")
return self.uuid
def to_dict(self, include_data: bool = True) -> dict: