from langgraph.checkpoint.memory import MemorySaver from langchain_openai import ChatOpenAI from dotenv import load_dotenv from datetime import datetime import gradio as gr import uuid import ast import os import re # Local modules from util import get_sources, get_start_end_months from data import download_data, extract_data from main import openai_model from graph import BuildGraph # Set environment variables load_dotenv(dotenv_path=".env", override=True) # Hide BM25S progress bars os.environ["DISABLE_TQDM"] = "true" # Database directory db_dir = "db" # Download and extract data if data directory is not present if not os.path.isdir(db_dir): print("Downloading data ... ", end="") download_data() print("done!") print("Extracting data ... ", end="") extract_data() print("done!") # Global setting for search type search_type = "hybrid" # Global variable for LangChain graph # Use dictionary to store user-specific instances # https://www.gradio.app/guides/state-in-blocks graph_instances = {} def cleanup_graph(request: gr.Request): timestamp = datetime.now().replace(microsecond=0).isoformat() if request.session_hash in graph_instances: del graph_instances[request.session_hash] print(f"{timestamp} - Delete graph for session {request.session_hash}") def extract_think(content): # Added by Cursor 20250726 jmd # Extract content within ... think_match = re.search(r"(.*?)", content, re.DOTALL) think_text = think_match.group(1).strip() if think_match else "" # Extract text after if think_match: post_think = content[think_match.end() :].lstrip() else: # Check if content starts with but missing closing tag if content.strip().startswith(""): # Extract everything after think_start = content.find("") + len("") think_text = content[think_start:].strip() post_think = "" else: # No found, so return entire content as post_think post_think = content return think_text, post_think def append_content(chunk_messages, history, thinking_about): """Append thinking and non-thinking content to chatbot history""" if chunk_messages.content: think_text, post_think = extract_think(chunk_messages.content) # Show thinking content in "metadata" message if think_text: history.append( gr.ChatMessage( role="assistant", content=think_text, metadata={"title": f"🧠 Thinking about the {thinking_about}"}, ) ) if not post_think and not chunk_messages.tool_calls: gr.Warning("Response may be incomplete", title="Thinking-only response") # Display non-thinking content if post_think: history.append(gr.ChatMessage(role="assistant", content=post_think)) return history def run_workflow(input, collection, history, thread_id, session_hash): """The main function to run the chat workflow""" # Get graph instance graph = graph_instances.get(session_hash) if graph is None: # Get the chat model and build the graph chat_model = ChatOpenAI(model=openai_model, temperature=0) graph_builder = BuildGraph( chat_model, db_dir, collection, search_type, ) # Compile the graph with an in-memory checkpointer memory = MemorySaver() graph = graph_builder.compile(checkpointer=memory) # Set global graph graph_instances[session_hash] = graph # ISO 8601 timestamp with local timezone information without microsecond timestamp = datetime.now().replace(microsecond=0).isoformat() print(f"{timestamp} - Set {collection} graph for session {session_hash}") ## Notify when model finishes loading # gr.Success("Model loaded!", duration=4) else: timestamp = datetime.now().replace(microsecond=0).isoformat() print(f"{timestamp} - Get graph for session {session_hash}") # print(f"Using thread_id: {thread_id}") # Display the user input in the chatbot history.append(gr.ChatMessage(role="user", content=input)) # Return the message history and empty lists for emails and citations texboxes yield history, [], [] # Stream graph steps for a single input # https://langchain-ai.lang.chat/langgraph/reference/graphs/#langgraph.graph.state.CompiledStateGraph for step in graph.stream( # Appends the user input to the graph state {"messages": [{"role": "user", "content": input}]}, config={"configurable": {"thread_id": thread_id}}, ): # Get the node name and output chunk node, chunk = next(iter(step.items())) if node == "query": # Get the message (AIMessage class in LangChain) chunk_messages = chunk["messages"] # Append thinking and non-thinking messages (if present) history = append_content(chunk_messages, history, thinking_about="query") # Look for tool calls if chunk_messages.tool_calls: # Loop over tool calls for tool_call in chunk_messages.tool_calls: # Show the tool call with arguments used args = tool_call["args"] content = args["search_query"] if "search_query" in args else "" start_year = args["start_year"] if "start_year" in args else None end_year = args["end_year"] if "end_year" in args else None if start_year or end_year: if start_year == end_year: content = f"{content} ({start_year or ''})" else: content = ( f"{content} ({start_year or ''} - {end_year or ''})" ) if "months" in args: months_text = ", ".join(args["months"]) content = f"{content} {months_text}" history.append( gr.ChatMessage( role="assistant", content=content, metadata={"title": f"🔍 Running tool {tool_call['name']}"}, ) ) yield history, [], [] if node == "retrieve_emails": chunk_messages = chunk["messages"] # Loop over tool calls count = 0 retrieved_emails = [] for message in chunk_messages: count += 1 # Get the retrieved emails as a list email_list = message.content.replace( "### Retrieved Emails:\n\n", "" ).split("--- --- --- --- Next Email --- --- --- ---\n\n") # Get the source file names (e.g. 2024-December.txt) for retrieved emails month_list = [ os.path.basename(text.splitlines()[0]) for text in email_list ] # Format months (e.g. 2024-December) into text month_text = ", ".join(month_list).replace(".txt", "") # Get the number of retrieved emails n_emails = len(email_list) title = f"🗎 Retrieved {n_emails} emails" if email_list[0] == "### No emails were retrieved": title = "❌ Retrieved 0 emails" history.append( gr.ChatMessage( role="assistant", content=month_text, metadata={"title": title}, ) ) # Format the retrieved emails with Tool Call heading retrieved_emails.append( message.content.replace( "### Retrieved Emails:\n\n", f"### ### ### ### Tool Call {count} ### ### ### ###\n\n", ) ) # Combine all the Tool Call results retrieved_emails = "\n\n".join(retrieved_emails) yield history, retrieved_emails, [] if node == "answer": # Append messages (thinking and non-thinking) to history chunk_messages = chunk["messages"] history = append_content(chunk_messages, history, thinking_about="answer") # None is used for no change to the retrieved emails textbox yield history, None, [] if node == "answer_with_citations": # Parse the message for the answer and citations chunk_messages = chunk["messages"][0] try: answer, citations = ast.literal_eval(chunk_messages.content) except: # In case we got an answer without citations answer = chunk_messages.content citations = None history.append(gr.ChatMessage(role="assistant", content=answer)) yield history, None, citations def run_workflow_in_session(request: gr.Request, *args): """Wrapper function to call run_workflow() with session_hash""" input = args[0] # Add session_hash to arguments new_args = args + (request.session_hash,) for value in run_workflow(*new_args): yield value with gr.Blocks( title="R-help-chat", ) as demo: # ----------------- # Define components # ----------------- loading_data = gr.Textbox( "Please wait for the email database to be downloaded and extracted.", max_lines=0, label="Loading Data", visible=False, render=False, ) downloading = gr.Textbox( max_lines=1, label="Downloading Data", visible=False, render=False, ) extracting = gr.Textbox( max_lines=1, label="Extracting Data", visible=False, render=False, ) missing_data = gr.Textbox( value="Email database is missing. Try reloading this page. If the problem persists, please contact the maintainer.", lines=1, label="Error downloading or extracting data", visible=False, render=False, ) chatbot = gr.Chatbot( show_label=False, avatar_images=(None, "images/cloud.png"), buttons=["copy_all"], render=False, ) # Modified from gradio/chat_interface.py input = gr.Textbox( show_label=False, label="Message", placeholder="Type a message...", scale=7, autofocus=True, submit_btn=True, render=False, ) emails_textbox = gr.Textbox( label="Retrieved Emails", info="Tip: Look for 'Tool Call' and 'Next Email' separators. Quoted lines (starting with '>') are removed before indexing.", lines=10, visible=False, render=False, ) citations_textbox = gr.Textbox( label="Citations", lines=2, visible=False, render=False, ) # ------------ # Set up state # ------------ def generate_thread_id(): """Generate a new thread ID""" thread_id = uuid.uuid4() # print(f"Generated thread_id: {thread_id}") return thread_id # Define thread_id variable thread_id = gr.State(generate_thread_id()) # Define states for the output textboxes retrieved_emails = gr.State([]) citations_text = gr.State([]) # ------------------ # Make the interface # ------------------ def get_intro_text(): intro = f""" ## 🇷🤝đŸ’Ŧ R-help-chat **Search and chat with the [R-help](https://stat.ethz.ch/pipermail/r-help/) and [R-devel](https://stat.ethz.ch/pipermail/r-devel/) mailing list archives.** An LLM turns your question into a search query, including year ranges and months. Retrieved emails are shown below the chatbot and are used by the LLM to generate an answer. You can ask follow-up questions with the chat history as context; changing the mailing list maintains history. Press the clear button (🗑) to clear the history and start a new chat. *Privacy notice*: Data sharing with OpenAI is enabled. """ return intro def get_info_text(collection): try: # Get source files for each email and start and end months from database sources = get_sources(db_dir, collection) start, end = get_start_end_months(sources) except: # If database isn't ready, put in empty values sources = [] start = None end = None info_text = f""" **Database:** {len(sources)} emails from {start} to {end}
**Features:** RAG, today's date, hybrid search (semantic + lexical), multiple retrievals, citations output, chat memory
**Tech:** [OpenAI](https://openai.com/), [Chroma](https://www.trychroma.com/), [BM25S](https://github.com/xhluca/bm25s), [Amazon S3](https://aws.amazon.com/s3/), [LangGraph](https://www.langchain.com/langgraph), [Gradio](https://www.gradio.app/)
**Maintainer:** [Jeffrey Dick](https://jedick.github.io) - feedback welcome!
**More info:** [GitHub repository](https://github.com/jedick/R-help-chat), [Walkthrough video](https://youtu.be/mLQqW7zea-k) """ return info_text def get_example_questions(as_dataset=False): """Get example questions""" questions = [ # "What is today's date?", "Show me code examples using plotmath", "When was the native pipe operator introduced?", ] # cf. https://github.com/gradio-app/gradio/pull/8745 for updating examples return gr.Dataset(samples=[[q] for q in questions]) if as_dataset else questions def get_multi_tool_questions(as_dataset=False): """Get multi-tool example questions""" questions = [ "Differences between lapply and for loops", "Summarize emails from the most recent two months", ] return gr.Dataset(samples=[[q] for q in questions]) if as_dataset else questions def get_multi_turn_questions(as_dataset=False): """Get multi-turn example questions""" questions = [ "Lookup emails that reference bugs.r-project.org in 2025", "Did the authors you cited report bugs before 2025?", ] return gr.Dataset(samples=[[q] for q in questions]) if as_dataset else questions def get_month_questions(as_dataset=False): """Get month example questions""" questions = [ "Was there any discussion of ggplot2 in Q4 2025?", "How about Q3?", ] return gr.Dataset(samples=[[q] for q in questions]) if as_dataset else questions with gr.Row(): # Left column: Intro, Compute, Chat with gr.Column(scale=2): with gr.Row(elem_classes=["row-container"]): with gr.Column(scale=4): intro = gr.Markdown(get_intro_text()) with gr.Column(scale=1): collection = gr.Radio( ["R-help", "R-devel"], value="R-help", label="Mailing List", ) with gr.Group() as chat_interface: chatbot.render() input.render() # Render textboxes for data loading progress loading_data.render() downloading.render() extracting.render() missing_data.render() # Right column: Info, Examples with gr.Column(scale=1): with gr.Accordion("â„šī¸ App Info", open=True): app_info = gr.Markdown(get_info_text(collection.value)) with gr.Accordion("💡 Examples", open=True): # Add some helpful examples example_questions = gr.Examples( examples=get_example_questions(), inputs=[input], label="Basic examples (try R-devel for #2)", ) multi_tool_questions = gr.Examples( examples=get_multi_tool_questions(), inputs=[input], label="Multiple retrievals", ) multi_turn_questions = gr.Examples( examples=get_multi_turn_questions(), inputs=[input], label="Follow-up questions", ) month_questions = gr.Examples( examples=get_month_questions(), inputs=[input], label="Three-month periods", ) # Bottom row: retrieved emails and citations with gr.Row(): with gr.Column(scale=2): emails_textbox.render() with gr.Column(scale=1): citations_textbox.render() # ------------- # App functions # ------------- def value(value): """Return updated value for a component""" return gr.update(value=value) def change_visibility(visible): """Return updated visibility state for a component""" return gr.update(visible=visible) def update_textbox(content, textbox): if content is None: # Keep the content of the textbox unchanged return textbox, change_visibility(True) elif content == []: # Blank out the textbox return "", change_visibility(False) else: # Display the content in the textbox return content, change_visibility(True) # -------------- # Event handlers # -------------- # Start a new thread when the user presses the clear (trash) button # https://github.com/gradio-app/gradio/issues/9722 chatbot.clear(generate_thread_id, outputs=[thread_id], api_visibility="private") collection.change( # We need to build a new graph if the collection changes cleanup_graph ).then( # Update the database stats in the app info box get_info_text, [collection], [app_info], api_name=False, ) input.submit( # Submit input to the chatbot run_workflow_in_session, [input, collection, chatbot, thread_id], [chatbot, retrieved_emails, citations_text], api_visibility="private", ) retrieved_emails.change( # Update the emails textbox update_textbox, [retrieved_emails, emails_textbox], [emails_textbox, emails_textbox], api_visibility="private", ) citations_text.change( # Update the citations textbox update_textbox, [citations_text, citations_textbox], [citations_textbox, citations_textbox], api_visibility="private", ) chatbot.clear( # Focus textbox when the chatbot is cleared lambda x: gr.update(value=x), [input], [input], api_visibility="private", ) # Clean up graph instances when page is closed/refreshed demo.unload(cleanup_graph) if __name__ == "__main__": # Set allowed_paths to serve chatbot avatar images current_directory = os.getcwd() allowed_paths = [current_directory + "/images"] # Noto Color Emoji gets a nice-looking Unicode Character “🇷” (U+1F1F7) on Chrome theme = gr.themes.Soft( font=[ "ui-sans-serif", "system-ui", "sans-serif", "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji", ] ) # Custom CSS for bottom alignment css = """ .row-container { display: flex; align-items: flex-end; /* Align components at the bottom */ gap: 10px; /* Add spacing between components */ } """ # HTML for Font Awesome # https://cdnjs.com/libraries/font-awesome head = '' # Launch the Gradio app demo.launch( allowed_paths=allowed_paths, theme=theme, css=css, head=head, footer_links=["gradio", "settings"], )