工作流上下文管理器
概述
工作流上下文管理器 (ContextManager) 是工作流引擎的核心组件之一,负责统一管理工作流执行过程中的上下文状态。它提供了完整的上下文生命周期管理,包括环境变量解析、子工作流支持、节点结果跟踪、数据继承等功能。
核心特性
- 🔧 统一上下文管理: 集中管理工作流执行期间的所有状态信息
- 🌐 环境变量支持: 自动解析和替换环境变量引用 (
{{varName}}格式) - 🔄 子工作流支持: 完整的多层级工作流嵌套和数据继承机制
- 📋 节点状态跟踪: 实时跟踪节点执行状态和结果
- 🧬 数据继承: 支持父子工作流之间的数据传递和隔离
- ⚡ 高性能: 优化的数据结构确保大规模工作流的高效执行
上下文结构
基础上下文结构
{
// 环境变量
env: {
API_URL: "https://api.example.com",
DATABASE_URL: "mongodb://localhost:27017",
DEBUG_MODE: "true"
},
// 执行信息
execution: {
teamId: "team_123",
userId: "user_456",
workflowId: "workflow_789",
executionId: "exec_abc"
},
// 团队和用户信息
teamId: "team_123",
userId: "user_456",
// 节点执行结果
"node-1": { output: "result1" },
"node-2": { output: "result2" },
// 内部状态管理
_processedNodes: Set, // 已处理的节点集合
_isSubflow: false, // 是否为子工作流
_parentContext: null, // 父上下文引用
_level: 0 // 嵌套层级
}
特殊属性说明
| 属性 | 类型 | 说明 |
|---|---|---|
_processedNodes | Set | 已处理节点的集合,用于防重复处理 |
_isSubflow | boolean | 标识当前上下文是否属于子工作流 |
_parentContext | Object | 父上下文的引用,用于数据继承 |
_level | number | 嵌套层级,根工作流为0,子工作流递增 |
API 参考
构造函数
new ContextManager(options)
创建新的上下文管理器实例。
参数:
{
execution: { // ✅ 必填 - 执行对象
teamId: string, // 团队ID
userId: string, // 用户ID
workflowId: string, // 工作流ID
executionId: string // 执行ID
},
envVars: Object, // ❌ 可选 - 环境变量字典
parentContext: Object // ❌ 可选 - 父级上下文(子工作流使用)
}
示例:
const contextManager = new ContextManager({
execution: {
teamId: 'team_123',
userId: 'user_456',
workflowId: 'workflow_789',
executionId: 'exec_abc'
},
envVars: {
API_URL: 'https://api.example.com',
DATABASE_URL: 'mongodb://localhost:27017'
}
});
核心方法
createExecutionContext(initialContext)
创建完整的执行上下文,合并基础上下文、父上下文继承数据和初始输入数据。
参数:
initialContext(Object): 初始上下文数据
返回值:
- (Object): 完整的执行上下文
示例:
const executionContext = contextManager.createExecutionContext({
'input-node': { data: 'initial data' },
'config-node': { settings: { timeout: 30 } }
});
updateNodeResult(nodeId, result)
更新指定节点的执行结果并标记为已处理。
参数:
nodeId(string): 节点标识符result(any): 节点执行结果
示例:
contextManager.updateNodeResult('data-processor', {
processedData: 'transformed data',
status: 'success'
});
getNodeResult(nodeId)
获取指定节点的执行结果。
参数:
nodeId(string): 节点标识符
返回值:
- (any): 节点执行结果,如果节点未执行则返回 undefined
示例:
const result = contextManager.getNodeResult('data-processor');
if (result) {
console.log('节点结果:', result);
}
isNodeProcessed(nodeId)
检查指定节点是否已经被处理。
参数:
nodeId(string): 节点标识符
返回值:
- (boolean): 如果节点已处理返回 true,否则返回 false
示例:
if (!contextManager.isNodeProcessed('data-processor')) {
// 节点尚未处理,可以执行
await executeNode('data-processor');
}
环境变量管理
环境变量语法
上下文管理器支持使用 {{variableName}} 语法在任何字符串值中引用环境变量。
支持的格式:
// 简单变量替换
"{{API_URL}}/users"
// 在复杂字符串中使用
"连接到数据库: {{DATABASE_URL}}"
// 在配置对象中使用
{
url: "{{API_URL}}",
timeout: "{{REQUEST_TIMEOUT}}"
}
resolveEnvironmentVariables(input)
递归解析输入数据中的所有环境变量引用。
参数:
input(any): 需要解析的输入数据
返回值:
- (any): 解析后的数据
支持的数据类型:
- ✅ 字符串: 直接替换变量引用
- ✅ 数组: 递归处理每个元素
- ✅ 对象: 递归处理所有属性值
- ✅ 其他类型: 直接返回原值
示例:
// 设置环境变量
const contextManager = new ContextManager({
execution: { /* ... */ },
envVars: {
API_URL: 'https://api.example.com',
DATABASE_NAME: 'production_db',
TIMEOUT: '5000'
}
});
// 解析包含环境变量的配置
const config = {
endpoint: "{{API_URL}}/data",
database: {
name: "{{DATABASE_NAME}}",
timeout: "{{TIMEOUT}}"
},
urls: [
"{{API_URL}}/users",
"{{API_URL}}/orders"
]
};
const resolved = contextManager.resolveEnvironmentVariables(config);
console.log(resolved);
// 输出:
// {
// endpoint: "https://api.example.com/data",
// database: {
// name: "production_db",
// timeout: "5000"
// },
// urls: [
// "https://api.example.com/users",
// "https://api.example.com/orders"
// ]
// }
子工作流支持
多层级工作流架构
根工作流 (Level 0)
├── 节点A
├── 控制节点B (触发子工作流)
│ └── 子工作流 (Level 1)
│ ├── 子节点C
│ ├── 子控制节点D (触发子子工作流)
│ │ └── 子子工作流 (Level 2)
│ │ └── 子子节点E
│ └── 子节点F
└── 节点G
数据继承机制
inheritFromParent()
从父上下文继承数据,自动过滤内部状态属性。
继承规则:
- ✅ 继承所有节点执行结果
- ✅ 继承用户自定义数据
- ❌ 不继承以
_开头的内部属性 - ❌ 不继承
env和execution对象
示例:
// 父工作流上下文
const parentContext = {
'node-1': { output: 'parent result' },
'node-2': { data: 'shared data' },
customData: { shared: true },
_processedNodes: new Set(['node-1', 'node-2']),
env: { /* 环境变量 */ },
execution: { /* 执行信息 */ }
};
// 创建子工作流上下文管理器
const childContextManager = new ContextManager({
execution: { /* 子工作流执行信息 */ },
parentContext: parentContext
});
// 子工作流将继承:
// - 'node-1': { output: 'parent result' }
// - 'node-2': { data: 'shared data' }
// - customData: { shared: true }
mergeSubflowResults(subflowResult, controlNodeId)
将子工作流的执行结果合并回父工作流上下文。
参数:
subflowResult(Object): 子工作流执行结果controlNodeId(string): 触发子工作流的控制节点ID
合并规则:
- ✅ 合并所有节点执行结果
- ✅ 合并已处理节点集合
- ❌ 不合并内部状态属性
示例:
// 子工作流执行结果
const subflowResult = {
'sub-node-1': { output: 'sub result 1' },
'sub-node-2': { output: 'sub result 2' },
_processedNodes: new Set(['sub-node-1', 'sub-node-2']),
_level: 1,
_isSubflow: true
};
// 合并到父工作流
contextManager.mergeSubflowResults(subflowResult, 'if-else-control');
// 父工作流上下文现在包含:
// - 原有的节点结果
// - 'sub-node-1': { output: 'sub result 1' }
// - 'sub-node-2': { output: 'sub result 2' }
// - 更新的 _processedNodes 集合
节点状态管理
节点生命周期
待处理 → 执行中 → 已完成
↓ ↓ ↓
未记录 处理中 记录结果
状态跟踪最佳实践
// 1. 检查节点是否已处理(避免重复执行)
if (contextManager.isNodeProcessed('data-processor')) {
console.log('节点已处理,跳过执行');
return contextManager.getNodeResult('data-processor');
}
// 2. 执行节点逻辑
const result = await executeNodeLogic();
// 3. 更新节点结果(自动标记为已处理)
contextManager.updateNodeResult('data-processor', result);
// 4. 后续节点可以获取结果
const processedData = contextManager.getNodeResult('data-processor');
实用工具方法
getFinalOutputs(nodes)
提取工作流的最终输出结果。
参数:
nodes(Object): 工作流节点定义
返回值:
- (Object): 包含所有已执行节点结果的对象
示例:
const nodes = {
'input-processor': { /* 节点定义 */ },
'data-transformer': { /* 节点定义 */ },
'output-formatter': { /* 节点定义 */ }
};
const outputs = contextManager.getFinalOutputs(nodes);
// 输出示例:
// {
// 'input-processor': { processedInput: '...' },
// 'data-transformer': { transformedData: '...' },
// 'output-formatter': { formattedOutput: '...' }
// }
getStatus()
获取上下文管理器的当前状态信息。
返回值:
{
level: number, // 嵌套层级
isSubflow: boolean, // 是否为子工作流
processedNodesCount: number, // 已处理节点数量
envVarsCount: number, // 环境变量数量
hasParent: boolean, // 是否有父上下文
contextKeysCount: number // 上下文键数量
}
示例:
const status = contextManager.getStatus();
console.log('上下文状态:', status);
// 输出:
// {
// level: 0,
// isSubflow: false,
// processedNodesCount: 3,
// envVarsCount: 5,
// hasParent: false,
// contextKeysCount: 12
// }
完整使用示例
示例 1: 基础工作流执行
const ContextManager = require('./src/core/ContextManager');
// 1. 创建上下文管理器
const contextManager = new ContextManager({
execution: {
teamId: 'team_123',
userId: 'user_456',
workflowId: 'data-processing-workflow',
executionId: 'exec_789'
},
envVars: {
API_URL: 'https://api.example.com',
DATABASE_URL: 'mongodb://localhost:27017',
TIMEOUT: '5000'
}
});
// 2. 创建执行上下文
const context = contextManager.createExecutionContext({
'input-node': {
data: 'raw data to process'
}
});
// 3. 模拟节点执行
async function executeWorkflow() {
// 执行数据处理节点
if (!contextManager.isNodeProcessed('data-processor')) {
const inputData = contextManager.getNodeResult('input-node');
const processedData = await processData(inputData);
contextManager.updateNodeResult('data-processor', processedData);
}
// 执行数据转换节点
if (!contextManager.isNodeProcessed('data-transformer')) {
const processedData = contextManager.getNodeResult('data-processor');
const transformedData = await transformData(processedData);
contextManager.updateNodeResult('data-transformer', transformedData);
}
// 获取最终结果
const finalOutputs = contextManager.getFinalOutputs({
'input-node': {},
'data-processor': {},
'data-transformer': {}
});
return finalOutputs;
}
示例 2: 环境变量使用
// 节点配置包含环境变量引用
const nodeConfig = {
apiEndpoint: "{{API_URL}}/data",
database: {
connectionString: "{{DATABASE_URL}}",
timeout: "{{TIMEOUT}}"
},
headers: {
authorization: "Bearer {{API_TOKEN}}"
}
};
// 解析环境变量
const resolvedConfig = contextManager.resolveEnvironmentVariables(nodeConfig);
// resolvedConfig 将包含实际的环境变量值
console.log(resolvedConfig);
// {
// apiEndpoint: "https://api.example.com/data",
// database: {
// connectionString: "mongodb://localhost:27017",
// timeout: "5000"
// },
// headers: {
// authorization: "Bearer your-actual-token"
// }
// }
示例 3: 子工作流处理
// 父工作流上下文管理器
const parentContextManager = new ContextManager({
execution: { /* 父工作流执行信息 */ },
envVars: { /* 环境变量 */ }
});
// 父工作流执行一些节点
parentContextManager.updateNodeResult('parent-node-1', { data: 'parent result' });
// 创建子工作流上下文管理器
const childContextManager = new ContextManager({
execution: { /* 子工作流执行信息 */ },
parentContext: parentContextManager.getContext()
});
// 子工作流可以访问父工作流的结果
const childContext = childContextManager.createExecutionContext();
console.log(childContext['parent-node-1']); // { data: 'parent result' }
// 子工作流执行
childContextManager.updateNodeResult('child-node-1', { data: 'child result' });
// 将子工作流结果合并回父工作流
const childResult = childContextManager.getContext();
parentContextManager.mergeSubflowResults(childResult, 'control-node');
// 父工作流现在可以访问子工作流的结果
console.log(parentContextManager.getNodeResult('child-node-1')); // { data: 'child result' }
性能优化
大规模工作流优化
1. 节点状态检查优化
// ✅ 推荐:使用 Set 进行快速查找
if (contextManager.isNodeProcessed(nodeId)) {
return contextManager.getNodeResult(nodeId);
}
// ❌ 避免:重复获取结果进行判断
const result = contextManager.getNodeResult(nodeId);
if (result !== undefined) {
return result;
}
2. 环境变量解析优化
// ✅ 推荐:批量解析配置
const resolvedConfigs = contextManager.resolveEnvironmentVariables(allNodeConfigs);
// ❌ 避免:逐个解析
Object.keys(nodeConfigs).forEach(nodeId => {
nodeConfigs[nodeId] = contextManager.resolveEnvironmentVariables(nodeConfigs[nodeId]);
});
3. 内存管理
// 对于长时间运行的工作流,定期清理不需要的节点结果
function cleanupOldResults(contextManager, keepNodeIds) {
const context = contextManager.getContext();
Object.keys(context).forEach(key => {
if (!key.startsWith('_') && !keepNodeIds.includes(key)) {
delete context[key];
}
});
}
调试和监控
启用详细日志
// 通过环境变量启用调试模式
process.env.LOG_LEVEL = 'debug';
const contextManager = new ContextManager({
// ... 配置
});
// 上下文管理器会输出详细的执行日志
// 包括: 节点结果更新、环境变量解析、子工作流合并等
状态监控
// 定期检查上下文状态
setInterval(() => {
const status = contextManager.getStatus();
console.log('上下文状态监控:', {
processedNodes: status.processedNodesCount,
level: status.level,
memoryUsage: process.memoryUsage()
});
}, 5000);
常见问题排查
1. 环境变量未解析
症状: 输出中仍包含 {{variableName}} 格式的字符串
排查步骤:
// 检查环境变量是否正确设置
console.log('环境变量:', contextManager.getContext().env);
// 检查变量名是否匹配
const testResolve = contextManager.resolveEnvironmentVariables("{{API_URL}}");
console.log('解析结果:', testResolve);
2. 子工作流数据未继承
症状: 子工作流无法访问父工作流的节点结果
排查步骤:
// 检查父上下文是否正确传递
console.log('是否为子工作流:', contextManager.getContext()._isSubflow);
console.log('父上下文:', contextManager.getContext()._parentContext);
// 检查继承的数据
const inherited = contextManager.inheritFromParent();
console.log('继承的数据:', inherited);
3. 节点重复执行
症状: 同一个节点被执行多次
排查步骤:
// 在节点执行前检查状态
console.log('节点是否已处理:', contextManager.isNodeProcessed(nodeId));
console.log('已处理节点列表:', Array.from(contextManager.getContext()._processedNodes));
最佳实践
1. 上下文设计原则
- 单一职责: 每个上下文管理器只负责一个工作流实例
- 状态隔离: 不同工作流实例之间的上下文完全隔离
- 数据最小化: 只在上下文中保存必要的数据,避免内存泄漏
2. 环境变量管理
// ✅ 推荐:集中管理环境变量
const envVars = {
// API 配置
API_URL: process.env.API_URL || 'https://api.default.com',
API_TOKEN: process.env.API_TOKEN,
// 数据库配置
DATABASE_URL: process.env.DATABASE_URL,
DATABASE_TIMEOUT: process.env.DATABASE_TIMEOUT || '5000',
// 功能开关
ENABLE_CACHE: process.env.ENABLE_CACHE || 'false',
DEBUG_MODE: process.env.DEBUG_MODE || 'false'
};
const contextManager = new ContextManager({
execution: executionInfo,
envVars: envVars
});
3. 错误处理
// 节点执行错误处理
try {
const result = await executeNode(nodeId, inputs);
contextManager.updateNodeResult(nodeId, result);
} catch (error) {
// 记录错误状态
contextManager.updateNodeResult(nodeId, {
error: error.message,
status: 'failed',
timestamp: new Date().toISOString()
});
throw error;
}
4. 资源清理
// 工作流执行完成后清理资源
function cleanupContext(contextManager) {
const context = contextManager.getContext();
// 清理大对象引用
Object.keys(context).forEach(key => {
if (typeof context[key] === 'object' && context[key] !== null) {
if (context[key].largeData) {
delete context[key].largeData;
}
}
});
}