diff --git a/LICENSE b/LICENSE new file mode 100755 index 0000000..0a20d3b --- /dev/null +++ b/LICENSE @@ -0,0 +1,7 @@ +ISC License (ISC) + +Copyright (c) 2026 + +Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. diff --git a/README.md b/README.md old mode 100644 new mode 100755 index e69de29..4e51ab0 --- a/README.md +++ b/README.md @@ -0,0 +1,151 @@ +# OpenClaw WeCom Plugin (Bone Version - Multi-App Support) + +This is a basic, locally developed OpenClaw plugin for Enterprise WeChat (WeCom). It supports **concurrent integration** with multiple WeCom applications, including both Intelligent Robot and Self-Built Apps. It handles core message parsing, basic AI integration, and appropriate responses (streaming JSON or passive XML). + +## Key Features + +- **Concurrent Multi-App Support**: Integrate multiple WeCom Intelligent Bots and Self-Built Apps simultaneously. +- **Dual Mode Per-App**: Each configured application can operate as either an Intelligent Bot (JSON streaming) or a Self-Built App (XML passive replies). +- **Message Parsing**: Decrypts and parses incoming messages in both JSON (AI Bot) and XML (Self-Built App) formats, dynamically detected per application. +- **Basic AI Integration**: Routes parsed messages to OpenClaw's AI core for processing. +- **Flexible Replies**: Generates streaming JSON responses for AI Bots or passive XML replies for Self-Built Apps. +- **Security**: Utilizes WeCom's message encryption/decryption, dynamically validates the `CorpID` from inbound messages for Self-Built Apps, for secure communication. +- **Unique Webhook Paths**: Each application can be configured with its own dedicated webhook URL for clear separation. + +## Prerequisites + +- [OpenClaw](https://github.com/openclaw/openclaw) installed (version 2026.1.30+) +- Enterprise WeChat admin access to configure Intelligent Robot or Self-Built applications. +- A public-facing server address accessible from Enterprise WeChat (HTTP/HTTPS) for webhooks, with unique paths for each application. + +## Installation + +To install this plugin locally: + +1. **Copy the plugin directory** to your OpenClaw extensions folder: + + ```bash + cp -r /mnt/codeserver/project/WeComCompanyPlugin ~/.openclaw/extensions/ + ``` + +2. **Enable the plugin and configure applications in `openclaw.json`**: + + Add or modify the `channels.wecom` section in your OpenClaw configuration file (`~/.openclaw/openclaw.json`). You will define an array of `applications`, where each object configures a distinct WeCom integration. + + ```json + { + "plugins": { + "entries": { + "WeComCompanyPlugin": { // This key MUST match your plugin folder name + "enabled": true + } + // ... other plugins if any + } + }, + "channels": { + "wecom": { + "enabled": true, + "applications": [ + { + "id": "my_ai_bot", // Unique ID for your AI Bot application + "enabled": true, + "token": "YOUR_AI_BOT_TOKEN", + "encodingAesKey": "YOUR_AI_BOT_ENCODING_AES_KEY", + "isSelfBuiltApp": false, // Set to false for AI Bot + "webhookPath": "/webhooks/wecom/ai_bot" // Unique URL path for this bot + }, + { + "id": "my_self_built_app", // Unique ID for your Self-Built App application + "enabled": true, + "token": "YOUR_SELF_BUILT_TOKEN", + "encodingAesKey": "YOUR_SELF_BUILT_ENCODING_AES_KEY", + "isSelfBuiltApp": true, // Set to true for Self-Built App + "webhookPath": "/webhooks/wecom/self_built" // Unique URL path for this app + } + ] + } + } + } + ``` + + Replace placeholder values with your actual WeCom application details. Ensure `webhookPath` is unique for each application. + If `webhookPath` is omitted, it defaults to `/webhooks/wecom/`. + +3. **Restart OpenClaw** for the changes to take effect. + + ```bash + openclaw gateway restart + ``` + +## Configuration Options + +Under `channels.wecom.applications` (an array of objects): + +| Option | Type | Required | Description | +| :--------------------------------- | :------ | :------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `id` | string | Yes | A unique identifier for this WeCom application within your OpenClaw configuration. | +| `enabled` | boolean | No | Set to `true` to enable this specific WeCom application. Default: `true`. | +| `token` | string | Yes | The WeCom bot Token obtained from the application's admin console. | +| `encodingAesKey` | string | Yes | The WeCom message encryption key (43 characters) from the application's admin console. | +| `isSelfBuiltApp` | boolean | No | **Crucial**: Set to `true` if this application is a Self-Built App (uses XML, passive replies). Set to `false` for an Intelligent Bot (uses JSON, streaming replies). Default: `false`. | +| `webhookPath` | string | No | The unique URL path where this application's messages will be received by OpenClaw. If not specified, it defaults to `/webhooks/wecom/`. | + +## Enterprise WeChat Configuration + +Follow these steps in the [Enterprise WeChat Admin Console](https://work.weixin.qq.com/) for *each* application you configure: + +### For Intelligent Robot (AI Bot) Integrations + +1. Log in to the Admin Console. +2. Navigate to "Application Management" > "Applications" > "Create Application" > Select "Intelligent Robot". +3. Configure "Receive Messages": + - **URL**: Use the `webhookPath` configured for this AI Bot in `openclaw.json` (e.g., `https://your-domain.com/webhooks/wecom/ai_bot`). Ensure this URL is publicly accessible. + - **Token**: Match the `token` specified for this AI Bot in `openclaw.json`. + - **EncodingAESKey**: Match the `encodingAesKey` specified for this AI Bot in `openclaw.json`. +4. Save and enable message receiving. + +### For Self-Built App Integrations + +1. Log in to the Admin Console. +2. Navigate to "Application Management" > "Applications" > Select your existing Self-Built App or create a new one. +3. Configure "Receive Messages": + - **URL**: Use the `webhookPath` configured for this Self-Built App in `openclaw.json` (e.g., `https://your-domain.com/webhooks/wecom/self_built`). Ensure this URL is publicly accessible. + - **Token**: Match the `token` specified for this Self-Built App in `openclaw.json`. + - **EncodingAESKey**: Match the `encodingAesKey` specified for this Self-Built App in `openclaw.json`. +4. Save and enable message receiving. + +**Important**: For Self-Built Apps, ensure `isSelfBuiltApp` is set to `true` in your `openclaw.json` configuration for that specific application. + +## FAQ + +### Q: How does inbound message decryption and parsing work for multiple applications (AI Bot JSON and Self-Built App XML)? + +**A:** The plugin dynamically identifies the target application based on the incoming webhook URL. For each request, it: + +1. Uses the `token`, `encodingAesKey`, and optionally `corpId` associated with the identified application. +2. Automatically detects if the incoming message body is JSON (for AI Bots) or XML (for Self-Built Apps). +3. Extracts and decrypts the message content from the respective format. +4. **For Self-Built Apps, it extracts the `CorpID` from the decrypted inbound message and uses it for constructing secure passive replies.** +5. Parses the decrypted content (JSON or XML) into a normalized message object for consistent processing by the AI. + +### Q: How does outbound image sending work? + +**A:** The plugin automatically handles images generated by OpenClaw (such as browser screenshots): + +- **For AI Bot (streaming JSON)**: Local images (from `~/.openclaw/media/`) are automatically encoded to base64 and sent via WeCom's `msg_item` API. Images appear when the AI completes its response. +- **For Self-Built App (passive XML)**: Direct passive XML replies do not support sending images from local paths. If the AI generates an image, its local path will be converted into a text description within the passive XML text reply. + +## Project Structure + +``` +WeComCompanyPlugin/ +├── index.js # Plugin entry point (main plugin logic and HTTP handler, multi-app routing) +├── wecom-message-processor.js # Core logic for message processing and dispatching +├── webhook.js # WeCom HTTP communication handler, message/event parser (JSON/XML), and reply builder +├── crypto.js # WeCom encryption algorithms (message + media) +├── logger.js # Logging module +├── stream-manager.js # Streaming response manager (for AI Bot mode) +├── utils.js # Utility functions (TTL cache, deduplication) +├── package.json # npm package config +└── openclaw.plugin.json # OpenClaw plugin manifest +``` diff --git a/crypto.js b/crypto.js new file mode 100755 index 0000000..bf95ee4 --- /dev/null +++ b/crypto.js @@ -0,0 +1,146 @@ +import { createCipheriv, createDecipheriv, randomBytes, createHash } from "node:crypto"; +import { logger } from "./logger.js"; +import { CONSTANTS } from "./utils.js"; + +/** + * Enterprise WeChat Intelligent Robot Crypto Implementation + * Supports both AI Bot mode (no corpId validation) and Self-Built App mode (with corpId validation) + */ +export class WecomCrypto { + token; + encodingAesKey; + aesKey; + iv; + corpId; // Added for Self-Built App mode + + constructor(token, encodingAesKey, corpId) { // corpId is now an optional parameter + if (!encodingAesKey || encodingAesKey.length !== CONSTANTS.AES_KEY_LENGTH) { + throw new Error(`EncodingAESKey invalid: length must be ${CONSTANTS.AES_KEY_LENGTH}`); + } + if (!token) { + throw new Error("Token is required"); + } + this.token = token; + this.encodingAesKey = encodingAesKey; + this.aesKey = Buffer.from(encodingAesKey + "=", "base64"); + this.iv = this.aesKey.subarray(0, 16); + this.corpId = corpId; // Store corpId if provided + + // Update debug message to reflect mode + logger.debug(`WecomCrypto initialized (Mode: ${corpId ? 'Self-Built App' : 'AI Bot'})`); + } + + getSignature(timestamp, nonce, encrypt) { + const shasum = createHash("sha1"); + // WeCom requires plain lexicographic sorting before SHA1; localeCompare is locale-sensitive. + const sorted = [this.token, timestamp, nonce, encrypt] + .map((value) => String(value)) + .toSorted(); + shasum.update(sorted.join("")); + return shasum.digest("hex"); + } + + decrypt(text) { + let decipher; + try { + decipher = createDecipheriv("aes-256-cbc", this.aesKey, this.iv); + decipher.setAutoPadding(false); + } catch (e) { + throw new Error(`Decrypt init failed: ${String(e)}`, { cause: e }); + } + + let deciphered = Buffer.concat([decipher.update(text, "base64"), decipher.final()]); + deciphered = this.decodePkcs7(deciphered); + + // Format: 16 random bytes | 4 bytes msg_len | msg_content | corpId + const content = deciphered.subarray(16); + const lenList = content.subarray(0, 4); + const xmlLen = lenList.readUInt32BE(0); + const xmlContent = content.subarray(4, 4 + xmlLen).toString("utf-8"); + + const extractedCorpId = content.subarray(4 + xmlLen).toString("utf-8"); + + // If corpId is provided in the constructor, validate it + if (this.corpId && this.corpId !== extractedCorpId) { + throw new Error(`CorpID mismatch: Expected ${this.corpId}, got ${extractedCorpId}`); + } + + // Return corpId along with the message + return { message: xmlContent, corpId: extractedCorpId }; + } + + encrypt(text) { + const random16 = randomBytes(16); + const msgBuffer = Buffer.from(text); + const lenBuffer = Buffer.alloc(4); + lenBuffer.writeUInt32BE(msgBuffer.length, 0); + + // Include corpId (receiveid) in the raw message if available + const corpIdBuffer = this.corpId ? Buffer.from(this.corpId) : Buffer.from(''); + const rawMsg = Buffer.concat([random16, lenBuffer, msgBuffer, corpIdBuffer]); + + const encoded = this.encodePkcs7(rawMsg); + const cipher = createCipheriv("aes-256-cbc", this.aesKey, this.iv); + cipher.setAutoPadding(false); + const ciphered = Buffer.concat([cipher.update(encoded), cipher.final()]); + return ciphered.toString("base64"); + } + + encodePkcs7(buff) { + const blockSize = CONSTANTS.AES_BLOCK_SIZE; + const amountToPad = blockSize - (buff.length % blockSize); + const pad = Buffer.alloc(amountToPad, amountToPad); + return Buffer.concat([buff, pad]); + } + + decodePkcs7(buff) { + const pad = buff[buff.length - 1]; + if (pad < 1 || pad > CONSTANTS.AES_BLOCK_SIZE) { + throw new Error(`Invalid PKCS7 padding: ${pad}`); + } + for (let i = buff.length - pad; i < buff.length; i++) { + if (buff[i] !== pad) { + throw new Error("Invalid PKCS7 padding: inconsistent padding bytes"); + } + } + return buff.subarray(0, buff.length - pad); + } + + /** + * Decrypt image/media file from Enterprise WeChat. + * Images are encrypted with AES-256-CBC using the same key as messages. + * Note: WeCom uses PKCS7 padding to 32-byte blocks (not standard 16-byte). + * @param {Buffer} encryptedData - The encrypted image data (raw bytes, not base64) + * @returns {Buffer} - Decrypted image data + */ + decryptMedia(encryptedData) { + const decipher = createDecipheriv("aes-256-cbc", this.aesKey, this.iv); + decipher.setAutoPadding(false); + const decrypted = Buffer.concat([ + decipher.update(encryptedData), + decipher.final(), + ]); + + // Remove PKCS7 padding manually (padded to 32-byte blocks). + const padLen = decrypted[decrypted.length - 1]; + let unpadded = decrypted; + if (padLen >= 1 && padLen <= 32) { + let validPadding = true; + for (let i = decrypted.length - padLen; i < decrypted.length; i++) { + if (decrypted[i] !== padLen) { + validPadding = false; + break; + } + } + if (validPadding) { + unpadded = decrypted.subarray(0, decrypted.length - padLen); + } + } + + logger.debug("Media decrypted successfully", { + inputSize: encryptedData.length, + outputSize: unpadded.length, + }); + return unpadded; + } +} diff --git a/dynamic-agent.js b/dynamic-agent.js new file mode 100755 index 0000000..5619db3 --- /dev/null +++ b/dynamic-agent.js @@ -0,0 +1,103 @@ +/** + * Dynamic agent helpers. + * + * This plugin only computes deterministic agent ids/session keys. + * Workspace/bootstrap creation is handled by OpenClaw core. + */ + +/** + * Build a deterministic agent id for dm/group contexts. + * + * @param {string} chatType - "dm" or "group" + * @param {string} peerId - user id or group id + * @returns {string} agentId + */ +export function generateAgentId(chatType, peerId) { + const sanitizedId = String(peerId) + .toLowerCase() + .replace(/[^a-z0-9_-]/g, "_"); + if (chatType === "group") { + return `wecom-group-${sanitizedId}`; + } + return `wecom-dm-${sanitizedId}`; +} + +/** + * Resolve runtime dynamic-agent settings from config. + */ +export function getDynamicAgentConfig(config) { + const wecom = config?.channels?.wecom || {}; + return { + enabled: wecom.dynamicAgents?.enabled !== false, + dmCreateAgent: wecom.dm?.createAgentOnFirstMessage !== false, + groupEnabled: wecom.groupChat?.enabled !== false, + groupRequireMention: wecom.groupChat?.requireMention !== false, + groupMentionPatterns: wecom.groupChat?.mentionPatterns || ["@"], + }; +} + +/** + * Decide whether this message context should route to a dynamic agent. + */ +export function shouldUseDynamicAgent({ chatType, config }) { + const dynamicConfig = getDynamicAgentConfig(config); + if (!dynamicConfig.enabled) { + return false; + } + if (chatType === "group") { + return dynamicConfig.groupEnabled; + } + return dynamicConfig.dmCreateAgent; +} + +/** + * Decide whether a group message should trigger a response. + */ +export function shouldTriggerGroupResponse(content, config) { + const dynamicConfig = getDynamicAgentConfig(config); + + if (!dynamicConfig.groupEnabled) { + return false; + } + + if (!dynamicConfig.groupRequireMention) { + return true; + } + + // Match any configured mention marker in the original message content. + // Use word-boundary check to avoid false positives on email addresses. + const patterns = dynamicConfig.groupMentionPatterns; + for (const pattern of patterns) { + const escaped = escapeRegExp(pattern); + // @ must NOT be preceded by a word char (avoids user@domain false matches). + const re = new RegExp(`(?:^|(?<=\\s|[^\\w]))${escaped}`, "u"); + if (re.test(content)) { + return true; + } + } + + return false; +} + +/** + * Remove configured mention markers from group message text. + */ +export function extractGroupMessageContent(content, config) { + const dynamicConfig = getDynamicAgentConfig(config); + let cleanContent = content; + + const patterns = dynamicConfig.groupMentionPatterns; + for (const pattern of patterns) { + const escapedPattern = escapeRegExp(pattern); + // Only strip @name tokens that are NOT part of email-style addresses. + // Require the pattern to be preceded by start-of-string or whitespace. + const regex = new RegExp(`(?:^|(?<=\\s))${escapedPattern}\\S*\\s*`, "gu"); + cleanContent = cleanContent.replace(regex, ""); + } + + return cleanContent.trim(); +} + +function escapeRegExp(value) { + return String(value).replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); +} diff --git a/image-processor.js b/image-processor.js new file mode 100755 index 0000000..e73e85f --- /dev/null +++ b/image-processor.js @@ -0,0 +1,172 @@ +import { createHash } from "crypto"; +import { readFile } from "fs/promises"; +import { logger } from "./logger.js"; + +/** + * Image Processing Module for WeCom + * + * Handles loading, validating, and encoding images for WeCom msg_item + * Supports JPG and PNG formats up to 2MB + */ + +// Image format signatures (magic bytes) +const IMAGE_SIGNATURES = { + JPG: [0xff, 0xd8, 0xff], + PNG: [0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a], +}; + +// 2MB size limit (before base64 encoding) +const MAX_IMAGE_SIZE = 2 * 1024 * 1024; + +/** + * Load image file from filesystem + * @param {string} filePath - Absolute path to image file + * @returns {Promise} Image data buffer + * @throws {Error} If file not found or cannot be read + */ +export async function loadImageFromPath(filePath) { + try { + logger.debug("Loading image from path", { filePath }); + const buffer = await readFile(filePath); + logger.debug("Image loaded successfully", { + filePath, + size: buffer.length, + }); + return buffer; + } catch (error) { + if (error.code === "ENOENT") { + throw new Error(`Image file not found: ${filePath}`, { cause: error }); + } else if (error.code === "EACCES") { + throw new Error(`Permission denied reading image: ${filePath}`, { cause: error }); + } else { + throw new Error(`Failed to read image file: ${error.message}`, { cause: error }); + } + } +} + +/** + * Convert buffer to base64 string + * @param {Buffer} buffer - Image data buffer + * @returns {string} Base64-encoded string + */ +export function encodeImageToBase64(buffer) { + return buffer.toString("base64"); +} + +/** + * Calculate MD5 checksum of buffer + * @param {Buffer} buffer - Image data buffer + * @returns {string} MD5 hash in hexadecimal + */ +export function calculateMD5(buffer) { + return createHash("md5").update(buffer).digest("hex"); +} + +/** + * Validate image size is within limits + * @param {Buffer} buffer - Image data buffer + * @throws {Error} If size exceeds 2MB limit + */ +export function validateImageSize(buffer) { + const sizeBytes = buffer.length; + const sizeMB = (sizeBytes / 1024 / 1024).toFixed(2); + + if (sizeBytes > MAX_IMAGE_SIZE) { + throw new Error(`Image size ${sizeMB}MB exceeds 2MB limit (actual: ${sizeBytes} bytes)`); + } + + logger.debug("Image size validated", { sizeBytes, sizeMB }); +} + +/** + * Detect image format from magic bytes + * @param {Buffer} buffer - Image data buffer + * @returns {string} Format: "JPG" or "PNG" + * @throws {Error} If format is not supported + */ +export function detectImageFormat(buffer) { + // Check PNG signature + if (buffer.length >= IMAGE_SIGNATURES.PNG.length) { + const isPNG = IMAGE_SIGNATURES.PNG.every((byte, index) => buffer[index] === byte); + if (isPNG) { + logger.debug("Image format detected: PNG"); + return "PNG"; + } + } + + // Check JPG signature + if (buffer.length >= IMAGE_SIGNATURES.JPG.length) { + const isJPG = IMAGE_SIGNATURES.JPG.every((byte, index) => buffer[index] === byte); + if (isJPG) { + logger.debug("Image format detected: JPG"); + return "JPG"; + } + } + + // Unknown format + const header = buffer.subarray(0, 16).toString("hex"); + throw new Error( + `Unsupported image format. Only JPG and PNG are supported. File header: ${header}`, + ); +} + +/** + * Complete image processing pipeline + * + * Loads image from filesystem, validates format and size, + * then encodes to base64 and calculates MD5 checksum. + * + * @param {string} filePath - Absolute path to image file + * @returns {Promise} Processed image data + * @returns {string} return.base64 - Base64-encoded image data + * @returns {string} return.md5 - MD5 checksum + * @returns {string} return.format - Image format (JPG or PNG) + * @returns {number} return.size - Original size in bytes + * + * @throws {Error} If any step fails (file not found, invalid format, size exceeded, etc.) + * + * @example + * const result = await prepareImageForMsgItem('/path/to/image.jpg'); + * // Returns: { base64: "...", md5: "...", format: "JPG", size: 123456 } + */ +export async function prepareImageForMsgItem(filePath) { + logger.debug("Starting image processing pipeline", { filePath }); + + try { + // Step 1: Load image + const buffer = await loadImageFromPath(filePath); + + // Step 2: Validate size + validateImageSize(buffer); + + // Step 3: Detect format + const format = detectImageFormat(buffer); + + // Step 4: Encode to base64 + const base64 = encodeImageToBase64(buffer); + + // Step 5: Calculate MD5 + const md5 = calculateMD5(buffer); + + logger.info("Image processed successfully", { + filePath, + format, + size: buffer.length, + md5, + base64Length: base64.length, + }); + + return { + base64, + md5, + format, + size: buffer.length, + }; + } catch (error) { + logger.error("Image processing failed", { + filePath, + error: error.message, + }); + throw error; + } +} diff --git a/index.js b/index.js new file mode 100755 index 0000000..88d2d00 --- /dev/null +++ b/index.js @@ -0,0 +1,534 @@ +import { AsyncLocalStorage } from "node:async_hooks"; +import * as crypto from "node:crypto"; +import { logger } from "./logger.js"; +import { streamManager } from "./stream-manager.js"; +import { WecomWebhook } from "./webhook.js"; // Now acts as parser & reply builder +import { + streamContext, + setApi, + getApi, + setOpenClawConfig, + streamMeta, + responseUrls, + getActiveStreamContext, + registerActiveStream, + unregisterActiveStream, + handleStreamError, + processInboundMessage, + deliverWecomReply, + getMessageStreamKey, +} from "./wecom-message-processor.js"; + +const DEFAULT_ACCOUNT_ID = "default"; +const THINKING_PLACEHOLDER = "思考中..."; + +// Webhook targets registry +const webhookTargets = new Map(); + +function normalizeWebhookPath(raw) { + const trimmed = (raw || "").trim(); + if (!trimmed) { + return "/"; + } + const withSlash = trimmed.startsWith("/") ? trimmed : `/${trimmed}`; + if (withSlash.length > 1 && withSlash.endsWith("/")) { + return withSlash.slice(0, -1); + } + return withSlash; +} + +function registerWebhookTarget(target) { + const key = normalizeWebhookPath(target.path); + const entry = { ...target, path: key }; + const existing = webhookTargets.get(key) ?? []; + webhookTargets.set(key, [...existing, entry]); + return () => { + const updated = (webhookTargets.get(key) ?? []).filter((e) => e !== entry); + if (updated.length > 0) { + webhookTargets.set(key, updated); + } else { + webhookTargets.delete(key); + } + }; +} + +// ============================================================================= +// Channel Plugin Definition +// ============================================================================= + +const wecomChannelPlugin = { + id: "wecom", + meta: { + id: "wecom", + label: "Enterprise WeChat", + selectionLabel: "Enterprise WeChat (AI Bot & Self-Built App)", + blurb: "Enterprise WeChat AI Bot and Self-Built App channel plugin.", + aliases: ["wecom", "wework"], + }, + capabilities: { + chatTypes: ["direct", "group"], + reactions: false, + threads: false, + media: true, + nativeCommands: false, + blockStreaming: true, // WeCom AI Bot requires stream-style responses. + }, + reload: { configPrefixes: ["channels.wecom"] }, + configSchema: { + schema: { + $schema: "http://json-schema.org/draft-07/schema#", + type: "object", + additionalProperties: false, + properties: { + enabled: { + type: "boolean", + description: "Enable WeCom channel", + default: true, + }, + applications: { + type: "array", + description: "List of WeCom applications (AI Bots or Self-Built Apps) to integrate.", + items: { + type: "object", + additionalProperties: false, + required: ["id", "token", "encodingAesKey"], + properties: { + id: { + type: "string", + description: "Unique ID for this WeCom application.", + }, + enabled: { + type: "boolean", + description: "Enable this specific WeCom application.", + default: true, + }, + token: { + type: "string", + description: "WeCom bot token from admin console.", + }, + encodingAesKey: { + type: "string", + description: "WeCom message encryption key (43 characters).", + minLength: 43, + maxLength: 43, + }, + isSelfBuiltApp: { + type: "boolean", + description: "Set to true if this is a Self-Built App (uses XML, passive replies). Default is AI Bot (JSON, streaming replies).", + default: false, + }, + corpId: { + type: "string", + description: "WeCom Corp ID (required for Self-Built Apps to send active replies like 'Thinking...')", + }, + corpSecret: { + type: "string", + description: "WeCom App Secret (required for Self-Built Apps to send active replies)", + }, + webhookPath: { + type: "string", + description: "Unique webhook path for this application (e.g., /webhooks/wecom/my-ai-bot). Defaults to /webhooks/wecom/ if not specified.", + }, + }, + }, + }, + }, + }, + uiHints: { + applications: { + label: "WeCom Applications", + itemLabel: "Application: {{item.id}}", + }, + token: { sensitive: true, label: "Bot Token" }, // Fallback, prefer item-level + encodingAesKey: { sensitive: true, label: "Encoding AES Key", help: "43-character encryption key from WeCom admin console" }, // Fallback, prefer item-level + isSelfBuiltApp: { label: "Self-Built App Mode", help: "Enable for Self-Built Apps (XML input/output). Disable for AI Bots (JSON input/output)." }, // Fallback, prefer item-level + }, + }, + config: { + listAccountIds: (cfg) => { + const wecom = cfg?.channels?.wecom; + if (!wecom || !wecom.enabled || !Array.isArray(wecom.applications)) { + return []; + } + return wecom.applications.filter(app => app.enabled !== false).map(app => app.id); + }, + resolveAccount: (cfg, accountId) => { + const wecom = cfg?.channels?.wecom; + if (!wecom || !wecom.enabled || !Array.isArray(wecom.applications)) { + return null; + } + const app = wecom.applications.find(app => app.id === accountId && app.enabled !== false); + if (!app) { + return null; + } + return { + id: app.id, + accountId: app.id, + enabled: app.enabled !== false, + token: app.token || "", + encodingAesKey: app.encodingAesKey || "", + webhookPath: app.webhookPath || normalizeWebhookPath(`/webhooks/wecom/${app.id}`), + config: app, // Pass the individual app config for `isSelfBuiltApp` etc. + isSelfBuiltApp: app.isSelfBuiltApp === true, + }; + }, + defaultAccountId: (cfg) => { + const wecom = cfg?.channels?.wecom; + if (!wecom || !wecom.enabled || !Array.isArray(wecom.applications) || wecom.applications.length === 0) { + return null; + } + return wecom.applications[0].id; // Use the first enabled application as default + }, + setAccountEnabled: ({ cfg, accountId, enabled }) => { + const wecom = cfg?.channels?.wecom; + if (!wecom || !Array.isArray(wecom.applications)) { + return cfg; + } + const app = wecom.applications.find(app => app.id === accountId); + if (app) { + app.enabled = enabled; + } + return cfg; + }, + deleteAccount: ({ cfg, accountId }) => { + const wecom = cfg?.channels?.wecom; + if (wecom && Array.isArray(wecom.applications)) { + wecom.applications = wecom.applications.filter(app => app.id !== accountId); + } + return cfg; + }, + }, + directory: { + self: async () => null, + listPeers: async () => [], + listGroups: async () => [], + }, + outbound: { + sendText: async ({ cfg, to, text, accountId }) => { + const userId = to.replace(/^wecom:/, ""); + const account = wecomChannelPlugin.config.resolveAccount(cfg, accountId); + if (!account) { + logger.error("WeCom outbound: account not found", { accountId, to }); + return { channel: "wecom", messageId: `fake_error_${Date.now()}` }; + } + const isSelfBuiltAppRequest = account?.isSelfBuiltApp === true; + + const ctx = streamContext.getStore(); + // StreamKey should be derived from the target 'to' or senderId as before + const streamKey = getMessageStreamKey({ fromUser: userId }); + const streamId = ctx?.streamId ?? getActiveStreamContext(streamKey); + + const result = await deliverWecomReply({ + payload: { text }, + senderId: userId, + streamId: isSelfBuiltAppRequest ? undefined : streamId, + isSelfBuiltAppRequest, + originalMessage: { fromUser: userId, chatId: userId, ToUserName: account.id, query: { nonce: crypto.randomBytes(8).toString('hex') } }, // toUser for passive reply is fromUser, FromUser is app.id + account, + }); + + if (result?.passiveReplyXml) { + logger.warn("WeCom outbound sendText: Passive XML reply generated, but cannot be returned directly from sendText. This should be handled by httpHandler.", { userId, accountId }); + } + + return { + channel: "wecom", + messageId: `msg_${isSelfBuiltAppRequest ? 'passive' : 'stream'}_${Date.now()}`, + }; + }, + sendMedia: async ({ cfg, to, text, mediaUrl, accountId }) => { + const userId = to.replace(/^wecom:/, ""); + const account = wecomChannelPlugin.config.resolveAccount(cfg, accountId); + if (!account) { + logger.error("WeCom outbound: account not found", { accountId, to }); + return { channel: "wecom", messageId: `fake_error_${Date.now()}` }; + } + const isSelfBuiltAppRequest = account?.isSelfBuiltApp === true; + + const ctx = streamContext.getStore(); + // StreamKey should be derived from the target 'to' or senderId as before + const streamKey = getMessageStreamKey({ fromUser: userId }); + const streamId = ctx?.streamId ?? getActiveStreamContext(streamKey); + + const result = await deliverWecomReply({ + payload: { text, mediaUrl }, + senderId: userId, + streamId: isSelfBuiltAppRequest ? undefined : streamId, + isSelfBuiltAppRequest, + originalMessage: { fromUser: userId, chatId: userId, ToUserName: account.id, query: { nonce: crypto.randomBytes(8).toString('hex') } }, // toUser for passive reply is fromUser, FromUser is app.id + account, + }); + + if (result?.passiveReplyXml) { + logger.warn("WeCom outbound sendMedia: Passive XML reply generated, but cannot be returned directly from sendMedia. This should be handled by httpHandler.", { userId, accountId }); + } + + return { + channel: "wecom", + messageId: `msg_${isSelfBuiltAppRequest ? 'passive_media' : 'stream_media'}_${Date.now()}`, + }; + }, + }, + gateway: { + startAccount: async (ctx) => { + // This startAccount will now be called for each configured application (identified by accountId) + const appAccount = ctx.account; // 'account' here is already resolved by resolveAccount and contains individual app config + + logger.info("WeCom gateway starting application", { + appId: appAccount.id, + webhookPath: appAccount.webhookPath, + isSelfBuiltApp: appAccount.isSelfBuiltApp, + }); + + const unregister = registerWebhookTarget({ + path: appAccount.webhookPath, + account: appAccount, + config: ctx.cfg, // Full OpenClaw config + }); + + return { + shutdown: async () => { + logger.info("WeCom gateway shutting down application", { appId: appAccount.id }); + unregister(); + }, + }; + }, + }, +}; + +// ============================================================================= +// HTTP Webhook Handler +// ============================================================================= + +async function wecomHttpHandler(req, res) { + const url = new URL(req.url || "", "http://localhost"); + const path = normalizeWebhookPath(url.pathname); + const targets = webhookTargets.get(path); + + if (!targets || targets.length === 0) { + logger.debug("WeCom HTTP request: no target found for path", { path }); + return false; // Not handled by this plugin + } + + // In multi-app scenario, there should ideally be only one target per path. + // If multiple targets map to the same path, we'll use the first one. + const target = targets[0]; + const appAccount = target.account; // This is the specific app's configuration + const fullConfig = target.config; // The full OpenClaw config + + if (!appAccount || !appAccount.enabled) { + logger.warn("WeCom HTTP request: target app not found or disabled", { path, appId: appAccount?.id }); + res.writeHead(503, { "Content-Type": "text/plain" }); + res.end("WeCom application not found or disabled"); + return true; + } + + const query = Object.fromEntries(url.searchParams); + logger.debug("WeCom HTTP request", { method: req.method, path, appId: appAccount.id }); + + const webhook = new WecomWebhook({ + token: appAccount.token, + encodingAesKey: appAccount.encodingAesKey, + }); + + // GET: URL Verification + if (req.method === "GET") { + const echo = webhook.handleVerify(query); + if (echo) { + res.writeHead(200, { "Content-Type": "text/plain" }); + res.end(echo); + logger.info("WeCom URL verification successful", { appId: appAccount.id }); + return true; + } + + res.writeHead(403, { "Content-Type": "text/plain" }); + res.end("Verification failed"); + logger.warn("WeCom URL verification failed", { appId: appAccount.id }); + return true; + } + + // POST: Message handling + if (req.method === "POST") { + const chunks = []; + for await (const chunk of req) { + chunks.push(chunk); + } + const rawBody = Buffer.concat(chunks).toString("utf-8"); + logger.debug("WeCom message received", { bodyLength: rawBody.length, appId: appAccount.id }); + + // handleMessage will parse JSON or XML based on content + const result = await webhook.handleMessage(query, rawBody); + if (result === WecomWebhook.DUPLICATE) { + res.writeHead(200, { "Content-Type": "text/plain" }); + res.end("success"); + return true; + } + if (!result) { + res.writeHead(400, { "Content-Type": "text/plain" }); + res.end("Bad Request"); + return true; + } + + const message = result.message || result; // Normalized message object + const { timestamp, nonce } = result.query; + + const isAIBot = !appAccount.isSelfBuiltApp; // Use app-specific flag + + if (isAIBot) { + if (result.stream) { // AI Bot stream refresh + const streamId = result.stream.id; + const stream = streamManager.getStream(streamId); + + if (!stream) { + logger.warn("Stream not found for refresh", { streamId, appId: appAccount.id }); + const streamResponse = webhook.buildStreamResponse( + streamId, + "会话已过期", + true, + timestamp, + nonce, + ); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(streamResponse); + return true; + } + + const meta = streamMeta.get(streamId); + if (meta?.mainResponseDone && !stream.finished) { + const idleMs = Date.now() - stream.updatedAt; + if (idleMs > 10000) { + logger.info("WeCom: closing stream due to idle timeout", { streamId, idleMs, appId: appAccount.id }); + try { + await streamManager.finishStream(streamId); + } catch (err) { + logger.error("WeCom: failed to finish stream", { streamId, error: err.message, appId: appAccount.id }); + } + } + } + + const streamResponse = webhook.buildStreamResponse( + streamId, + stream.content, + stream.finished, + timestamp, + nonce, + stream.finished && stream.msgItem.length > 0 ? { msgItem: stream.msgItem } : {}, + ); + + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(streamResponse); + + logger.debug("Stream refresh response sent", { + streamId, + contentLength: stream.content.length, + finished: stream.finished, + appId: appAccount.id, + }); + + if (stream.finished) { + setTimeout(() => { + streamManager.deleteStream(streamId); + streamMeta.delete(streamId); + }, 30 * 1000); + } + return true; + + } else { // AI Bot initial message/event + const streamId = `stream_${crypto.randomUUID()}`; + streamManager.createStream(streamId); + streamManager.appendStream(streamId, THINKING_PLACEHOLDER); + const streamResponse = webhook.buildStreamResponse(streamId, THINKING_PLACEHOLDER, false, timestamp, nonce); + + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(streamResponse); + + logger.info("Stream initiated (AI Bot)", { + streamId, + from: message.fromUser, + appId: appAccount.id, + }); + + processInboundMessage({ + message, + streamId, + timestamp, + nonce, + account: appAccount, + config: fullConfig, + }).catch(async (err) => { + logger.error("WeCom AI Bot message processing failed", { error: err.message, appId: appAccount.id }); + await handleStreamError(streamId, getMessageStreamKey(message), "处理消息时出错,请稍后再试。"); + }); + return true; + } + + } else { // Self-Built App: passive XML reply + logger.info("Self-Built App message/event received, processing for passive reply", { + from: message.fromUser, + appId: appAccount.id, + }); + + const processingResult = await processInboundMessage({ + message, + streamId: undefined, + timestamp, + nonce, + account: appAccount, + config: fullConfig, + }).catch(async (err) => { + logger.error("WeCom Self-Built App message processing failed", { error: err.message, appId: appAccount.id }); + return { passiveReplyXml: webhook.buildPassiveReplyXml( + message.fromUser, + appAccount.id, + "text", + "处理消息时出错,请稍后再试。", + Math.floor(Date.now() / 1000), + nonce, + )}; + }); + + if (processingResult?.passiveReplyXml) { + logger.info("WeCom Self-Built: returning passive XML reply", { replyXmlPreview: processingResult.passiveReplyXml.substring(0, 100), appId: appAccount.id }); + res.writeHead(200, { "Content-Type": "application/xml" }); + res.end(processingResult.passiveReplyXml); + return true; + } else { + logger.info("WeCom Self-Built: no immediate passive XML reply, returning success.", { appId: appAccount.id }); + res.writeHead(200, { "Content-Type": "text/plain" }); + res.end("success"); + return true; + } + } + } + + res.writeHead(405, { "Content-Type": "text/plain" }); + res.end("Method Not Allowed"); + return true; +} + +// ============================================================================= +// Plugin Registration +// ============================================================================= + +const plugin = { + id: "wecom", + name: "Enterprise WeChat", + description: "Enterprise WeChat AI Bot and Self-Built App channel plugin for OpenClaw (Bone Version)", + configSchema: { type: "object", additionalProperties: false, properties: {} }, + register(api) { + logger.info("WeCom plugin registering..."); + console.log('OpenClaw API structure:', JSON.stringify(api, null, 2)); + setApi(api); + setOpenClawConfig(api.config); + + api.registerChannel({ plugin: wecomChannelPlugin }); + logger.info("WeCom channel registered"); console.log("DEBUG OPENCLAW channelApi.reply keys:", Object.keys(api.runtime?.channel?.reply || api.channel?.reply || {})); console.log("DEBUG OPENCLAW channelApi.text keys:", Object.keys(api.runtime?.channel?.text || api.channel?.text || {})); + + // Register HTTP handler for webhooks. This handler will now dynamically resolve the application. + api.registerHttpHandler(wecomHttpHandler); + logger.info("WeCom HTTP handler registered"); + }, +}; + +export default plugin; +export const register = (api) => plugin.register(api); diff --git a/logger.js b/logger.js new file mode 100755 index 0000000..c4016ce --- /dev/null +++ b/logger.js @@ -0,0 +1,63 @@ +/** + * Structured logging for WeCom plugin + */ +const LEVELS = { + debug: 10, + info: 20, + warn: 30, + error: 40, + silent: 100, +}; + +function getEnvLogLevel() { + const raw = (process.env.WECOM_LOG_LEVEL || process.env.LOG_LEVEL || "info").toLowerCase(); + return Object.prototype.hasOwnProperty.call(LEVELS, raw) ? raw : "info"; +} + +export class Logger { + prefix; + level; + constructor(prefix = "[wecom]", level = getEnvLogLevel()) { + this.prefix = prefix; + this.level = level; + } + log(level, message, context) { + if (LEVELS[level] < LEVELS[this.level]) { + return; + } + const timestamp = new Date().toISOString(); + const contextStr = context ? ` ${JSON.stringify(context)}` : ""; + const logMessage = `${timestamp} ${level.toUpperCase()} ${this.prefix} ${message}${contextStr}`; + switch (level) { + case "debug": + console.debug(logMessage); + break; + case "info": + console.info(logMessage); + break; + case "warn": + console.warn(logMessage); + break; + case "error": + console.error(logMessage); + break; + } + } + debug(message, context) { + this.log("debug", message, context); + } + info(message, context) { + this.log("info", message, context); + } + warn(message, context) { + this.log("warn", message, context); + } + error(message, context) { + this.log("error", message, context); + } + child(subPrefix) { + return new Logger(`${this.prefix}:${subPrefix}`, this.level); + } +} +// Default logger instance +export const logger = new Logger(); diff --git a/openclaw.plugin.json b/openclaw.plugin.json new file mode 100755 index 0000000..b072b3f --- /dev/null +++ b/openclaw.plugin.json @@ -0,0 +1,13 @@ +{ + "id": "wecom", + "name": "OpenClaw WeCom", + "description": "Enterprise WeChat (WeCom) messaging channel plugin for OpenClaw", + "channels": [ + "wecom" + ], + "configSchema": { + "type": "object", + "additionalProperties": false, + "properties": {} + } +} diff --git a/package.json b/package.json new file mode 100755 index 0000000..70a7689 --- /dev/null +++ b/package.json @@ -0,0 +1,59 @@ +{ + "name": "@sunnoy/wecom", + "version": "1.2.1", + "description": "Enterprise WeChat AI Bot channel plugin for OpenClaw", + "type": "module", + "main": "index.js", + "files": [ + "index.js", + "crypto.js", + "dynamic-agent.js", + "image-processor.js", + "logger.js", + "README.md", + "README_ZH.md", + "LICENSE", + "CONTRIBUTING.md", + "stream-manager.js", + "utils.js", + "webhook.js", + "openclaw.plugin.json" + ], + "peerDependencies": { + "openclaw": "*" + }, + "scripts": { + "test": "node --test tests/*.test.js" + }, + "openclaw": { + "extensions": [ + "./index.js" + ], + "channel": { + "id": "wecom", + "label": "Enterprise WeChat", + "selectionLabel": "Enterprise WeChat (AI Bot)", + "docsPath": "/channels/wecom", + "docsLabel": "wecom", + "blurb": "Support for Enterprise WeChat (WeCom) AI Bot integration", + "order": 90, + "aliases": [ + "wecom", + "wework" + ] + }, + "install": { + "defaultChoice": "npm", + "npmSpec": "@sunnoy/wecom" + } + }, + "keywords": [ + "openclaw", + "wecom", + "enterprise-wechat", + "chat", + "plugin" + ], + "author": "", + "license": "ISC" +} diff --git a/stream-manager.js b/stream-manager.js new file mode 100755 index 0000000..7784a8b --- /dev/null +++ b/stream-manager.js @@ -0,0 +1,358 @@ +import { prepareImageForMsgItem } from "./image-processor.js"; +import { logger } from "./logger.js"; + +/** + * Streaming state manager for WeCom responses. + */ + +/** WeCom enforces a 20480-byte UTF-8 content limit per stream response. */ +const MAX_STREAM_BYTES = 20480; + +/** Truncate content to MAX_STREAM_BYTES if it exceeds the limit. */ +function enforceByteLimit(content) { + const contentBytes = Buffer.byteLength(content, "utf8"); + if (contentBytes <= MAX_STREAM_BYTES) { + return content; + } + logger.warn("Stream content exceeds byte limit, truncating", { bytes: contentBytes }); + // Truncate at byte boundary, then trim any broken trailing multi-byte char. + const buf = Buffer.from(content, "utf8").subarray(0, MAX_STREAM_BYTES); + return buf.toString("utf8"); +} + +class StreamManager { + constructor() { + // streamId -> { content, finished, updatedAt, feedbackId, msgItem, pendingImages } + this.streams = new Map(); + this._cleanupInterval = null; + } + + /** + * Start periodic cleanup lazily to avoid import-time side effects. + */ + startCleanup() { + if (this._cleanupInterval) { + return; + } + this._cleanupInterval = setInterval(() => this.cleanup(), 60 * 1000); + // Do not keep the process alive for this timer. + this._cleanupInterval.unref?.(); + } + + stopCleanup() { + if (!this._cleanupInterval) { + return; + } + clearInterval(this._cleanupInterval); + this._cleanupInterval = null; + } + + /** + * Create a new stream session. + * @param {string} streamId - Stream id + * @param {object} options - Optional settings + * @param {string} options.feedbackId - Optional feedback tracking id + */ + createStream(streamId, options = {}) { + this.startCleanup(); + logger.debug("Creating stream", { streamId, feedbackId: options.feedbackId }); + this.streams.set(streamId, { + content: "", + finished: false, + updatedAt: Date.now(), + feedbackId: options.feedbackId || null, + msgItem: [], + pendingImages: [], + }); + return streamId; + } + + /** + * Update stream content (replace mode). + * @param {string} streamId - Stream id + * @param {string} content - Message content (max 20480 bytes in UTF-8) + * @param {boolean} finished - Whether stream is completed + * @param {object} options - Optional settings + * @param {Array} options.msgItem - Mixed media list (supported when finished=true) + */ + updateStream(streamId, content, finished = false, options = {}) { + this.startCleanup(); + const stream = this.streams.get(streamId); + if (!stream) { + logger.warn("Stream not found for update", { streamId }); + return false; + } + + content = enforceByteLimit(content); + + stream.content = content; + stream.finished = finished; + stream.updatedAt = Date.now(); + + // Mixed media items are only valid for finished responses. + if (finished && options.msgItem && options.msgItem.length > 0) { + stream.msgItem = options.msgItem.slice(0, 10); + } + + logger.debug("Stream updated", { + streamId, + contentLength: content.length, + finished, + hasMsgItem: stream.msgItem.length > 0, + }); + + return true; + } + + /** + * Append content to an existing stream. + */ + appendStream(streamId, chunk) { + this.startCleanup(); + const stream = this.streams.get(streamId); + if (!stream) { + logger.warn("Stream not found for append", { streamId }); + return false; + } + + stream.content = enforceByteLimit(stream.content + chunk); + stream.updatedAt = Date.now(); + + logger.debug("Stream appended", { + streamId, + chunkLength: chunk.length, + totalLength: stream.content.length, + }); + + return true; + } + + /** + * Replace stream content if it currently contains only the placeholder, + * otherwise append normally. + * @param {string} streamId - Stream id + * @param {string} chunk - New content to write + * @param {string} placeholder - The placeholder text to detect and replace + * @returns {boolean} Whether the operation succeeded + */ + replaceIfPlaceholder(streamId, chunk, placeholder) { + this.startCleanup(); + const stream = this.streams.get(streamId); + if (!stream) { + logger.warn("Stream not found for replaceIfPlaceholder", { streamId }); + return false; + } + + if (stream.content.trim() === placeholder.trim()) { + stream.content = enforceByteLimit(chunk); + stream.updatedAt = Date.now(); + logger.debug("Stream placeholder replaced", { + streamId, + newContentLength: stream.content.length, + }); + return true; + } + + // Normal append behavior. + stream.content = enforceByteLimit(stream.content + chunk); + stream.updatedAt = Date.now(); + logger.debug("Stream appended (no placeholder)", { + streamId, + chunkLength: chunk.length, + totalLength: stream.content.length, + }); + return true; + } + + /** + * Queue image for inclusion when stream finishes + * @param {string} streamId - Stream id + * @param {string} imagePath - Absolute image path + * @returns {boolean} Whether enqueue succeeded + */ + queueImage(streamId, imagePath) { + this.startCleanup(); + const stream = this.streams.get(streamId); + if (!stream) { + logger.warn("Stream not found for queueImage", { streamId }); + return false; + } + + stream.pendingImages.push({ + path: imagePath, + queuedAt: Date.now(), + }); + + logger.debug("Image queued for stream", { + streamId, + imagePath, + totalQueued: stream.pendingImages.length, + }); + + return true; + } + + /** + * Process all pending images and build msgItem array + * @param {string} streamId - Stream id + * @returns {Promise} msg_item entries + */ + async processPendingImages(streamId) { + const stream = this.streams.get(streamId); + if (!stream || stream.pendingImages.length === 0) { + return []; + } + + logger.debug("Processing pending images", { + streamId, + count: stream.pendingImages.length, + }); + + const msgItems = []; + + for (const img of stream.pendingImages) { + try { + // Limit to 10 images per WeCom API spec + if (msgItems.length >= 10) { + logger.warn("Stream exceeded 10 image limit, truncating", { + streamId, + total: stream.pendingImages.length, + processed: msgItems.length, + }); + break; + } + + const processed = await prepareImageForMsgItem(img.path); + msgItems.push({ + msgtype: "image", + image: { + base64: processed.base64, + md5: processed.md5, + }, + }); + + logger.debug("Image processed successfully", { + streamId, + imagePath: img.path, + format: processed.format, + size: processed.size, + }); + } catch (error) { + logger.error("Failed to process image for stream", { + streamId, + imagePath: img.path, + error: error.message, + }); + // Keep going even when one image fails. + } + } + + logger.info("Completed processing images for stream", { + streamId, + processed: msgItems.length, + pending: stream.pendingImages.length, + }); + + return msgItems; + } + + /** + * Mark the stream as finished and process queued images. + */ + async finishStream(streamId) { + this.startCleanup(); + const stream = this.streams.get(streamId); + if (!stream) { + logger.warn("Stream not found for finish", { streamId }); + return false; + } + + if (stream.finished) { + return true; + } + + // Process pending images before finalizing the stream. + if (stream.pendingImages.length > 0) { + stream.msgItem = await this.processPendingImages(streamId); + stream.pendingImages = []; + } + + stream.finished = true; + stream.updatedAt = Date.now(); + + logger.info("Stream finished", { + streamId, + contentLength: stream.content.length, + imageCount: stream.msgItem.length, + }); + + return true; + } + + /** + * Get current stream state. + */ + getStream(streamId) { + return this.streams.get(streamId); + } + + /** + * Check whether a stream exists. + */ + hasStream(streamId) { + return this.streams.has(streamId); + } + + /** + * Delete a stream. + */ + deleteStream(streamId) { + const deleted = this.streams.delete(streamId); + if (deleted) { + logger.debug("Stream deleted", { streamId }); + } + return deleted; + } + + /** + * Remove streams that were inactive for over 10 minutes. + */ + cleanup() { + const now = Date.now(); + const timeout = 10 * 60 * 1000; + let cleaned = 0; + + for (const [streamId, stream] of this.streams.entries()) { + if (now - stream.updatedAt > timeout) { + this.streams.delete(streamId); + cleaned++; + } + } + + if (cleaned > 0) { + logger.info("Cleaned up expired streams", { count: cleaned }); + } + } + + /** + * Get in-memory stream stats. + */ + getStats() { + const total = this.streams.size; + let finished = 0; + let active = 0; + + for (const stream of this.streams.values()) { + if (stream.finished) { + finished++; + } else { + active++; + } + } + + return { total, finished, active }; + } +} + +// Shared singleton instance used by the plugin runtime. +export const streamManager = new StreamManager(); diff --git a/tests/outbound.test.js b/tests/outbound.test.js new file mode 100755 index 0000000..18982b1 --- /dev/null +++ b/tests/outbound.test.js @@ -0,0 +1,308 @@ +import { describe, it, beforeEach, afterEach } from 'node:test'; +import assert from 'node:assert'; +import { AsyncLocalStorage } from 'node:async_hooks'; + +/** + * Unit tests for outbound message delivery with three-layer fallback + * + * Test coverage: + * 1. Layer 1: Active stream delivery + * 2. Layer 2: response_url fallback + * 3. Layer 3: Warning log when no channel available + */ + +describe('outbound.sendText - three-layer fallback', () => { + // Mock dependencies + let streamManager; + let responseUrls; + let streamContext; + let fetchMock; + let mockStreams; + + beforeEach(() => { + // Reset mocks + mockStreams = new Map(); + streamManager = { + hasStream: (id) => mockStreams.has(id), + getStream: (id) => mockStreams.get(id), + replaceIfPlaceholder: () => {}, + }; + responseUrls = new Map(); + fetchMock = global.fetch; + global.fetch = async (url, options) => ({ ok: true }); + }); + + afterEach(() => { + global.fetch = fetchMock; + mockStreams.clear(); + responseUrls.clear(); + }); + + it('Layer 1: should deliver via active stream when available', async () => { + // Setup: Active stream exists + const streamId = 'stream_test_123'; + const userId = 'user_abc'; + mockStreams.set(streamId, { finished: false, content: 'thinking...' }); + + // Simulate streamContext having streamId + streamContext = new AsyncLocalStorage(); + streamContext.run({ streamId }, async () => { + // Simulate outbound.sendText + const ctx = streamContext.getStore(); + const activeStreamId = ctx?.streamId; + + assert.strictEqual(activeStreamId, streamId); + assert.strictEqual(streamManager.hasStream(streamId), true); + }); + }); + + it('Layer 2: should use response_url fallback when stream closed', async () => { + // Setup: Stream is closed, but response_url is available + const streamId = 'stream_test_123'; + const userId = 'user_abc'; + const testUrl = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=test'; + + // Stream is finished + mockStreams.set(streamId, { finished: true, content: 'done' }); + + // response_url saved + responseUrls.set(userId, { + url: testUrl, + expiresAt: Date.now() + 60 * 60 * 1000, + used: false, + }); + + // Simulate fallback logic + const stream = streamManager.getStream(streamId); + const canUseStream = stream && !stream.finished; + assert.strictEqual(canUseStream, false); + + const saved = responseUrls.get(userId); + const canUseFallback = saved && !saved.used && Date.now() < saved.expiresAt; + assert.strictEqual(canUseFallback, true); + + // Simulate fetch call + if (canUseFallback) { + const response = await fetch(saved.url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ msgtype: 'text', text: { content: 'test' } }), + }); + assert.strictEqual(response.ok, true); + } + }); + + it('Layer 2: should not use response_url if already used', async () => { + // Setup: response_url was already used + const userId = 'user_abc'; + const testUrl = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=test'; + + responseUrls.set(userId, { + url: testUrl, + expiresAt: Date.now() + 60 * 60 * 1000, + used: true, // Already used + }); + + const saved = responseUrls.get(userId); + const canUseFallback = saved && !saved.used && Date.now() < saved.expiresAt; + assert.strictEqual(canUseFallback, false); + }); + + it('Layer 2: should not use response_url if expired', async () => { + // Setup: response_url has expired + const userId = 'user_abc'; + const testUrl = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=test'; + + responseUrls.set(userId, { + url: testUrl, + expiresAt: Date.now() - 1000, // Expired 1 second ago + used: false, + }); + + const saved = responseUrls.get(userId); + const canUseFallback = saved && !saved.used && Date.now() < saved.expiresAt; + assert.strictEqual(canUseFallback, false); + }); + + it('Layer 3: should log warning when no delivery channel available', async () => { + // Setup: No active stream, no response_url + const userId = 'user_abc'; + + const stream = streamManager.getStream('nonexistent'); + const saved = responseUrls.get(userId); + + const canUseStream = !!(stream && !stream.finished); + const canUseFallback = !!(saved && !saved.used && Date.now() < saved.expiresAt); + + assert.strictEqual(canUseStream, false); + assert.strictEqual(canUseFallback, false); + + // In real code, this would log: logger.warn("WeCom outbound: no delivery channel available...") + }); +}); + +describe('stream refresh handler - delayed close logic', () => { + let streamMeta; + let mockStreams; + + beforeEach(() => { + streamMeta = new Map(); + mockStreams = new Map(); + }); + + it('should close stream when main response done + idle for 10s', () => { + const streamId = 'stream_test_123'; + const now = Date.now(); + + // Setup: Main response done, stream idle for 11s + streamMeta.set(streamId, { + mainResponseDone: true, + doneAt: now - 11000, + }); + + mockStreams.set(streamId, { + finished: false, + updatedAt: now - 11000, + content: 'done', + }); + + // Simulate refresh handler logic + const stream = mockStreams.get(streamId); + const meta = streamMeta.get(streamId); + const idleMs = now - stream.updatedAt; + + const shouldClose = meta?.mainResponseDone && !stream.finished && idleMs > 10000; + assert.strictEqual(shouldClose, true); + }); + + it('should NOT close stream when idle time < 10s', () => { + const streamId = 'stream_test_123'; + const now = Date.now(); + + // Setup: Main response done, but only idle for 5s + streamMeta.set(streamId, { + mainResponseDone: true, + doneAt: now - 5000, + }); + + mockStreams.set(streamId, { + finished: false, + updatedAt: now - 5000, + content: 'done', + }); + + // Simulate refresh handler logic + const stream = mockStreams.get(streamId); + const meta = streamMeta.get(streamId); + const idleMs = now - stream.updatedAt; + + const shouldClose = meta?.mainResponseDone && !stream.finished && idleMs > 10000; + assert.strictEqual(shouldClose, false); + }); + + it('should NOT close stream when main response not done', () => { + const streamId = 'stream_test_123'; + const now = Date.now(); + + // Setup: Stream idle for 11s, but main response not done + streamMeta.set(streamId, { + mainResponseDone: false, + doneAt: now - 11000, + }); + + mockStreams.set(streamId, { + finished: false, + updatedAt: now - 11000, + content: 'processing...', + }); + + // Simulate refresh handler logic + const stream = mockStreams.get(streamId); + const meta = streamMeta.get(streamId); + + const shouldClose = meta?.mainResponseDone && !stream.finished; + assert.strictEqual(shouldClose, false); + }); + + it('should NOT close stream when already finished', () => { + const streamId = 'stream_test_123'; + const now = Date.now(); + + // Setup: Stream already finished + streamMeta.set(streamId, { + mainResponseDone: true, + doneAt: now - 15000, + }); + + mockStreams.set(streamId, { + finished: true, + updatedAt: now - 11000, + content: 'done', + }); + + // Simulate refresh handler logic + const stream = mockStreams.get(streamId); + const meta = streamMeta.get(streamId); + + const shouldClose = meta?.mainResponseDone && !stream.finished; + assert.strictEqual(shouldClose, false); + }); +}); + +describe('safety net - emergency stream cleanup', () => { + let mockStreams; + let streamMeta; + + beforeEach(() => { + mockStreams = new Map(); + streamMeta = new Map(); + }); + + it('should close idle stream after 30s safety timeout', () => { + const streamId = 'stream_test_123'; + const now = Date.now(); + + // Setup: Stream idle for 31s (exceeds safety net timeout) + streamMeta.set(streamId, { + mainResponseDone: true, + doneAt: now - 31000, + }); + + mockStreams.set(streamId, { + finished: false, + updatedAt: now - 31000, + content: 'done', + }); + + // Simulate safety net logic + const stream = mockStreams.get(streamId); + const idleMs = now - stream.updatedAt; + + const shouldForceClose = stream && !stream.finished && idleMs > 30000; + assert.strictEqual(shouldForceClose, true); + }); + + it('should NOT close stream with recent activity', () => { + const streamId = 'stream_test_123'; + const now = Date.now(); + + // Setup: Stream updated 5s ago + streamMeta.set(streamId, { + mainResponseDone: true, + doneAt: now - 5000, + }); + + mockStreams.set(streamId, { + finished: false, + updatedAt: now - 5000, + content: 'done', + }); + + // Simulate safety net logic + const stream = mockStreams.get(streamId); + const idleMs = now - stream.updatedAt; + + const shouldForceClose = stream && !stream.finished && idleMs > 30000; + assert.strictEqual(shouldForceClose, false); + }); +}); diff --git a/utils.js b/utils.js new file mode 100755 index 0000000..b5480b0 --- /dev/null +++ b/utils.js @@ -0,0 +1,89 @@ +/** + * Utility helpers for the WeCom plugin. + */ +export class TTLCache { + options; + cache = new Map(); + checkPeriod; + cleanupTimer; + constructor(options) { + this.options = options; + this.checkPeriod = options.checkPeriod || options.ttl; + this.startCleanup(); + } + set(key, value, ttl) { + const expiresAt = Date.now() + (ttl || this.options.ttl); + this.cache.set(key, { value, expiresAt }); + } + get(key) { + const entry = this.cache.get(key); + if (!entry) { + return undefined; + } + if (Date.now() > entry.expiresAt) { + this.cache.delete(key); + return undefined; + } + return entry.value; + } + has(key) { + return this.get(key) !== undefined; + } + delete(key) { + return this.cache.delete(key); + } + clear() { + this.cache.clear(); + } + size() { + this.cleanup(); + return this.cache.size; + } + cleanup() { + const now = Date.now(); + for (const [key, entry] of this.cache.entries()) { + if (now > entry.expiresAt) { + this.cache.delete(key); + } + } + } + startCleanup() { + this.cleanupTimer = setInterval(() => { + this.cleanup(); + }, this.checkPeriod); + // Don't prevent process from exiting + if (this.cleanupTimer.unref) { + this.cleanupTimer.unref(); + } + } + destroy() { + if (this.cleanupTimer) { + clearInterval(this.cleanupTimer); + } + this.cache.clear(); + } +} +// ============================================================================ +// Message deduplication +// ============================================================================ +export class MessageDeduplicator { + seen = new TTLCache({ ttl: 300000 }); // 5 minutes + isDuplicate(msgId) { + if (this.seen.has(msgId)) { + return true; + } + this.seen.set(msgId, true); + return false; + } + markAsSeen(msgId) { + this.seen.set(msgId, true); + } +} +// ============================================================================ +// Constants +// ============================================================================ +export const CONSTANTS = { + // AES/Crypto + AES_BLOCK_SIZE: 32, + AES_KEY_LENGTH: 43, +}; diff --git a/webhook.js b/webhook.js new file mode 100755 index 0000000..d382bae --- /dev/null +++ b/webhook.js @@ -0,0 +1,383 @@ +import { WecomCrypto } from "./crypto.js"; +import { logger } from "./logger.js"; +import { MessageDeduplicator } from "./utils.js"; + +/** + * Basic XML parser for WeCom messages. + */ +function parseWecomXml(xmlString) { + const result = {}; + const cdataRegex = /<(\w+)><\/\1>/gs; + const tagRegex = /<(\w+)>(.*?)<\/\1>/gs; + + let match; + while ((match = cdataRegex.exec(xmlString)) !== null) { + result[match[1]] = match[2]; + } + while ((match = tagRegex.exec(xmlString)) !== null) { + if (!result[match[1]]) { + result[match[1]] = match[2]; + } + } + return result; +} + +/** + * WeCom AI Bot & Self-Built App Webhook Handler + * Supports both JSON (AI Bot) and XML (Self-Built App) formats. + */ +export class WecomWebhook { + config; + crypto; + deduplicator = new MessageDeduplicator(); + + /** Sentinel returned when a message is a duplicate (caller should ACK 200). */ + static DUPLICATE = Symbol.for("wecom.duplicate"); + + constructor(config) { + this.config = config; + this.crypto = new WecomCrypto(config.token, config.encodingAesKey); + logger.debug("WecomWebhook initialized (AI Bot & Self-Built App mode)"); + } + + handleVerify(query) { + const signature = query.msg_signature; + const timestamp = query.timestamp; + const nonce = query.nonce; + const echostr = query.echostr; + + if (!signature || !timestamp || !nonce || !echostr) { + logger.warn("Missing parameters in verify request", { query }); + return null; + } + + logger.debug("Handling verify request", { timestamp, nonce }); + + const calcSignature = this.crypto.getSignature(timestamp, nonce, echostr); + if (calcSignature !== signature) { + logger.error("Signature mismatch in verify", { + expected: signature, + calculated: calcSignature, + }); + return null; + } + + try { + const result = this.crypto.decrypt(echostr); + logger.info("URL verification successful"); + return result.message; + } catch (e) { + logger.error("Decrypt failed in verify", { + error: e instanceof Error ? e.message : String(e), + }); + return null; + } + } + + async handleMessage(query, rawBody) { + const signature = query.msg_signature; + const timestamp = query.timestamp; + const nonce = query.nonce; + + if (!signature || !timestamp || !nonce) { + logger.warn("Missing parameters in message request", { query }); + return null; + } + + let encrypt; + let isXml = false; + + if (rawBody.trim().startsWith("")) { + isXml = true; + try { + const parsedXml = parseWecomXml(rawBody); + encrypt = parsedXml.Encrypt; + logger.debug("Parsed request body as XML", { hasEncrypt: !!encrypt }); + } catch (e) { + logger.error("Failed to parse request body as XML", { + error: e instanceof Error ? e.message : String(e), + body: rawBody.substring(0, 200), + }); + return null; + } + } else { + try { + const jsonBody = JSON.parse(rawBody); + encrypt = jsonBody.encrypt; + logger.debug("Parsed request body as JSON", { hasEncrypt: !!encrypt }); + } catch (e) { + logger.error("Failed to parse request body as JSON", { + error: e instanceof Error ? e.message : String(e), + body: rawBody.substring(0, 200), + }); + return null; + } + } + + if (!encrypt) { + logger.error("No encrypt field in body (JSON or XML)"); + return null; + } + + const calcSignature = this.crypto.getSignature(timestamp, nonce, encrypt); + if (calcSignature !== signature) { + logger.error("Signature mismatch in message", { + expected: signature, + calculated: calcSignature, + }); + return null; + } + + let decryptedContent; + try { + const result = this.crypto.decrypt(encrypt); + decryptedContent = result.message; + logger.debug("Decrypted content", { content: decryptedContent.substring(0, 300) }); + } catch (e) { + logger.error("Message decrypt failed", { + error: e instanceof Error ? e.message : String(e), + }); + return null; + } + + let data; + try { + if (isXml) { + data = parseWecomXml(decryptedContent); + if (data.CreateTime) data.CreateTime = parseInt(data.CreateTime, 10); + if (data.AgentID) data.AgentID = parseInt(data.AgentID, 10); + data.msgtype = data.MsgType?.toLowerCase(); + data.from = { userid: data.FromUserName }; + data.chatid = data.ToUserName; // Assuming ToUserName for self-built is the chat target for direct messages + data.msgid = data.MsgId; + data.content = data.Content; + data.image = { url: data.PicUrl, mediaId: data.MediaId }; + data.voice = { mediaId: data.MediaId, format: data.Format }; + data.video = { mediaId: data.MediaId, thumbMediaId: data.ThumbMediaId }; + data.location = { latitude: data.Location_X, longitude: data.Location_Y, scale: data.Scale, name: data.Label }; + data.link = { title: data.Title, description: data.Description, url: data.Url, picUrl: data.PicUrl }; + data.event = { event_type: data.Event?.toLowerCase(), event_key: data.EventKey, latitude: data.Latitude, longitude: data.Longitude, precision: data.Precision }; + logger.debug("Parsed decrypted content as XML", { msgtype: data.msgtype, keys: Object.keys(data) }); + } else { + data = JSON.parse(decryptedContent); + logger.debug("Parsed decrypted content as JSON", { + msgtype: data.msgtype, + keys: Object.keys(data), + text: JSON.stringify(data.text), + }); + } + } catch (e) { + logger.error("Failed to parse decrypted content (JSON/XML)", { + error: e instanceof Error ? e.message : String(e), + content: decryptedContent.substring(0, 200), + isXmlDetected: isXml, + }); + return null; + } + + const msgtype = data.msgtype; + + let normalizedMessage = null; + const commonFields = { + msgId: data.msgid || `msg_${Date.now()}`, + fromUser: data.from?.userid || data.FromUserName || "", + chatType: data.chattype || "single", + chatId: data.chatid || data.ToUserName || "", + aibotId: data.aibotid || "", + responseUrl: data.response_url || "", + query: { timestamp, nonce }, + }; + + if (msgtype === "text") { + normalizedMessage = { + ...commonFields, + msgType: "text", + content: data.text?.content || data.Content || "", + quote: data.quote + ? { + msgType: data.quote.msgtype, + content: data.quote.text?.content || data.quote.image?.url || "", + } + : null, + }; + } else if (msgtype === "stream") { + normalizedMessage = { + stream: { id: data.stream?.id }, + query: { timestamp, nonce }, + rawData: data, + }; + } else if (msgtype === "image") { + normalizedMessage = { + ...commonFields, + msgType: "image", + imageUrl: data.image?.url || data.PicUrl || "", + mediaId: data.image?.mediaId || data.MediaId || "", + }; + } else if (msgtype === "voice") { + const content = data.voice?.content || data.Recognition || ""; + if (content) { + normalizedMessage = { + ...commonFields, + msgType: "text", + content, + originalType: "voice", + mediaId: data.voice?.mediaId || data.MediaId || "", + format: data.voice?.format || data.Format || "", + }; + } else { + normalizedMessage = { + ...commonFields, + msgType: "voice", + mediaId: data.voice?.mediaId || data.MediaId || "", + format: data.voice?.format || data.Format || "", + }; + } + } else if (msgtype === "file") { + normalizedMessage = { + ...commonFields, + msgType: "file", + fileUrl: data.file?.url || "", + fileName: data.file?.name || data.file?.filename || "", + mediaId: data.file?.mediaId || data.MediaId || "", + }; + } else if (msgtype === "location") { + const latitude = data.location?.latitude || data.Location_X || ""; + const longitude = data.location?.longitude || data.Location_Y || ""; + const name = data.location?.name || data.location?.label || data.Label || ""; + const content = name + ? `[位置] ${name} (${latitude}, ${longitude})` + : `[位置] ${latitude}, ${longitude}`; + + normalizedMessage = { + ...commonFields, + msgType: "text", + content, + location: { latitude, longitude, name, scale: data.location?.scale || data.Scale || "" }, + }; + } else if (msgtype === "link") { + const title = data.link?.title || data.Title || ""; + const description = data.link?.description || data.Description || ""; + const url = data.link?.url || data.Url || ""; + const picUrl = data.link?.picUrl || data.PicUrl || ""; + + const parts = []; + if (title) parts.push(`[链接] ${title}`); + if (description) parts.push(description); + if (url) parts.push(url); + const content = parts.join("\n") || "[链接]"; + + normalizedMessage = { + ...commonFields, + msgType: "text", + content, + link: { title, description, url, picUrl }, + }; + } else if (msgtype === "event") { + logger.info("Received event", { event: data.event || data.Event, isXml }); + normalizedMessage = { + event: data.event || data.Event, + event_type: data.event?.event_type || data.Event?.toLowerCase(), + fromUser: data.from?.userid || data.FromUserName || "", + agentId: data.agentid || data.AgentID || "", + eventKey: data.event?.event_key || data.EventKey || "", + latitude: data.Latitude, + longitude: data.Longitude, + precision: data.Precision, + }; + } else if (msgtype === "mixed") { + logger.warn("Mixed messages are not fully supported in bone version. Converting to text.", { msgtype, isXmlDetected: isXml }); + const msgItems = data.mixed?.msg_item || []; + const textParts = []; + for (const item of msgItems) { + if (item.msgtype === "text" && item.text?.content) { + textParts.push(item.text.content); + } + } + const content = textParts.join("\n"); + normalizedMessage = { + ...commonFields, + msgType: "text", // Fallback to text + content: content || "[Mixed message received]", + }; + } else { + logger.warn("Unknown message type or unsupported in bone version", { msgtype, isXmlDetected: isXml }); + return null; + } + + if (normalizedMessage && normalizedMessage.msgId && this.deduplicator.isDuplicate(normalizedMessage.msgId)) { + logger.debug("Duplicate message ignored", { msgId: normalizedMessage.msgId }); + return WecomWebhook.DUPLICATE; + } + + if (normalizedMessage && normalizedMessage.message) { + normalizedMessage.message.query = { timestamp, nonce }; + } else if (normalizedMessage) { + normalizedMessage.query = { timestamp, nonce }; + } + + return normalizedMessage; + } + + buildStreamResponse(streamId, content, finish, timestamp, nonce, options = {}) { + const stream = { + id: streamId, + finish: finish, + content: content, + }; + + if (options.msgItem && options.msgItem.length > 0) { + stream.msg_item = options.msgItem; + } + + if (options.feedbackId) { + stream.feedback = { id: options.feedbackId }; + } + + const plain = { + msgtype: "stream", + stream: stream, + }; + + const plainStr = JSON.stringify(plain); + const encrypted = this.crypto.encrypt(plainStr); + const signature = this.crypto.getSignature(timestamp, nonce, encrypted); + + return JSON.stringify({ + encrypt: encrypted, + msgsignature: signature, + timestamp: timestamp, + nonce: nonce, + }); + } + + buildPassiveReplyXml(toUser, fromUser, msgType, contentOrMediaId, timestamp, nonce, options = {}) { + let innerXml = ""; + if (msgType === "text") { + innerXml = ` + + ${timestamp} + + `; + } else if (msgType === "image") { + innerXml = ` + + ${timestamp} + + `; + } else { + logger.warn("Unsupported message type for passive XML reply", { msgType }); + return null; + } + + const plainXml = `${innerXml}`; + const encrypted = this.crypto.encrypt(plainXml); + const signature = this.crypto.getSignature(timestamp, nonce, encrypted); + + return ` + + + ${timestamp} + + `; + } +} diff --git a/wecom-api.js b/wecom-api.js new file mode 100755 index 0000000..ffd66d5 --- /dev/null +++ b/wecom-api.js @@ -0,0 +1,59 @@ +import { logger } from "./logger.js"; + +class WeComApiClient { + constructor() { + this.tokens = new Map(); + } + + async getAccessToken(corpId, corpSecret) { + if (!corpId || !corpSecret) { + throw new Error("Missing corpId or corpSecret for WeCom API call"); + } + + const key = `${corpId}:${corpSecret}`; + const cached = this.tokens.get(key); + // Allow 5 minutes buffer before expiration + if (cached && cached.expiresAt > Date.now() + 5 * 60 * 1000) { + return cached.token; + } + + logger.debug("WeCom API: Fetching new access token", { corpId }); + const res = await fetch(`https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=${corpId}&corpsecret=${corpSecret}`); + const data = await res.json(); + + if (data.errcode !== 0) { + throw new Error(`WeCom gettoken failed: ${data.errmsg} (${data.errcode})`); + } + + this.tokens.set(key, { + token: data.access_token, + // expires_in is usually 7200 seconds + expiresAt: Date.now() + data.expires_in * 1000, + }); + + return data.access_token; + } + + async sendTextMessage(corpId, corpSecret, agentId, toUser, text) { + logger.debug("WeCom API: Sending async text message", { corpId, agentId, toUser, textPreview: text.substring(0, 50) }); + const token = await this.getAccessToken(corpId, corpSecret); + const res = await fetch(`https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=${token}`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + touser: toUser, + msgtype: "text", + agentid: agentId, + text: { content: text }, + }), + }); + const data = await res.json(); + if (data.errcode !== 0) { + logger.error("WeCom API: message/send failed", { error: data.errmsg, code: data.errcode, corpId, agentId, toUser }); + throw new Error(`WeCom message/send failed: ${data.errmsg} (${data.errcode})`); + } + return data; + } +} + +export const wecomApi = new WeComApiClient(); diff --git a/wecom-message-processor.js b/wecom-message-processor.js new file mode 100755 index 0000000..739a602 --- /dev/null +++ b/wecom-message-processor.js @@ -0,0 +1,801 @@ +import { AsyncLocalStorage } from "node:async_hooks"; +import * as crypto from "node:crypto"; +import { mkdirSync, existsSync, writeFileSync } from "node:fs"; +import { join } from "node:path"; +import { logger } from "./logger.js"; +import { streamManager } from "./stream-manager.js"; +import { WecomWebhook } from "./webhook.js"; // WecomWebhook is now the parser +import { wecomApi } from "./wecom-api.js"; + +const DEFAULT_ACCOUNT_ID = "default"; +const THINKING_PLACEHOLDER = "思考中..."; +const MEDIA_CACHE_DIR = join(process.env.HOME || "/tmp", ".openclaw", "media", "wecom"); + +// ============================================================================= +// Media Download and Decryption +// ============================================================================= + +/** + * Download and decrypt a WeCom encrypted image. + * @param {string} imageUrl - Encrypted image URL from WeCom + * @param {string} encodingAesKey - AES key + * @param {string} token - Token + * @returns {Promise} Local path to decrypted image + */ +async function downloadAndDecryptImage(imageUrl, encodingAesKey, token) { + if (!existsSync(MEDIA_CACHE_DIR)) { + mkdirSync(MEDIA_CACHE_DIR, { recursive: true }); + } + + logger.info("Downloading encrypted image", { url: imageUrl.substring(0, 80) }); + const response = await fetch(imageUrl); + if (!response.ok) { + throw new Error(`Failed to download image: ${response.status}`); + } + const encryptedBuffer = Buffer.from(await response.arrayBuffer()); + logger.debug("Downloaded encrypted image", { size: encryptedBuffer.length }); + + const wecomCrypto = new WecomWebhook({token, encodingAesKey}).crypto; + const decryptedBuffer = wecomCrypto.decryptMedia(encryptedBuffer); + + // Detect image type via magic bytes. + let ext = "jpg"; + if (decryptedBuffer[0] === 0x89 && decryptedBuffer[1] === 0x50) { + ext = "png"; + } else if (decryptedBuffer[0] === 0x47 && decryptedBuffer[1] === 0x49) { + ext = "gif"; + } + + const filename = `wecom_${Date.now()}_${Math.random().toString(36).substring(2, 8)}.${ext}`; + const localPath = join(MEDIA_CACHE_DIR, filename); + writeFileSync(localPath, decryptedBuffer); + + const mimeType = ext === "png" ? "image/png" : ext === "gif" ? "image/gif" : "image/jpeg"; + logger.info("Image decrypted and saved", { path: localPath, size: decryptedBuffer.length, mimeType }); + return { localPath, mimeType }; +} + +/** + * Download and decrypt a file from WeCom. + * @param {string} fileUrl - File download URL + * @param {string} fileName - Original file name + * @param {string} encodingAesKey - AES key for decryption + * @param {string} token - Token for decryption + * @returns {Promise<{localPath: string, effectiveFileName: string}>} Local path and resolved filename + */ +async function downloadWecomFile(fileUrl, fileName, encodingAesKey, token) { + if (!existsSync(MEDIA_CACHE_DIR)) { + mkdirSync(MEDIA_CACHE_DIR, { recursive: true }); + } + + logger.info("Downloading encrypted file", { url: fileUrl.substring(0, 80), name: fileName }); + const response = await fetch(fileUrl); + if (!response.ok) { + throw new Error(`Failed to download file: ${response.status}`); + } + const encryptedBuffer = Buffer.from(await response.arrayBuffer()); + + let effectiveFileName = fileName; + if (!effectiveFileName) { + const contentDisposition = response.headers.get("content-disposition"); + if (contentDisposition) { + const filenameMatch = contentDisposition.match(/filename\*?=(?:UTF-8'')?["']?([^"';\n]+)["']?/i); + if (filenameMatch && filenameMatch[1]) { + effectiveFileName = decodeURIComponent(filenameMatch[1]); + logger.info("Extracted filename from Content-Disposition", { name: effectiveFileName }); + } + } + } + + const wecomCrypto = new WecomWebhook({token, encodingAesKey}).crypto; + const decryptedBuffer = wecomCrypto.decryptMedia(encryptedBuffer); + + const safeName = (effectiveFileName || `file_${Date.now()}`).replace(/[/\\:*?"<>|]/g, "_"); + const localPath = join(MEDIA_CACHE_DIR, `${Date.now()}_${safeName}`); + writeFileSync(localPath, decryptedBuffer); + + logger.info("File decrypted and saved", { path: localPath, size: decryptedBuffer.length }); + return { localPath, effectiveFileName: effectiveFileName || fileName }; +} + +/** + * Guess MIME type from file extension. + */ +function guessMimeType(fileName) { + const ext = (fileName || "").split(".").pop()?.toLowerCase() || ""; + const mimeMap = { + pdf: "application/pdf", + doc: "application/msword", + docx: "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + xls: "application/vnd.ms-excel", + xlsx: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ppt: "application/vnd.ms-powerpoint", + pptx: "application/vnd.openxmlformats-officedocument.presentationml.presentation", + txt: "text/plain", + csv: "text/csv", + zip: "application/zip", + png: "image/png", + jpg: "image/jpeg", + jpeg: "image/jpeg", + gif: "image/gif", + mp4: "video/mp4", + mp3: "audio/mpeg", + }; + return mimeMap[ext] || "application/octet-stream"; +} + +// Runtime state (module-level singleton) +let _api = null; +let _openclawConfig = null; + +// Track active stream for each user, so outbound messages (like reset confirmation) +// can be added to the correct stream instead of using response_url +const activeStreams = new Map(); +const activeStreamHistory = new Map(); + +// Store stream metadata for delayed finish (main response done flag) +export const streamMeta = new Map(); + +// Store response_url for fallback delivery after stream closes +// response_url is valid for 1 hour and can be used only once +export const responseUrls = new Map(); + +// Periodic cleanup for streamMeta and expired responseUrls to prevent memory leaks. +setInterval(() => { + const now = Date.now(); + for (const streamId of streamMeta.keys()) { + if (!streamManager.hasStream(streamId)) { + streamMeta.delete(streamId); + } + } + for (const [key, entry] of responseUrls.entries()) { + if (now > entry.expiresAt) { + responseUrls.delete(key); + } + } +}, 60 * 1000).unref(); + +// AsyncLocalStorage for propagating the correct streamId through the async +// processing chain. +export const streamContext = new AsyncLocalStorage(); + +export function getMessageStreamKey(message) { + if (!message || typeof message !== "object") { + return ""; + } + const chatType = message.chatType || "single"; + const chatId = message.chatId || ""; + if (chatType === "group" && chatId) { + return chatId; + } + return message.fromUser || ""; +} + +export function setApi(api) { + _api = api; +} + +export function getApi() { + if (!_api) { + throw new Error("[wecom] API not initialized"); + } + return _api; +} + +export function setOpenClawConfig(config) { + _openclawConfig = config; +} + +export function getActiveStreamContext(streamKey) { + if (!streamKey) { + return null; + } + + const history = activeStreamHistory.get(streamKey); + if (!history || history.length === 0) { + activeStreams.delete(streamKey); + return null; + } + + const remaining = history.filter((id) => streamManager.hasStream(id)); + if (remaining.length === 0) { + activeStreamHistory.delete(streamKey); + activeStreams.delete(streamKey); + return null; + } + + activeStreamHistory.set(streamKey, remaining); + const latest = remaining[remaining.length - 1]; + activeStreams.set(streamKey, latest); + return latest; +} + +export function registerActiveStream(streamKey, streamId) { + if (!streamKey || !streamId) { + return; + } + + const history = activeStreamHistory.get(streamKey) ?? []; + const deduped = history.filter((id) => id !== streamId); + deduped.push(streamId); + activeStreamHistory.set(streamKey, deduped); + activeStreams.set(streamKey, streamId); +} + +export function unregisterActiveStream(streamKey, streamId) { + if (!streamKey || !streamId) { + return; + } + + const history = activeStreamHistory.get(streamKey); + if (!history || history.length === 0) { + if (activeStreams.get(streamKey) === streamId) { + activeStreams.delete(streamKey); + } + return; + } + + const remaining = history.filter((id) => id !== streamId); + if (remaining.length === 0) { + activeStreamHistory.delete(streamKey); + activeStreams.delete(streamKey); + return; + } + + activeStreamHistory.set(streamKey, remaining); + activeStreams.set(streamKey, remaining[remaining.length - 1]); +} + +/** + * Handle stream error: replace placeholder with error message, finish stream, unregister. + */ +export async function handleStreamError(streamId, streamKey, errorMessage) { + if (!streamId) return; + const stream = streamManager.getStream(streamId); + if (stream && !stream.finished) { + if (stream.content.trim() === THINKING_PLACEHOLDER.trim()) { + streamManager.replaceIfPlaceholder(streamId, errorMessage, THINKING_PLACEHOLDER); + } + await streamManager.finishStream(streamId); + } + unregisterActiveStream(streamKey, streamId); +} + +export async function processInboundMessage({ + message, + streamId, + timestamp: _timestamp, + nonce: _nonce, + account, + config, +}) { + const _api = getApi(); + const channelApi = _api.runtime?.channel || _api.channel; + const routingApi = channelApi?.routing || _api.routing; + const sessionApi = channelApi?.session || _api.session; + const replyApi = channelApi?.reply || _api.reply; + + const senderId = message.fromUser; + const msgType = message.msgType || "text"; + const imageUrl = message.imageUrl || ""; + const imageUrls = message.imageUrls || []; + const fileUrl = message.fileUrl || ""; + const fileName = message.fileName || ""; + const rawContent = message.content || ""; + const chatType = message.chatType || "single"; + const chatId = message.chatid || ""; + const isGroupChat = chatType === "group" && chatId; + + const isAIBotStreamRequest = message.stream && message.stream.id; // AI Bot stream refresh + const isSelfBuiltAppRequest = !isAIBotStreamRequest && !message.responseUrl; // Heuristic: no stream id and no response_url means self-built + + const peerId = isGroupChat ? chatId : senderId; + const peerKind = isGroupChat ? "group" : "dm"; + const conversationId = isGroupChat ? `wecom:group:${chatId}` : `wecom:${senderId}`; + + const streamKey = isGroupChat ? chatId : senderId; + if (streamId) { + registerActiveStream(streamKey, streamId); + } + + if (message.responseUrl && message.responseUrl.trim()) { + responseUrls.set(streamKey, { + url: message.responseUrl, + expiresAt: Date.now() + 60 * 60 * 1000, // 1 hour + used: false, + }); + logger.debug("WeCom: saved response_url for fallback (AI Bot)", { streamKey }); + } + + let effectiveRawBody = rawContent; + + if (!effectiveRawBody.trim() && !imageUrl && imageUrls.length === 0 && !fileUrl && !message.event) { + logger.debug("WeCom: empty message or unhandled event, skipping"); + if (streamId) { + await streamManager.finishStream(streamId); + unregisterActiveStream(streamKey, streamId); + } + return; + } + + logger.info("WeCom processing message", { + from: senderId, + chatType: peerKind, + peerId, + content: effectiveRawBody.substring(0, 50), + streamId, + isAIBotStreamRequest, + isSelfBuiltAppRequest, + }); + + const route = routingApi.resolveAgentRoute({ + cfg: config, + channel: "wecom", + accountId: account.accountId, + peer: { + kind: peerKind, + id: peerId, + }, + }); + + const storePath = sessionApi.resolveStorePath(config.session?.store, { + agentId: route.agentId, + }); + const envelopeOptions = replyApi.resolveEnvelopeFormatOptions(config); + const previousTimestamp = sessionApi.readSessionUpdatedAt({ + storePath, + sessionKey: route.sessionKey, + }); + + const senderLabel = isGroupChat ? `[${senderId}]` : senderId; + const body = replyApi.formatAgentEnvelope({ + channel: isGroupChat ? "Enterprise WeChat Group" : "Enterprise WeChat", + from: senderLabel, + timestamp: Date.now(), + previousTimestamp, + envelope: envelopeOptions, + body: effectiveRawBody, + }); + + const ctxBase = { + Body: body, + RawBody: effectiveRawBody, + CommandBody: effectiveRawBody, + From: `wecom:${senderId}`, + To: conversationId, + SessionKey: route.sessionKey, + AccountId: route.accountId, + ChatType: isGroupChat ? "group" : "direct", + ConversationLabel: isGroupChat ? `Group ${chatId}` : senderId, + SenderName: senderId, + SenderId: senderId, + GroupId: isGroupChat ? chatId : undefined, + Provider: "wecom", + Surface: "wecom", + OriginatingChannel: "wecom", + OriginatingTo: conversationId, + // CommandAuthorized: commandAuthorized, // Removed + }; + + const allImageUrls = imageUrl ? [imageUrl] : imageUrls; + + if (allImageUrls.length > 0) { + const mediaPaths = []; + const mediaTypes = []; + const fallbackUrls = []; + + for (const url of allImageUrls) { + try { + const result = await downloadAndDecryptImage(url, account.encodingAesKey, account.token); + mediaPaths.push(result.localPath); + mediaTypes.push(result.mimeType); + } catch (e) { + logger.warn("Image decryption failed, using URL fallback", { error: e.message, url: url.substring(0, 80) }); + fallbackUrls.push(url); + mediaTypes.push("image/jpeg"); + } + } + + if (mediaPaths.length > 0) { + ctxBase.MediaPaths = mediaPaths; + } + if (fallbackUrls.length > 0) { + ctxBase.MediaUrls = fallbackUrls; + } + ctxBase.MediaTypes = mediaTypes; + + logger.info("Image attachments prepared", { + decrypted: mediaPaths.length, + fallback: fallbackUrls.length, + }); + + if (!effectiveRawBody.trim()) { + const count = allImageUrls.length; + ctxBase.Body = count > 1 + ? `[用户发送了${count}张图片]` + : "[用户发送了一张图片]"; + ctxBase.RawBody = "[图片]"; + ctxBase.CommandBody = ""; + } + } + + if (fileUrl) { + try { + const { localPath: localFilePath, effectiveFileName } = await downloadWecomFile(fileUrl, fileName, account.encodingAesKey, account.token); + ctxBase.MediaPaths = [...(ctxBase.MediaPaths || []), localFilePath]; + ctxBase.MediaTypes = [...(ctxBase.MediaTypes || []), guessMimeType(effectiveFileName)]; + logger.info("File attachment prepared", { path: localFilePath, name: effectiveFileName }); + } catch (e) { + logger.warn("File download failed", { error: e.message }); + const label = fileName ? `[文件: ${fileName}]` : "[文件]"; + if (!effectiveRawBody.trim()) { + ctxBase.Body = `[用户发送了文件] ${label}`; + ctxBase.RawBody = label; + ctxBase.CommandBody = ""; + } + } + if (!effectiveRawBody.trim() && !ctxBase.Body) { + const label = fileName ? `[文件: ${fileName}]` : "[文件]"; + ctxBase.Body = `[用户发送了文件] ${label}`; + ctxBase.RawBody = label; + ctxBase.CommandBody = ""; + } + } + + if (message.event && message.event_type) { + ctxBase.MsgType = 'event'; + ctxBase.Event = message.event_type; + ctxBase.RawBody = `[Event: ${message.event_type}]`; + + if (!effectiveRawBody.trim()) { + ctxBase.Body = `[用户触发了事件: ${message.event_type}]`; + } + } + + const ctxPayload = replyApi.finalizeInboundContext(ctxBase); + + void sessionApi + .recordSessionMetaFromInbound({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + }) + .catch((err) => { + logger.error("WeCom: failed updating session meta", { error: err.message }); + }); + + if (isSelfBuiltAppRequest) { + if (account.corpId && account.corpSecret) { + // Async mode: Return "Thinking..." passively immediately + const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey }); + const initialReplyXml = webhook.buildPassiveReplyXml( + message.fromUser || message.chatId || message.ToUserName, + message.ToUserName, + "text", + THINKING_PLACEHOLDER, + Math.floor(Date.now() / 1000), + _nonce, + ); + + // Process reply in the background + replyApi.dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg: config, + dispatcherOptions: { + deliver: async (payload, info) => { + // Deliver actual reply using WeCom Active Message API + logger.info("WeCom Self-Built (Async): deliver called", { + kind: info.kind, + hasText: !!(payload.text && payload.text.trim()), + textPreview: (payload.text || "").substring(0, 50), + }); + const text = payload.text || ""; + const mediaRegex = /^MEDIA:\s*(.+)$/gm; + const mediaMatches = []; + let match; + while ((match = mediaRegex.exec(text)) !== null) { + const mediaPath = match[1].trim(); + if (mediaPath.startsWith("/")) { + mediaMatches.push({ fullMatch: match[0], path: mediaPath }); + } + } + + let processedText = text; + for (const media of mediaMatches) { + processedText = processedText.replace(media.fullMatch, `[图片: ${media.path}]`).trim(); + } + + if (processedText.trim() || mediaMatches.length > 0) { + try { + await wecomApi.sendTextMessage(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, processedText); + } catch (e) { + logger.error("WeCom Self-Built (Async): Failed to send active message", { error: e.message }); + } + } + }, + onError: async (err, info) => { + logger.error("WeCom Self-Built App (Async): error during dispatch", { error: err.message, kind: info?.kind }); + try { + await wecomApi.sendTextMessage(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, "处理消息时出错,请稍后再试。"); + } catch (e) {} + } + } + }).catch(err => logger.error("WeCom Async Dispatch Error", { error: err.message })); + + return { passiveReplyXml: initialReplyXml }; + } else { + // Sync mode: Wait for reply and then send it as passive reply + let capturedReplyXml = null; + let fullText = ""; + let errorOccurred = false; + + await replyApi.dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg: config, + dispatcherOptions: { + deliver: async (payload, info) => { + logger.info("WeCom Self-Built (Sync): deliver called", { + kind: info.kind, + hasText: !!(payload.text && payload.text.trim()), + }); + if (payload.text) { + fullText += payload.text; + } + }, + onError: async (err, info) => { + errorOccurred = true; + logger.error("WeCom Self-Built App (Sync): error during dispatch", { error: err.message, kind: info?.kind }); + const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey }); + capturedReplyXml = webhook.buildPassiveReplyXml( + message.fromUser || message.chatId || message.ToUserName, + message.ToUserName, + "text", + "处理消息时出错,请稍后再试。", + Math.floor(Date.now() / 1000), + _nonce, + ); + } + } + }); + + if (!errorOccurred) { + logger.info('WeCom Sync: Final accumulated text', { text: fullText }); + const result = await deliverWecomReply({ + payload: { text: fullText }, + senderId: streamKey, + streamId: undefined, + isSelfBuiltAppRequest, + originalMessage: message, + account, + }); + if (result?.passiveReplyXml) { + capturedReplyXml = result.passiveReplyXml; + } else if (!capturedReplyXml) { + // Fallback if deliverWecomReply didn't return XML (e.g., empty text) + const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey }); + capturedReplyXml = webhook.buildPassiveReplyXml( + message.fromUser || message.chatId || message.ToUserName, + message.ToUserName, + "text", + "处理完成", // fallback text + Math.floor(Date.now() / 1000), + _nonce, + ); + } + } + + return { passiveReplyXml: capturedReplyXml }; + } + } else { // AI Bot streaming response + const replyResult = await streamContext.run({ streamId, streamKey }, async () => { + const replyPromise = replyApi.dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg: config, + dispatcherOptions: { + deliver: async (payload, info) => { + logger.info("Dispatcher deliver called", { + kind: info.kind, + hasText: !!(payload.text && payload.text.trim()), + textPreview: (payload.text || "").substring(0, 50), + isSelfBuiltAppRequest, + }); + + return deliverWecomReply({ + payload, + senderId: streamKey, + streamId, + isSelfBuiltAppRequest, + }); + }, + onError: async (err, info) => { + logger.error("WeCom reply failed", { error: err.message, kind: info.kind }); + if (streamId) { + await handleStreamError(streamId, streamKey, "处理消息时出错,请稍后再试。"); + } + }, + }, + }); + return replyPromise; + }); + + if (streamId) { + const stream = streamManager.getStream(streamId); + if (!stream || stream.finished) { + unregisterActiveStream(streamKey, streamId); + } else { + setTimeout(async () => { + const checkStream = streamManager.getStream(streamId); + if (checkStream && !checkStream.finished) { + const meta = streamMeta.get(streamId); + const idleMs = Date.now() - checkStream.updatedAt; + if (idleMs > 30000) { + logger.warn("WeCom safety net: closing idle stream", { streamId, idleMs }); + try { + await streamManager.finishStream(streamId); + unregisterActiveStream(streamKey, streamId); + } catch (err) { + logger.error("WeCom safety net: failed to close stream", { streamId, error: err.message }); + } + } + } + }, 35000); + } + } + return replyResult; + } +} + +export async function deliverWecomReply({ payload, senderId, streamId, isSelfBuiltAppRequest, originalMessage, account }) { + logger.info('deliverWecomReply received payload', { payload: JSON.stringify(payload) }); + const text = payload.text || ""; + + logger.debug("deliverWecomReply called", { + hasText: !!text.trim(), + textPreview: text.substring(0, 50), + streamId, + senderId, + isSelfBuiltAppRequest, + }); + + const mediaRegex = /^MEDIA:\s*(.+)$/gm; + const mediaMatches = []; + let match; + while ((match = mediaRegex.exec(text)) !== null) { + const mediaPath = match[1].trim(); + if (mediaPath.startsWith("/")) { + mediaMatches.push({ + fullMatch: match[0], + path: mediaPath, + }); + logger.debug("Detected absolute path MEDIA line", { + streamId, + mediaPath, + line: match[0], + }); + } + } + + let processedText = text; + if (mediaMatches.length > 0) { + for (const media of mediaMatches) { + if (streamId && !isSelfBuiltAppRequest) { + const queued = streamManager.queueImage(streamId, media.path); + if (queued) { + processedText = processedText.replace(media.fullMatch, "").trim(); + logger.info("Queued absolute path image for stream (AI Bot)", { + streamId, + imagePath: media.path, + }); + } + } else if (isSelfBuiltAppRequest) { + processedText = processedText.replace(media.fullMatch, `[图片: ${media.path}]`).trim(); + logger.warn("WeCom Self-Built: converting local media to text for passive reply", { mediaPath: media.path }); + } + } + } + + if (!processedText.trim() && mediaMatches.length === 0) { + logger.debug("WeCom: empty block after processing, skipping stream update/reply"); + return; + } + + if (isSelfBuiltAppRequest) { + if (!originalMessage || !account) { + logger.error("WeCom Self-Built: missing originalMessage or account for passive reply"); + return; + } + const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey }); + const passiveReply = webhook.buildPassiveReplyXml( + originalMessage.fromUser || originalMessage.chatId || originalMessage.ToUserName, + originalMessage.ToUserName, + "text", + processedText, + Math.floor(Date.now() / 1000), + originalMessage.query.nonce, + ); + logger.info("WeCom Self-Built: constructed passive XML reply", { replyXmlPreview: passiveReply.substring(0, 100) }); + return { passiveReplyXml: passiveReply }; + + } else { // AI Bot streaming response + const appendToStream = (targetStreamId, content) => { + const stream = streamManager.getStream(targetStreamId); + if (!stream) { + return false; + } + + if (stream.content.trim() === THINKING_PLACEHOLDER.trim()) { + streamManager.replaceIfPlaceholder(targetStreamId, content, THINKING_PLACEHOLDER); + return true; + } + + if (stream.content.includes(content.trim())) { + logger.debug("WeCom: duplicate content, skipping", { + streamId: targetStreamId, + contentPreview: content.substring(0, 30), + }); + return true; + } + + const separator = stream.content.length > 0 ? "\n\n" : ""; + streamManager.appendStream(targetStreamId, separator + content); + return true; + }; + + if (!streamId) { + const ctx = streamContext.getStore(); + const contextStreamId = ctx?.streamId; + const activeStreamId = contextStreamId ?? getActiveStreamContext(senderId); + + if (activeStreamId && streamManager.hasStream(activeStreamId)) { + appendToStream(activeStreamId, processedText); + logger.debug("WeCom stream appended (via context/activeStreams)", { + streamId: activeStreamId, + source: contextStreamId ? "asyncContext" : "activeStreams", + contentLength: processedText.length, + }); + return; + } + logger.warn("WeCom: no active stream for this message", { senderId }); + return; + } + + if (!streamManager.hasStream(streamId)) { + logger.warn("WeCom: stream not found, attempting response_url fallback", { streamId, senderId }); + + const saved = responseUrls.get(senderId); + if (saved && !saved.used && Date.now() < saved.expiresAt) { + saved.used = true; + try { + await fetch(saved.url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ msgtype: 'text', text: { content: processedText } }), + }); + logger.info("WeCom: sent via response_url fallback (deliverWecomReply)", { + senderId, + contentPreview: processedText.substring(0, 50), + }); + return; + } catch (err) { + logger.error("WeCom: response_url fallback failed", { + senderId, + error: err.message, + }); + } + } + + logger.warn("WeCom: unable to deliver message (stream closed + response_url unavailable)", { + senderId, + contentPreview: processedText.substring(0, 50), + }); + return; + } + + appendToStream(streamId, processedText); + logger.debug("WeCom stream appended", { + streamId, + contentLength: processedText.length, + to: senderId, + }); + } +}