Spaces:
Runtime error
Runtime error
| """Answer synthesis module for final response generation. - Phase F (Steps 15-16)""" | |
| import logging | |
| import json | |
| from typing import Dict, List, Any, Optional | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from .setup import GraphRAGSetup | |
| from .query_preprocessing import DriftRoutingResult, QueryAnalysis | |
| from .vector_augmentation import AugmentationResult | |
| class SourceEvidence: | |
| """Evidence source with attribution and confidence.""" | |
| source_type: str # 'community', 'entity', 'relationship', 'vector_doc' | |
| source_id: str | |
| content: str | |
| confidence: float | |
| phase: str # 'C', 'D', 'E' | |
| class SynthesisResult: | |
| """Phase F synthesis result with comprehensive answer.""" | |
| final_answer: str | |
| confidence_score: float | |
| source_evidence: List[SourceEvidence] | |
| synthesis_strategy: str | |
| coverage_assessment: Dict[str, float] | |
| execution_time: float | |
| metadata: Dict[str, Any] | |
| class AnswerSynthesisEngine: | |
| """ | |
| Answer synthesis engine implementing Phase F (Steps 15-16). | |
| Handles final answer generation process: | |
| - Context assembly and evidence ranking (Step 15) | |
| - Final answer generation with confidence scoring (Step 16) | |
| """ | |
| def __init__(self, setup: GraphRAGSetup): | |
| self.setup = setup | |
| self.llm = setup.llm | |
| self.config = setup.config | |
| self.logger = logging.getLogger(self.__class__.__name__) | |
| # Synthesis parameters | |
| self.min_confidence_threshold = 0.7 | |
| self.max_synthesis_length = 2000 | |
| async def execute_answer_synthesis_phase(self, | |
| analysis: QueryAnalysis, | |
| routing: DriftRoutingResult, | |
| community_results: Dict[str, Any], | |
| follow_up_results: Dict[str, Any], | |
| augmentation_results: AugmentationResult) -> SynthesisResult: | |
| """ | |
| Execute answer synthesis phase with comprehensive integration. | |
| Args: | |
| analysis: Query analysis results | |
| routing: Routing decision parameters | |
| community_results: Community search results | |
| follow_up_results: Follow-up search results | |
| augmentation_results: Vector augmentation results | |
| Returns: | |
| Synthesis result with final answer | |
| """ | |
| start_time = datetime.now() | |
| try: | |
| # Context assembly | |
| self.logger.info("Starting Step 15: Context Assembly and Ranking") | |
| assembled_context = await self._assemble_and_rank_context( | |
| analysis, community_results, follow_up_results, augmentation_results | |
| ) | |
| # Final answer generation | |
| self.logger.info("Starting Step 16: Final Answer Generation") | |
| final_answer, confidence = await self._generate_final_answer( | |
| analysis, routing, assembled_context | |
| ) | |
| execution_time = (datetime.now() - start_time).total_seconds() | |
| synthesis_result = SynthesisResult( | |
| final_answer=final_answer, | |
| confidence_score=confidence, | |
| source_evidence=assembled_context['evidence'], | |
| synthesis_strategy='comprehensive_drift', | |
| coverage_assessment=assembled_context['coverage'], | |
| execution_time=execution_time, | |
| metadata={ | |
| 'sources_integrated': len(assembled_context['evidence']), | |
| 'phase_coverage': assembled_context['phase_coverage'], | |
| 'synthesis_method': 'llm_guided', | |
| 'phase': 'answer_synthesis', | |
| 'step_range': '15-16' | |
| } | |
| ) | |
| self.logger.info(f"Phase F completed: confidence {confidence:.3f}, {len(assembled_context['evidence'])} sources integrated") | |
| return synthesis_result | |
| except Exception as e: | |
| self.logger.error(f"Answer synthesis phase failed: {e}") | |
| # Return fallback synthesis on failure | |
| return self._create_fallback_synthesis( | |
| community_results, follow_up_results, | |
| (datetime.now() - start_time).total_seconds(), str(e) | |
| ) | |
| async def _assemble_and_rank_context(self, | |
| analysis: QueryAnalysis, | |
| community_results: Dict[str, Any], | |
| follow_up_results: Dict[str, Any], | |
| augmentation_results: AugmentationResult) -> Dict[str, Any]: | |
| """ | |
| Step 15: Assemble and rank all context from Phases C, D, and E. | |
| Prioritizes information by relevance, confidence, and source diversity. | |
| """ | |
| evidence_sources = [] | |
| # Extract community evidence | |
| if 'communities' in community_results: | |
| for community in community_results['communities']: | |
| evidence_sources.append(SourceEvidence( | |
| source_type='community', | |
| source_id=community.community_id, | |
| content=community.summary, | |
| confidence=community.similarity_score, | |
| phase='C' | |
| )) | |
| # Extract follow-up evidence | |
| if 'intermediate_answers' in follow_up_results: | |
| for answer in follow_up_results['intermediate_answers']: | |
| evidence_sources.append(SourceEvidence( | |
| source_type='entity_search', | |
| source_id=f"followup_{len(evidence_sources)}", | |
| content=f"Q: {answer.question}\nA: {answer.answer}", | |
| confidence=answer.confidence, | |
| phase='D' | |
| )) | |
| # Extract vector evidence | |
| if augmentation_results and augmentation_results.vector_results: | |
| for i, vector_result in enumerate(augmentation_results.vector_results): | |
| evidence_sources.append(SourceEvidence( | |
| source_type='vector_doc', | |
| source_id=f"vector_{i}", | |
| content=vector_result.content, | |
| confidence=vector_result.similarity_score, | |
| phase='E' | |
| )) | |
| # Rank evidence | |
| ranked_evidence = sorted(evidence_sources, key=lambda x: x.confidence, reverse=True) | |
| # Calculate coverage | |
| coverage = { | |
| 'community_coverage': len([e for e in ranked_evidence if e.phase == 'C']) / max(1, len(community_results.get('communities', []))), | |
| 'entity_coverage': len([e for e in ranked_evidence if e.phase == 'D']) / max(1, len(follow_up_results.get('intermediate_answers', []))), | |
| 'vector_coverage': len([e for e in ranked_evidence if e.phase == 'E']) / max(1, len(augmentation_results.vector_results) if augmentation_results else 1), | |
| 'overall_confidence': sum(e.confidence for e in ranked_evidence) / max(1, len(ranked_evidence)) | |
| } | |
| phase_coverage = { | |
| 'phase_c': len([e for e in ranked_evidence if e.phase == 'C']), | |
| 'phase_d': len([e for e in ranked_evidence if e.phase == 'D']), | |
| 'phase_e': len([e for e in ranked_evidence if e.phase == 'E']) | |
| } | |
| return { | |
| 'evidence': ranked_evidence[:15], # Top 15 pieces of evidence | |
| 'coverage': coverage, | |
| 'phase_coverage': phase_coverage | |
| } | |
| async def _generate_final_answer(self, | |
| analysis: QueryAnalysis, | |
| routing: DriftRoutingResult, | |
| assembled_context: Dict[str, Any]) -> tuple[str, float]: | |
| """ | |
| Step 16: Generate comprehensive final answer using LLM synthesis. | |
| Creates structured, comprehensive response with proper source attribution. | |
| """ | |
| try: | |
| # Prepare prompt | |
| synthesis_prompt = self._create_synthesis_prompt( | |
| routing.original_query, | |
| assembled_context['evidence'] | |
| ) | |
| # Generate answer | |
| response = self.llm.complete(synthesis_prompt) | |
| final_answer = str(response).strip() | |
| # Calculate confidence | |
| synthesis_confidence = self._calculate_synthesis_confidence( | |
| assembled_context['evidence'], assembled_context['coverage'] | |
| ) | |
| # Format final answer | |
| formatted_answer = self._format_final_answer( | |
| final_answer, assembled_context['evidence'], synthesis_confidence | |
| ) | |
| return formatted_answer, synthesis_confidence | |
| except Exception as e: | |
| self.logger.error(f"Final answer generation failed: {e}") | |
| return self._create_fallback_answer(assembled_context['evidence']), 0.5 | |
| def _create_synthesis_prompt(self, original_query: str, evidence: List[SourceEvidence]) -> str: | |
| """Create comprehensive synthesis prompt for LLM.""" | |
| prompt_parts = [ | |
| f"# Query: {original_query}", | |
| "", | |
| "You are an expert synthesizing information from multiple sources.", | |
| "Create a comprehensive, accurate answer using the following evidence:", | |
| "", | |
| "## Evidence Sources:", | |
| "" | |
| ] | |
| for i, source in enumerate(evidence[:10], 1): # Top 10 sources | |
| prompt_parts.extend([ | |
| f"### Source {i} ({source.phase} - {source.source_type}, confidence: {source.confidence:.3f})", | |
| source.content[:500] + ("..." if len(source.content) > 500 else ""), | |
| "" | |
| ]) | |
| prompt_parts.extend([ | |
| "## Instructions:", | |
| "1. Synthesize a comprehensive answer addressing the original query", | |
| "2. Prioritize high-confidence sources (>0.8)", | |
| "3. Include specific details and examples from the evidence", | |
| "4. Structure the response clearly with sections if appropriate", | |
| "5. Do not mention source IDs or technical details", | |
| "6. Focus on factual accuracy and completeness", | |
| "", | |
| "## Comprehensive Answer:" | |
| ]) | |
| return "\n".join(prompt_parts) | |
| def _calculate_synthesis_confidence(self, evidence: List[SourceEvidence], coverage: Dict[str, float]) -> float: | |
| """Calculate overall synthesis confidence based on evidence quality and coverage.""" | |
| if not evidence: | |
| return 0.0 | |
| # Weight evidence | |
| evidence_confidence = sum(e.confidence for e in evidence) / len(evidence) | |
| coverage_score = sum(coverage.values()) / len(coverage) | |
| # Coverage bonus | |
| phase_diversity = len(set(e.phase for e in evidence)) / 3.0 # 3 phases max | |
| # Combined score | |
| synthesis_confidence = (evidence_confidence * 0.5) + (coverage_score * 0.3) + (phase_diversity * 0.2) | |
| return min(synthesis_confidence, 1.0) | |
| def _format_final_answer(self, answer: str, evidence: List[SourceEvidence], confidence: float) -> str: | |
| """Format the final answer with proper structure and attribution.""" | |
| formatted_parts = [ | |
| "# Comprehensive Answer", | |
| "", | |
| answer, | |
| "", | |
| "---", | |
| "", | |
| f"**Answer Confidence**: {confidence:.1%}", | |
| f"**Sources Integrated**: {len(evidence)} evidence sources", | |
| f"**Multi-Phase Coverage**: {len(set(e.phase for e in evidence))} phases (C: Community, D: Entity, E: Vector)", | |
| "" | |
| ] | |
| return "\n".join(formatted_parts) | |
| def _create_fallback_answer(self, evidence: List[SourceEvidence]) -> str: | |
| """Create fallback answer when LLM synthesis fails.""" | |
| if not evidence: | |
| return "Unable to generate answer due to insufficient evidence." | |
| # Simple concatenation of top evidence | |
| fallback_parts = [ | |
| "# Answer Summary", | |
| "", | |
| "Based on available evidence:", | |
| "" | |
| ] | |
| for i, source in enumerate(evidence[:3], 1): | |
| fallback_parts.extend([ | |
| f"## Source {i} (Confidence: {source.confidence:.2f})", | |
| source.content[:300] + ("..." if len(source.content) > 300 else ""), | |
| "" | |
| ]) | |
| return "\n".join(fallback_parts) | |
| def _create_fallback_synthesis(self, community_results: Dict, follow_up_results: Dict, | |
| execution_time: float, error: str) -> SynthesisResult: | |
| """Create fallback synthesis result when phase fails.""" | |
| return SynthesisResult( | |
| final_answer=" Response failed due to technical error. Please try again.", | |
| confidence_score=0.0, | |
| source_evidence=[], | |
| synthesis_strategy='fallback', | |
| coverage_assessment={'overall_confidence': 0.0}, | |
| execution_time=execution_time, | |
| metadata={'error': error, 'fallback': True} | |
| ) | |
| def combine_phase_results(self, | |
| phase_c_answer: str, | |
| follow_up_results: Dict[str, Any], | |
| augmentation_results=None) -> str: | |
| """ | |
| Combine Phase C, D, and E results into enhanced answer. | |
| Creates comprehensive response by integrating results from multiple phases. | |
| """ | |
| try: | |
| intermediate_answers = follow_up_results.get('intermediate_answers', []) | |
| if not intermediate_answers: | |
| return phase_c_answer | |
| # Start with Phase C answer | |
| enhanced_parts = [ | |
| "## Global Context (Phase C)", | |
| phase_c_answer.strip(), | |
| "", | |
| "## Detailed Information (Phase D)" | |
| ] | |
| # Add intermediate answers from Phase D | |
| for i, answer in enumerate(intermediate_answers, 1): | |
| enhanced_parts.extend([ | |
| f"**{i}. {answer.question}**", | |
| answer.answer, | |
| f"*Confidence: {answer.confidence:.2f}*", | |
| "" | |
| ]) | |
| # Add Phase E vector augmentation if available | |
| if augmentation_results and hasattr(augmentation_results, 'vector_results') and augmentation_results.vector_results: | |
| enhanced_parts.extend([ | |
| "## Vector Augmentation (Phase E)", | |
| f"**Semantic Enhancement** (Confidence: {augmentation_results.augmentation_confidence:.2f})", | |
| "" | |
| ]) | |
| # Add top vector results | |
| for i, vector_result in enumerate(augmentation_results.vector_results[:3], 1): | |
| enhanced_parts.extend([ | |
| f"**Vector Result {i}** (Similarity: {vector_result.similarity_score:.3f})", | |
| vector_result.content, # Show full content without truncation | |
| "" | |
| ]) | |
| # Add supporting evidence if available | |
| if intermediate_answers: | |
| enhanced_parts.extend([ | |
| "## Supporting Evidence", | |
| "**Key Entities Found:** " + ", ".join( | |
| set(entity for answer in intermediate_answers | |
| for entity in answer.supporting_entities[:3]) | |
| ), | |
| "" | |
| ]) | |
| return "\n".join(enhanced_parts) | |
| except Exception as e: | |
| self.logger.error(f"Failed to combine phase results: {e}") | |
| return phase_c_answer | |
| def generate_error_response(self, error_message: str) -> Dict[str, Any]: | |
| """ | |
| Generate standardized error response. | |
| Creates consistent error format for failed synthesis operations. | |
| """ | |
| return { | |
| "answer": f"Sorry, I encountered an error during answer synthesis: {error_message}", | |
| "metadata": { | |
| "status": "synthesis_error", | |
| "error_message": error_message, | |
| "synthesis_stage": "failed", | |
| "confidence_score": 0.0, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| } | |
| # Exports | |
| __all__ = ['AnswerSynthesisEngine', 'SynthesisResult', 'SourceEvidence'] |