It is a library for building stateful, multi-actor applications with LLMs, used to create agent and multi-agent workflows.
Cycles and Branching: Implement loops and conditionals in your apps.
Persistence: Automatically save state after each step in the graph. Pause and resume the graph execution at any point to support error recovery, human-in-the-loop workflows, time travel and more.
Human-in-the-Loop: Interrupt graph execution to approve or edit next action planned by the agent.
Streaming Support: Stream outputs as they are produced by each node (including token streaming).
Integration with LangChain: LangGraph integrates seamlessly with LangChain and LangSmith (but does not require them).
Example
The main file is for API end point, streaming the answer for output
The graph is to define the workflow , node, the relationship of the node
The panda file is to initialize the DB connection to the database
from langchain_core.messages import HumanMessage
from pydantic import BaseModel
import json
import threading
import queue
from graph import get_app, init_workflow, save_graph
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from typing import Generator
app = FastAPI()
# Initialize graph once at startup for better performance
_graph = None
_langgraph_app = None
def get_langgraph_app():
"""Get or create singleton LangGraph app."""
global _graph, _langgraph_app
if _langgraph_app is None:
_graph = init_workflow()
_langgraph_app = get_app(_graph)
return _langgraph_app
class ChatRequest(BaseModel):
question: str
# Polling interval in seconds
POLLING_INTERVAL = 2.0
def stream_worker(question: str, thread_id: str, result_queue: queue.Queue):
"""
Worker function that runs LangGraph stream in a separate thread.
Puts chunks into the queue for the main generator to consume.
"""
langgraph_app = get_langgraph_app()
config = {"configurable": {"thread_id": thread_id}}
try:
result = langgraph_app.stream(
{"messages": [HumanMessage(content=question)]},
config=config,
stream_mode="messages"
)
full_answer = ""
for chunk, metadata in result:
node = metadata.get("langgraph_node", "")
checkpoint_ns = metadata.get("checkpoint_ns", None)
if chunk.content and node in ("redefine_answer", "chat_with_other"):
content = chunk.content
if checkpoint_ns is not None :
full_answer += content
result_queue.put({
"type": "chunk",
"content": content,
"node": node
})
# Signal completion
result_queue.put({
"type": "done",
"full_answer": full_answer
})
except Exception as e:
result_queue.put({
"type": "error",
"message": str(e)
})
def generate_sse_stream(question: str, thread_id: str) -> Generator[str, None, None]:
"""
Generator function that yields SSE-formatted chunks with polling events.
Sends heartbeat/polling events while waiting for actual content.
"""
print(f"\n=== Question (thread: {thread_id}) ===")
print(f"Question: {question}")
# Create queue for thread communication
result_queue: queue.Queue = queue.Queue()
# Start worker thread
worker = threading.Thread(
target=stream_worker,
args=(question, thread_id, result_queue),
daemon=True
)
worker.start()
# Send initial polling event
sse_polling = json.dumps({
"event": "polling",
"data": "Processing your question...",
"status": "started"
})
yield f"event: polling\ndata: {sse_polling}\n\n"
done = False
poll_count = 0
while not done:
try:
# Try to get result with timeout (for polling)
item = result_queue.get(timeout=POLLING_INTERVAL)
if item["type"] == "chunk":
sse_data = json.dumps({
"event": "chunk",
"data": item["content"]
})
yield f"event: chunk\ndata: {sse_data}\n\n"
elif item["type"] == "done":
sse_done = json.dumps({
"event": "done",
"data": item["full_answer"]
})
yield f"event: done\ndata: {sse_done}\n\n"
done = True
elif item["type"] == "error":
sse_error = json.dumps({
"event": "error",
"data": item["message"]
})
yield f"event: error\ndata: {sse_error}\n\n"
done = True
except queue.Empty:
# No data available - send polling/heartbeat event
poll_count += 1
sse_polling = json.dumps({
"event": "polling",
"data": "Still processing...",
"status": "waiting",
"poll_count": poll_count,
"elapsed_seconds": poll_count * POLLING_INTERVAL
})
yield f"event: polling\ndata: {sse_polling}\n\n"
# Ensure worker thread completes
worker.join(timeout=1.0)
@app.post("/chat")
def chat(request: ChatRequest):
"""
Non-streaming endpoint for backward compatibility.
Returns complete answer as JSON.
"""
print(f"question: {request.question}")
langgraph_app = get_langgraph_app()
thread_id = "demo-session"
config = {"configurable": {"thread_id": thread_id}}
print(f"\n=== Question (thread: {thread_id}) ===")
result = langgraph_app.stream(
{"messages": [HumanMessage(content=request.question)]},
config=config,
stream_mode="messages"
)
full_answer = ""
for chunk, metadata in result:
if chunk.content and metadata["langgraph_node"] in ("redefine_answer", "chat_with_other"):
full_answer += chunk.content
print("chunk: ", chunk.content)
return {"answer": full_answer}
@app.post("/chat/stream")
def chat_stream(request: ChatRequest):
"""
SSE streaming endpoint.
Streams answer chunks as Server-Sent Events.
Usage:
POST /chat/stream
Content-Type: application/json
{"question": "Your question here"}
Response format (SSE):
data: {"type": "chunk", "content": "partial answer...", "node": "redefine_answer"}
data: {"type": "chunk", "content": "more content...", "node": "redefine_answer"}
data: {"type": "done", "full_answer": "complete answer..."}
"""
thread_id = "demo-session"
return StreamingResponse(
generate_sse_stream(request.question, thread_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
@app.get("/chat/stream")
def chat_stream_get(question: str):
"""
SSE streaming endpoint (GET version for EventSource compatibility).
Usage with JavaScript EventSource:
const evtSource = new EventSource('/chat/stream?question=Your+question');
evtSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'chunk') {
console.log(data.content);
}
};
"""
thread_id = "demo-session"
return StreamingResponse(
generate_sse_stream(question, thread_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
graph.py
import os
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import END, START, StateGraph
from chat import chat_with_income, identify_question_type, check_question_type, chat_with_other, chat_with_parking_lot, chat_with_parking_lot_chinese, chat_with_car_rental, chat_with_car_rental_chinese, redefine_answer, clear_up_state
class MessageState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
question_type: Literal["parking_lot", "parking_lot_chinese", "car_rental",
"car_rental_chinese", "income", "other", None]
answer: str
SQLITE_DB_PATH = "db/checkpoints.db"
## Define the memory of the graph, to keep the conversation history
def get_sqlite_checkpointer():
"""
Create and return a SQLite checkpointer with a persistent connection.
"""
# Ensure the directory exists before creating the database file
db_dir = os.path.dirname(SQLITE_DB_PATH)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
conn = sqlite3.connect(SQLITE_DB_PATH, check_same_thread=False)
return SqliteSaver(conn)
def get_app(workflow: StateGraph):
"""
Create and return the compiled workflow with SQLite checkpointer.
Args:
workflow: The workflow to compile
Returns:
The compiled workflow
"""
checkpointer = get_sqlite_checkpointer()
return workflow.compile(checkpointer=checkpointer)
# def get_conversation_history(thread_id: str = "default"):
# """
# Get the conversation history for a specific thread.
# Args:
# thread_id: Unique identifier for the conversation thread
# Returns:
# The current state of the conversation
# """
# checkpointer = get_sqlite_checkpointer()
# app = get_app(checkpointer)
# config = {"configurable": {"thread_id": thread_id}}
# state = app.get_state(config)
# return state.values if state else None
# Define the workflow and the node
def init_workflow():
workflow = StateGraph(MessageState)
workflow.add_node("identify_question_type", identify_question_type)
workflow.add_node("chat_with_other", chat_with_other)
workflow.add_node("chat_with_parking_lot", chat_with_parking_lot)
workflow.add_node("chat_with_parking_lot_chinese", chat_with_parking_lot_chinese)
workflow.add_node("chat_with_car_rental", chat_with_car_rental)
workflow.add_node("chat_with_car_rental_chinese", chat_with_car_rental_chinese)
workflow.add_node("chat_with_income", chat_with_income)
workflow.add_node("redefine_answer", redefine_answer)
workflow.add_node("clear_up_state", clear_up_state)
workflow.add_edge(START, "identify_question_type")
workflow.add_conditional_edges(
"identify_question_type", # Source node
check_question_type, # Routing function (returns next node name)
{
"chat_with_parking_lot": "chat_with_parking_lot",
"chat_with_parking_lot_chinese": "chat_with_parking_lot_chinese",
"chat_with_car_rental": "chat_with_car_rental",
"chat_with_car_rental_chinese": "chat_with_car_rental_chinese",
"chat_with_other": "chat_with_other",
"chat_with_income": "chat_with_income"
}
)
workflow.add_edge("chat_with_parking_lot", "redefine_answer")
workflow.add_edge("chat_with_parking_lot_chinese", "redefine_answer")
workflow.add_edge("chat_with_car_rental", "redefine_answer")
workflow.add_edge("chat_with_car_rental_chinese", "redefine_answer")
workflow.add_edge("chat_with_income", "redefine_answer")
workflow.add_edge("chat_with_other", "clear_up_state")
workflow.add_edge("redefine_answer", "clear_up_state")
workflow.add_edge("clear_up_state", END)
return workflow
def save_graph(app):
"""Save the compiled graph as a PNG image."""
graph_png = app.get_graph().draw_mermaid_png()
output_dir = "charts"
os.makedirs(output_dir, exist_ok=True)
graph_path = os.path.join(output_dir, f"graph.png")
with open(graph_path, "wb") as f:
f.write(graph_png)
print(f"Graph saved to: {graph_path}")
panda.py
from pandasai import pd
from pandasai.dataframe.base import pai
from pandasai_litellm.litellm import LiteLLM
import pyodbc
import time
from typing import Optional
# === Singleton LLM instance ===
_llm_instance: Optional[LiteLLM] = None
_llm_configured: bool = False
def get_llm():
"""Get or create singleton LLM instance."""
global _llm_instance, _llm_configured
if _llm_instance is None:
_llm_instance = LiteLLM(
model="gpt-4.1",
api_key="sk-wx6xklvYEtIrd-CQuQcqNA",
base_url="https://ai.hld.com/lite-llm/"
)
if not _llm_configured:
pai.config.set({"llm": _llm_instance})
_llm_configured = True
return _llm_instance
# === DataFrame Cache ===
class DataFrameCache:
"""Cache for PandasAI DataFrames with optional TTL."""
def __init__(self, ttl_seconds: int = 3600): # Default 1 hour TTL
self._cache: dict = {}
self._timestamps: dict = {}
self._ttl = ttl_seconds
def get(self, key: str):
"""Get cached DataFrame if exists and not expired."""
if key not in self._cache:
return None
# Check if expired
if time.time() - self._timestamps[key] > self._ttl:
self.invalidate(key)
return None
return self._cache[key]
def set(self, key: str, df):
"""Cache a DataFrame."""
self._cache[key] = df
self._timestamps[key] = time.time()
def invalidate(self, key: str):
"""Remove a specific DataFrame from cache."""
self._cache.pop(key, None)
self._timestamps.pop(key, None)
def clear(self):
"""Clear all cached DataFrames."""
self._cache.clear()
self._timestamps.clear()
# Global cache instance (1 hour TTL by default)
_df_cache = DataFrameCache(ttl_seconds=3600)
# === Database Connection Pool ===
_conn_str = (
"DRIVER={ODBC Driver 17 for SQL Server};"
"SERVER=CLUSQL01.hq.hk.hld;"
"DATABASE=HLDCCSCPRDB_report;"
"UID=HLDCCSCPRDBViewer;"
"PWD=EFbG5BVVNx!hgxox6#5EMkzd4ks*EN;"
"Encrypt=no;"
"TrustServerCertificate=yes;"
)
def get_db_connection():
"""Get database connection."""
return pyodbc.connect(_conn_str)
def init_panda_dataframe(query: str, columns_dict: dict, dataset_path: str, description: str):
"""
Initialize or retrieve cached PandasAI DataFrame.
Uses a multi-level caching strategy:
1. In-memory cache (fastest) - checks if DataFrame is already loaded
2. Disk cache via pai.load() - checks if dataset exists on disk
3. Database query (slowest) - only if dataset doesn't exist
"""
# Ensure LLM is configured (singleton)
get_llm()
# Check in-memory cache first
cached_df = _df_cache.get(dataset_path)
if cached_df is not None:
print(f"[Cache HIT] Returning cached DataFrame for: {dataset_path}")
return cached_df
print(f"[Cache MISS] Loading DataFrame for: {dataset_path}")
# Try to load from disk cache (pai.load)
try:
df = pai.load(dataset_path)
print(f"[Disk Cache HIT] Loaded from disk: {dataset_path}")
_df_cache.set(dataset_path, df)
return df
except Exception:
print(f"[Disk Cache MISS] Creating new dataset: {dataset_path}")
# Dataset not found - need to query database and create
conn = get_db_connection()
try:
raw_df = pd.read_sql(query, conn)
columns_list = [{"name": name, **props} for name, props in columns_dict.items()]
pai_df = pai.DataFrame(raw_df)
df = pai.create(
path=dataset_path,
df=pai_df,
description=description,
columns=columns_list
)
# Cache the newly created DataFrame
_df_cache.set(dataset_path, df)
print(f"[Created & Cached] New dataset: {dataset_path}")
return df
finally:
conn.close()
def refresh_dataset(dataset_path: str, query: str, columns_dict: dict, description: str):
"""
Force refresh a dataset from the database.
Useful when you know data has changed.
"""
# Invalidate cache
_df_cache.invalidate(dataset_path)
# Reload from database
conn = get_db_connection()
try:
raw_df = pd.read_sql(query, conn)
columns_list = [{"name": name, **props} for name, props in columns_dict.items()]
pai_df = pai.DataFrame(raw_df)
# Delete existing and recreate
try:
import shutil
import os
full_path = os.path.join("datasets", dataset_path)
if os.path.exists(full_path):
shutil.rmtree(full_path)
except Exception:
pass
df = pai.create(
path=dataset_path,
df=pai_df,
description=description,
columns=columns_list
)
_df_cache.set(dataset_path, df)
print(f"[Refreshed] Dataset: {dataset_path}")
return df
finally:
conn.close()
def get_cache_stats():
"""Get cache statistics for monitoring."""
return {
"cached_datasets": list(_df_cache._cache.keys()),
"cache_size": len(_df_cache._cache),
"ttl_seconds": _df_cache._ttl
}
chat.py
from typing import Literal
from langchain_core.messages import AIMessage, HumanMessage, RemoveMessage, SystemMessage
from langchain_openai import ChatOpenAI
from pandasai import pd
from pandasai.dataframe.base import pai
from pandasai_litellm.litellm import LiteLLM
from pydantic import BaseModel, Field
import pyodbc
from state import MessageState
from panda import init_panda_dataframe
def chat_with_other(state: MessageState):
print(f"chat_with_other state: {state}")
model = ChatOpenAI(model="gpt-4.1", api_key="sk-wx6xklvYEtIrd-CQuQcqNA", base_url="https://ai.hld.com/lite-llm/")
response = model.invoke(state["messages"])
return { "messages": [AIMessage(content=response.content)] }
def chat_with_income(state: MessageState):
print(f"chat_with_income state: {state}")
query = """
WITH DailyData AS (
SELECT
DATEADD(MONTH, DATEDIFF(MONTH, 0, BusinessDT), 0) AS month_start,
s.ChnName AS PtyCodeCn,
CASE tt.IDTransactionType
WHEN 1000 THEN '月租'
WHEN 1001 THEN '月租'
WHEN 1005 THEN
CASE tt.IDPaymentType
WHEN 1010 THEN '商戶優惠券收入'
ELSE '時租收入'
END
WHEN 1002 THEN '調整'
WHEN 1003 THEN '入閘器按金'
WHEN 1004 THEN '轉換車位'
WHEN 1006 THEN '商戶優惠券按金'
WHEN 1007 THEN '商戶優惠券收入'
WHEN 1008 THEN '雜項'
ELSE 'Other'
END AS TransactionTypeDescription,
tt.ActualAmt
FROM tbl_Transaction tt
JOIN tbl_Site s ON s.PtyCode = tt.PtyCode
), MonthlyPtyTotals AS (
SELECT
month_start,
PtyCodeCn,
SUM(ActualAmt) AS total_amt_for_pty_in_month,
ROW_NUMBER() OVER (PARTITION BY month_start ORDER BY SUM(ActualAmt) DESC) AS rn_in_month
FROM DailyData
GROUP BY month_start, PtyCodeCn
), Top20PtyPerMonth AS (
SELECT
month_start,
PtyCodeCn,
total_amt_for_pty_in_month
FROM MonthlyPtyTotals
WHERE rn_in_month <= 20
) SELECT
dd.month_start AS month,
dd.PtyCodeCn,
dd.TransactionTypeDescription AS IDTransactionType,
SUM(dd.ActualAmt) AS amt_by_type
FROM DailyData dd
INNER JOIN Top20PtyPerMonth t20
ON dd.month_start = t20.month_start AND dd.PtyCodeCn = t20.PtyCodeCn
GROUP BY
dd.month_start,
dd.PtyCodeCn,
dd.TransactionTypeDescription,
t20.total_amt_for_pty_in_month order by amt_by_type desc;
"""
dataset_path = "hld/income"
columns_dict = {
"month": {"type": "datetime", "description": "The first day of the month for this income record (e.g., 2025-12-01 for December 2025). Use this column to filter and compare months. 'This month' or 'current month' refers to the most recent month in the data. 'Previous month' means the month immediately before the most recent month."},
"PtyCodeCn": {"type": "string", "description": "The property code of the income"},
"IDTransactionType": {"type": "string", "description": """Income category. IMPORTANT CATEGORIZATION RULES:
- MONTHLY RENTALS: Only records where IDTransactionType == '月租'. This is the ONLY value for monthly rent.
- DAILY/HOURLY RENTALS: Only records where IDTransactionType == '時租收入'. This is the ONLY value for daily/hourly rent.
- OTHER INCOME: All other values including '商戶優惠券收入', '調整', '入閘器按金', '轉換車位', '商戶優惠券按金', '雜項', 'Other'.
When calculating monthly rentals vs daily/hourly rentals percentage:
1. Monthly rental income = SUM(amt_by_type) WHERE IDTransactionType == '月租'
2. Daily/hourly rental income = SUM(amt_by_type) WHERE IDTransactionType == '時租收入'
3. Total income = SUM(all amt_by_type)
4. Monthly percentage = (Monthly rental income / Total income) * 100
5. Daily/hourly percentage = (Daily/hourly rental income / Total income) * 100"""},
"amt_by_type": {"type": "float", "description": "The rental income amount. Use SUM(amt_by_type) grouped by IDTransactionType to calculate subtotals for each category."},
}
description = """Rental income data view showing rental income statistics.
Each row represents one property in a specific month with a specific transaction sub-category.
ALL income in this dataset is rental income. The IDTransactionType column is just a sub-category.
To calculate total rental income (or total income - they are the same), sum all amt_by_type values.
Contains: month, property code, transaction type sub-category, rental income amount.
"""
df = init_panda_dataframe(query, columns_dict, dataset_path, description)
response = df.chat(state["messages"][-1].content)
print(response)
return {"answer": str(response)}
def identify_question_type(state: MessageState):
class QuestionType(BaseModel):
question_type: Literal["parking_lot", "parking_lot_chinese", "car_rental", "car_rental_chinese", "income", "other"] = Field(
description="The type of question",
default="other"
)
model = ChatOpenAI(model="gpt-4.1", api_key="sk-wx6xklvYEtIrd-CQuQcqNA", base_url="https://ai.hld.com/lite-llm/")
model_with_schema = model.with_structured_output(QuestionType)
system_message = SystemMessage(content="""
You are a helpful assistant that classifies questions about parking lots, car rentals, and income.
You will be given a question and you need to determine the type of question.
The question types are:
## income (HIGHEST PRIORITY for income-related questions)
Questions about rental income, revenue, earnings, and money-related statistics. This includes:
General Income Questions:
- What is the total monthly rental income for the past three months?
- How does the daily income compare to the hourly income for this week?
- What percentage of total income comes from monthly rentals versus daily/hourly rentals?
Income Comparison Questions:
- How does this monthly rental income compare to the previous month?
- Which location has the highest daily income compared to its monthly rental income?
- What is the average income for both monthly and daily/hourly rentals?
Income Data Inquiries:
- How have the income patterns changed over the last six months for both rental types?
- Which days of the week typically generate the most hourly income?
Income Insight Queries:
- What factors contribute to higher monthly rental income compared to daily/hourly income?
- Is there a correlation between carpark location and income patterns?
- How does seasonal demand affect monthly versus daily rental income?
Keywords (English): income, revenue, earnings, money, earnings, profit, total income, monthly income, daily income, hourly income, rental income, income comparison, income pattern, income statistics
Keywords (Chinese): 收入, 收入統計, 收入數據, 月租收入, 時租收入, 商戶優惠券收入, 調整, 入閘器按金, 轉換車位, 商戶優惠券按金, 雜項, 營收, 盈利
## car_rental
Questions related to rental statistics, rental ratios, and rental units (NOT income/money). This includes:
- 月租車位出租率 (monthly parking spot rental rate/ratio)
- 出租率 (rental rate/ratio)
- rent ratio, total rent ratio
- rented units, rent unit, 出租單位
- available long-term rental units
- property-level rental statistics
- rental comparison between months
- Questions containing keywords: 出租, 租出, rent ratio, rented, rental rate
## car_rental_chinese
Questions related to rental statistics, rental ratios, and rental units in Chinese (NOT income/money). This includes:
- 月租車位出租率 (monthly parking spot rental rate/ratio)
- 出租率 (rental rate/ratio)
- Questions containing keywords: 出租, 租出, rent ratio, rented, rental rate
- Questions containing keywords: 月租車位出租率, 出租率, rent ratio, rented, rental rate
- Questions containing keywords: 出租單位, rent unit, 出租單位
- Questions containing keywords: 可供月租單位數, available long-term rental units
- Questions containing keywords: 物業編號, 物業中文名稱, 物業編號+中文名稱
- Questions containing keywords: 單位總數, 可用單位數, 可供月租單位數, 已出租單位數
## parking_lot
Questions about parking lot information and basic statistics in English. This includes:
- Number of parking lots
- Parking lot districts (HK, KLN, NT)
- Parking lot types (C, M)
- Monthly vs hourly parking availability (Monthly + Hourly, Hourly Only, Monthly Only)
- Parking lot property details (property code, name)
- Questions containing keywords: parking lot, district, CPType
## parking_lot_chinese
Questions about parking lot information and basic statistics in Chinese. This includes:
- 停車場數量, 車場數量, 有幾多個停車場
- 車位總數, 車位數量, 有幾多個車位
- 地區: 香港, 九龍, 新界
- 車場類型
- 月租, 時租
- Questions containing keywords: 停車場, 車場, 車位總數, 車位數量, 地區
## other
Questions that are not about parking lots, car rentals, or income.
IMPORTANT CLASSIFICATION RULES:
1. If the question mentions income, revenue, earnings, money, profit, 收入, 營收, or any income/money-related terms, classify as "income".
2. If the question mentions 出租率, 月租車位出租率, rent ratio, rented unit, or any rental rate/ratio metrics (NOT income), classify as "car_rental" or "car_rental_chinese" based on language.
3. If the question is in Chinese and asks about 停車場, 車場數量, 車位總數, classify as "parking_lot_chinese".
4. If the question is in English and asks about parking lots, classify as "parking_lot".
You need to return the question type in the format of "parking_lot", "parking_lot_chinese", "car_rental", "car_rental_chinese", "income", or "other".
""")
response = model_with_schema.invoke([system_message, *state["messages"]])
print(f"response: {response}")
return {"question_type": response.question_type}
def redefine_answer(state: MessageState):
print(f"redefine_answer state: {state}")
system_message = SystemMessage(content=f"""
You are a helpful assistant that redefines the answer to the {state.get('question_type', 'other')} question.
You will be given a question and an answer.
You need to redefine the answer to the {state.get('question_type', 'other')}
question in a more concise and clear way.
The answer should be in a format that is easy to understand and answer the question.
""")
model = ChatOpenAI(model="gpt-4.1", api_key="sk-wx6xklvYEtIrd-CQuQcqNA", base_url="https://ai.hld.com/lite-llm/")
response = model.invoke([system_message,
HumanMessage(content=f"Question: {state['messages'][-1].content}\nAnswer: {state['answer']}")]
)
return {"messages": [AIMessage(content=response.content)]}
def check_question_type(state: MessageState):
print(f"check_question_type state: {state}")
if state["question_type"] == "parking_lot":
return "chat_with_parking_lot"
elif state["question_type"] == "parking_lot_chinese":
return "chat_with_parking_lot_chinese"
elif state["question_type"] == "car_rental":
return "chat_with_car_rental"
elif state["question_type"] == "car_rental_chinese":
return "chat_with_car_rental_chinese"
elif state["question_type"] == "income":
return "chat_with_income"
else:
return "chat_with_other"
def clear_up_state(state: MessageState):
"""
Clear all state attributes and keep only the last 3 messages.
Uses RemoveMessage to properly remove old messages from the state.
"""
messages = state.get("messages", [])
# Get messages to remove (all except last 4)
messages_to_remove = messages[:-4] if len(messages) > 4 else []
# Create RemoveMessage commands for old messages
delete_messages = [RemoveMessage(id=msg.id) for msg in messages_to_remove]
return {
"messages": delete_messages, # Remove old messages, keeping last 3
"question_type": None, # Clear question type
"answer": "" # Clear answer
}