Source code for agentbx.utils.redis_utils
"""
Redis utility functions for agentbx.
"""
import argparse
import json
import logging
import sys
from agentbx.core.redis_manager import RedisManager
logger = logging.getLogger(__name__)
[docs]def inspect_bundles_cli():
"""CLI tool to inspect bundles in Redis."""
parser = argparse.ArgumentParser(description="Inspect bundles in Redis")
parser.add_argument(
"--host", default="localhost", help="Redis host (default: localhost)"
)
parser.add_argument(
"--port", type=int, default=6379, help="Redis port (default: 6379)"
)
parser.add_argument("--type", help="Filter by bundle type")
parser.add_argument("--bundle-id", help="Inspect specific bundle by ID")
parser.add_argument(
"--metadata-only",
action="store_true",
help="Show only metadata, not full content analysis",
)
parser.add_argument("--json", action="store_true", help="Output in JSON format")
args = parser.parse_args()
# Initialize Redis manager
redis_manager = RedisManager(host=args.host, port=args.port)
if not redis_manager.is_healthy():
print("Error: Redis connection is not healthy", file=sys.stderr)
sys.exit(1)
try:
if args.bundle_id:
# Inspect specific bundle
if args.metadata_only:
result = redis_manager.get_bundle_metadata(args.bundle_id)
else:
result = redis_manager.inspect_bundle(args.bundle_id)
else:
# List bundles
result = redis_manager.list_bundles_with_metadata(args.type)
if args.json:
print(json.dumps(result, indent=2, default=str))
else:
if isinstance(result, list):
if not result:
print("No bundles found")
else:
for bundle_info in result:
print(f"Bundle ID: {bundle_info['bundle_id']}")
print(f" Type: {bundle_info['bundle_type']}")
print(f" Created: {bundle_info['created_at']}")
print(f" Size: {bundle_info['size_bytes']} bytes")
print(f" Checksum: {bundle_info['checksum']}")
print("---")
else:
print("Bundle Information:")
print(json.dumps(result, indent=2, default=str))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
inspect_bundles_cli()