使用LangGraph、FastAPI和React构建AI驱动的注册系统

2025年03月04日 由 alex 发表 1913 0

在这篇文章中,我分享了一个基于FastAPI、React、LangGraph、DSPy、Guardrails AI和MLflow构建的AI驱动注册系统。该系统通过结构化对话引导用户完成注册详情的填写,同时利用AI验证确保输入准确性。它通过使用AI进行验证循环、多步骤工作流导航和实时数据驱动调整,实现了动态适应性。虽然该系统自动化了结构化交互,但在完全自主行为方面还有所保留,保持了AI驱动指导与可预测用户体验之间的平衡。


一个完全自主的系统会在此基础上更进一步,自主决定交互顺序,根据更广泛的目标适应工作流,并在没有预定义规则的情况下进行自我修正。它不会遵循结构化路径,而是会根据上下文做出决策,动态重新安排任务优先级,并不断完善其方法。


可预测性与自主性之间的权衡:

  • 可预测性确保合规性、可重复性和用户信任——在金融和医疗等受监管行业中尤为重要,因为这些行业的决策必须可审计且可解释。
  • 自主性允许AI在没有严格约束的情况下适应和优化工作流,可能会提高效率,但代价是减少了监督。
  • 混合方法(如本系统)在AI驱动的灵活性与人工监督和基于规则的约束之间取得平衡,允许动态交互的同时保持治理和控制。


虽然该系统相对简单,但它展示了AI如何在不牺牲透明度和可靠性的前提下增强结构化工作流,为在用户面向的应用中集成AI同时保持可解释性提供了一条实用路径。


关键特性:

  • 对话式UI:用户通过与聊天机器人般的界面交互,逐步提供注册详情,并可选择跳过某些步骤。
  • FastAPI后端:处理会话管理、状态转换和AI驱动的验证。
  • React前端:提供直观的用户体验,根据用户响应动态更新。
  • LangGraph用于流程控制:管理多步骤对话逻辑,确保注册步骤之间的顺畅过渡。
  • 基于OpenAI GPT-3.5 Turbo的AI驱动验证:系统使用OpenAI的GPT-3.5 Turbo验证用户输入,以两种方式实现以进行比较:
  • 使用OpenAI Python库:直接调用OpenAI的API生成结构化验证响应。
  • 使用DSPy进行提示优化:利用DSPy(一种声明式AI框架)优化提示,提高AI生成验证的可靠性。
  • 比较:虽然OpenAI库提供了直接的API调用,但DSPy通过从过去的交互中学习并优化响应生成,提高了提示效率。
  • Guardrails AI用于结构化验证:确保输入符合预期格式,纠正小错误,并在需要时请求澄清。
  • MLflow用于日志记录:跟踪验证性能,记录交互,并支持模型改进的调试。


技术栈:

  • Python后端:FastAPI、LangGraph、DSPy、MLflow、Guardrails AI
  • 前端:React(Vite)、Material UI
  • 数据库:SQLite(用于会话持久性)
  • 部署:Docker、Docker Compose


对话式UI

本文介绍的对话式注册系统将传统的表单填写转变为动态、引导式的对话,提高了用户参与度和数据质量。该系统不是呈现静态表单,而是逐步收集用户信息,并根据响应进行适应。虽然UI相当简单,但它突出了如何创建一种既有引导又开放式的体验,同时提供强大的实时验证和格式化功能。


a


本文主要关注后端功能和代码,因为它们是前端动态特性的驱动力。我提供了完整代码仓库的链接,以便你可以亲自试验前端和后端代码。


使用LangGraph的基于图的工作流

在这个示例中,LangGraph通过定义一个交互图来结构化注册流程,其中每个节点代表一个特定问题(例如,“你的电子邮件是什么?”),而边则决定了用户响应后对话如何进展。系统可以根据用户反馈动态跳过步骤、请求澄清或调整流程,确保提供上下文感知和自适应的用户体验。通过利用LangGraph,应用程序能够在交互过程中保持状态,在需要时恢复会话,并确保用户能够顺利完成注册过程,而不会遇到重复或无关的问题。下图展示了定义流程的有向循环图(DCG)。请注意,用户可以跳过ask_address和ask_phone节点(用虚线表示):


6


LangGraph是一个框架,旨在使用基于图的工作流来构建结构化、有状态和多步骤的AI应用程序。与提示链或简单的顺序API不同,LangGraph支持灵活的分支、条件逻辑和内存持久化,因此非常适合像这种基于AI的注册系统这样的复杂对话应用程序。请注意,在这个示例中,我使用SQLite来存储对话状态,但我也可以使用LangGraph的内存持久化功能。下面的Python类负责构建有向循环图(DCG),该图将被编译用于FastAPI后端。


from langgraph.graph import END
from db.sqlite_db import RegistrationState
from graph.base_graph import BaseGraphManager
import logging

class RegistrationGraphManager(BaseGraphManager):
    """Specialized graph manager with domain-specific (registration) logic."""
    def __init__(self, name: str, question_map: dict):
        # We pass RegistrationState as the state_class
        super().__init__(name, question_map, RegistrationState)
    def _build_graph(self):
        """
        Override the base build method to incorporate optional steps,
        or domain-specific edges.
        """
        def ask_question(state: RegistrationState, question_text: str):
            logging.info(f"[Registration] Transitioning to: {question_text}")
            return {
                "collected_data": state.collected_data,
                "current_question": question_text,
            }
        def create_question_node(question_text):
            return lambda s: ask_question(s, question_text)
        # Add each node
        for key, question_text in self.question_map.items():
            self.graph.add_node(key, create_question_node(question_text))
        # Set entry point
        self.graph.set_entry_point("ask_email")
        # Normal edges for the first two nodes
        self.graph.add_edge("ask_email", "ask_name")
        def path_func(state: RegistrationState):
            """Determine where to go next, considering multiple skips."""
            skip_address = state.collected_data.get("skip_ask_address", False)
            skip_phone = state.collected_data.get("skip_ask_phone", False)
            if skip_address and skip_phone:
                return "ask_username"  # Skip both address and phone
            elif skip_address:
                return "ask_phone"  # Skip address only
            else:
                return "ask_address"  # Default to asking for address first
        self.graph.add_conditional_edges(
            source="ask_name",
            path=path_func,
            path_map={
                "ask_address": "ask_address",
                "ask_phone": "ask_phone",
                "ask_username": "ask_username",
            },
        )
        self.graph.add_edge("ask_address", "ask_phone")
        self.graph.add_conditional_edges(
            source="ask_phone",
            path=lambda state: (
                "ask_username"
                if state.collected_data.get("skip_ask_phone", False)
                else "ask_username"
            ),
            path_map={"ask_username": "ask_username"},
        )
        self.graph.add_edge("ask_username", "ask_password")
        self.graph.add_edge("ask_password", END)


基于AI的用户答案验证

DSPy是一个框架,用于以编程方式定义和优化AI工作流,非常适合结构化验证任务。在这个应用程序中,DSPy定义了一个验证签名,该签名接收用户的响应和当前问题作为输入,然后生成结构化输出:

  • 状态(有效、需要澄清或错误)
  • 反馈(对任何问题的解释)
  • 格式化答案(响应的正确结构化版本)


验证逻辑由OpenAI的GPT-3.5-turbo提供支持,DSPy通过声明式的dspy.Predict函数调用它。通过利用DSPy,注册系统确保响应格式一致(例如,小写电子邮件、标准化电话号码、正确大写的姓名),同时检测不完整或模糊的输入。


此外,DSPy与Guardrails AI无缝集成,增加了额外的验证层来强制执行预期的响应模式——这在后面的部分中会详细介绍。该实现还结合了MLflow,捕获每个验证步骤的详细跟踪信息,用于审计和模型监控。下面是DSPy验证器代码,展示了这种集成。


import guardrails as gd
import dspy
from validation.base_validator import BaseValidator
from pydantic import ValidationError
from validation.validated_response import ValidatedLLMResponse
from typing import Literal
import logging
import json
import mlflow
from helpers.config import OPENAI_API_KEY, MLFLOW_ENABLED, MLFLOW_EXPERIMENT_NAME
dspy.settings.configure(lm=dspy.LM(model="gpt-3.5-turbo", api_key=OPENAI_API_KEY))
if MLFLOW_ENABLED:
    mlflow.dspy.autolog()
guard = gd.Guard.for_pydantic(ValidatedLLMResponse)
# Define DSPy Signature
class ValidateUserAnswer(dspy.Signature):
    """Validates and formats user responses. Should return 'valid', 'clarify', or 'error'."""
    question: str = dspy.InputField()
    user_answer: str = dspy.InputField()
    status: Literal["valid", "clarify", "error"] = dspy.OutputField(
        desc="Validation status: 'valid', 'clarify', or 'error'. Return 'valid' if the input contains all necessary information."
    )
    feedback: str = dspy.OutputField(
        desc="Explanation if response is incorrect or needs details."
    )
    formatted_answer: str = dspy.OutputField(
        desc="Return the response with proper formatting. Example: "
        "- Emails: Lowercase (e.g., 'John@gmail.com' → 'john@gmail.com'). "
        "- Names: Capitalize first & last name (e.g., 'john doe' → 'John Doe'). "
        "- Addresses: Capitalize & ensure complete info (e.g., '123 main st,newyork,ny' → '123 Main St, New York, NY 10001'). "
        "- Phone numbers: Format as (XXX) XXX-XXXX (e.g., '1234567890' → '(123) 456-7890'). "
        "An address must include: street number, street name, city, state, and ZIP code. "
        "Reject responses that do not meet this format with status='clarify'."
        "If the response cannot be formatted, return the original answer."
    )

run_llm_validation = dspy.Predict(ValidateUserAnswer)

class DSPyValidator(BaseValidator):
    """Uses DSPy with Guardrails AI for structured validation."""
    def validate(self, question: str, user_answer: str):
        """Validates user response, applies guardrails, and logs to MLflow."""
        try:
            raw_result = run_llm_validation(question=question, user_answer=user_answer)
            structured_validation_output = guard.parse(json.dumps(raw_result.toDict()))
            validated_dict = dict(structured_validation_output.validated_output)
            if MLFLOW_ENABLED:
                mlflow.set_experiment(MLFLOW_EXPERIMENT_NAME)
                with mlflow.start_run(nested=True):
                    mlflow.log_param("validation_engine", "DSPy + Guardrails AI")
                    mlflow.log_param("question", question)
                    mlflow.log_param("input_answer", user_answer)
                    mlflow.log_param("status", validated_dict["status"])
                    mlflow.log_param(
                        "formatted_answer", validated_dict["formatted_answer"]
                    )
            return {
                "status": validated_dict["status"],
                "feedback": validated_dict["feedback"],
                "formatted_answer": validated_dict["formatted_answer"],
            }
        except ValidationError as ve:
            logging.error(f"Validation error: {str(ve)}")
            return {
                "status": "error",
                "feedback": "Output validation failed.",
                "formatted_answer": user_answer,
            }
        except Exception as e:
            logging.error(f"Validation error: {str(e)}")
            return {
                "status": "error",
                "feedback": "An error occurred during validation.",
                "formatted_answer": user_answer,
            }


上述代码片段中的ValidateUserAnswer类定义了一个DSPy签名,该签名作为结构化AI工作流的输入和输出约束的声明性规范。这确保了大型语言模型(LLM)遵循预定义的模式,使得验证更加可预测和可解释。在这个用例中,签名遵循以下结构:


问题、用户答案 → 状态、反馈、格式化答案


这个结构化定义被传递给dspy.Predict,允许DSPy直接将其应用于用户输入。MLflow捕获并记录DSPy发送给gpt-3.5-turbo的确切请求,以及模型的结构化响应,从而实现透明度和可重复性。


值得注意的是,DSPy遵循系统、用户和助手提示结构,这是与OpenAI的GPT模型交互时的标准方法。这种格式有助于保持对话上下文,确保验证逻辑对LLM而言是一致且框架化的。


System
Your input fields are:input fields are:
question (str)
user_answer (str)
Your output fields are:
status (typing.Literal['valid', 'clarify', 'error']): Validation status: 'valid', 'clarify', or 'error'. Return 'valid' if the input contains all necessary information.
feedback (str): Explanation if response is incorrect or needs details.
formatted_answer (str): Return the response with proper formatting. Example: - Emails: Lowercase (e.g., 'John@gmail.com' → 'john@gmail.com'). - Names: Capitalize first & last name (e.g., 'john doe' → 'John Doe'). - Addresses: Capitalize & ensure complete info (e.g., '123 main st,newyork,ny' → '123 Main St, New York, NY 10001'). - Phone numbers: Format as (XXX) XXX-XXXX (e.g., '1234567890' → '(123) 456-7890'). An address must include: street number, street name, city, state, and ZIP code. Reject responses that do not meet this format with status='clarify'.If the response cannot be formatted, return the original answer.
All interactions will be structured in the following way, with the appropriate values filled in.
[[ ## question ## ]] {question}
[[ ## user_answer ## ]] {user_answer}
[[ ## status ## ]] {status} # note: the value you produce must exactly match (no extra characters) one of: valid; clarify; error
[[ ## feedback ## ]] {feedback}
[[ ## formatted_answer ## ]] {formatted_answer}
[[ ## completed ## ]]
In adhering to this structure, your objective is: Validates and formats user responses. Should return 'valid', 'clarify', or 'error'.


User
[[ ## question ## ]] What is your address?
[[ ## user_answer ## ]] 112 main st los angeles ca 90210
Respond with the corresponding output fields, starting with the field [[ ## status ## ]] (must be formatted as a valid Python typing.Literal['valid', 'clarify', 'error']), then [[ ## feedback ## ]], then [[ ## formatted_answer ## ]], and then ending with the marker for [[ ## completed ## ]].


Assistant
[[ ## status ## ]] valid
[[ ## feedback ## ]] N/A
[[ ## formatted_answer ## ]] 112 Main St, Los Angeles, CA 90210
[[ ## completed ## ]]


你可以看到,模型验证了输入的正确性,并返回了一个格式良好的地址,在显示给用户之前确保了数据的一致性。这种结构化验证通过强制执行响应的标准格式,有助于维护数据完整性。


现在,让我们使用OpenAI Python客户端(不使用DSPy)对相同的用户输入进行比较。下面是直接与OpenAI API交互的验证器代码,绕过了DSPy的声明式结构。


应用程序允许通过修改环境变量,在基于DSPy的验证器和原始的OpenAI Python实现之间无缝切换。这使得实验变得简单,让你可以亲身体验DSPy的结构化方法如何影响验证、格式化和整体可预测性。


import openai
import guardrails as gd
import json
from typing import Dict
from validation.base_validator import BaseValidator
from validation.validated_response import ValidatedLLMResponse
from helpers.config import OPENAI_API_KEY, MLFLOW_ENABLED, MLFLOW_EXPERIMENT_NAME
import mlflow
if MLFLOW_ENABLED:
    mlflow.openai.autolog()
guard = gd.Guard.for_pydantic(ValidatedLLMResponse)

class ChatGPTValidator(BaseValidator):
    """ChatGPT-based implementation of the validation strategy."""
    def validate(self, question: str, user_answer: str) -> Dict[str, str]:
        """Uses OpenAI ChatGPT to validate responses."""
        client = openai.OpenAI(api_key=OPENAI_API_KEY)
        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=[
                {
                    "role": "system",
                    "content": (
                        "You are a helpful assistant that validates user responses. "
                        "You must respond in JSON format with a clear validation status. "
                        "If the response is valid, return: {'status': 'valid', 'feedback': '<feedback message>', 'formatted_answer': '<formatted response>'}. "
                        "If the response needs clarification, return: {'status': 'clarify', 'feedback': '<clarification message>', 'formatted_answer': '<original response>'}."
                        "Ensure proper formatting: lowercase emails, capitalized names, standardized phone numbers and addresses."
                    ),
                },
                {
                    "role": "user",
                    "content": f"Question: {question}\nUser Answer: {user_answer}\nValidate the answer.",
                },
            ],
            response_format={"type": "json_object"},
        )
        try:
            validation_str = response.choices[0].message.content.strip()
            validation_result = json.loads(validation_str)
            print(validation_result)
            # Apply Guardrails AI
            validated_result = guard.parse(json.dumps(validation_result))
            validated_dict = validated_result.validated_output
            print(validated_dict)
        except (json.JSONDecodeError, KeyError):
            validation_result = {
                "status": "error",
                "feedback": "Error processing validation response.",
                "formatted_answer": user_answer,
            }
        if MLFLOW_ENABLED:
            mlflow.set_experiment(MLFLOW_EXPERIMENT_NAME)
            with mlflow.start_run(nested=True):
                mlflow.log_param("validation_engine", "ChatGPT")
                mlflow.log_param("question", question)
                mlflow.log_param("input_answer", user_answer)
                mlflow.log_param("status", validated_dict.get("status", "error"))
                mlflow.log_param(
                    "feedback", validated_dict.get("feedback", "No feedback")
                )
                mlflow.log_param(
                    "formatted_answer",
                    validated_dict.get("formatted_answer", user_answer),
                )
        return validated_dict


与DSPy实现中使用声明式签名来结构化验证不同,使用OpenAI Python客户端的方法需要在API调用中显式定义系统和用户角色。然而,它缺乏一种结构化方法,无法让你直接在Python代码中定义验证逻辑。


在没有DSPy的情况下,验证逻辑必须在提示中手动构建,而不是通过输入/输出字段以编程方式定义。这使得一致性执行、问题调试和集成额外的验证层(如Guardrails AI)变得更加困难。


为了说明这种差异,下面是使用OpenAI Python客户端发送给gpt-3.5-turbo的实际内容。请注意,提示结构是手动设计的,而不是像DSPy中那样以编程方式强制执行的:


[
  {
    "role": "system",
    "content": "You are a helpful assistant that validates user responses. You must respond in JSON format with a clear validation status. If the response is valid, return: {'status': 'valid', 'feedback': '<feedback message>', 'formatted_answer': '<formatted response>'}. If the response needs clarification, return: {'status': 'clarify', 'feedback': '<clarification message>', 'formatted_answer': '<original response>'}.Ensure proper formatting: lowercase emails, capitalized names, standardized phone numbers and addresses."
  },
  {
    "role": "user",
    "content": "Question: What is your address?\nUser Answer: 112 main st los angeles ca 90210\nValidate the answer."
  }
]


[
  {
    "finish_reason": "stop",
    "index": 0,
    "logprobs": null,
    "message": {
      "content": "{\n\t\"status\": \"valid\",\n\t\"feedback\": \"Address is valid.\",\n\t\"formatted_answer\": \"112 Main St, Los Angeles, CA 90210\"\n}",
      "refusal": null,
      "role": "assistant",
      "audio": null,
      "function_call": null,
      "tool_calls": null
    }
  }
]


在这个示例中,两个验证器返回了相同的响应:输入有效,且正确格式化的地址是“112 Main St, Los Angeles, CA 90210”。这是因为第二个验证器中的提示结构良好,这是经过多次迭代才实现的。


虽然在这个简单案例中,OpenAI Python客户端同样有效,但DSPy的结构化方法在更复杂场景中表现出色。它允许开发人员显式定义预期的输入-输出结构,确保多个请求之间的一致性。此外,如果模型的输出无效或模糊,DSPy可以自动重试请求或应用内置启发式方法来优化响应,从而减少手动后处理的需求。


使用Guardrails AI进行结构化验证和安全性保障

Guardrails AI对GPT-3.5-turbo模型生成的输出强制执行结构化验证和安全性约束,确保响应在到达应用程序之前符合预定义的格式、约束和业务逻辑。在这个系统中,Guardrails AI验证关键输出字段——状态、反馈和格式化答案——以保证:

  • 电子邮件地址始终为小写
  • 电话号码遵循正确的格式 (XXX) XXX-XXXX
  • 地址包含所有必需的组成部分(街道、城市、州、邮政编码)


这个验证层对于维护数据完整性至关重要,尤其是在高风险应用中,错误、不完整或格式不正确的信息可能导致:

  • 监管和合规风险
  • 运营效率低下(例如,交易失败、记录错误)
  • 用户体验不佳和信任丧失


通过将Guardrails AI与DSPy结合使用,系统受益于结构化验证和智能重试机制。这确保了错误能够早期捕获,强制执行特定领域的规则,并防止幻觉或格式错误的输出传播,从而提高了AI驱动决策的整体可靠性和可信度。


以下是Guardrails AI用于强制执行输出验证的Pydantic类。


from pydantic import BaseModel, Field, field_validator
import re
class ValidatedLLMResponse(BaseModel):
    """Validates & formats user responses using Guardrails AI & Pydantic."""
    status: str = Field(..., pattern="^(valid|clarify|error)$")
    feedback: str
    formatted_answer: str
    @field_validator("formatted_answer", mode="before")
    @classmethod
    def validate_and_format(cls, value, values):
        """Formats & validates responses based on the question type."""
        if values.get("status") == "error":
            return value  # Skip validation for errors
        question = values.get("question", "").lower()
        # Validate Email Format
        if "email" in question:
            return cls.validate_email(value)
        # Validate Name Format (Capitalize First & Last Name)
        if "name" in question:
            return cls.validate_name(value)
        # Validate Phone Number (Format: (XXX) XXX-XXXX)
        if "phone" in question:
            return cls.validate_phone(value)
        # Validate Address (Must contain street, city, state, ZIP)
        if "address" in question:
            return cls.validate_address(value)
        return value  # Default: Return unchanged
    @staticmethod
    def validate_email(email: str) -> str:
        """Validates email format and converts to lowercase."""
        return (
            email.lower() if re.match(r"^[\w\.-]+@[\w\.-]+\.\w+$", email) else "clarify"
        )
    @staticmethod
    def validate_name(name: str) -> str:
        """Capitalizes first & last name."""
        return " ".join(word.capitalize() for word in name.split())
    @staticmethod
    def validate_phone(phone: str) -> str:
        """Validates & formats phone numbers as (XXX) XXX-XXXX."""
        digits = re.sub(r"\D", "", phone)  # Remove non-numeric characters
        return (
            f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
            if len(digits) == 10
            else "clarify"
        )
    @staticmethod
    def validate_address(address: str) -> str:
        """Ensures address includes street, city, state, ZIP & formats correctly."""
        components = address.split(",")
        if len(components) < 3:
            return "clarify"  # Address is incomplete
        formatted_address = ", ".join(comp.strip().title() for comp in components)
        return (
            formatted_address if re.search(r"\d{5}", formatted_address) else "clarify"
        )


将所有组件集成到FastAPI中

这个FastAPI后端作为系统的协调器,负责会话管理、用户输入验证和工作流推进。它提供了三个主要端点:/start_registration、/submit_response和/edit_field,每个端点在引导用户完成注册过程中都发挥着关键作用。


/start_registration端点用于初始化新会话,分配唯一的session_id并启动注册流程。它使用LangGraph确定工作流中的第一个问题,并将初始状态存储在SQLite数据库中。由RegistrationGraphManager定义的注册图结构化了问题序列,确保工作流按逻辑顺序进行。


当用户通过/submit_response端点提交响应时,系统从数据库中检索会话状态,使用validate_user_input验证输入,并更新存储的响应。验证过程使用DSPy和Guardrails AI,确保输入符合预期的格式和业务规则。如果输入不完整或无效,系统会提示用户进行澄清,而不是进入下一步。然后,工作流动态确定下一个问题,允许自适应排序——例如,根据之前的响应跳过可选步骤。一旦注册完成,将返回收集到的响应的结构化摘要。


/edit_field端点允许用户修改之前提交的信息。它检索会话状态,重新验证更新的输入,如果新值符合所需标准,则更新数据库。这确保了用户更正的处理与初始数据输入时应用的相同验证和格式化逻辑保持一致。


from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uuid
import logging
from validation.factory import validate_user_input
from db.sqlite_db import fetch_session_from_db, upsert_session_to_db, RegistrationState
from graph.registration_graph import RegistrationGraphManager
app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "http://localhost",
        "http://localhost:8080",
        "http://localhost:5173",
    ],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
registration_questions = {
    "ask_email": "What is your email address?",
    "ask_name": "What is your full name?",
    "ask_address": "What is your address?",
    "ask_phone": "What is your phone number?",
    "ask_username": "Choose a username.",
    "ask_password": "Choose a strong password.",
}
registration_graph = RegistrationGraphManager("registration", registration_questions)
registration_graph.generate_mermaid_diagram()

@app.post("/start_registration")
def start_registration():
    session_id = str(uuid.uuid4())
    # Our initial state
    initial_state: RegistrationState = {
        "collected_data": {},
        "current_question": "",
        "current_node": "ask_email",
        "session_id": session_id,
    }
    # Start the graph & get the first node
    execution = registration_graph.compiled_graph.stream(initial_state)
    try:
        steps = list(execution)  # Fully consume the generator
        if not steps:
            raise RuntimeError("Generator exited before producing any states.")
        first_step = steps[0]
    except GeneratorExit:
        raise RuntimeError("GeneratorExit detected before first transition!")
    # Extract state from the first node
    first_node_key = list(first_step.keys())[0]
    first_node_state = first_step[first_node_key]
    first_node_state["current_node"] = first_node_key
    first_node_state["session_id"] = session_id
    # Save to session
    upsert_session_to_db(
        session_id,
        first_node_state["collected_data"],
        first_node_state["current_question"],
        first_node_state["current_node"],
    )
    return {
        "session_id": session_id,
        "message": first_node_state["current_question"],
        "state": first_node_state,
    }

@app.post("/submit_response")
def submit_response(response: dict):
    session_id = response.get("session_id")
    if not session_id:
        return {"error": "Missing session_id"}
    current_state = fetch_session_from_db(session_id)
    if not current_state:
        return {"error": "Session not found. Please restart registration."}
    skip_steps = response.get("skip_steps", [])
    for node_key in skip_steps:
        logging.info(f"skip_{node_key}")
        current_state[f"skip_{node_key}"] = True
    user_answer = response.get("answer", "")
    current_question = current_state["current_question"]
    current_node = current_state["current_node"]
    # Use dspy to validate the answer with fallbacks
    if current_node in skip_steps:
        # If user is skipping this question, create a dummy validation result
        validation_result = {
            "status": "valid",
            "feedback": "Skipped this question",
            "formatted_answer": "-",
        }
        logging.info(f"Skipping validation for {current_node}")
    else:
        # Normal validation
        validation_result = validate_user_input(current_question, user_answer)
        # If there's a clarify/error
        if validation_result["status"] in ("clarify", "error"):
            return {
                "next_question": current_question,
                "validation_feedback": validation_result["feedback"],
                "user_answer": user_answer,
                "formatted_answer": validation_result["formatted_answer"],
                "state": current_state,
            }
    current_state["collected_data"][current_state["current_node"]] = validation_result[
        "formatted_answer"
    ]
    if "current_node" not in current_state or not current_state.get("collected_data"):
        return {"error": "Corrupt session state, restart registration."}
    next_step = registration_graph.resume_and_step_graph(current_state)
    if not next_step or next_step == {}:
        # Means we've hit the END node or no more steps
        return {
            "message": "Registration complete!",
            "validation_feedback": validation_result["feedback"],
            "user_answer": user_answer,
            "formatted_answer": validation_result["formatted_answer"],
            "state": current_state,
            "summary": current_state["collected_data"],
        }
    next_node_key = list(next_step.keys())[0]
    next_node_state = next_step[next_node_key]
    next_node_state["current_node"] = next_node_key
    upsert_session_to_db(
        session_id,
        current_state["collected_data"],
        next_node_state["current_question"],
        next_node_state["current_node"],
    )
    return {
        "next_question": next_node_state["current_question"],
        "validation_feedback": validation_result["feedback"],
        "user_answer": user_answer,
        "formatted_answer": validation_result["formatted_answer"],
        "state": next_node_state,
        "summary": current_state["collected_data"],
    }

@app.post("/edit_field")
def edit_field(request: dict):
    session_id = request.get("session_id")
    if not session_id:
        return {"error": "Missing session_id"}
    field_to_edit = request.get("field_to_edit")
    new_value = request.get("new_value")
    current_state = fetch_session_from_db(session_id)
    if not current_state:
        logging.error("Session not found. Please restart registration.")
        return {"error": "Session not found. Please restart registration."}
    question_text = registration_questions.get(field_to_edit)
    if not question_text:
        logging.error(f"Invalid field_to_edit: {field_to_edit}")
        return {"error": f"Invalid field_to_edit: {field_to_edit}"}
    validation_result = validate_user_input(
        question=question_text, user_answer=new_value
    )
    if validation_result["status"] == "clarify":
        return {
            "message": "Needs clarification",
            "validation_feedback": validation_result["feedback"],
            "raw_answer": new_value,
            "formatted_answer": validation_result["formatted_answer"],
        }
    current_state["collected_data"][field_to_edit] = validation_result[
        "formatted_answer"
    ]
    upsert_session_to_db(
        session_id,
        current_state["collected_data"],
        current_state["current_question"],
        current_state["current_node"],
    )
    return {
        "message": "Field updated successfully!",
        "validation_feedback": validation_result["feedback"],
        "raw_answer": new_value,
        "formatted_answer": validation_result["formatted_answer"],
        "summary": current_state["collected_data"],
    }


在整个过程中,MLflow记录交互,捕获模型的输入和输出以进行监控和确保可重复性。通过结合工作流管理、结构化验证、会话持久性和用户更正处理,这个FastAPI实现为增强型AI结构化对话注册系统提供了骨干。


总结

在这篇文章中,我介绍了使用FastAPI、React、LangGraph、DSPy、Guardrails AI和MLflow实现的AI驱动注册系统。该系统引导用户通过结构化对话,实时验证其响应,同时确保数据的准确性和一致性。通过利用AI驱动的验证,应用程序能够动态适应用户输入,从而提供更无缝和智能的入职体验。



文章来源:https://medium.com/@phurlocker/building-an-ai-powered-registration-system-with-langgraph-fastapi-and-react-3e651ce83d71
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消