跳到主要内容

工作流操作节点

操作节点是工作流中执行具体任务的组件,它们处理数据、进行计算、调用服务、发送通知等。GeniSpace提供了丰富的预定义操作节点,同时支持自定义操作节点,满足各种复杂业务需求。

操作节点概述

每个操作节点都有:

  • 输入数据:节点处理的数据来源
  • 配置参数:控制节点行为的设置
  • 输出数据:处理后产生的结果
  • 错误处理:异常情况的处理方式

数据处理节点

数据转换

将数据从一种格式转换为另一种格式:

{
"type": "transform",
"config": {
"mappings": [
{
"source": "$.input.customer.name",
"target": "$.output.userName",
"transform": "uppercase"
},
{
"source": "$.input.order.items",
"target": "$.output.productList",
"transform": {
"type": "map",
"expression": "item => ({ id: item.productId, quantity: item.qty })"
}
}
],
"outputSchema": {
"type": "object",
"properties": {
"userName": { "type": "string" },
"productList": { "type": "array" }
}
}
}
}

数据筛选

过滤和精简数据:

{
"type": "filter",
"config": {
"source": "$.input.products",
"condition": "item => item.price > 100 && item.stock > 0",
"limit": 10,
"sort": {
"field": "popularity",
"order": "desc"
}
}
}

数据合并

合并多个数据源:

{
"type": "merge",
"config": {
"sources": [
{ "name": "customerData", "path": "$.input.customer" },
{ "name": "orderData", "path": "$.input.order" },
{ "name": "productData", "path": "$.context.products" }
],
"mergeStrategy": "deep",
"conflictResolution": "lastWins"
}
}

数据验证

验证数据是否符合要求:

{
"type": "validate",
"config": {
"schema": {
"type": "object",
"properties": {
"email": {
"type": "string",
"format": "email"
},
"age": {
"type": "number",
"minimum": 18
}
},
"required": ["email", "age"]
},
"customValidators": [
{
"name": "domainCheck",
"expression": "data.email.endsWith('@company.com')",
"message": "必须使用公司邮箱"
}
],
"onValidationError": "throw" // throw, continue, branch
}
}

数据库操作节点

数据查询

从数据库查询数据:

{
"type": "dbQuery",
"config": {
"connection": "main_db",
"queryType": "select",
"query": "SELECT * FROM customers WHERE status = :status AND created_at > :date",
"parameters": {
"status": "$.input.status",
"date": "$.input.startDate"
},
"pagination": {
"enabled": true,
"pageSize": 100,
"maxItems": 1000
}
}
}

数据更新

更新数据库记录:

{
"type": "dbUpdate",
"config": {
"connection": "main_db",
"table": "orders",
"updates": {
"status": "$.input.newStatus",
"updated_at": "${ new Date().toISOString() }"
},
"condition": "id = :orderId",
"parameters": {
"orderId": "$.input.orderId"
},
"returnUpdated": true
}
}

事务操作

在事务中执行多个数据库操作:

{
"type": "dbTransaction",
"config": {
"connection": "main_db",
"operations": [
{
"type": "insert",
"table": "orders",
"data": "$.input.order",
"returnId": true,
"idField": "orderId"
},
{
"type": "insert",
"table": "order_items",
"data": "$.input.items.map(item => ({ ...item, order_id: $.context.orderId }))"
},
{
"type": "update",
"table": "inventory",
"updates": {
"quantity": "inventory.quantity - item.quantity"
},
"condition": "product_id = :productId",
"multipleUpdates": {
"source": "$.input.items",
"parameterField": "productId"
}
}
],
"isolation": "serializable"
}
}

HTTP 和 API 节点

HTTP 请求

调用外部 API:

{
"type": "httpRequest",
"config": {
"url": "https://api.example.com/v1/orders",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"Authorization": "Bearer ${$.context.apiToken}"
},
"body": "$.input.orderData",
"timeout": 5000,
"retry": {
"maxAttempts": 3,
"initialDelay": 1000,
"backoffMultiplier": 2,
"retryOn": [500, 502, 503]
},
"responseMapping": {
"orderId": "$.response.data.id",
"status": "$.response.data.status"
}
}
}

GraphQL 请求

执行 GraphQL 查询:

{
"type": "graphqlRequest",
"config": {
"endpoint": "https://api.example.com/graphql",
"query": `
query GetProductDetails($id: ID!) {
product(id: $id) {
id
name
price
availability {
inStock
leadTime
}
}
}
`,
"variables": {
"id": "$.input.productId"
},
"authentication": {
"type": "header",
"headerName": "Authorization",
"value": "Bearer ${$.context.apiToken}"
}
}
}

API 组合

顺序调用多个 API 并处理结果:

{
"type": "apiComposite",
"config": {
"operations": [
{
"name": "getUserProfile",
"type": "httpRequest",
"config": {
"url": "https://api.example.com/users/${$.input.userId}",
"method": "GET"
}
},
{
"name": "getUserOrders",
"type": "httpRequest",
"dependsOn": ["getUserProfile"],
"config": {
"url": "https://api.example.com/users/${$.input.userId}/orders",
"method": "GET"
}
},
{
"name": "getProductDetails",
"type": "httpRequest",
"dependsOn": ["getUserOrders"],
"config": {
"url": "https://api.example.com/products/details",
"method": "POST",
"body": {
"productIds": "$.context.getUserOrders.data.map(order => order.productId)"
}
}
}
],
"outputMapping": {
"user": "$.context.getUserProfile.data",
"orders": "$.context.getUserOrders.data",
"products": "$.context.getProductDetails.data"
}
}
}

AI 和机器学习节点

文本分析

分析文本内容:

{
"type": "textAnalysis",
"config": {
"input": "$.input.customerMessage",
"operations": [
{
"type": "sentiment",
"field": "sentiment"
},
{
"type": "entityExtraction",
"field": "entities",
"entityTypes": ["PERSON", "LOCATION", "ORGANIZATION"]
},
{
"type": "languageDetection",
"field": "language"
},
{
"type": "categorization",
"field": "category",
"categories": ["投诉", "咨询", "反馈", "请求"]
}
],
"modelSettings": {
"provider": "openai",
"model": "gpt-4"
}
}
}

智能总结

生成内容摘要:

{
"type": "contentSummary",
"config": {
"content": "$.input.document",
"maxLength": 200,
"format": "bullet",
"targetLanguage": "zh-CN",
"focusAreas": ["主要观点", "关键数据", "行动项"],
"modelSettings": {
"provider": "geniTask",
"model": "summarizer-v2"
}
}
}

内容生成

生成新的内容:

{
"type": "contentGeneration",
"config": {
"prompt": "为以下产品创建一个吸引人的营销描述:\n产品名称: ${$.input.product.name}\n特点: ${$.input.product.features.join(', ')}\n目标受众: ${$.input.audience}\n",
"parameters": {
"maxTokens": 500,
"temperature": 0.7,
"topP": 0.9
},
"outputFormat": "html",
"systemContext": "你是一位有创意的营销文案撰写者,擅长撰写引人注目的产品描述。",
"modelSettings": {
"provider": "openai",
"model": "gpt-4-turbo"
}
}
}

图像分析

分析图片内容:

{
"type": "imageAnalysis",
"config": {
"image": "$.input.productImage",
"operations": [
{
"type": "objectDetection",
"field": "objects",
"minConfidence": 0.7
},
{
"type": "sceneClassification",
"field": "scene"
},
{
"type": "colorAnalysis",
"field": "dominantColors",
"maxColors": 5
},
{
"type": "textExtraction",
"field": "textContent"
}
],
"modelSettings": {
"provider": "geniTask",
"model": "vision-analyzer-v1"
}
}
}

通知和消息节点

电子邮件

发送电子邮件:

{
"type": "email",
"config": {
"connection": "company_smtp",
"to": ["${$.input.customer.email}"],
"cc": ["support@company.com"],
"subject": "您的订单 #${$.input.order.id} 已确认",
"template": "order_confirmation",
"templateData": {
"customer": "$.input.customer",
"order": "$.input.order",
"products": "$.context.productDetails"
},
"attachments": [
{
"name": "receipt.pdf",
"content": "$.context.receiptPdf",
"contentType": "application/pdf"
}
],
"trackOpens": true,
"priority": "high"
}
}

短信

发送短信通知:

{
"type": "sms",
"config": {
"provider": "twilio",
"to": "${$.input.customer.phone}",
"message": "您好 ${$.input.customer.firstName},您的订单 #${$.input.order.id} 已发货,预计送达时间: ${$.input.shipment.estimatedDelivery}",
"messageType": "transactional"
}
}

推送通知

发送推送通知:

{
"type": "pushNotification",
"config": {
"provider": "firebase",
"recipients": {
"type": "tokens",
"tokens": "$.input.user.deviceTokens"
},
"notification": {
"title": "订单状态更新",
"body": "您的订单 #${$.input.order.id} 状态已更新为: ${$.input.order.status}",
"icon": "order_icon"
},
"data": {
"orderId": "$.input.order.id",
"status": "$.input.order.status",
"deepLink": "app://orders/${$.input.order.id}"
},
"priority": "high",
"ttl": 3600
}
}

消息队列

向消息队列发送消息:

{
"type": "queueMessage",
"config": {
"provider": "rabbitmq",
"connection": "mq_main",
"queue": "order_processing",
"message": "$.input.orderData",
"properties": {
"contentType": "application/json",
"messageType": "order.created",
"priority": 5
},
"persistent": true
}
}

文件操作节点

文件生成

生成文件:

{
"type": "generateFile",
"config": {
"fileType": "pdf",
"template": "invoice_template",
"data": {
"invoice": "$.input.invoice",
"company": "$.context.companyDetails",
"customer": "$.input.customer"
},
"options": {
"paperSize": "A4",
"orientation": "portrait",
"headerTemplate": "<div>发票 #{{invoiceNumber}}</div>",
"footerTemplate": "<div>第 <span class='pageNumber'></span> 页,共 <span class='totalPages'></span> 页</div>"
},
"output": {
"filename": "Invoice_${$.input.invoice.number}.pdf",
"saveToStorage": true,
"storagePath": "invoices/${$.input.customer.id}/"
}
}
}

文件处理

处理上传的文件:

{
"type": "processFile",
"config": {
"input": "$.input.file",
"operations": [
{
"type": "extract",
"fileType": "csv",
"options": {
"delimiter": ",",
"header": true,
"skipEmptyLines": true
},
"output": "extractedData"
},
{
"type": "transform",
"source": "$.context.extractedData",
"transformation": "item => ({
customerId: item.customer_id,
total: parseFloat(item.amount)
})",
"output": "transformedData"
},
{
"type": "validate",
"source": "$.context.transformedData",
"validation": "item => item.total > 0",
"output": "validData"
}
]
}
}

文件存储

存储和管理文件:

{
"type": "fileStorage",
"config": {
"operation": "store",
"file": "$.input.document",
"storage": "aws_s3",
"path": "customers/${$.input.customerId}/documents/${$.input.documentType}/",
"filename": "${$.input.documentName}_${Date.now()}.pdf",
"metadata": {
"documentType": "$.input.documentType",
"uploadedBy": "$.context.currentUser.id",
"tags": "$.input.tags"
},
"accessControl": {
"visibility": "private",
"expiryTime": 86400
}
}
}

集成节点

第三方服务集成

集成外部服务:

{
"type": "serviceIntegration",
"config": {
"service": "salesforce",
"operation": "createLead",
"authentication": {
"type": "oauth2",
"credential": "salesforce_oauth"
},
"input": {
"firstName": "$.input.customer.firstName",
"lastName": "$.input.customer.lastName",
"email": "$.input.customer.email",
"company": "$.input.customer.company",
"source": "Website Form",
"status": "New"
},
"outputMapping": {
"leadId": "$.result.id",
"success": "$.result.success"
}
}
}

支付处理

处理支付交易:

{
"type": "payment",
"config": {
"provider": "stripe",
"operation": "createCharge",
"authentication": {
"secretKey": "${$.context.secrets.stripeApiKey}"
},
"input": {
"amount": "$.input.order.total * 100", // Stripe使用分作为单位
"currency": "$.input.order.currency",
"source": "$.input.paymentMethod.id",
"description": "订单 #${$.input.order.id}",
"metadata": {
"orderId": "$.input.order.id",
"customerId": "$.input.customer.id"
}
},
"onSuccess": {
"updateOrder": {
"status": "paid",
"paymentId": "$.result.id",
"paidAt": "${new Date().toISOString()}"
}
},
"onError": {
"updateOrder": {
"status": "payment_failed",
"failureReason": "$.error.message"
}
}
}
}

自定义节点

自定义代码

执行自定义代码逻辑:

{
"type": "customCode",
"config": {
"runtime": "nodejs16",
"code": `
module.exports = async function(input, context) {
const { customer, order } = input;

// 自定义业务逻辑
let discountRate = 0;

if (customer.vipLevel === 'gold') {
discountRate = 0.1;
} else if (customer.vipLevel === 'platinum') {
discountRate = 0.15;
}

// 计算折扣
const originalTotal = order.items.reduce((sum, item) => sum + (item.price * item.quantity), 0);
const discount = originalTotal * discountRate;
const finalTotal = originalTotal - discount;

return {
originalTotal,
discount,
finalTotal,
discountRate,
customerLevel: customer.vipLevel
};
}
`,
"inputMapping": {
"customer": "$.input.customer",
"order": "$.input.order"
},
"timeout": 3000,
"memoryLimit": 128
}
}

外部服务函数

调用外部函数服务:

{
"type": "function",
"config": {
"provider": "aws_lambda",
"function": "order-processor",
"input": "$.input",
"authentication": {
"type": "iam",
"role": "workflow-execution-role"
},
"timeout": 10000,
"asyncExecution": false
}
}

错误处理与重试

错误处理器

处理执行过程中的错误:

{
"type": "errorHandler",
"config": {
"strategies": [
{
"errorType": "validation",
"action": "resolve",
"value": {
"status": "invalid",
"errors": "$.error.details"
}
},
{
"errorType": "http4xx",
"action": "retry",
"maxRetries": 3,
"delay": 1000,
"backoffMultiplier": 2,
"conditions": [
{
"field": "$.error.status",
"operator": "equals",
"value": 429
}
]
},
{
"errorType": "timeout",
"action": "fallback",
"fallbackNode": "timeoutFallback"
},
{
"errorType": "*",
"action": "throw"
}
],
"onExhaustedRetries": "fallback",
"fallbackNode": "defaultFallback",
"logLevel": "warning"
}
}

超时控制

控制节点执行时间:

{
"type": "timeout",
"config": {
"node": "longRunningOperation",
"timeout": 5000,
"onTimeout": "abort", // abort, continue, fallback
"fallbackNode": "timeoutFallback"
}
}

节点组合与重用

子工作流

创建可重用的子工作流:

{
"type": "subworkflow",
"config": {
"workflowId": "payment-processor",
"version": "latest",
"input": {
"paymentMethod": "$.input.paymentMethod",
"amount": "$.input.order.total",
"currency": "$.input.order.currency",
"description": "订单 #${$.input.order.id}"
},
"waitForCompletion": true,
"timeout": 30000,
"outputMapping": {
"paymentResult": "$.result"
}
}
}

节点组

将多个节点作为一个组:

{
"type": "nodeGroup",
"config": {
"name": "orderProcessing",
"nodes": [
{
"name": "validateOrder",
"type": "validate",
"config": { /* ... */ }
},
{
"name": "calculateTax",
"type": "customCode",
"config": { /* ... */ }
},
{
"name": "processPayment",
"type": "payment",
"config": { /* ... */ }
}
],
"inputMapping": {
"order": "$.input.order",
"customer": "$.input.customer"
},
"outputMapping": {
"result": {
"orderValid": "$.context.validateOrder.valid",
"taxAmount": "$.context.calculateTax.taxAmount",
"paymentStatus": "$.context.processPayment.status"
}
}
}
}

调试与监控

日志节点

记录调试和审计信息:

{
"type": "log",
"config": {
"level": "info",
"message": "处理订单: ${$.input.order.id} - 金额: ${$.input.order.total} ${$.input.order.currency}",
"data": {
"order": "$.input.order",
"customer": {
"id": "$.input.customer.id",
"email": "$.input.customer.email"
},
"processingTime": "$.context.metrics.processingTime"
},
"tags": ["order", "payment"]
}
}

指标收集

收集执行指标:

{
"type": "metrics",
"config": {
"measurements": [
{
"name": "order_processing_time",
"value": "$.context.metrics.processingTime",
"unit": "milliseconds",
"dimensions": {
"orderId": "$.input.order.id",
"orderType": "$.input.order.type",
"customerId": "$.input.customer.id"
}
},
{
"name": "order_value",
"value": "$.input.order.total",
"unit": "currency",
"dimensions": {
"currency": "$.input.order.currency",
"orderType": "$.input.order.type"
}
}
],
"destination": "cloudwatch"
}
}

最佳实践

节点命名与组织

  • 使用清晰描述性的节点名称
  • 将相关节点组织在节点组中
  • 为复杂的节点添加描述和注释
  • 使用一致的命名约定

节点配置优化

  • 只处理必要的数据,避免冗余操作
  • 使用数据映射减少数据传输量
  • 配置适当的超时和重试策略
  • 为重要节点添加错误处理

安全最佳实践

  • 使用环境变量和保密存储管理敏感信息
  • 遵循最小权限原则配置集成权限
  • 验证和清理所有用户输入
  • 使用审计日志记录敏感操作

常见问题

Details

如何创建自定义操作节点? 创建自定义操作节点有两种主要方式:

  1. 使用自定义代码节点 - 对于简单的自定义逻辑,可以使用内置的自定义代码节点,编写JavaScript/Python代码。

  2. 开发独立节点组件 - 对于更复杂的功能或需要重复使用的组件:

    • 使用GeniSpace SDK创建自定义节点包
    • 实现节点接口,定义输入/输出模式
    • 打包并注册节点
    • 发布到组织的节点仓库
Details

如何处理大数据量工作流? 处理大数据量时:

  1. 使用分页和批处理节点
  2. 配置流式处理而非一次性加载全部数据
  3. 使用数据过滤节点减少处理量
  4. 考虑使用专用的大数据处理节点
  5. 对于特别大的数据集,配置异步执行并实现结果通知机制
Details

如何优化节点性能? 优化节点性能的主要方法:

  1. 仅选择必要的字段进行处理
  2. 对于数据库操作,优化查询并添加适当索引
  3. 使用缓存减少重复计算和API调用
  4. 配置并发执行无相互依赖的节点
  5. 使用正确的数据结构和算法
  6. 监控节点执行时间,识别性能瓶颈

下一步