本文将介绍为了构建工作流程并使其更加用户友好,使用 Streamlit 实现了一个 UI 来增强用户体验。该 UI 显示工作流程执行的进度更新、集成用户输入、实现实时用户反馈并呈现最终生成的幻灯片。
后端增强功能:
前端 UI 功能:
综合起来:
从终端启动工作流时,可以直接看到它正在执行哪个步骤以及我们在这些步骤中记录的日志。
我们还可以在工作流中使用 user_feedback = input()(用户反馈 = 输入())来启用人环互动。这将暂停工作流,等待用户输入。然而,为了在用户友好的界面中实现相同的功能,我们需要对原始工作流进行额外的修改。
从工作流中发送流式事件
工作流程的执行可能需要很长时间,因此为了获得更好的用户体验,Llamaindex 提供了一种发送流式事件的方法,以显示工作流程的进度,如笔记本所示。在我的工作流中,我定义了一个 WorkflowStreamingEvent 类,其中包含了关于事件消息的有用信息,如事件类型、从哪一步发送等:
class WorkflowStreamingEvent(BaseModel):
event_type: Literal["server_message", "request_user_input"] = Field(
..., description="Type of the event"
)
event_sender: str = Field(
..., description="Sender (workflow step name) of the event"
)
event_content: Dict[str, Any] = Field(..., description="Content of the event")
要启用流式事件发送功能,工作流步骤需要访问共享上下文,具体方法是在步骤定义中添加 @step(pass_context=True) 装饰器。然后,在步骤定义中,我们可以通过上下文发送有关进度的事件消息。例如,在 tavily_query() 步骤中:
@step(pass_context=True)
async def tavily_query(self, ctx: Context, ev: StartEvent) -> TavilyResultsEvent:
ctx.data["research_topic"] = ev.user_query
query = f"arxiv papers about the state of the art of {ev.user_query}"
ctx.write_event_to_stream(
Event(
msg=WorkflowStreamingEvent(
event_type="server_message",
event_sender=inspect.currentframe().f_code.co_name,
event_content={"message": f"Querying Tavily with: '{query}'"},
).model_dump()
)
)
在本例中,我们将事件类型设置为 “server_message”。这意味着这是一条更新信息,不需要用户操作。我们还有另一种事件类型 “request_user_input”,表示需要用户输入。例如,在工作流中的 gather_feedback_outline() 步骤中,根据原始论文摘要生成幻灯片文本大纲后,会发送一条消息提示用户对大纲文本提供批准和反馈:
@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
"""Present user the original paper summary and the outlines generated, gather feedback from user"""
...
# Send a special event indicating that user input is needed
ctx.write_event_to_stream(
Event(
msg=json.dumps(
{
"event_type": "request_user_input",
"event_sender": inspect.currentframe().f_code.co_name,
"event_content": {
"summary": ev.summary,
"outline": ev.outline.dict(),
"message": "Do you approve this outline? If not, please provide feedback.",
},
}
)
)
)
...
这些事件在后端 API 和前端逻辑中的处理方式不同,我将在本文后面的章节中详细介绍。
暂停工作流以等待用户输入
在向用户发送 “request_user_input ”事件时,我们只希望在收到用户输入后才进入下一步。如上工作流程图所示,如果用户批准了大纲,则进入 outlines_with_layout()步骤;如果用户没有批准,则再次进入 summary2outline() 步骤。
这是使用 Python asyncio 库中的 Future() 对象实现的。在 SlideGenerationWorkflow 类中,我们设置了 self.user_input_future = asyncio.Future() 属性,该属性可在 gather_feedback_outline() 步骤中等待。工作流的后续执行以用户反馈内容为条件:
@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
...
# Wait for user input
if not self.user_input_future.done():
user_response = await self.user_input_future
logger.info(f"gather_feedback_outline: Got user response: {user_response}")
# Process user_response, which should be a JSON string
try:
response_data = json.loads(user_response)
approval = response_data.get("approval", "").lower().strip()
feedback = response_data.get("feedback", "").strip()
except json.JSONDecodeError:
# Handle invalid JSON
logger.error("Invalid user response format")
raise Exception("Invalid user response format")
if approval == ":material/thumb_up:":
return OutlineOkEvent(summary=ev.summary, outline=ev.outline)
else:
return OutlineFeedbackEvent(
summary=ev.summary, outline=ev.outline, feedback=feedback
)
FastAPI 后端
我们使用 fastAPI 设置后端,暴露一个 POST 端点来处理请求,然后启动工作流运行。异步函数 run_workflow_endpoint() 将 ResearchTopic 作为输入。在该函数中,定义了异步生成器 event_generator(),它会创建一个运行工作流的任务,并在工作流进行时将事件流传输到客户端。当工作流程结束时,它还会将最终文件结果流式传输到客户端。
class ResearchTopic(BaseModel):
query: str = Field(..., example="example query")
@app.post("/run-slide-gen")
async def run_workflow_endpoint(topic: ResearchTopic):
workflow_id = str(uuid.uuid4())
wf = SummaryAndSlideGenerationWorkflow(wid=workflow_id, timeout=2000, verbose=True)
wf.add_workflows(
summary_gen_wf=SummaryGenerationWorkflow(
wid=workflow_id, timeout=800, verbose=True
)
)
wf.add_workflows(
slide_gen_wf=SlideGenerationWorkflow(
wid=workflow_id, timeout=1200, verbose=True
)
)
async def event_generator():
loop = asyncio.get_running_loop()
logger.debug(f"event_generator: loop id {id(loop)}")
yield f"{json.dumps({'workflow_id': workflow_id})}\n\n"
task = asyncio.create_task(wf.run(user_query=topic.query))
logger.debug(f"event_generator: Created task {task}")
try:
async for ev in wf.stream_events():
logger.info(f"Sending message to frontend: {ev.msg}")
yield f"{ev.msg}\n\n"
await asyncio.sleep(0.1) # Small sleep to ensure proper chunking
final_result = await task
# Construct the download URL
download_pptx_url = f"http://backend:80/download_pptx/{workflow_id}"
download_pdf_url = f"http://backend:80/download_pdf/{workflow_id}"
final_result_with_url = {
"result": final_result,
"download_pptx_url": download_pptx_url,
"download_pdf_url": download_pdf_url,
}
yield f"{json.dumps({'final_result': final_result_with_url})}\n\n"
except Exception as e:
error_message = f"Error in workflow: {str(e)}"
logger.error(error_message)
yield f"{json.dumps({'event': 'error', 'message': error_message})}\n\n"
finally:
# Clean up
workflows.pop(workflow_id, None)
return StreamingResponse(event_generator(), media_type="text/event-stream")
除了这个端点,还有一些端点用于接收来自客户端的用户输入和处理文件下载请求。由于每个工作流都有一个唯一的工作流 ID,因此我们可以将从客户端接收到的用户输入映射到正确的工作流。通过调用等待未来的 set_result(),待处理的工作流就可以继续执行。
@app.post("/submit_user_input")
async def submit_user_input(data: dict = Body(...)):
workflow_id = data.get("workflow_id")
user_input = data.get("user_input")
wf = workflows.get(workflow_id)
if wf and wf.user_input_future:
loop = wf.user_input_future.get_loop() # Get the loop from the future
logger.info(f"submit_user_input: wf.user_input_future loop id {id(loop)}")
if not wf.user_input_future.done():
loop.call_soon_threadsafe(wf.user_input_future.set_result, user_input)
logger.info("submit_user_input: set_result called")
else:
logger.info("submit_user_input: future already done")
return {"status": "input received"}
else:
raise HTTPException(
status_code=404, detail="Workflow not found or future not initialized"
)
下载端点还会根据工作流 ID 确定最终文件的位置。
@app.get("/download_pptx/{workflow_id}")
async def download_pptx(workflow_id: str):
file_path = (
Path(settings.WORKFLOW_ARTIFACTS_PATH)
/ "SlideGenerationWorkflow"
/ workflow_id
/ "final.pptx"
)
if file_path.exists():
return FileResponse(
path=file_path,
media_type="application/vnd.openxmlformats-officedocument.presentationml.presentation",
filename=f"final.pptx",
)
else:
raise HTTPException(status_code=404, detail="File not found")
Streamlit 前端
在前台页面中,用户通过 st.text_input() 提交研究课题后,后台线程会在一个新的事件循环中启动一个长期运行的进程,用于接收来自后台的流式事件,而不会影响页面的其他部分:
def start_long_running_task(url, payload, message_queue, user_input_event):
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
get_stream_data(url, payload, message_queue, user_input_event)
)
loop.close()
except Exception as e:
message_queue.put(("error", f"Exception in background thread: {str(e)}"))
...
def main():
...
with st.sidebar:
with st.form(key="slide_gen_form"):
query = st.text_input(
"Enter the topic of your research:",
)
submit_button = st.form_submit_button(label="Submit")
if submit_button:
# Reset the workflow_complete flag for a new workflow
st.session_state.workflow_complete = False
# Start the long-running task in a separate thread
if (
st.session_state.workflow_thread is None
or not st.session_state.workflow_thread.is_alive()
):
st.write("Starting the background thread...")
st.session_state.workflow_thread = threading.Thread(
target=start_long_running_task,
args=(
"http://backend:80/run-slide-gen",
{"query": query},
st.session_state.message_queue,
st.session_state.user_input_event,
),
)
st.session_state.workflow_thread.start()
st.session_state.received_lines = []
else:
st.write("Background thread is already running.")
httpx.AsyncClient 会获取从后端流式传输过来的事件数据,并将其放入消息队列作进一步处理。根据事件类型提取不同的信息。对于 “request_user_input”(请求用户输入)事件类型,线程也会暂停,直到用户输入完毕。
async def fetch_streaming_data(url: str, payload: dict = None):
async with httpx.AsyncClient(timeout=1200.0) as client:
async with client.stream("POST", url=url, json=payload) as response:
async for line in response.aiter_lines():
if line:
yield line
async def get_stream_data(url, payload, message_queue, user_input_event):
# message_queue.put(("message", "Starting to fetch streaming data..."))
data_json = None
async for data in fetch_streaming_data(url, payload):
if data:
try:
data_json = json.loads(data)
if "workflow_id" in data_json:
# Send workflow_id to main thread
message_queue.put(("workflow_id", data_json["workflow_id"]))
continue
elif "final_result" in data_json:
# Send final_result to main thread
message_queue.put(("final_result", data_json["final_result"]))
continue
event_type = data_json.get("event_type")
event_sender = data_json.get("event_sender")
event_content = data_json.get("event_content")
if event_type in ["request_user_input"]:
# Send the message to the main thread
message_queue.put(("user_input_required", data_json))
# Wait until user input is provided
user_input_event.wait()
user_input_event.clear()
continue
else:
# Send the line to the main thread
message_queue.put(("message", format_workflow_info(data_json)))
except json.JSONDecodeError: # todo: is this necessary?
message_queue.put(("message", data))
if data_json and "final_result" in data_json or "final_result" in str(data):
break # Stop processing after receiving the final result
我们将信息存储在 st.session_state 中,并使用 st.expander() 来显示和更新这些流数据。
if st.session_state.received_lines:
with expander_placeholder.container():
# Create or update the expander with the latest truncated line
expander = st.expander(st.session_state.expander_label)
for line in st.session_state.received_lines:
expander.write(line)
expander.divider()
为确保用户界面保持反应灵敏,并在后台线程处理事件消息时显示消息,我们使用了自定义的自动刷新组件,以设定的时间间隔刷新页面:
if not st.session_state.workflow_complete:
st_autorefresh(interval=2000, limit=None, key="data_refresh")
当流事件类型为 “request_user_input ”时,我们将在单独的容器中显示相关信息并收集用户反馈。由于一个工作流运行中可能有多个事件需要用户输入,因此我们将它们放入一个消息队列,并确保为与每个事件相关联的 st.feedback()、st.text_area() 和 st.button() 指定一个唯一的键,以确保这些部件不会相互干扰:
def gather_outline_feedback(placeholder):
container = placeholder.container()
with container:
if st.session_state.user_input_required:
data = st.session_state.user_input_prompt
event_type = data.get("event_type")
if event_type == "request_user_input":
summary = data.get("event_content").get("summary")
outline = data.get("event_content").get("outline")
prompt_message = data.get("event_content").get(
"message", "Please review the outline."
)
# display the content for user input
st.markdown("## Original Summary:")
st.text_area("Summary", summary, disabled=True, height=400)
st.divider()
st.markdown("## Generated Slide Outline:")
st.json(outline)
st.write(prompt_message)
# Define unique keys for widgets
current_prompt = st.session_state.prompt_counter
approval_key = f"approval_state_{current_prompt}"
feedback_key = f"user_feedback_{current_prompt}"
# Display the approval feedback widget
approval = st.feedback("thumbs", key=approval_key)
st.write(f"Current Approval state is: {approval}")
logging.info(f"Current Approval state is: {approval}")
# Display the feedback text area
feedback = st.text_area(
"Please provide feedback if you have any:", key=feedback_key
)
# Handle the submission of user response
if st.button(
"Submit Feedback", key=f"submit_response_{current_prompt}"
):
if not st.session_state.user_response_submitted:
# Retrieve approval and feedback using unique keys
approval_state = st.session_state.get(approval_key)
user_feedback = st.session_state.get(feedback_key, "")
# Ensure approval_state is valid
if approval_state not in [0, 1]:
st.error("Please select an approval option.")
return
user_response = {
"approval": (
":material/thumb_down:"
if approval_state == 0
else ":material/thumb_up:"
),
"feedback": user_feedback,
}
# Send the user's response to the backend
try:
response = requests.post(
"http://backend:80/submit_user_input",
json={
"workflow_id": st.session_state.workflow_id,
"user_input": json.dumps(user_response),
},
)
response.raise_for_status()
logging.info(
f"Backend response for submitting approval: {response.status_code}"
)
except requests.RequestException as e:
st.error(f"Failed to submit user input: {str(e)}")
return
...
最后,当工作流运行最终结束时,前端客户端将收到一个响应,其中包含最终生成文件的路径(同样的幻灯片,pdf 格式用于在用户界面中渲染,pptx 格式用于下载最终结果)。我们将显示 pdf 文件,并创建一个用于下载 pptx 文件的按钮:
if "download_url_pdf" in st.session_state and st.session_state.download_url_pdf:if "download_url_pdf" in st.session_state and st.session_state.download_url_pdf:
download_url_pdf = st.session_state.download_url_pdf
try:
# Fetch the PDF content
pdf_response = requests.get(download_url_pdf)
pdf_response.raise_for_status()
st.session_state.pdf_data = pdf_response.content
st.markdown("### Generated Slide Deck:")
# Display the PDF using an iframe
st.markdown(
f'<iframe src=_"data:application/pdf;base64,{base64.b64encode(st.session_state.pdf_data).decode()}" width="100%" height="600px" type="application/pdf"></iframe>',
unsafe_allow_html=True,
)
except Exception as e:
st.error(f"Failed to load the PDF file: {str(e)}")
# Provide the download button for PPTX if available
if (
"download_url_pptx" in st.session_state
and st.session_state.download_url_pptx
):
download_url_pptx = st.session_state.download_url_pptx
try:
# Fetch the PPTX content
pptx_response = requests.get(download_url_pptx)
pptx_response.raise_for_status()
pptx_data = pptx_response.content
st.download_button(
label="Download Generated PPTX",
data=pptx_data,
file_name="generated_slides.pptx",
mime="application/vnd.openxmlformats-officedocument.presentationml.presentation",
)
except Exception as e:
st.error(f"Failed to load the PPTX file: {str(e)}")
使用 docker-compose 将一切整合在一起
我们将使用 docker-compose 创建一个多服务 Docker 应用程序,以运行前端和后端应用程序。
version: '3.8'
services:
backend:
build:
context: ./backend
args:
- --no-cache
ports:
- "8000:80"
networks:
- app-network
volumes:
- .env:/app/.env
- ./data:/app/data
- ./workflow_artifacts:/app/workflow_artifacts
- ~/.azure:/root/.azure
frontend:
build:
context: ./frontend
args:
- --no-cache
ports:
- "8501:8501"
networks:
- app-network
networks:
app-network:
就是这样!只需运行 docker-compose,我们现在就拥有了一个应用程序,它可以根据用户输入的查询运行研究工作流,在执行过程中提示用户反馈,并向用户显示最终结果。