工作流操作节点
操作节点是工作流中执行具体任务的组件,它们处理数据、进行计算、调用服务、发送通知等。GeniSpace提供了丰富的预定义操作节点,同时支持自定义操作节点,满足各种复杂业务需求。
操作节点概述
每个操作节点都有:
- 输入数据:节点处理的数据来源
- 配置参数:控制节点行为的设置
- 输出数据:处理后产生的结果
- 错误处理:异常情况的处理方式
数据处理节点
数据转换
将数据从一种格式转换为另一种格式:
{
"type": "transform",
"config": {
"mappings": [
{
"source": "$.input.customer.name",
"target": "$.output.userName",
"transform": "uppercase"
},
{
"source": "$.input.order.items",
"target": "$.output.productList",
"transform": {
"type": "map",
"expression": "item => ({ id: item.productId, quantity: item.qty })"
}
}
],
"outputSchema": {
"type": "object",
"properties": {
"userName": { "type": "string" },
"productList": { "type": "array" }
}
}
}
}
数据筛选
过滤和精简数据:
{
"type": "filter",
"config": {
"source": "$.input.products",
"condition": "item => item.price > 100 && item.stock > 0",
"limit": 10,
"sort": {
"field": "popularity",
"order": "desc"
}
}
}
数据合并
合并多个数据源:
{
"type": "merge",
"config": {
"sources": [
{ "name": "customerData", "path": "$.input.customer" },
{ "name": "orderData", "path": "$.input.order" },
{ "name": "productData", "path": "$.context.products" }
],
"mergeStrategy": "deep",
"conflictResolution": "lastWins"
}
}
数据验证
验证数据是否符合要求:
{
"type": "validate",
"config": {
"schema": {
"type": "object",
"properties": {
"email": {
"type": "string",
"format": "email"
},
"age": {
"type": "number",
"minimum": 18
}
},
"required": ["email", "age"]
},
"customValidators": [
{
"name": "domainCheck",
"expression": "data.email.endsWith('@company.com')",
"message": "必须使用公司邮箱"
}
],
"onValidationError": "throw" // throw, continue, branch
}
}
数据库操作节点
数据查询
从数据库查询数据:
{
"type": "dbQuery",
"config": {
"connection": "main_db",
"queryType": "select",
"query": "SELECT * FROM customers WHERE status = :status AND created_at > :date",
"parameters": {
"status": "$.input.status",
"date": "$.input.startDate"
},
"pagination": {
"enabled": true,
"pageSize": 100,
"maxItems": 1000
}
}
}
数据更新
更新数据库记录:
{
"type": "dbUpdate",
"config": {
"connection": "main_db",
"table": "orders",
"updates": {
"status": "$.input.newStatus",
"updated_at": "${ new Date().toISOString() }"
},
"condition": "id = :orderId",
"parameters": {
"orderId": "$.input.orderId"
},
"returnUpdated": true
}
}
事务操作
在事务中执行多个数据库操作:
{
"type": "dbTransaction",
"config": {
"connection": "main_db",
"operations": [
{
"type": "insert",
"table": "orders",
"data": "$.input.order",
"returnId": true,
"idField": "orderId"
},
{
"type": "insert",
"table": "order_items",
"data": "$.input.items.map(item => ({ ...item, order_id: $.context.orderId }))"
},
{
"type": "update",
"table": "inventory",
"updates": {
"quantity": "inventory.quantity - item.quantity"
},
"condition": "product_id = :productId",
"multipleUpdates": {
"source": "$.input.items",
"parameterField": "productId"
}
}
],
"isolation": "serializable"
}
}
HTTP 和 API 节点
HTTP 请求
调用外部 API:
{
"type": "httpRequest",
"config": {
"url": "https://api.example.com/v1/orders",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer ${$.context.apiToken}"
},
"body": "$.input.orderData",
"timeout": 5000,
"retry": {
"maxAttempts": 3,
"initialDelay": 1000,
"backoffMultiplier": 2,
"retryOn": [500, 502, 503]
},
"responseMapping": {
"orderId": "$.response.data.id",
"status": "$.response.data.status"
}
}
}
GraphQL 请求
执行 GraphQL 查询:
{
"type": "graphqlRequest",
"config": {
"endpoint": "https://api.example.com/graphql",
"query": `
query GetProductDetails($id: ID!) {
product(id: $id) {
id
name
price
availability {
inStock
leadTime
}
}
}
`,
"variables": {
"id": "$.input.productId"
},
"authentication": {
"type": "header",
"headerName": "Authorization",
"value": "Bearer ${$.context.apiToken}"
}
}
}
API 组合
顺序调用多个 API 并处理结果:
{
"type": "apiComposite",
"config": {
"operations": [
{
"name": "getUserProfile",
"type": "httpRequest",
"config": {
"url": "https://api.example.com/users/${$.input.userId}",
"method": "GET"
}
},
{
"name": "getUserOrders",
"type": "httpRequest",
"dependsOn": ["getUserProfile"],
"config": {
"url": "https://api.example.com/users/${$.input.userId}/orders",
"method": "GET"
}
},
{
"name": "getProductDetails",
"type": "httpRequest",
"dependsOn": ["getUserOrders"],
"config": {
"url": "https://api.example.com/products/details",
"method": "POST",
"body": {
"productIds": "$.context.getUserOrders.data.map(order => order.productId)"
}
}
}
],
"outputMapping": {
"user": "$.context.getUserProfile.data",
"orders": "$.context.getUserOrders.data",
"products": "$.context.getProductDetails.data"
}
}
}
AI 和机器学习节点
文本分析
分析文本内容:
{
"type": "textAnalysis",
"config": {
"input": "$.input.customerMessage",
"operations": [
{
"type": "sentiment",
"field": "sentiment"
},
{
"type": "entityExtraction",
"field": "entities",
"entityTypes": ["PERSON", "LOCATION", "ORGANIZATION"]
},
{
"type": "languageDetection",
"field": "language"
},
{
"type": "categorization",
"field": "category",
"categories": ["投诉", "咨询", "反馈", "请求"]
}
],
"modelSettings": {
"provider": "openai",
"model": "gpt-4"
}
}
}
智能总结
生成内容摘要:
{
"type": "contentSummary",
"config": {
"content": "$.input.document",
"maxLength": 200,
"format": "bullet",
"targetLanguage": "zh-CN",
"focusAreas": ["主要观点", "关键数据", "行动项"],
"modelSettings": {
"provider": "geniTask",
"model": "summarizer-v2"
}
}
}
内容生成
生成新的内容:
{
"type": "contentGeneration",
"config": {
"prompt": "为以下产品创建一个吸引人的营销描述:\n产品名称: ${$.input.product.name}\n特点: ${$.input.product.features.join(', ')}\n目标受众: ${$.input.audience}\n",
"parameters": {
"maxTokens": 500,
"temperature": 0.7,
"topP": 0.9
},
"outputFormat": "html",
"systemContext": "你是一位有创意的营销文案撰写者,擅长撰写引人注目的产品描述。",
"modelSettings": {
"provider": "openai",
"model": "gpt-4-turbo"
}
}
}
图像分析
分析图片内容:
{
"type": "imageAnalysis",
"config": {
"image": "$.input.productImage",
"operations": [
{
"type": "objectDetection",
"field": "objects",
"minConfidence": 0.7
},
{
"type": "sceneClassification",
"field": "scene"
},
{
"type": "colorAnalysis",
"field": "dominantColors",
"maxColors": 5
},
{
"type": "textExtraction",
"field": "textContent"
}
],
"modelSettings": {
"provider": "geniTask",
"model": "vision-analyzer-v1"
}
}
}
通知和消息节点
电子邮件
发送电子邮件:
{
"type": "email",
"config": {
"connection": "company_smtp",
"to": ["${$.input.customer.email}"],
"cc": ["support@company.com"],
"subject": "您的订单 #${$.input.order.id} 已确认",
"template": "order_confirmation",
"templateData": {
"customer": "$.input.customer",
"order": "$.input.order",
"products": "$.context.productDetails"
},
"attachments": [
{
"name": "receipt.pdf",
"content": "$.context.receiptPdf",
"contentType": "application/pdf"
}
],
"trackOpens": true,
"priority": "high"
}
}
短信
发送短信通知:
{
"type": "sms",
"config": {
"provider": "twilio",
"to": "${$.input.customer.phone}",
"message": "您好 ${$.input.customer.firstName},您的订单 #${$.input.order.id} 已发货,预计送达时间: ${$.input.shipment.estimatedDelivery}",
"messageType": "transactional"
}
}
推送通知
发送推送通知:
{
"type": "pushNotification",
"config": {
"provider": "firebase",
"recipients": {
"type": "tokens",
"tokens": "$.input.user.deviceTokens"
},
"notification": {
"title": "订单状态更新",
"body": "您的订单 #${$.input.order.id} 状态已更新为: ${$.input.order.status}",
"icon": "order_icon"
},
"data": {
"orderId": "$.input.order.id",
"status": "$.input.order.status",
"deepLink": "app://orders/${$.input.order.id}"
},
"priority": "high",
"ttl": 3600
}
}
消息队列
向消息队列发送消息:
{
"type": "queueMessage",
"config": {
"provider": "rabbitmq",
"connection": "mq_main",
"queue": "order_processing",
"message": "$.input.orderData",
"properties": {
"contentType": "application/json",
"messageType": "order.created",
"priority": 5
},
"persistent": true
}
}
文件操作节点
文件生成
生成文件:
{
"type": "generateFile",
"config": {
"fileType": "pdf",
"template": "invoice_template",
"data": {
"invoice": "$.input.invoice",
"company": "$.context.companyDetails",
"customer": "$.input.customer"
},
"options": {
"paperSize": "A4",
"orientation": "portrait",
"headerTemplate": "<div>发票 #{{invoiceNumber}}</div>",
"footerTemplate": "<div>第 <span class='pageNumber'></span> 页,共 <span class='totalPages'></span> 页</div>"
},
"output": {
"filename": "Invoice_${$.input.invoice.number}.pdf",
"saveToStorage": true,
"storagePath": "invoices/${$.input.customer.id}/"
}
}
}
文件处理
处理上传的文件:
{
"type": "processFile",
"config": {
"input": "$.input.file",
"operations": [
{
"type": "extract",
"fileType": "csv",
"options": {
"delimiter": ",",
"header": true,
"skipEmptyLines": true
},
"output": "extractedData"
},
{
"type": "transform",
"source": "$.context.extractedData",
"transformation": "item => ({
customerId: item.customer_id,
total: parseFloat(item.amount)
})",
"output": "transformedData"
},
{
"type": "validate",
"source": "$.context.transformedData",
"validation": "item => item.total > 0",
"output": "validData"
}
]
}
}
文件存储
存储和管理文件:
{
"type": "fileStorage",
"config": {
"operation": "store",
"file": "$.input.document",
"storage": "aws_s3",
"path": "customers/${$.input.customerId}/documents/${$.input.documentType}/",
"filename": "${$.input.documentName}_${Date.now()}.pdf",
"metadata": {
"documentType": "$.input.documentType",
"uploadedBy": "$.context.currentUser.id",
"tags": "$.input.tags"
},
"accessControl": {
"visibility": "private",
"expiryTime": 86400
}
}
}
集成节点
第三方服务集成
集成外部服务:
{
"type": "serviceIntegration",
"config": {
"service": "salesforce",
"operation": "createLead",
"authentication": {
"type": "oauth2",
"credential": "salesforce_oauth"
},
"input": {
"firstName": "$.input.customer.firstName",
"lastName": "$.input.customer.lastName",
"email": "$.input.customer.email",
"company": "$.input.customer.company",
"source": "Website Form",
"status": "New"
},
"outputMapping": {
"leadId": "$.result.id",
"success": "$.result.success"
}
}
}
支付处理
处理支付交易:
{
"type": "payment",
"config": {
"provider": "stripe",
"operation": "createCharge",
"authentication": {
"secretKey": "${$.context.secrets.stripeApiKey}"
},
"input": {
"amount": "$.input.order.total * 100", // Stripe使用分作为单位
"currency": "$.input.order.currency",
"source": "$.input.paymentMethod.id",
"description": "订单 #${$.input.order.id}",
"metadata": {
"orderId": "$.input.order.id",
"customerId": "$.input.customer.id"
}
},
"onSuccess": {
"updateOrder": {
"status": "paid",
"paymentId": "$.result.id",
"paidAt": "${new Date().toISOString()}"
}
},
"onError": {
"updateOrder": {
"status": "payment_failed",
"failureReason": "$.error.message"
}
}
}
}
自定义节点
自定义代码
执行自定义代码逻辑:
{
"type": "customCode",
"config": {
"runtime": "nodejs16",
"code": `
module.exports = async function(input, context) {
const { customer, order } = input;
// 自定义业务逻辑
let discountRate = 0;
if (customer.vipLevel === 'gold') {
discountRate = 0.1;
} else if (customer.vipLevel === 'platinum') {
discountRate = 0.15;
}
// 计算折扣
const originalTotal = order.items.reduce((sum, item) => sum + (item.price * item.quantity), 0);
const discount = originalTotal * discountRate;
const finalTotal = originalTotal - discount;
return {
originalTotal,
discount,
finalTotal,
discountRate,
customerLevel: customer.vipLevel
};
}
`,
"inputMapping": {
"customer": "$.input.customer",
"order": "$.input.order"
},
"timeout": 3000,
"memoryLimit": 128
}
}
外部服务函数
调用外部函数服务:
{
"type": "function",
"config": {
"provider": "aws_lambda",
"function": "order-processor",
"input": "$.input",
"authentication": {
"type": "iam",
"role": "workflow-execution-role"
},
"timeout": 10000,
"asyncExecution": false
}
}
错误处理与重试
错误处理器
处理执行过程中的错误:
{
"type": "errorHandler",
"config": {
"strategies": [
{
"errorType": "validation",
"action": "resolve",
"value": {
"status": "invalid",
"errors": "$.error.details"
}
},
{
"errorType": "http4xx",
"action": "retry",
"maxRetries": 3,
"delay": 1000,
"backoffMultiplier": 2,
"conditions": [
{
"field": "$.error.status",
"operator": "equals",
"value": 429
}
]
},
{
"errorType": "timeout",
"action": "fallback",
"fallbackNode": "timeoutFallback"
},
{
"errorType": "*",
"action": "throw"
}
],
"onExhaustedRetries": "fallback",
"fallbackNode": "defaultFallback",
"logLevel": "warning"
}
}
超时控制
控制节点执行时间:
{
"type": "timeout",
"config": {
"node": "longRunningOperation",
"timeout": 5000,
"onTimeout": "abort", // abort, continue, fallback
"fallbackNode": "timeoutFallback"
}
}
节点组合与重用
子工作流
创建可重用的子工作流:
{
"type": "subworkflow",
"config": {
"workflowId": "payment-processor",
"version": "latest",
"input": {
"paymentMethod": "$.input.paymentMethod",
"amount": "$.input.order.total",
"currency": "$.input.order.currency",
"description": "订单 #${$.input.order.id}"
},
"waitForCompletion": true,
"timeout": 30000,
"outputMapping": {
"paymentResult": "$.result"
}
}
}
节点组
将多个节点作为一个组:
{
"type": "nodeGroup",
"config": {
"name": "orderProcessing",
"nodes": [
{
"name": "validateOrder",
"type": "validate",
"config": { /* ... */ }
},
{
"name": "calculateTax",
"type": "customCode",
"config": { /* ... */ }
},
{
"name": "processPayment",
"type": "payment",
"config": { /* ... */ }
}
],
"inputMapping": {
"order": "$.input.order",
"customer": "$.input.customer"
},
"outputMapping": {
"result": {
"orderValid": "$.context.validateOrder.valid",
"taxAmount": "$.context.calculateTax.taxAmount",
"paymentStatus": "$.context.processPayment.status"
}
}
}
}
调试与监控
日志节点
记录调试和审计信息:
{
"type": "log",
"config": {
"level": "info",
"message": "处理订单: ${$.input.order.id} - 金额: ${$.input.order.total} ${$.input.order.currency}",
"data": {
"order": "$.input.order",
"customer": {
"id": "$.input.customer.id",
"email": "$.input.customer.email"
},
"processingTime": "$.context.metrics.processingTime"
},
"tags": ["order", "payment"]
}
}
指标收集
收集执行指标:
{
"type": "metrics",
"config": {
"measurements": [
{
"name": "order_processing_time",
"value": "$.context.metrics.processingTime",
"unit": "milliseconds",
"dimensions": {
"orderId": "$.input.order.id",
"orderType": "$.input.order.type",
"customerId": "$.input.customer.id"
}
},
{
"name": "order_value",
"value": "$.input.order.total",
"unit": "currency",
"dimensions": {
"currency": "$.input.order.currency",
"orderType": "$.input.order.type"
}
}
],
"destination": "cloudwatch"
}
}
最佳实践
节点命名与组织
- 使用清晰描述性的节点名称
- 将相关节点组织在节点组中
- 为复杂的节点添加描述和注释
- 使用一致的命名约定
节点配置优化
- 只处理必要的数据,避免冗余操作
- 使用数据映射减少数据传输量
- 配置适当的超时和重试策略
- 为重要节点添加错误处理
安全最佳实践
- 使用环境变量和保密存储管理敏感信息
- 遵循最小权限原则配置集成权限
- 验证和清理所有用户输入
- 使用审计日志记录敏感操作
常见问题
Details
如何创建自定义操作节点?
创建自定义操作节点有两种主要方式:-
使用自定义代码节点 - 对于简单的自定义逻辑,可以使用内置的自定义代码节点,编写JavaScript/Python代码。
-
开发独立节点组件 - 对于更复杂的功能或需要重复使用的组件:
- 使用GeniSpace SDK创建自定义节点包
- 实现节点接口,定义输入/输出模式
- 打包并注册节点
- 发布到组织的节点仓库
Details
如何处理大数据量工作流?
处理大数据量时:- 使用分页和批处理节点
- 配置流式处理而非一次性加载全部数据
- 使用数据过滤节点减少处理量
- 考虑使用专用的大数据处理节点
- 对于特别大的数据集,配置异步执行并实现结果通知机制
Details
如何优化节点性能?
优化节点性能的主要方法:- 仅选择必要的字段进行处理
- 对于数据库操作,优化查询并添加适当索引
- 使用缓存减少重复计算和API调用
- 配置并发执行无相互依赖的节点
- 使用正确的数据结构和算法
- 监控节点执行时间,识别性能瓶颈