Spaces:
Runtime error
Runtime error
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import sys | |
| from typing import Any, Dict, Optional | |
| from my_config import MY_CONFIG | |
| from neo4j import GraphDatabase, Driver | |
| from tqdm import tqdm | |
| from fastmcp import FastMCP | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| GRAPH_DATA_DIR = MY_CONFIG.GRAPH_DATA_DIR | |
| GRAPH_DATA_FILE = os.path.join(GRAPH_DATA_DIR, "graph-data-final.json") | |
| class Neo4jConnection: | |
| def __init__(self): | |
| self.uri = MY_CONFIG.NEO4J_URI | |
| self.username = MY_CONFIG.NEO4J_USER | |
| self.password = MY_CONFIG.NEO4J_PASSWORD | |
| self.database = getattr(MY_CONFIG, "NEO4J_DATABASE", None) | |
| if not self.uri: | |
| raise ValueError("NEO4J_URI config is required") | |
| if not self.username: | |
| raise ValueError("NEO4J_USERNAME config is required") | |
| if not self.password: | |
| raise ValueError("NEO4J_PASSWORD config is required") | |
| if not self.database: | |
| raise ValueError("NEO4J_DATABASE config is required") | |
| self.driver: Optional[Driver] = None | |
| async def connect(self): | |
| if self.driver is None: | |
| try: | |
| self.driver = GraphDatabase.driver( | |
| self.uri, | |
| auth=(self.username, self.password) | |
| ) | |
| await asyncio.get_event_loop().run_in_executor( | |
| None, self.driver.verify_connectivity | |
| ) | |
| logger.info("Connected to Neo4j") | |
| except Exception as e: | |
| logger.error(f"Connection failed: {e}") | |
| self.driver = None | |
| async def disconnect(self): | |
| if self.driver: | |
| await asyncio.get_event_loop().run_in_executor( | |
| None, self.driver.close | |
| ) | |
| self.driver = None | |
| async def execute_query(self, query: str, parameters: Optional[Dict[str, Any]] = None): | |
| if not self.driver: | |
| raise ConnectionError("Not connected to Neo4j database") | |
| def run_query(): | |
| with self.driver.session(database=self.database) as session: | |
| result = session.run(query, parameters or {}) | |
| records = [record.data() for record in result] | |
| summary = result.consume() | |
| return records, summary | |
| return await asyncio.get_event_loop().run_in_executor(None, run_query) | |
| neo4j_connection = Neo4jConnection() | |
| app = FastMCP("Neo4j Graph Data Upload Server") | |
| async def execute_cypher(query: str, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| try: | |
| if not neo4j_connection.driver: | |
| await neo4j_connection.connect() | |
| if not neo4j_connection.driver: | |
| return { | |
| "status": "error", | |
| "error": "Unable to connect to Neo4j database", | |
| "details": "Check connection settings and network connectivity" | |
| } | |
| records, summary = await neo4j_connection.execute_query(query, parameters) | |
| return { | |
| "status": "success", | |
| "query": query, | |
| "parameters": parameters or {}, | |
| "records": records, | |
| "record_count": len(records), | |
| "execution_time_ms": summary.result_available_after, | |
| "summary": { | |
| "query_type": summary.query_type, | |
| "counters": dict(summary.counters) if summary.counters else {} | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "query": query, | |
| "error": str(e) | |
| } | |
| async def get_database_schema() -> Dict[str, Any]: | |
| try: | |
| if not neo4j_connection.driver: | |
| await neo4j_connection.connect() | |
| if not neo4j_connection.driver: | |
| return { | |
| "status": "error", | |
| "error": "Unable to connect to Neo4j database" | |
| } | |
| labels_records, _ = await neo4j_connection.execute_query("CALL db.labels()") | |
| labels = [record["label"] for record in labels_records] | |
| rel_records, _ = await neo4j_connection.execute_query("CALL db.relationshipTypes()") | |
| relationships = [record["relationshipType"] for record in rel_records] | |
| prop_records, _ = await neo4j_connection.execute_query("CALL db.propertyKeys()") | |
| properties = [record["propertyKey"] for record in prop_records] | |
| try: | |
| constraint_records, _ = await neo4j_connection.execute_query("SHOW CONSTRAINTS") | |
| constraints = [dict(record) for record in constraint_records] | |
| except Exception: | |
| constraints = [] | |
| try: | |
| index_records, _ = await neo4j_connection.execute_query("SHOW INDEXES") | |
| indexes = [dict(record) for record in index_records] | |
| except Exception: | |
| indexes = [] | |
| return { | |
| "status": "success", | |
| "schema": { | |
| "node_labels": labels, | |
| "relationship_types": relationships, | |
| "property_keys": properties, | |
| "constraints": constraints, | |
| "indexes": indexes | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "error": str(e) | |
| } | |
| async def get_node_count(label: Optional[str] = None) -> Dict[str, Any]: | |
| try: | |
| if not neo4j_connection.driver: | |
| await neo4j_connection.connect() | |
| if not neo4j_connection.driver: | |
| return { | |
| "status": "error", | |
| "error": "Unable to connect to Neo4j database" | |
| } | |
| if label: | |
| query = f"MATCH (n:`{label}`) RETURN count(n) as count" | |
| else: | |
| query = "MATCH (n) RETURN count(n) as count" | |
| records, _ = await neo4j_connection.execute_query(query) | |
| count = records[0]["count"] if records else 0 | |
| return { | |
| "status": "success", | |
| "label": label, | |
| "count": count | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "error": str(e) | |
| } | |
| async def get_relationship_count(relationship_type: Optional[str] = None) -> Dict[str, Any]: | |
| try: | |
| if not neo4j_connection.driver: | |
| await neo4j_connection.connect() | |
| if not neo4j_connection.driver: | |
| return { | |
| "status": "error", | |
| "error": "Unable to connect to Neo4j database" | |
| } | |
| if relationship_type: | |
| query = f"MATCH ()-[r:`{relationship_type}`]-() RETURN count(r) as count" | |
| else: | |
| query = "MATCH ()-[r]-() RETURN count(r) as count" | |
| records, _ = await neo4j_connection.execute_query(query) | |
| count = records[0]["count"] if records else 0 | |
| return { | |
| "status": "success", | |
| "relationship_type": relationship_type, | |
| "count": count | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "error": str(e) | |
| } | |
| async def health_check() -> Dict[str, Any]: | |
| try: | |
| if not neo4j_connection.driver: | |
| await neo4j_connection.connect() | |
| if not neo4j_connection.driver: | |
| return { | |
| "status": "unhealthy", | |
| "reason": "Unable to connect to Neo4j database", | |
| "configuration": { | |
| "uri": neo4j_connection.uri, | |
| "database": neo4j_connection.database, | |
| "username": neo4j_connection.username | |
| } | |
| } | |
| # A simple query to test connectivity | |
| records, _ = await neo4j_connection.execute_query("RETURN 1 as test") | |
| if records and records[0]["test"] == 1: | |
| return { | |
| "status": "healthy", | |
| "database": neo4j_connection.database, | |
| "uri": neo4j_connection.uri, | |
| "ssl_enabled": neo4j_connection.uri.startswith(('neo4j+s://', 'bolt+s://')), | |
| "message": "Neo4j connection is working properly" | |
| } | |
| else: | |
| return { | |
| "status": "unhealthy", | |
| "reason": "Query execution failed or returned unexpected results" | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "unhealthy", | |
| "reason": str(e) | |
| } | |
| async def clear_database_impl() -> Dict[str, Any]: | |
| try: | |
| if not neo4j_connection.driver: | |
| await neo4j_connection.connect() | |
| if not neo4j_connection.driver: | |
| return { | |
| "status": "error", | |
| "error": "Unable to connect to Neo4j database" | |
| } | |
| node_count_query = "MATCH (n) RETURN count(n) as count" | |
| rel_count_query = "MATCH ()-[r]->() RETURN count(r) as count" | |
| node_records, _ = await neo4j_connection.execute_query(node_count_query) | |
| rel_records, _ = await neo4j_connection.execute_query(rel_count_query) | |
| nodes_before = node_records[0]["count"] if node_records else 0 | |
| rels_before = rel_records[0]["count"] if rel_records else 0 | |
| await neo4j_connection.execute_query("MATCH ()-[r]->() DELETE r") | |
| await neo4j_connection.execute_query("MATCH (n) DELETE n") | |
| print(f"✅ Cleared: {nodes_before} nodes, {rels_before} relationships") | |
| return { | |
| "status": "success", | |
| "message": "Database cleared successfully", | |
| "statistics": { | |
| "nodes_removed": nodes_before, | |
| "relationships_removed": rels_before | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "error": str(e) | |
| } | |
| async def clear_database() -> Dict[str, Any]: | |
| return await clear_database_impl() | |
| async def upload_graph_data_impl() -> Dict[str, Any]: | |
| try: | |
| if not neo4j_connection.driver: | |
| await neo4j_connection.connect() | |
| if not neo4j_connection.driver: | |
| return { | |
| "status": "error", | |
| "error": "Unable to connect to Neo4j database" | |
| } | |
| clear_result = await clear_database_impl() | |
| if clear_result["status"] != "success": | |
| return clear_result | |
| # Check if graph data file exists | |
| if not os.path.exists(GRAPH_DATA_FILE): | |
| return { | |
| "status": "error", | |
| "error": f"Graph data file not found: {GRAPH_DATA_FILE}" | |
| } | |
| with open(GRAPH_DATA_FILE, 'r', encoding='utf-8') as f: | |
| graph_data = json.load(f) | |
| if not isinstance(graph_data, dict) or 'nodes' not in graph_data: | |
| return { | |
| "status": "error", | |
| "error": "Invalid graph data format. Expected JSON with 'nodes' array" | |
| } | |
| nodes = graph_data.get('nodes', []) | |
| relationships = graph_data.get('relationships', []) | |
| communities_data = graph_data.get('communities', {}) | |
| drift_metadata = graph_data.get('drift_search_metadata', {}) | |
| global_metadata = graph_data.get('metadata', {}) | |
| search_optimization = drift_metadata.get('search_optimization', {}) if drift_metadata else {} | |
| communities_count = len(drift_metadata.get('community_search_index', {})) if drift_metadata else 0 | |
| drift_count = 1 if drift_metadata else 0 | |
| metadata_count = 1 if global_metadata else 0 | |
| optimization_count = 1 if search_optimization else 0 | |
| communities_metadata_count = 1 if communities_data else 0 | |
| drift_config_count = 1 if (drift_metadata and 'configuration' in drift_metadata) else 0 | |
| community_search_index_count = 1 if (drift_metadata and 'community_search_index' in drift_metadata) else 0 | |
| search_optimization_object_count = 1 if (drift_metadata and 'search_optimization' in drift_metadata) else 0 | |
| embeddings_object_count = 1 if (drift_metadata and 'community_search_index' in drift_metadata) else 0 | |
| embeddings_count = communities_count if (drift_metadata and 'community_search_index' in drift_metadata) else 0 | |
| total_items = (len(nodes) + len(relationships) + communities_count + drift_count + | |
| metadata_count + optimization_count + communities_metadata_count + | |
| drift_config_count + community_search_index_count + | |
| search_optimization_object_count + embeddings_object_count + embeddings_count) | |
| print(f"Processing: {len(nodes)} nodes, {len(relationships)} relationships, {communities_count} communities, {total_items - len(nodes) - len(relationships) - communities_count} metadata") | |
| upload_stats = { | |
| "nodes_processed": 0, | |
| "nodes_created": 0, | |
| "relationships_processed": 0, | |
| "relationships_created": 0, | |
| "communities_processed": 0, | |
| "communities_created": 0, | |
| "drift_metadata_created": 0, | |
| "global_metadata_created": 0, | |
| "search_optimization_created": 0, | |
| "communities_metadata_created": 0, | |
| "drift_config_created": 0, | |
| "community_search_index_created": 0, | |
| "search_optimization_object_created": 0, | |
| "embeddings_object_created": 0, | |
| "embeddings_created": 0, | |
| "errors": [] | |
| } | |
| with tqdm(total=len(nodes), desc="Nodes", unit="node", ncols=80, leave=False) as pbar: | |
| for node in nodes: | |
| try: | |
| upload_stats["nodes_processed"] += 1 | |
| node_id = node['id'] | |
| labels = node['labels'] | |
| properties = node.get('properties', {}) | |
| # Create node with labels | |
| labels_str = ':'.join([f"`{label}`" for label in labels]) | |
| query = f"MERGE (n:{labels_str} {{id: $id}}) SET n += $props RETURN n" | |
| await neo4j_connection.execute_query(query, { | |
| "id": node_id, | |
| "props": properties | |
| }) | |
| upload_stats["nodes_created"] += 1 | |
| pbar.update(1) | |
| except Exception as e: | |
| upload_stats["errors"].append(f"Node upload error: {str(e)}") | |
| pbar.update(1) | |
| with tqdm(total=len(relationships), desc="Relationships", unit="rel", ncols=80, leave=False) as pbar: | |
| for rel in relationships: | |
| upload_stats["relationships_processed"] += 1 | |
| start_node = rel['startNode'] | |
| end_node = rel['endNode'] | |
| rel_type = rel['type'] | |
| try: | |
| query = f""" | |
| MATCH (a {{id: $start_node}}) | |
| MATCH (b {{id: $end_node}}) | |
| CREATE (a)-[r:`{rel_type}`]->(b) | |
| SET r += $props | |
| RETURN r | |
| """ | |
| await neo4j_connection.execute_query(query, { | |
| "start_node": start_node, | |
| "end_node": end_node, | |
| "props": properties | |
| }) | |
| upload_stats["relationships_created"] += 1 | |
| pbar.update(1) | |
| except Exception as e: | |
| error_msg = f"Relationship upload error for rel {rel}: {str(e)}" | |
| logger.error(error_msg) | |
| upload_stats["errors"].append(error_msg) | |
| pbar.update(1) | |
| if drift_metadata and 'community_search_index' in drift_metadata: | |
| community_index = drift_metadata['community_search_index'] | |
| with tqdm(total=len(community_index), desc="Communities", unit="comm", ncols=80, leave=False) as pbar: | |
| for comm_id, comm_data in community_index.items(): | |
| try: | |
| upload_stats["communities_processed"] += 1 | |
| embeddings = comm_data.get('embeddings', {}) | |
| summary_embedding = embeddings.get('summary_embedding', []) | |
| hyde_embeddings = embeddings.get('hyde_embeddings', []) | |
| follow_up_templates_json = json.dumps(comm_data.get('follow_up_templates', {})) | |
| hyde_embeddings_json = json.dumps(hyde_embeddings) | |
| # Get statistics | |
| stats = comm_data.get('statistics', {}) | |
| # Community properties with documented attributes from JSON | |
| community_props = { | |
| "id": comm_id, | |
| "summary": comm_data.get('summary', ''), | |
| "key_entities": comm_data.get('key_entities', []), | |
| "member_count": stats.get('member_count', 0), | |
| "member_ids": stats.get('member_ids', []), | |
| "internal_edges": stats.get('internal_edges', 0), | |
| "density": stats.get('density', 0.0), | |
| "avg_degree": stats.get('avg_degree', 0.0), | |
| "follow_up_templates": follow_up_templates_json, | |
| "hyde_embeddings": hyde_embeddings_json | |
| } | |
| # Add summary embedding as List<Float> if available | |
| if summary_embedding and isinstance(summary_embedding, list): | |
| community_props["summary_embedding"] = summary_embedding | |
| community_props["embedding_dimensions"] = len(summary_embedding) | |
| # Create Community node | |
| query = """ | |
| MERGE (c:Community {id: $id}) | |
| SET c += $props | |
| RETURN c | |
| """ | |
| await neo4j_connection.execute_query(query, { | |
| "id": comm_id, | |
| "props": community_props | |
| }) | |
| upload_stats["communities_created"] += 1 | |
| pbar.update(1) | |
| except Exception as e: | |
| error_msg = f"Community upload error for {comm_id}: {str(e)}" | |
| logger.error(error_msg) | |
| upload_stats["errors"].append(error_msg) | |
| pbar.update(1) | |
| if drift_metadata: | |
| try: | |
| query_routing_config_json = json.dumps(drift_metadata.get('query_routing_config', {})) | |
| performance_monitoring_json = json.dumps(drift_metadata.get('performance_monitoring', {})) | |
| configuration_json = json.dumps(drift_metadata.get('configuration', {})) | |
| community_search_index_json = json.dumps(drift_metadata.get('community_search_index', {})) | |
| search_optimization_json = json.dumps(drift_metadata.get('search_optimization', {})) | |
| # Build a compact embeddings object (per-community) to store on the DRIFT node | |
| embeddings_per_community = {} | |
| for _comm_id, _comm_data in drift_metadata.get('community_search_index', {}).items(): | |
| emb = _comm_data.get('embeddings') | |
| if emb: | |
| # Only keep summary and hyde to limit size | |
| embeddings_per_community[_comm_id] = { | |
| 'summary_embedding': emb.get('summary_embedding'), | |
| 'hyde_embeddings': emb.get('hyde_embeddings') | |
| } | |
| embeddings_json = json.dumps(embeddings_per_community) | |
| drift_props = { | |
| "version": drift_metadata.get('version', '1.0'), | |
| "generated_timestamp": drift_metadata.get('generated_timestamp', ''), | |
| "query_routing_config": query_routing_config_json, | |
| "performance_monitoring": performance_monitoring_json, | |
| "configuration": configuration_json, | |
| # Nested objects stored as JSON strings for direct inspection | |
| "community_search_index": community_search_index_json, | |
| "search_optimization": search_optimization_json, | |
| "embeddings": embeddings_json, | |
| "total_communities": len(drift_metadata.get('community_search_index', {})) | |
| } | |
| # Create single DRIFT metadata node | |
| query = """ | |
| MERGE (d:DriftMetadata {version: $version}) | |
| SET d += $props | |
| RETURN d | |
| """ | |
| await neo4j_connection.execute_query(query, { | |
| "version": drift_metadata.get('version', '1.0'), | |
| "props": drift_props | |
| }) | |
| upload_stats["drift_metadata_created"] = 1 | |
| except Exception as e: | |
| error_msg = f"DRIFT metadata upload error: {str(e)}" | |
| logger.error(error_msg) | |
| upload_stats["errors"].append(error_msg) | |
| if global_metadata: | |
| try: | |
| # Convert nested objects to JSON strings for Neo4j compatibility | |
| recovery_stats_json = json.dumps(global_metadata.get('recovery_stats', {})) | |
| member_extraction_stats_json = json.dumps(global_metadata.get('member_extraction_stats', {})) | |
| community_detection_json = json.dumps(global_metadata.get('community_detection', {})) | |
| metadata_props = { | |
| "node_count": global_metadata.get('node_count', 0), | |
| "relationship_count": global_metadata.get('relationship_count', 0), | |
| "generated_at": global_metadata.get('generated_at', ''), | |
| "generator": global_metadata.get('generator', ''), | |
| "llm_provider": global_metadata.get('llm_provider', ''), | |
| "model": global_metadata.get('model', ''), | |
| "format_version": global_metadata.get('format_version', ''), | |
| "last_updated": global_metadata.get('last_updated', ''), | |
| "phase": global_metadata.get('phase', ''), | |
| "entity_count": global_metadata.get('entity_count', global_metadata.get('node_count', 0)), | |
| "community_count": global_metadata.get('community_count', 0), | |
| "total_node_count": global_metadata.get('total_node_count', global_metadata.get('node_count', 0)), | |
| "total_relationship_count": global_metadata.get('total_relationship_count', global_metadata.get('relationship_count', 0)), | |
| # Complex nested objects as JSON strings | |
| "recovery_stats": recovery_stats_json, | |
| "member_extraction_stats": member_extraction_stats_json, | |
| "community_detection": community_detection_json | |
| } | |
| # Create Global Metadata node | |
| query = """ | |
| MERGE (m:GraphMetadata {generator: $generator}) | |
| SET m += $props | |
| RETURN m | |
| """ | |
| await neo4j_connection.execute_query(query, { | |
| "generator": global_metadata.get('generator', 'unknown'), | |
| "props": metadata_props | |
| }) | |
| upload_stats["global_metadata_created"] = 1 | |
| except Exception as e: | |
| error_msg = f"Global metadata upload error: {str(e)}" | |
| logger.error(error_msg) | |
| upload_stats["errors"].append(error_msg) | |
| if search_optimization: | |
| try: | |
| optimization_props = { | |
| "total_communities": search_optimization.get('total_communities', 0), | |
| "avg_community_size": search_optimization.get('avg_community_size', 0.0), | |
| "graph_density": search_optimization.get('graph_density', 0.0), | |
| "total_nodes": search_optimization.get('total_nodes', 0), | |
| "total_edges": search_optimization.get('total_edges', 0), | |
| "max_primer_communities": search_optimization.get('max_primer_communities', 0) | |
| } | |
| query = """ | |
| MERGE (s:SearchOptimization {id: 'global'}) | |
| SET s += $props | |
| RETURN s | |
| """ | |
| await neo4j_connection.execute_query(query, { | |
| "props": optimization_props | |
| }) | |
| upload_stats["search_optimization_created"] = 1 | |
| except Exception as e: | |
| error_msg = f"Search optimization upload error: {str(e)}" | |
| logger.error(error_msg) | |
| upload_stats["errors"].append(error_msg) | |
| if communities_data: | |
| try: | |
| communities_props = { | |
| "algorithm": communities_data.get('algorithm', ''), | |
| "total_communities": communities_data.get('total_communities', 0), | |
| "modularity_score": communities_data.get('modularity_score', 0.0), | |
| "summaries": json.dumps(communities_data.get('summaries', {})), | |
| "statistics": json.dumps(communities_data.get('statistics', {})) | |
| } | |
| query = """ | |
| MERGE (cm:CommunitiesMetadata {algorithm: $algorithm}) | |
| SET cm += $props | |
| RETURN cm | |
| """ | |
| await neo4j_connection.execute_query(query, { | |
| "algorithm": communities_data.get('algorithm', 'unknown'), | |
| "props": communities_props | |
| }) | |
| upload_stats["communities_metadata_created"] = 1 | |
| except Exception as e: | |
| error_msg = f"Communities metadata upload error: {str(e)}" | |
| logger.error(error_msg) | |
| upload_stats["errors"].append(error_msg) | |
| if drift_metadata and 'configuration' in drift_metadata: | |
| try: | |
| config = drift_metadata['configuration'] | |
| drift_config_props = { | |
| "max_iterations": config.get('max_iterations', 0), | |
| "confidence_threshold": config.get('confidence_threshold', 0.0), | |
| "top_k_communities": config.get('top_k_communities', 0), | |
| "hyde_expansion_count": config.get('hyde_expansion_count', 0), | |
| "termination_criteria": config.get('termination_criteria', ''), | |
| "version": drift_metadata.get('version', '1.0'), | |
| "generated_timestamp": drift_metadata.get('generated_timestamp', '') | |
| } | |
| query = """ | |
| MERGE (dc:DriftConfiguration {version: $version}) | |
| SET dc += $props | |
| RETURN dc | |
| """ | |
| await neo4j_connection.execute_query(query, { | |
| "version": drift_metadata.get('version', '1.0'), | |
| "props": drift_config_props | |
| }) | |
| upload_stats["drift_config_created"] = 1 | |
| except Exception as e: | |
| error_msg = f"DRIFT Configuration upload error: {str(e)}" | |
| logger.error(error_msg) | |
| upload_stats["errors"].append(error_msg) | |
| if drift_metadata and 'community_search_index' in drift_metadata: | |
| try: | |
| community_search_index = drift_metadata['community_search_index'] | |
| search_index_props = { | |
| "version": drift_metadata.get('version', '1.0'), | |
| "total_communities": len(community_search_index), | |
| "community_data": json.dumps(community_search_index), | |
| "generated_timestamp": drift_metadata.get('generated_timestamp', ''), | |
| "index_type": "community_search" | |
| } | |
| query = """ | |
| MERGE (csi:CommunitySearchIndex {version: $version}) | |
| SET csi += $props | |
| RETURN csi | |
| """ | |
| await neo4j_connection.execute_query(query, { | |
| "version": drift_metadata.get('version', '1.0'), | |
| "props": search_index_props | |
| }) | |
| upload_stats["community_search_index_created"] = 1 | |
| except Exception as e: | |
| error_msg = f"Community Search Index upload error: {str(e)}" | |
| logger.error(error_msg) | |
| upload_stats["errors"].append(error_msg) | |
| if drift_metadata and 'search_optimization' in drift_metadata: | |
| try: | |
| search_opt_data = drift_metadata['search_optimization'] | |
| search_opt_props = { | |
| "total_communities": search_opt_data.get('total_communities', 0), | |
| "avg_community_size": search_opt_data.get('avg_community_size', 0.0), | |
| "graph_density": search_opt_data.get('graph_density', 0.0), | |
| "total_nodes": search_opt_data.get('total_nodes', 0), | |
| "total_edges": search_opt_data.get('total_edges', 0), | |
| "max_primer_communities": search_opt_data.get('max_primer_communities', 0), | |
| "optimization_version": drift_metadata.get('version', '1.0') | |
| } | |
| query = """ | |
| MERGE (so:SearchOptimizationObject {optimization_version: $version}) | |
| SET so += $props | |
| RETURN so | |
| """ | |
| await neo4j_connection.execute_query(query, { | |
| "version": drift_metadata.get('version', '1.0'), | |
| "props": search_opt_props | |
| }) | |
| upload_stats["search_optimization_object_created"] = 1 | |
| except Exception as e: | |
| error_msg = f"Search Optimization object upload error: {str(e)}" | |
| logger.error(error_msg) | |
| upload_stats["errors"].append(error_msg) | |
| if drift_metadata and 'community_search_index' in drift_metadata: | |
| try: | |
| community_index = drift_metadata['community_search_index'] | |
| total_embeddings = 0 | |
| total_dimensions = 0 | |
| embedding_communities = [] | |
| for comm_id, comm_data in community_index.items(): | |
| embeddings_data = comm_data.get('embeddings', {}) | |
| if embeddings_data: | |
| total_embeddings += 1 | |
| if embeddings_data.get('summary_embedding'): | |
| total_dimensions = len(embeddings_data.get('summary_embedding', [])) | |
| embedding_communities.append(comm_id) | |
| # Create embeddings object properties | |
| embeddings_obj_props = { | |
| "total_embeddings": total_embeddings, | |
| "embedding_dimensions": total_dimensions, | |
| "embedding_computation": "computed via text-embedding-ada-002", | |
| "communities_with_embeddings": embedding_communities, | |
| "embedding_type": "community_summaries", | |
| "embeddings_version": drift_metadata.get('version', '1.0') | |
| } | |
| query = """ | |
| MERGE (eo:EmbeddingsObject {embeddings_version: $version}) | |
| SET eo += $props | |
| RETURN eo | |
| """ | |
| await neo4j_connection.execute_query(query, { | |
| "version": drift_metadata.get('version', '1.0'), | |
| "props": embeddings_obj_props | |
| }) | |
| upload_stats["embeddings_object_created"] = 1 | |
| except Exception as e: | |
| error_msg = f"Embeddings object upload error: {str(e)}" | |
| logger.error(error_msg) | |
| upload_stats["errors"].append(error_msg) | |
| if drift_metadata and 'community_search_index' in drift_metadata: | |
| community_index = drift_metadata['community_search_index'] | |
| for comm_id, comm_data in community_index.items(): | |
| try: | |
| embeddings_data = comm_data.get('embeddings', {}) | |
| if embeddings_data: | |
| embeddings_props = { | |
| "community_id": comm_id, | |
| "summary_embedding": embeddings_data.get('summary_embedding', []), | |
| "hyde_embeddings": json.dumps(embeddings_data.get('hyde_embeddings', [])), | |
| "embedding_dimensions": len(embeddings_data.get('summary_embedding', [])), | |
| "embedding_computation": embeddings_data.get('embedding_computation', 'computed') | |
| } | |
| query = """ | |
| MERGE (e:Embeddings {community_id: $community_id}) | |
| SET e += $props | |
| RETURN e | |
| """ | |
| await neo4j_connection.execute_query(query, { | |
| "community_id": comm_id, | |
| "props": embeddings_props | |
| }) | |
| upload_stats["embeddings_created"] += 1 | |
| except Exception as e: | |
| error_msg = f"Embeddings upload error for {comm_id}: {str(e)}" | |
| logger.error(error_msg) | |
| upload_stats["errors"].append(error_msg) | |
| if communities_count > 0: | |
| try: | |
| # Connect entities to their communities based on community_id property | |
| community_rel_query = """ | |
| MATCH (n) WHERE n.community_id IS NOT NULL | |
| MATCH (c:Community {id: n.community_id}) | |
| MERGE (n)-[:BELONGS_TO_COMMUNITY]->(c) | |
| """ | |
| await neo4j_connection.execute_query(community_rel_query, {}) | |
| except Exception as e: | |
| error_msg = f"Community relationship creation error: {str(e)}" | |
| logger.error(error_msg) | |
| upload_stats["errors"].append(error_msg) | |
| # Calculate success percentage for all components | |
| nodes_success_rate = (upload_stats["nodes_created"] / len(nodes) * 100) if nodes else 100 | |
| rels_success_rate = (upload_stats["relationships_created"] / len(relationships) * 100) if relationships else 100 | |
| communities_success_rate = (upload_stats["communities_created"] / communities_count * 100) if communities_count else 100 | |
| drift_success_rate = (upload_stats["drift_metadata_created"] / drift_count * 100) if drift_count else 100 | |
| embedding_dimensions = 0 | |
| if drift_metadata and 'community_search_index' in drift_metadata: | |
| for comm_data in drift_metadata['community_search_index'].values(): | |
| embeddings = comm_data.get('embeddings', {}) | |
| summary_embedding = embeddings.get('summary_embedding', []) | |
| if summary_embedding: | |
| embedding_dimensions = len(summary_embedding) | |
| break | |
| total_created = (upload_stats["nodes_created"] + upload_stats["relationships_created"] + | |
| upload_stats["communities_created"] + upload_stats["drift_metadata_created"] + | |
| upload_stats["global_metadata_created"] + upload_stats["search_optimization_created"] + | |
| upload_stats["communities_metadata_created"] + upload_stats["drift_config_created"] + | |
| upload_stats["community_search_index_created"] + upload_stats["search_optimization_object_created"] + | |
| upload_stats["embeddings_object_created"] + upload_stats["embeddings_created"]) | |
| overall_success_rate = (total_created / total_items * 100) if total_items else 100 | |
| result = { | |
| "status": "success", | |
| "message": "Graph data upload completed successfully", | |
| "statistics": upload_stats, | |
| "success_rates": { | |
| "nodes": f"{nodes_success_rate:.1f}%", | |
| "relationships": f"{rels_success_rate:.1f}%", | |
| "communities": f"{communities_success_rate:.1f}%", | |
| "drift_metadata": f"{drift_success_rate:.1f}%", | |
| "global_metadata": f"{100.0 if upload_stats['global_metadata_created'] > 0 else 0:.1f}%", | |
| "search_optimization": f"{100.0 if upload_stats['search_optimization_created'] > 0 else 0:.1f}%", | |
| "communities_metadata": f"{100.0 if upload_stats['communities_metadata_created'] > 0 else 0:.1f}%", | |
| "drift_config": f"{100.0 if upload_stats['drift_config_created'] > 0 else 0:.1f}%", | |
| "community_search_index": f"{100.0 if upload_stats['community_search_index_created'] > 0 else 0:.1f}%", | |
| "search_optimization_object": f"{100.0 if upload_stats['search_optimization_object_created'] > 0 else 0:.1f}%", | |
| "embeddings_object": f"{100.0 if upload_stats['embeddings_object_created'] > 0 else 0:.1f}%", | |
| "embeddings": f"{(upload_stats['embeddings_created']/communities_count*100) if communities_count > 0 else 0:.1f}%", | |
| "overall": f"{overall_success_rate:.1f}%" | |
| }, | |
| "source_file": GRAPH_DATA_FILE, | |
| "architecture_summary": { | |
| "nodes": f"{upload_stats['nodes_created']}/{len(nodes)}", | |
| "relationships": f"{upload_stats['relationships_created']}/{len(relationships)}", | |
| "communities": f"{upload_stats['communities_created']}/{communities_count}", | |
| "drift_metadata": f"{upload_stats['drift_metadata_created']}/{drift_count}", | |
| "global_metadata": f"{upload_stats['global_metadata_created']}/1", | |
| "search_optimization": f"{upload_stats['search_optimization_created']}/1", | |
| "embeddings_stored": communities_count > 0, | |
| "vector_dimensions": embedding_dimensions, | |
| "complete_metadata_coverage": upload_stats['global_metadata_created'] > 0 and upload_stats['search_optimization_created'] > 0 | |
| } | |
| } | |
| # Print concise upload summary | |
| print(f"\n✅ Upload completed: {total_created}/{total_items} items ({overall_success_rate:.1f}%)") | |
| # Show all node types created | |
| total_entity_nodes = upload_stats['nodes_created'] | |
| total_metadata_nodes = (upload_stats['drift_metadata_created'] + | |
| upload_stats['global_metadata_created'] + | |
| upload_stats['search_optimization_created'] + | |
| upload_stats['communities_metadata_created'] + | |
| upload_stats['drift_config_created'] + | |
| upload_stats['community_search_index_created'] + | |
| upload_stats['search_optimization_object_created'] + | |
| upload_stats['embeddings_object_created']) | |
| print(f" Entity Nodes: {total_entity_nodes}, Community Nodes: {upload_stats['communities_created']}, Metadata Nodes: {total_metadata_nodes}, Embedding Nodes: {upload_stats['embeddings_created']}") | |
| print(f" Relationships: {upload_stats['relationships_created']}") | |
| if upload_stats['errors']: | |
| print(f" ⚠️ {len(upload_stats['errors'])} errors encountered") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Graph data upload failed: {str(e)}") | |
| return { | |
| "status": "error", | |
| "error": str(e) | |
| } | |
| async def upload_graph_data() -> Dict[str, Any]: | |
| return await upload_graph_data_impl() | |
| async def check_graph_data_file() -> Dict[str, Any]: | |
| try: | |
| if not os.path.exists(GRAPH_DATA_FILE): | |
| return { | |
| "status": "not_found", | |
| "path": GRAPH_DATA_FILE, | |
| "message": "Graph data file does not exist" | |
| } | |
| # Get file stats | |
| file_stats = os.stat(GRAPH_DATA_FILE) | |
| file_size = file_stats.st_size | |
| # Try to parse the JSON to validate format | |
| try: | |
| with open(GRAPH_DATA_FILE, 'r', encoding='utf-8') as f: | |
| graph_data = json.load(f) | |
| nodes_count = len(graph_data.get('nodes', [])) | |
| relationships_count = len(graph_data.get('relationships', [])) | |
| return { | |
| "status": "found", | |
| "path": GRAPH_DATA_FILE, | |
| "file_size_bytes": file_size, | |
| "nodes_count": nodes_count, | |
| "relationships_count": relationships_count, | |
| "valid_json": True | |
| } | |
| except json.JSONDecodeError as e: | |
| return { | |
| "status": "invalid", | |
| "path": GRAPH_DATA_FILE, | |
| "file_size_bytes": file_size, | |
| "valid_json": False, | |
| "json_error": str(e) | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "error": str(e), | |
| "error_type": type(e).__name__ | |
| } | |
| async def get_connection_info() -> Dict[str, Any]: | |
| try: | |
| # Always return configuration info even if not connected | |
| deployment_type = "Self-hosted" | |
| if "databases.neo4j.io" in neo4j_connection.uri: | |
| deployment_type = "Neo4j Aura" | |
| elif "sandbox" in neo4j_connection.uri: | |
| deployment_type = "Neo4j Sandbox" | |
| elif any(cloud in neo4j_connection.uri for cloud in ["aws", "gcp", "azure"]): | |
| deployment_type = "Enterprise Cloud" | |
| connection_info = { | |
| "status": "success", | |
| "connection": { | |
| "uri": neo4j_connection.uri, | |
| "database": neo4j_connection.database, | |
| "username": neo4j_connection.username, | |
| "deployment_type": deployment_type, | |
| "ssl_enabled": neo4j_connection.uri.startswith(('neo4j+s://', 'bolt+s://')), | |
| "connected": neo4j_connection.driver is not None | |
| }, | |
| "capabilities": { | |
| "cypher_queries": True, | |
| "schema_inspection": True, | |
| "bulk_operations": True, | |
| "graph_algorithms": "unknown", | |
| "multi_database": "unknown" | |
| } | |
| } | |
| if neo4j_connection.driver: | |
| try: | |
| server_info_records, _ = await neo4j_connection.execute_query( | |
| "CALL dbms.components() YIELD name, versions, edition" | |
| ) | |
| connection_info["server_info"] = server_info_records[0] if server_info_records else {} | |
| except Exception: | |
| connection_info["server_info"] = {} | |
| return connection_info | |
| except Exception as e: | |
| logger.error(f"Connection info retrieval failed: {str(e)}") | |
| return { | |
| "status": "error", | |
| "error": str(e), | |
| "error_type": type(e).__name__ | |
| } | |
| if __name__ == "__main__": | |
| import sys | |
| try: | |
| asyncio.run(neo4j_connection.connect()) | |
| print(f"Looking for graph data at: {GRAPH_DATA_FILE}") | |
| print(f"File exists: {os.path.exists(GRAPH_DATA_FILE)}") | |
| result = asyncio.run(upload_graph_data_impl()) | |
| print(f"Upload result: {result.get('status', 'unknown')}") | |
| if result.get('status') == 'error': | |
| print(f"❌ Error details: {result.get('error', 'Unknown error')}") | |
| if 'error_type' in result: | |
| print(f"Error type: {result['error_type']}") | |
| except ValueError as e: | |
| logger.error(f"Configuration Error: {e}") | |
| sys.exit(1) | |
| except Exception as e: | |
| logger.warning(f"Connection Warning: {e}") |