观测/数据上报/AI 框架/扣子工作流
扣子工作流
更新于: 2026-06-24 15:45:09
本文介绍如何通过扣子罗盘 SDK 和扣子编程 SDK,将扣子编程工作流执行过程的 Trace 数据与扣子罗盘 Trace 数据串联,并上报至扣子罗盘进行观测分析。
扣子编程支持自动上报工作流执行过程的 Trace 数据到扣子罗盘,实现可视化观测,详细说明请参考平台自动上报。同时,扣子罗盘 SDK 提供自定义上报功能,能够上报多种自定义 Trace 数据,如性能数据、执行过程数据、业务数据等。
当你需要将扣子罗盘 SDK 上报的自定义 Trace 数据与扣子编程自动上报的工作流执行过程 Trace 数据串联,形成一个 Trace 调用树时,可参考本文档集成扣子罗盘 SDK 和扣子编程 SDK 来实现。
目前,支持串联如下工作流执行接口的 Trace 数据。
|
接口 |
功能 |
对应扣子 SDK 方法 |
|---|---|---|
|
/v1/workflows/chat |
执行已发布的对话流。更多信息,请参考执行对话流。 |
CozeCli.Workflows.Chat.Stream |
|
/v1/workflow/run |
执行已发布的工作流。更多信息,请参考执行工作流。 |
CozeCli.Workflows.Runs.Create |
|
/v1/workflow/stream_run |
执行已发布的工作流,响应方式为流式响应。更多信息,请参考执行工作流(流式响应)。 |
CozeCli.Workflows.Runs.Stream |
目前支持通过 Go SDK 串联 Trace 数据。在进行相关操作前,需要先安装 Go SDK。
在上报 Trace 数据前,你需要正确配置扣子罗盘和扣子编程的环境变量,以确保数据能够正确上报。环境变量配置格式及说明如下:
|
环境变量 |
说明 |
|---|---|
|
COZELOOP_API_TOKEN |
用于设置上报数据时所需的扣子罗盘认证信息,配置为扣子罗盘的个人访问令牌或服务访问令牌,获取访问令牌的步骤,请参考配置个人访问令牌、配置服务访问令牌。 说明 设置令牌权限时,需要选择罗盘相关权限和工作流-run权限(用于调用工作流)。 |
|
COZELOOP_WORKSPACE_ID |
配置为扣子罗盘工作空间 ID。获取空间 ID 的步骤,请参考获取扣子罗盘工作空间 ID。 |
|
WORKFLOW_ID |
已发布的工作流 ID。 |
|
PUBLISHED_BOT_ID |
执行对话流或部分工作流时,需要配置关联的智能体 ID,该智能体需已发布。 |
如下示例通过扣子编程 SDK 执行对话流,实现用户与智能体的交互,并集成扣子罗盘 SDK 实现了 Trace 数据串联。Trace 数据串联的具体流程如下:
my_type 的 Span 节点。package main
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"time"
"github.com/coze-dev/coze-go"
"github.com/coze-dev/cozeloop-go"
)
type newTransport struct {
TraceContextFunc func(ctx context.Context) map[string]string
}
// pass span context to http header
func (n *newTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if n.TraceContextFunc != nil && len(n.TraceContextFunc(req.Context())) > 0 {
for key, value := range n.TraceContextFunc(req.Context()) {
req.Header.Set(key, value)
}
}
return http.DefaultTransport.RoundTrip(req)
}
var httpCli *http.Client
var cozeloopClient cozeloop.Client
func init() {
var err error
// init cozeloop client
// global env COZELOOP_WORKSPACE_ID=your workspace id
// global env COZELOOP_API_TOKEN=your token
cozeloopClient, err = cozeloop.NewClient()
if err != nil {
panic(err)
}
httpCli = &http.Client{Timeout: time.Second * 20}
httpCli.Transport = &newTransport{
TraceContextFunc: func(ctx context.Context) map[string]string {
header, err := cozeloop.GetSpanFromContext(ctx).ToHeader()
if err != nil {
fmt.Printf("cozeloop.GetSpanFromContext ToHeader err: %v", err)
return nil
}
return header
},
}
}
func main() {
// Should use the ctx passed down from upstream (for automatic concatenation of traces), and here for simplicity, we will directly use Background
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
// Set the following environment variables first.
// COZELOOP_WORKSPACE_ID=your workspace id, for cozeloop client init by default
// COZELOOP_API_TOKEN=your token, for cozeloop client init by default
token := os.Getenv("COZELOOP_API_TOKEN")
workflowID := os.Getenv("WORKFLOW_ID")
botID := os.Getenv("PUBLISHED_BOT_ID")
authCli := coze.NewTokenAuth(token)
cozeApiBase := "https://api.coze.cn" // os.Getenv("COZE_API_BASE")
// This is a span of cozeloop sdk.
ctx, span := cozeloopClient.StartSpan(ctx, "my_span", "MyType")
// Init the Coze client through the access_token.
cozeCli := coze.NewCozeAPI(authCli, coze.WithBaseURL(cozeApiBase), coze.WithHttpClient(httpCli))
// Step one, create chats
req := &coze.WorkflowsChatStreamReq{
BotID: &botID,
WorkflowID: workflowID,
AdditionalMessages: []*coze.Message{
coze.BuildUserQuestionText("What can you do?", nil),
},
Parameters: map[string]any{
"name": "John",
},
}
resp, err := cozeCli.Workflows.Chat.Stream(ctx, req) // Ctx must be transmitted throughout the entire chain, otherwise trace cannot be concatenated
if err != nil {
fmt.Printf("Error starting chats: %v\n", err)
return
}
defer resp.Close()
for {
event, err := resp.Recv()
if errors.Is(err, io.EOF) {
fmt.Println("Stream finished")
break
}
if err != nil {
fmt.Println(err)
break
}
if event.Event == coze.ChatEventConversationMessageDelta {
fmt.Print(event.Message.Content)
} else if event.Event == coze.ChatEventConversationChatCompleted {
fmt.Printf("Token usage:%d\n", event.Chat.Usage.TokenCount)
} else {
fmt.Printf("\n")
}
}
fmt.Printf("done, log:%s\n", resp.Response().LogID())
span.Finish(ctx) // span finish
cozeloop.Flush(ctx)
}
上报 Trace 数据后,你可以在扣子罗盘的 Trace 页面,筛选SDK上报类型,找到目标 Span,查看上报的 Trace 数据。如示下图中所示,扣子罗盘的 my_span Span 已与扣子编程工作流执行过程的 Span 串联,形成一个 Trace 调用树。
关于串联扣子工作流 Trace 数据的更多 Go SDK 示例代码,请参考 workflow_chat。