跳到主要内容

工作流上下文管理器

概述

工作流上下文管理器 (ContextManager) 是工作流引擎的核心组件之一,负责统一管理工作流执行过程中的上下文状态。它提供了完整的上下文生命周期管理,包括环境变量解析、子工作流支持、节点结果跟踪、数据继承等功能。

核心特性

  • 🔧 统一上下文管理: 集中管理工作流执行期间的所有状态信息
  • 🌐 环境变量支持: 自动解析和替换环境变量引用 ({{varName}} 格式)
  • 🔄 子工作流支持: 完整的多层级工作流嵌套和数据继承机制
  • 📋 节点状态跟踪: 实时跟踪节点执行状态和结果
  • 🧬 数据继承: 支持父子工作流之间的数据传递和隔离
  • 高性能: 优化的数据结构确保大规模工作流的高效执行

上下文结构

基础上下文结构

{
// 环境变量
env: {
API_URL: "https://api.example.com",
DATABASE_URL: "mongodb://localhost:27017",
DEBUG_MODE: "true"
},

// 执行信息
execution: {
teamId: "team_123",
userId: "user_456",
workflowId: "workflow_789",
executionId: "exec_abc"
},

// 团队和用户信息
teamId: "team_123",
userId: "user_456",

// 节点执行结果
"node-1": { output: "result1" },
"node-2": { output: "result2" },

// 内部状态管理
_processedNodes: Set, // 已处理的节点集合
_isSubflow: false, // 是否为子工作流
_parentContext: null, // 父上下文引用
_level: 0 // 嵌套层级
}

特殊属性说明

属性类型说明
_processedNodesSet已处理节点的集合,用于防重复处理
_isSubflowboolean标识当前上下文是否属于子工作流
_parentContextObject父上下文的引用,用于数据继承
_levelnumber嵌套层级,根工作流为0,子工作流递增

API 参考

构造函数

new ContextManager(options)

创建新的上下文管理器实例。

参数:

{
execution: { // ✅ 必填 - 执行对象
teamId: string, // 团队ID
userId: string, // 用户ID
workflowId: string, // 工作流ID
executionId: string // 执行ID
},
envVars: Object, // ❌ 可选 - 环境变量字典
parentContext: Object // ❌ 可选 - 父级上下文(子工作流使用)
}

示例:

const contextManager = new ContextManager({
execution: {
teamId: 'team_123',
userId: 'user_456',
workflowId: 'workflow_789',
executionId: 'exec_abc'
},
envVars: {
API_URL: 'https://api.example.com',
DATABASE_URL: 'mongodb://localhost:27017'
}
});

核心方法

createExecutionContext(initialContext)

创建完整的执行上下文,合并基础上下文、父上下文继承数据和初始输入数据。

参数:

  • initialContext (Object): 初始上下文数据

返回值:

  • (Object): 完整的执行上下文

示例:

const executionContext = contextManager.createExecutionContext({
'input-node': { data: 'initial data' },
'config-node': { settings: { timeout: 30 } }
});

updateNodeResult(nodeId, result)

更新指定节点的执行结果并标记为已处理。

参数:

  • nodeId (string): 节点标识符
  • result (any): 节点执行结果

示例:

contextManager.updateNodeResult('data-processor', {
processedData: 'transformed data',
status: 'success'
});

getNodeResult(nodeId)

获取指定节点的执行结果。

参数:

  • nodeId (string): 节点标识符

返回值:

  • (any): 节点执行结果,如果节点未执行则返回 undefined

示例:

const result = contextManager.getNodeResult('data-processor');
if (result) {
console.log('节点结果:', result);
}

isNodeProcessed(nodeId)

检查指定节点是否已经被处理。

参数:

  • nodeId (string): 节点标识符

返回值:

  • (boolean): 如果节点已处理返回 true,否则返回 false

示例:

if (!contextManager.isNodeProcessed('data-processor')) {
// 节点尚未处理,可以执行
await executeNode('data-processor');
}

环境变量管理

环境变量语法

上下文管理器支持使用 {{variableName}} 语法在任何字符串值中引用环境变量。

支持的格式:

// 简单变量替换
"{{API_URL}}/users"

// 在复杂字符串中使用
"连接到数据库: {{DATABASE_URL}}"

// 在配置对象中使用
{
url: "{{API_URL}}",
timeout: "{{REQUEST_TIMEOUT}}"
}

resolveEnvironmentVariables(input)

递归解析输入数据中的所有环境变量引用。

参数:

  • input (any): 需要解析的输入数据

返回值:

  • (any): 解析后的数据

支持的数据类型:

  • ✅ 字符串: 直接替换变量引用
  • ✅ 数组: 递归处理每个元素
  • ✅ 对象: 递归处理所有属性值
  • ✅ 其他类型: 直接返回原值

示例:

// 设置环境变量
const contextManager = new ContextManager({
execution: { /* ... */ },
envVars: {
API_URL: 'https://api.example.com',
DATABASE_NAME: 'production_db',
TIMEOUT: '5000'
}
});

// 解析包含环境变量的配置
const config = {
endpoint: "{{API_URL}}/data",
database: {
name: "{{DATABASE_NAME}}",
timeout: "{{TIMEOUT}}"
},
urls: [
"{{API_URL}}/users",
"{{API_URL}}/orders"
]
};

const resolved = contextManager.resolveEnvironmentVariables(config);
console.log(resolved);
// 输出:
// {
// endpoint: "https://api.example.com/data",
// database: {
// name: "production_db",
// timeout: "5000"
// },
// urls: [
// "https://api.example.com/users",
// "https://api.example.com/orders"
// ]
// }

子工作流支持

多层级工作流架构

根工作流 (Level 0)
├── 节点A
├── 控制节点B (触发子工作流)
│ └── 子工作流 (Level 1)
│ ├── 子节点C
│ ├── 子控制节点D (触发子子工作流)
│ │ └── 子子工作流 (Level 2)
│ │ └── 子子节点E
│ └── 子节点F
└── 节点G

数据继承机制

inheritFromParent()

从父上下文继承数据,自动过滤内部状态属性。

继承规则:

  • ✅ 继承所有节点执行结果
  • ✅ 继承用户自定义数据
  • ❌ 不继承以 _ 开头的内部属性
  • ❌ 不继承 envexecution 对象

示例:

// 父工作流上下文
const parentContext = {
'node-1': { output: 'parent result' },
'node-2': { data: 'shared data' },
customData: { shared: true },
_processedNodes: new Set(['node-1', 'node-2']),
env: { /* 环境变量 */ },
execution: { /* 执行信息 */ }
};

// 创建子工作流上下文管理器
const childContextManager = new ContextManager({
execution: { /* 子工作流执行信息 */ },
parentContext: parentContext
});

// 子工作流将继承:
// - 'node-1': { output: 'parent result' }
// - 'node-2': { data: 'shared data' }
// - customData: { shared: true }

mergeSubflowResults(subflowResult, controlNodeId)

将子工作流的执行结果合并回父工作流上下文。

参数:

  • subflowResult (Object): 子工作流执行结果
  • controlNodeId (string): 触发子工作流的控制节点ID

合并规则:

  • ✅ 合并所有节点执行结果
  • ✅ 合并已处理节点集合
  • ❌ 不合并内部状态属性

示例:

// 子工作流执行结果
const subflowResult = {
'sub-node-1': { output: 'sub result 1' },
'sub-node-2': { output: 'sub result 2' },
_processedNodes: new Set(['sub-node-1', 'sub-node-2']),
_level: 1,
_isSubflow: true
};

// 合并到父工作流
contextManager.mergeSubflowResults(subflowResult, 'if-else-control');

// 父工作流上下文现在包含:
// - 原有的节点结果
// - 'sub-node-1': { output: 'sub result 1' }
// - 'sub-node-2': { output: 'sub result 2' }
// - 更新的 _processedNodes 集合

节点状态管理

节点生命周期

待处理 → 执行中 → 已完成
↓ ↓ ↓
未记录 处理中 记录结果

状态跟踪最佳实践

// 1. 检查节点是否已处理(避免重复执行)
if (contextManager.isNodeProcessed('data-processor')) {
console.log('节点已处理,跳过执行');
return contextManager.getNodeResult('data-processor');
}

// 2. 执行节点逻辑
const result = await executeNodeLogic();

// 3. 更新节点结果(自动标记为已处理)
contextManager.updateNodeResult('data-processor', result);

// 4. 后续节点可以获取结果
const processedData = contextManager.getNodeResult('data-processor');

实用工具方法

getFinalOutputs(nodes)

提取工作流的最终输出结果。

参数:

  • nodes (Object): 工作流节点定义

返回值:

  • (Object): 包含所有已执行节点结果的对象

示例:

const nodes = {
'input-processor': { /* 节点定义 */ },
'data-transformer': { /* 节点定义 */ },
'output-formatter': { /* 节点定义 */ }
};

const outputs = contextManager.getFinalOutputs(nodes);
// 输出示例:
// {
// 'input-processor': { processedInput: '...' },
// 'data-transformer': { transformedData: '...' },
// 'output-formatter': { formattedOutput: '...' }
// }

getStatus()

获取上下文管理器的当前状态信息。

返回值:

{
level: number, // 嵌套层级
isSubflow: boolean, // 是否为子工作流
processedNodesCount: number, // 已处理节点数量
envVarsCount: number, // 环境变量数量
hasParent: boolean, // 是否有父上下文
contextKeysCount: number // 上下文键数量
}

示例:

const status = contextManager.getStatus();
console.log('上下文状态:', status);
// 输出:
// {
// level: 0,
// isSubflow: false,
// processedNodesCount: 3,
// envVarsCount: 5,
// hasParent: false,
// contextKeysCount: 12
// }

完整使用示例

示例 1: 基础工作流执行

const ContextManager = require('./src/core/ContextManager');

// 1. 创建上下文管理器
const contextManager = new ContextManager({
execution: {
teamId: 'team_123',
userId: 'user_456',
workflowId: 'data-processing-workflow',
executionId: 'exec_789'
},
envVars: {
API_URL: 'https://api.example.com',
DATABASE_URL: 'mongodb://localhost:27017',
TIMEOUT: '5000'
}
});

// 2. 创建执行上下文
const context = contextManager.createExecutionContext({
'input-node': {
data: 'raw data to process'
}
});

// 3. 模拟节点执行
async function executeWorkflow() {
// 执行数据处理节点
if (!contextManager.isNodeProcessed('data-processor')) {
const inputData = contextManager.getNodeResult('input-node');
const processedData = await processData(inputData);
contextManager.updateNodeResult('data-processor', processedData);
}

// 执行数据转换节点
if (!contextManager.isNodeProcessed('data-transformer')) {
const processedData = contextManager.getNodeResult('data-processor');
const transformedData = await transformData(processedData);
contextManager.updateNodeResult('data-transformer', transformedData);
}

// 获取最终结果
const finalOutputs = contextManager.getFinalOutputs({
'input-node': {},
'data-processor': {},
'data-transformer': {}
});

return finalOutputs;
}

示例 2: 环境变量使用

// 节点配置包含环境变量引用
const nodeConfig = {
apiEndpoint: "{{API_URL}}/data",
database: {
connectionString: "{{DATABASE_URL}}",
timeout: "{{TIMEOUT}}"
},
headers: {
authorization: "Bearer {{API_TOKEN}}"
}
};

// 解析环境变量
const resolvedConfig = contextManager.resolveEnvironmentVariables(nodeConfig);

// resolvedConfig 将包含实际的环境变量值
console.log(resolvedConfig);
// {
// apiEndpoint: "https://api.example.com/data",
// database: {
// connectionString: "mongodb://localhost:27017",
// timeout: "5000"
// },
// headers: {
// authorization: "Bearer your-actual-token"
// }
// }

示例 3: 子工作流处理

// 父工作流上下文管理器
const parentContextManager = new ContextManager({
execution: { /* 父工作流执行信息 */ },
envVars: { /* 环境变量 */ }
});

// 父工作流执行一些节点
parentContextManager.updateNodeResult('parent-node-1', { data: 'parent result' });

// 创建子工作流上下文管理器
const childContextManager = new ContextManager({
execution: { /* 子工作流执行信息 */ },
parentContext: parentContextManager.getContext()
});

// 子工作流可以访问父工作流的结果
const childContext = childContextManager.createExecutionContext();
console.log(childContext['parent-node-1']); // { data: 'parent result' }

// 子工作流执行
childContextManager.updateNodeResult('child-node-1', { data: 'child result' });

// 将子工作流结果合并回父工作流
const childResult = childContextManager.getContext();
parentContextManager.mergeSubflowResults(childResult, 'control-node');

// 父工作流现在可以访问子工作流的结果
console.log(parentContextManager.getNodeResult('child-node-1')); // { data: 'child result' }

性能优化

大规模工作流优化

1. 节点状态检查优化

// ✅ 推荐:使用 Set 进行快速查找
if (contextManager.isNodeProcessed(nodeId)) {
return contextManager.getNodeResult(nodeId);
}

// ❌ 避免:重复获取结果进行判断
const result = contextManager.getNodeResult(nodeId);
if (result !== undefined) {
return result;
}

2. 环境变量解析优化

// ✅ 推荐:批量解析配置
const resolvedConfigs = contextManager.resolveEnvironmentVariables(allNodeConfigs);

// ❌ 避免:逐个解析
Object.keys(nodeConfigs).forEach(nodeId => {
nodeConfigs[nodeId] = contextManager.resolveEnvironmentVariables(nodeConfigs[nodeId]);
});

3. 内存管理

// 对于长时间运行的工作流,定期清理不需要的节点结果
function cleanupOldResults(contextManager, keepNodeIds) {
const context = contextManager.getContext();
Object.keys(context).forEach(key => {
if (!key.startsWith('_') && !keepNodeIds.includes(key)) {
delete context[key];
}
});
}

调试和监控

启用详细日志

// 通过环境变量启用调试模式
process.env.LOG_LEVEL = 'debug';

const contextManager = new ContextManager({
// ... 配置
});

// 上下文管理器会输出详细的执行日志
// 包括: 节点结果更新、环境变量解析、子工作流合并等

状态监控

// 定期检查上下文状态
setInterval(() => {
const status = contextManager.getStatus();
console.log('上下文状态监控:', {
processedNodes: status.processedNodesCount,
level: status.level,
memoryUsage: process.memoryUsage()
});
}, 5000);

常见问题排查

1. 环境变量未解析

症状: 输出中仍包含 {{variableName}} 格式的字符串

排查步骤:

// 检查环境变量是否正确设置
console.log('环境变量:', contextManager.getContext().env);

// 检查变量名是否匹配
const testResolve = contextManager.resolveEnvironmentVariables("{{API_URL}}");
console.log('解析结果:', testResolve);

2. 子工作流数据未继承

症状: 子工作流无法访问父工作流的节点结果

排查步骤:

// 检查父上下文是否正确传递
console.log('是否为子工作流:', contextManager.getContext()._isSubflow);
console.log('父上下文:', contextManager.getContext()._parentContext);

// 检查继承的数据
const inherited = contextManager.inheritFromParent();
console.log('继承的数据:', inherited);

3. 节点重复执行

症状: 同一个节点被执行多次

排查步骤:

// 在节点执行前检查状态
console.log('节点是否已处理:', contextManager.isNodeProcessed(nodeId));
console.log('已处理节点列表:', Array.from(contextManager.getContext()._processedNodes));

最佳实践

1. 上下文设计原则

  • 单一职责: 每个上下文管理器只负责一个工作流实例
  • 状态隔离: 不同工作流实例之间的上下文完全隔离
  • 数据最小化: 只在上下文中保存必要的数据,避免内存泄漏

2. 环境变量管理

// ✅ 推荐:集中管理环境变量
const envVars = {
// API 配置
API_URL: process.env.API_URL || 'https://api.default.com',
API_TOKEN: process.env.API_TOKEN,

// 数据库配置
DATABASE_URL: process.env.DATABASE_URL,
DATABASE_TIMEOUT: process.env.DATABASE_TIMEOUT || '5000',

// 功能开关
ENABLE_CACHE: process.env.ENABLE_CACHE || 'false',
DEBUG_MODE: process.env.DEBUG_MODE || 'false'
};

const contextManager = new ContextManager({
execution: executionInfo,
envVars: envVars
});

3. 错误处理

// 节点执行错误处理
try {
const result = await executeNode(nodeId, inputs);
contextManager.updateNodeResult(nodeId, result);
} catch (error) {
// 记录错误状态
contextManager.updateNodeResult(nodeId, {
error: error.message,
status: 'failed',
timestamp: new Date().toISOString()
});
throw error;
}

4. 资源清理

// 工作流执行完成后清理资源
function cleanupContext(contextManager) {
const context = contextManager.getContext();

// 清理大对象引用
Object.keys(context).forEach(key => {
if (typeof context[key] === 'object' && context[key] !== null) {
if (context[key].largeData) {
delete context[key].largeData;
}
}
});
}