Tools / Modules /Agent_Skills.py
Nymbo's picture
ADDING PROPER SUPPORT FOR SKILLS, NEW AGENT_SKILLS TOOL ADDED
a684bf6 verified
from __future__ import annotations
"""
Agent Skills Module for Nymbo-Tools MCP Server.
Provides structured skill discovery, activation, validation, and resource access
following the Agent Skills specification (https://agentskills.io).
Skills are directories containing a SKILL.md file with YAML frontmatter (name, description)
and Markdown instructions. This tool enables agents to efficiently discover and use skills
through progressive disclosure: low-token metadata discovery, on-demand full activation,
and targeted resource access.
"""
import json
import os
import re
import unicodedata
from pathlib import Path
from typing import Annotated, Optional
import gradio as gr
from app import _log_call_end, _log_call_start, _truncate_for_log
from ._docstrings import autodoc
from .File_System import ROOT_DIR, _display_path
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
SKILLS_SUBDIR = "Skills" # Subdirectory under ROOT_DIR containing skills
MAX_SKILL_NAME_LENGTH = 64
MAX_DESCRIPTION_LENGTH = 1024
MAX_COMPATIBILITY_LENGTH = 500
ALLOWED_FRONTMATTER_FIELDS = {
"name",
"description",
"license",
"allowed-tools",
"metadata",
"compatibility",
}
TOOL_SUMMARY = (
"Discover, inspect, validate, and access Agent Skills. "
"Actions: discover (list all skills), info (get SKILL.md contents), "
"resources (list/read bundled files), validate (check format), search (find by keyword). "
"Skills provide structured instructions for specialized tasks. "
"Use in combination with the `Shell_Command` and `File_System` tools."
)
HELP_TEXT = """\
Agent Skills — actions and usage
Skills are directories containing a SKILL.md file with YAML frontmatter (name, description)
and Markdown instructions. They live under /Skills/ in the filesystem root.
Actions:
- discover: List all available skills with their metadata (name, description, location)
- info: Get the full contents of a specific skill's SKILL.md file
- resources: List or read files within a skill's bundled directories (scripts/, references/, assets/)
- validate: Check if a skill conforms to the Agent Skills specification
- search: Find skills by keyword in name or description
- help: Show this guide
Examples:
- Discover all skills: action="discover"
- Get skill info: action="info", skill_name="pdf"
- List skill resources: action="resources", skill_name="mcp-builder"
- Read a resource: action="resources", skill_name="pdf", resource_path="references/forms.md"
- Validate a skill: action="validate", skill_name="pdf"
- Search for skills: action="search", query="MCP"
"""
# ---------------------------------------------------------------------------
# Skills Root Resolution
# ---------------------------------------------------------------------------
def _get_skills_root() -> Path:
"""Get the absolute path to the skills directory."""
skills_root = os.getenv("NYMBO_SKILLS_ROOT")
if skills_root and skills_root.strip():
return Path(skills_root.strip()).resolve()
return Path(ROOT_DIR) / SKILLS_SUBDIR
def _fmt_size(num_bytes: int) -> str:
"""Format byte size as human-readable string."""
units = ["B", "KB", "MB", "GB"]
size = float(num_bytes)
for unit in units:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
# ---------------------------------------------------------------------------
# YAML Frontmatter Parsing (adapted from skills_ref/parser.py)
# ---------------------------------------------------------------------------
class ParseError(Exception):
"""Raised when SKILL.md parsing fails."""
pass
class ValidationError(Exception):
"""Raised when skill validation fails."""
def __init__(self, message: str, errors: list[str] | None = None):
super().__init__(message)
self.errors = errors if errors is not None else [message]
def _parse_frontmatter(content: str) -> tuple[dict, str]:
"""
Parse YAML frontmatter from SKILL.md content.
Returns (metadata dict, markdown body).
Raises ParseError if frontmatter is missing or invalid.
"""
if not content.startswith("---"):
raise ParseError("SKILL.md must start with YAML frontmatter (---)")
parts = content.split("---", 2)
if len(parts) < 3:
raise ParseError("SKILL.md frontmatter not properly closed with ---")
frontmatter_str = parts[1]
body = parts[2].strip()
# Simple YAML parsing without external dependency
metadata: dict = {}
in_metadata_block = False
metadata_dict: dict = {}
for line in frontmatter_str.strip().split("\n"):
if not line.strip():
continue
if line.strip() == "metadata:":
in_metadata_block = True
continue
if in_metadata_block:
if line.startswith(" "):
match = re.match(r"^\s+(\w+):\s*(.*)$", line)
if match:
key = match.group(1).strip()
value = match.group(2).strip().strip('"').strip("'")
metadata_dict[key] = value
continue
else:
in_metadata_block = False
if metadata_dict:
metadata["metadata"] = metadata_dict
metadata_dict = {}
match = re.match(r"^(\S+):\s*(.*)$", line)
if match:
key = match.group(1).strip()
value = match.group(2).strip()
if (value.startswith('"') and value.endswith('"')) or \
(value.startswith("'") and value.endswith("'")):
value = value[1:-1]
metadata[key] = value if value else ""
if in_metadata_block and metadata_dict:
metadata["metadata"] = metadata_dict
return metadata, body
def _find_skill_md(skill_dir: Path) -> Optional[Path]:
"""Find the SKILL.md file in a skill directory (prefers uppercase)."""
for name in ("SKILL.md", "skill.md"):
path = skill_dir / name
if path.exists():
return path
return None
# ---------------------------------------------------------------------------
# Skill Validation (adapted from skills_ref/validator.py)
# ---------------------------------------------------------------------------
def _validate_name(name: str, skill_dir: Path) -> list[str]:
"""Validate skill name format and directory match."""
errors = []
if not name or not isinstance(name, str) or not name.strip():
errors.append("Field 'name' must be a non-empty string")
return errors
name = unicodedata.normalize("NFKC", name.strip())
if len(name) > MAX_SKILL_NAME_LENGTH:
errors.append(f"Skill name '{name}' exceeds {MAX_SKILL_NAME_LENGTH} character limit ({len(name)} chars)")
if name != name.lower():
errors.append(f"Skill name '{name}' must be lowercase")
if name.startswith("-") or name.endswith("-"):
errors.append("Skill name cannot start or end with a hyphen")
if "--" in name:
errors.append("Skill name cannot contain consecutive hyphens")
if not all(c.isalnum() or c == "-" for c in name):
errors.append(f"Skill name '{name}' contains invalid characters. Only letters, digits, and hyphens allowed.")
if skill_dir:
dir_name = unicodedata.normalize("NFKC", skill_dir.name)
if dir_name != name:
errors.append(f"Directory name '{skill_dir.name}' must match skill name '{name}'")
return errors
def _validate_description(description: str) -> list[str]:
"""Validate description format."""
errors = []
if not description or not isinstance(description, str) or not description.strip():
errors.append("Field 'description' must be a non-empty string")
return errors
if len(description) > MAX_DESCRIPTION_LENGTH:
errors.append(f"Description exceeds {MAX_DESCRIPTION_LENGTH} character limit ({len(description)} chars)")
return errors
def _validate_compatibility(compatibility: str) -> list[str]:
"""Validate compatibility format."""
errors = []
if not isinstance(compatibility, str):
errors.append("Field 'compatibility' must be a string")
return errors
if len(compatibility) > MAX_COMPATIBILITY_LENGTH:
errors.append(f"Compatibility exceeds {MAX_COMPATIBILITY_LENGTH} character limit ({len(compatibility)} chars)")
return errors
def _validate_skill(skill_dir: Path) -> list[str]:
"""Validate a skill directory. Returns list of error messages (empty = valid)."""
if not skill_dir.exists():
return [f"Path does not exist: {skill_dir}"]
if not skill_dir.is_dir():
return [f"Not a directory: {skill_dir}"]
skill_md = _find_skill_md(skill_dir)
if skill_md is None:
return ["Missing required file: SKILL.md"]
try:
content = skill_md.read_text(encoding="utf-8")
metadata, _ = _parse_frontmatter(content)
except ParseError as e:
return [str(e)]
except Exception as e:
return [f"Failed to read SKILL.md: {e}"]
errors = []
extra_fields = set(metadata.keys()) - ALLOWED_FRONTMATTER_FIELDS
if extra_fields:
errors.append(f"Unexpected fields in frontmatter: {', '.join(sorted(extra_fields))}")
if "name" not in metadata:
errors.append("Missing required field: name")
else:
errors.extend(_validate_name(metadata["name"], skill_dir))
if "description" not in metadata:
errors.append("Missing required field: description")
else:
errors.extend(_validate_description(metadata["description"]))
if "compatibility" in metadata:
errors.extend(_validate_compatibility(metadata["compatibility"]))
return errors
# ---------------------------------------------------------------------------
# Skill Discovery and Info
# ---------------------------------------------------------------------------
def _read_skill_properties(skill_dir: Path) -> dict:
"""Read skill properties from SKILL.md frontmatter. Returns dict with metadata."""
skill_md = _find_skill_md(skill_dir)
if skill_md is None:
raise ParseError(f"SKILL.md not found in {skill_dir}")
content = skill_md.read_text(encoding="utf-8")
metadata, body = _parse_frontmatter(content)
if "name" not in metadata:
raise ValidationError("Missing required field: name")
if "description" not in metadata:
raise ValidationError("Missing required field: description")
return {
"name": metadata.get("name", "").strip(),
"description": metadata.get("description", "").strip(),
"license": metadata.get("license"),
"compatibility": metadata.get("compatibility"),
"allowed_tools": metadata.get("allowed-tools"),
"metadata": metadata.get("metadata", {}),
"location": str(skill_md),
"body": body,
}
def _discover_skills() -> list[dict]:
"""Discover all valid skills in the skills directory."""
skills_root = _get_skills_root()
if not skills_root.exists():
return []
skills = []
for item in sorted(skills_root.iterdir()):
if not item.is_dir():
continue
skill_md = _find_skill_md(item)
if skill_md is None:
continue
try:
props = _read_skill_properties(item)
skills.append({
"name": props["name"],
"description": props["description"],
"location": _display_path(str(skill_md)),
})
except Exception:
continue
return skills
def _get_skill_info(skill_name: str, offset: int = 0, max_chars: int = 0) -> dict:
"""Get full information for a specific skill."""
skills_root = _get_skills_root()
skill_dir = skills_root / skill_name
if not skill_dir.exists():
raise FileNotFoundError(f"Skill not found: {skill_name}")
skill_md = _find_skill_md(skill_dir)
if skill_md is None:
raise FileNotFoundError(f"SKILL.md not found in skill: {skill_name}")
content = skill_md.read_text(encoding="utf-8")
metadata, body = _parse_frontmatter(content)
total_chars = len(body)
start = max(0, min(offset, total_chars))
if max_chars > 0:
end = min(total_chars, start + max_chars)
else:
end = total_chars
body_chunk = body[start:end]
truncated = end < total_chars
next_cursor = end if truncated else None
return {
"name": metadata.get("name", "").strip(),
"description": metadata.get("description", "").strip(),
"license": metadata.get("license"),
"compatibility": metadata.get("compatibility"),
"allowed_tools": metadata.get("allowed-tools"),
"metadata": metadata.get("metadata", {}),
"location": _display_path(str(skill_md)),
"body": body_chunk,
"offset": start,
"total_chars": total_chars,
"truncated": truncated,
"next_cursor": next_cursor,
}
def _list_skill_resources(skill_name: str) -> dict:
"""List resources (scripts, references, assets) within a skill directory."""
skills_root = _get_skills_root()
skill_dir = skills_root / skill_name
if not skill_dir.exists():
raise FileNotFoundError(f"Skill not found: {skill_name}")
resources = {
"skill": skill_name,
"scripts": [],
"references": [],
"assets": [],
"other_files": [],
}
known_dirs = {"scripts", "references", "assets"}
for item in sorted(skill_dir.iterdir()):
if item.name.lower() in ("skill.md",):
continue
if item.is_dir():
category = item.name.lower()
if category in known_dirs:
files = []
for f in sorted(item.rglob("*")):
if f.is_file():
files.append({
"path": str(f.relative_to(skill_dir)),
"size": f.stat().st_size,
})
resources[category] = files
elif item.is_file():
resources["other_files"].append({
"path": item.name,
"size": item.stat().st_size,
})
return resources
def _read_skill_resource(skill_name: str, resource_path: str, offset: int = 0, max_chars: int = 3000) -> dict:
"""Read a specific resource file from a skill."""
skills_root = _get_skills_root()
skill_dir = skills_root / skill_name
if not skill_dir.exists():
raise FileNotFoundError(f"Skill not found: {skill_name}")
resource_file = skill_dir / resource_path
try:
resource_file.resolve().relative_to(skill_dir.resolve())
except ValueError:
raise PermissionError(f"Resource path escapes skill directory: {resource_path}")
if not resource_file.exists():
raise FileNotFoundError(f"Resource not found: {resource_path}")
if resource_file.is_dir():
raise IsADirectoryError(f"Path is a directory: {resource_path}")
content = resource_file.read_text(encoding="utf-8", errors="replace")
total_chars = len(content)
start = max(0, min(offset, total_chars))
if max_chars > 0:
end = min(total_chars, start + max_chars)
else:
end = total_chars
chunk = content[start:end]
truncated = end < total_chars
next_cursor = end if truncated else None
return {
"skill": skill_name,
"resource": resource_path,
"content": chunk,
"size": resource_file.stat().st_size,
"offset": start,
"total_chars": total_chars,
"truncated": truncated,
"next_cursor": next_cursor,
}
def _search_skills(query: str) -> list[dict]:
"""Search for skills by keyword in name or description."""
query_lower = query.lower()
all_skills = _discover_skills()
matches = []
for skill in all_skills:
name_match = query_lower in skill["name"].lower()
desc_match = query_lower in skill["description"].lower()
if name_match or desc_match:
matches.append({
**skill,
"match_in": "name" if name_match else "description",
})
return matches
# ---------------------------------------------------------------------------
# Human-Readable Output Formatters
# ---------------------------------------------------------------------------
def _format_discover(skills: list[dict]) -> str:
"""Format skill discovery results as human-readable text."""
skills_root = _display_path(str(_get_skills_root()))
lines = [
f"Available Skills",
f"Root: {skills_root}",
f"Total: {len(skills)} skills",
"",
]
if not skills:
lines.append("No skills found.")
else:
for i, skill in enumerate(skills, 1):
name = skill["name"]
desc = skill["description"]
# Truncate long descriptions
if len(desc) > 100:
desc = desc[:97] + "..."
lines.append(f"{i}. {name}")
lines.append(f" {desc}")
lines.append("")
return "\n".join(lines).strip()
def _format_skill_info(info: dict) -> str:
"""Format skill info as human-readable text."""
lines = [
f"Skill: {info['name']}",
f"Location: {info['location']}",
"",
f"Description: {info['description']}",
]
if info.get("license"):
lines.append(f"License: {info['license']}")
if info.get("compatibility"):
lines.append(f"Compatibility: {info['compatibility']}")
if info.get("allowed_tools"):
lines.append(f"Allowed Tools: {info['allowed_tools']}")
if info.get("metadata"):
meta_str = ", ".join(f"{k}={v}" for k, v in info["metadata"].items())
lines.append(f"Metadata: {meta_str}")
lines.append("")
lines.append("--- SKILL.md Body ---")
if info.get("offset", 0) > 0:
lines.append(f"(Showing content from offset {info['offset']})")
lines.append("")
lines.append(info["body"])
if info.get("truncated"):
lines.append("")
lines.append(f"… Truncated. Showing {len(info['body'])} chars (offset {info['offset']}). Total: {info['total_chars']}.")
lines.append(f"Next cursor: {info['next_cursor']}")
return "\n".join(lines)
def _format_resources_list(resources: dict) -> str:
"""Format resource listing as human-readable text."""
skill = resources["skill"]
lines = [
f"Resources for skill: {skill}",
"",
]
total_files = 0
for category in ["scripts", "references", "assets"]:
files = resources.get(category, [])
if files:
lines.append(f"📂 {category}/")
for f in files:
size_str = _fmt_size(f["size"])
lines.append(f" • {f['path']} ({size_str})")
total_files += 1
lines.append("")
other = resources.get("other_files", [])
if other:
lines.append("📂 (root)")
for f in other:
size_str = _fmt_size(f["size"])
lines.append(f" • {f['path']} ({size_str})")
total_files += 1
lines.append("")
if total_files == 0:
lines.append("No resource files found.")
else:
lines.append(f"Total: {total_files} files")
return "\n".join(lines).strip()
def _format_resource_content(data: dict) -> str:
"""Format resource file content as human-readable text."""
lines = [
f"Resource: {data['resource']}",
f"Skill: {data['skill']}",
f"Size: {_fmt_size(data['size'])}",
]
offset = data.get("offset", 0)
lines.append(f"Showing: {len(data['content'])} of {data['total_chars']} chars (offset {offset})")
lines.append("")
lines.append("--- Content ---")
lines.append("")
lines.append(data["content"])
if data.get("truncated"):
lines.append("")
lines.append(f"… Truncated. Next cursor: {data['next_cursor']}")
return "\n".join(lines)
def _format_validation(skill_name: str, errors: list[str]) -> str:
"""Format validation results as human-readable text."""
if not errors:
return f"✓ Skill '{skill_name}' is valid."
lines = [
f"✗ Validation failed for skill '{skill_name}'",
f"Errors: {len(errors)}",
"",
]
for i, err in enumerate(errors, 1):
lines.append(f" {i}. {err}")
return "\n".join(lines)
def _format_search(query: str, matches: list[dict]) -> str:
"""Format search results as human-readable text."""
lines = [
f"Search results for: {query}",
f"Matches: {len(matches)}",
"",
]
if not matches:
lines.append("No matching skills found.")
else:
for i, m in enumerate(matches, 1):
name = m["name"]
desc = m["description"]
match_in = m.get("match_in", "")
if len(desc) > 80:
desc = desc[:77] + "..."
lines.append(f"{i}. {name} (matched in {match_in})")
lines.append(f" {desc}")
lines.append("")
return "\n".join(lines).strip()
def _format_error(message: str, hint: str = "") -> str:
"""Format error as human-readable text."""
lines = [f"Error: {message}"]
if hint:
lines.append(f"Hint: {hint}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Main Tool Function
# ---------------------------------------------------------------------------
@autodoc(summary=TOOL_SUMMARY)
def Agent_Skills(
action: Annotated[str, "Operation: 'discover', 'info', 'resources', 'validate', 'search', 'help'."],
skill_name: Annotated[Optional[str], "Name of skill (required for info/resources/validate)."] = None,
resource_path: Annotated[Optional[str], "Path to resource file within skill (for resources action)."] = None,
query: Annotated[Optional[str], "Search query (for search action)."] = None,
max_chars: Annotated[int, "Max characters to return for skill body or resource content (0 = no limit)."] = 3000,
offset: Annotated[int, "Start offset for reading content (for info/resources)."] = 0,
) -> str:
_log_call_start("Agent_Skills", action=action, skill_name=skill_name, resource_path=resource_path, query=query, max_chars=max_chars, offset=offset)
action = (action or "").strip().lower()
if action not in {"discover", "info", "resources", "validate", "search", "help"}:
result = _format_error(
f"Invalid action: {action}",
"Choose from: discover, info, resources, validate, search, help."
)
_log_call_end("Agent_Skills", _truncate_for_log(result))
return result
try:
if action == "help":
result = HELP_TEXT
elif action == "discover":
skills = _discover_skills()
result = _format_discover(skills)
elif action == "info":
if not skill_name:
result = _format_error("skill_name is required for 'info' action.")
else:
info = _get_skill_info(skill_name.strip(), offset=offset, max_chars=max_chars)
result = _format_skill_info(info)
elif action == "resources":
if not skill_name:
result = _format_error("skill_name is required for 'resources' action.")
elif resource_path:
resource_data = _read_skill_resource(skill_name.strip(), resource_path.strip(), offset=offset, max_chars=max_chars)
result = _format_resource_content(resource_data)
else:
resources = _list_skill_resources(skill_name.strip())
result = _format_resources_list(resources)
elif action == "validate":
if not skill_name:
result = _format_error("skill_name is required for 'validate' action.")
else:
skills_root = _get_skills_root()
skill_dir = skills_root / skill_name.strip()
errors = _validate_skill(skill_dir)
result = _format_validation(skill_name, errors)
elif action == "search":
if not query:
result = _format_error("query is required for 'search' action.")
else:
matches = _search_skills(query.strip())
result = _format_search(query, matches)
else:
result = _format_error(f"Action '{action}' not implemented.")
except FileNotFoundError as e:
result = _format_error(str(e))
except PermissionError as e:
result = _format_error(str(e))
except ParseError as e:
result = _format_error(str(e))
except ValidationError as e:
result = _format_error(str(e))
except Exception as e:
result = _format_error(f"Unexpected error: {e}")
_log_call_end("Agent_Skills", _truncate_for_log(result))
return result
# ---------------------------------------------------------------------------
# Gradio Interface
# ---------------------------------------------------------------------------
def build_interface() -> gr.Interface:
return gr.Interface(
fn=Agent_Skills,
inputs=[
gr.Radio(
label="Action",
choices=["discover", "info", "resources", "validate", "search", "help"],
value="help",
info="Operation to perform",
),
gr.Textbox(label="Skill Name", placeholder="pdf", max_lines=1, info="Name of the skill"),
gr.Textbox(label="Resource Path", placeholder="references/forms.md", max_lines=1, info="Path to resource within skill"),
gr.Textbox(label="Search Query", placeholder="MCP", max_lines=1, info="Keyword to search for"),
gr.Slider(minimum=0, maximum=100000, step=500, value=3000, label="Max Chars", info="Max characters for content (0 = no limit)"),
gr.Slider(minimum=0, maximum=1_000_000, step=100, value=0, label="Offset", info="Start offset (Info/Resources)"),
],
outputs=gr.Textbox(label="Result", lines=20),
title="Agent Skills",
description=(
"<div style=\"text-align:center; overflow:hidden;\">"
"Discover, inspect, and access Agent Skills. "
"Skills provide structured instructions and resources for specialized tasks."
"</div>"
),
api_description=TOOL_SUMMARY,
flagging_mode="never",
submit_btn="Run",
)
__all__ = ["Agent_Skills", "build_interface"]