Newer
Older
WeComCompanyPlugin / wecom-message-processor.js
import { AsyncLocalStorage } from "node:async_hooks";
import * as crypto from "node:crypto";
import { mkdirSync, existsSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import { logger } from "./logger.js";
import { streamManager } from "./stream-manager.js";
import { WecomWebhook } from "./webhook.js"; // WecomWebhook is now the parser
import { wecomApi } from "./wecom-api.js";

const DEFAULT_ACCOUNT_ID = "default";
const THINKING_PLACEHOLDER = "思考中...";
const MEDIA_CACHE_DIR = join(process.env.HOME || "/tmp", ".openclaw", "media", "wecom");

// =============================================================================
// Media Download and Decryption
// =============================================================================

/**
 * Download and decrypt a WeCom encrypted image.
 * @param {string} imageUrl - Encrypted image URL from WeCom
 * @param {string} encodingAesKey - AES key
 * @param {string} token - Token
 * @returns {Promise<string>} Local path to decrypted image
 */
async function downloadAndDecryptImage(imageUrl, encodingAesKey, token) {
  if (!existsSync(MEDIA_CACHE_DIR)) {
    mkdirSync(MEDIA_CACHE_DIR, { recursive: true });
  }

  logger.info("Downloading encrypted image", { url: imageUrl.substring(0, 80) });
  const response = await fetch(imageUrl);
  if (!response.ok) {
    throw new Error(`Failed to download image: ${response.status}`);
  }
  const encryptedBuffer = Buffer.from(await response.arrayBuffer());
  logger.debug("Downloaded encrypted image", { size: encryptedBuffer.length });

  const wecomCrypto = new WecomWebhook({token, encodingAesKey}).crypto;
  const decryptedBuffer = wecomCrypto.decryptMedia(encryptedBuffer);

  // Detect image type via magic bytes.
  let ext = "jpg";
  if (decryptedBuffer[0] === 0x89 && decryptedBuffer[1] === 0x50) {
    ext = "png";
  } else if (decryptedBuffer[0] === 0x47 && decryptedBuffer[1] === 0x49) {
    ext = "gif";
  }

  const filename = `wecom_${Date.now()}_${Math.random().toString(36).substring(2, 8)}.${ext}`;
  const localPath = join(MEDIA_CACHE_DIR, filename);
  writeFileSync(localPath, decryptedBuffer);

  const mimeType = ext === "png" ? "image/png" : ext === "gif" ? "image/gif" : "image/jpeg";
  logger.info("Image decrypted and saved", { path: localPath, size: decryptedBuffer.length, mimeType });
  return { localPath, mimeType };
}

/**
 * Download and decrypt a file from WeCom.
 * @param {string} fileUrl - File download URL
 * @param {string} fileName - Original file name
 * @param {string} encodingAesKey - AES key for decryption
 * @param {string} token - Token for decryption
 * @returns {Promise<{localPath: string, effectiveFileName: string}>} Local path and resolved filename
 */
async function downloadWecomFile(fileUrl, fileName, encodingAesKey, token) {
  if (!existsSync(MEDIA_CACHE_DIR)) {
    mkdirSync(MEDIA_CACHE_DIR, { recursive: true });
  }

  logger.info("Downloading encrypted file", { url: fileUrl.substring(0, 80), name: fileName });
  const response = await fetch(fileUrl);
  if (!response.ok) {
    throw new Error(`Failed to download file: ${response.status}`);
  }
  const encryptedBuffer = Buffer.from(await response.arrayBuffer());

  let effectiveFileName = fileName;
  if (!effectiveFileName) {
    const contentDisposition = response.headers.get("content-disposition");
    if (contentDisposition) {
      const filenameMatch = contentDisposition.match(/filename\*?=(?:UTF-8'')?["']?([^"';\n]+)["']?/i);
      if (filenameMatch && filenameMatch[1]) {
        effectiveFileName = decodeURIComponent(filenameMatch[1]);
        logger.info("Extracted filename from Content-Disposition", { name: effectiveFileName });
      }
    }
  }

  const wecomCrypto = new WecomWebhook({token, encodingAesKey}).crypto;
  const decryptedBuffer = wecomCrypto.decryptMedia(encryptedBuffer);

  const safeName = (effectiveFileName || `file_${Date.now()}`).replace(/[/\\:*?"<>|]/g, "_");
  const localPath = join(MEDIA_CACHE_DIR, `${Date.now()}_${safeName}`);
  writeFileSync(localPath, decryptedBuffer);

  logger.info("File decrypted and saved", { path: localPath, size: decryptedBuffer.length });
  return { localPath, effectiveFileName: effectiveFileName || fileName };
}

/**
 * Guess MIME type from file extension.
 */
function guessMimeType(fileName) {
  const ext = (fileName || "").split(".").pop()?.toLowerCase() || "";
  const mimeMap = {
    pdf: "application/pdf",
    doc: "application/msword",
    docx: "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
    xls: "application/vnd.ms-excel",
    xlsx: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
    ppt: "application/vnd.ms-powerpoint",
    pptx: "application/vnd.openxmlformats-officedocument.presentationml.presentation",
    txt: "text/plain",
    csv: "text/csv",
    zip: "application/zip",
    png: "image/png",
    jpg: "image/jpeg",
    jpeg: "image/jpeg",
    gif: "image/gif",
    mp4: "video/mp4",
    mp3: "audio/mpeg",
  };
  return mimeMap[ext] || "application/octet-stream";
}

// Runtime state (module-level singleton)
let _api = null;
let _openclawConfig = null;

// Track active stream for each user, so outbound messages (like reset confirmation)
// can be added to the correct stream instead of using response_url
const activeStreams = new Map();
const activeStreamHistory = new Map();

// Store stream metadata for delayed finish (main response done flag)
export const streamMeta = new Map();

// Store response_url for fallback delivery after stream closes
// response_url is valid for 1 hour and can be used only once
export const responseUrls = new Map();

// Periodic cleanup for streamMeta and expired responseUrls to prevent memory leaks.
setInterval(() => {
  const now = Date.now();
  for (const streamId of streamMeta.keys()) {
    if (!streamManager.hasStream(streamId)) {
      streamMeta.delete(streamId);
    }
  }
  for (const [key, entry] of responseUrls.entries()) {
    if (now > entry.expiresAt) {
      responseUrls.delete(key);
    }
  }
}, 60 * 1000).unref();

// AsyncLocalStorage for propagating the correct streamId through the async
// processing chain.
export const streamContext = new AsyncLocalStorage();

export function getMessageStreamKey(message) {
  if (!message || typeof message !== "object") {
    return "";
  }
  const chatType = message.chatType || "single";
  const chatId = message.chatId || "";
  if (chatType === "group" && chatId) {
    return chatId;
  }
  return message.fromUser || "";
}

export function setApi(api) {
  _api = api;
}

export function getApi() {
  if (!_api) {
    throw new Error("[wecom] API not initialized");
  }
  return _api;
}

export function setOpenClawConfig(config) {
  _openclawConfig = config;
}

export function getActiveStreamContext(streamKey) {
  if (!streamKey) {
    return null;
  }

  const history = activeStreamHistory.get(streamKey);
  if (!history || history.length === 0) {
    activeStreams.delete(streamKey);
    return null;
  }

  const remaining = history.filter((id) => streamManager.hasStream(id));
  if (remaining.length === 0) {
    activeStreamHistory.delete(streamKey);
    activeStreams.delete(streamKey);
    return null;
  }

  activeStreamHistory.set(streamKey, remaining);
  const latest = remaining[remaining.length - 1];
  activeStreams.set(streamKey, latest);
  return latest;
}

export function registerActiveStream(streamKey, streamId) {
  if (!streamKey || !streamId) {
    return;
  }

  const history = activeStreamHistory.get(streamKey) ?? [];
  const deduped = history.filter((id) => id !== streamId);
  deduped.push(streamId);
  activeStreamHistory.set(streamKey, deduped);
  activeStreams.set(streamKey, streamId);
}

export function unregisterActiveStream(streamKey, streamId) {
  if (!streamKey || !streamId) {
    return;
  }

  const history = activeStreamHistory.get(streamKey);
  if (!history || history.length === 0) {
    if (activeStreams.get(streamKey) === streamId) {
      activeStreams.delete(streamKey);
    }
    return;
  }

  const remaining = history.filter((id) => id !== streamId);
  if (remaining.length === 0) {
    activeStreamHistory.delete(streamKey);
    activeStreams.delete(streamKey);
    return;
  }

  activeStreamHistory.set(streamKey, remaining);
  activeStreams.set(streamKey, remaining[remaining.length - 1]);
}

/**
 * Handle stream error: replace placeholder with error message, finish stream, unregister.
 */
export async function handleStreamError(streamId, streamKey, errorMessage) {
  if (!streamId) return;
  const stream = streamManager.getStream(streamId);
  if (stream && !stream.finished) {
    if (stream.content.trim() === THINKING_PLACEHOLDER.trim()) {
      streamManager.replaceIfPlaceholder(streamId, errorMessage, THINKING_PLACEHOLDER);
    }
    await streamManager.finishStream(streamId);
  }
  unregisterActiveStream(streamKey, streamId);
}

export async function processInboundMessage({
  message,
  streamId,
  timestamp: _timestamp,
  nonce: _nonce,
  account,
  config,
}) {
  const _api = getApi();
  const channelApi = _api.runtime?.channel || _api.channel;
  const routingApi = channelApi?.routing || _api.routing;
  const sessionApi = channelApi?.session || _api.session;
  const replyApi = channelApi?.reply || _api.reply;

  const senderId = message.fromUser;
  const msgType = message.msgType || "text";
  const imageUrl = message.imageUrl || "";
  const imageUrls = message.imageUrls || [];
  const fileUrl = message.fileUrl || "";
  const fileName = message.fileName || "";
  const rawContent = message.content || "";
  const chatType = message.chatType || "single";
  const chatId = message.chatid || "";
  const isGroupChat = chatType === "group" && chatId;

  const isAIBotStreamRequest = message.stream && message.stream.id; // AI Bot stream refresh
  const isSelfBuiltAppRequest = !isAIBotStreamRequest && !message.responseUrl; // Heuristic: no stream id and no response_url means self-built

  const peerId = isGroupChat ? chatId : senderId;
  const peerKind = isGroupChat ? "group" : "dm";
  const conversationId = isGroupChat ? `wecom:group:${chatId}` : `wecom:${senderId}`;

  const streamKey = isGroupChat ? chatId : senderId;
  if (streamId) {
    registerActiveStream(streamKey, streamId);
  }

  if (message.responseUrl && message.responseUrl.trim()) {
    responseUrls.set(streamKey, {
      url: message.responseUrl,
      expiresAt: Date.now() + 60 * 60 * 1000, // 1 hour
      used: false,
    });
    logger.debug("WeCom: saved response_url for fallback (AI Bot)", { streamKey });
  }

  let effectiveRawBody = rawContent;

  if (!effectiveRawBody.trim() && !imageUrl && imageUrls.length === 0 && !fileUrl && !message.event) {
    logger.debug("WeCom: empty message or unhandled event, skipping");
    if (streamId) {
      await streamManager.finishStream(streamId);
      unregisterActiveStream(streamKey, streamId);
    }
    return;
  }

  logger.info("WeCom processing message", {
    from: senderId,
    chatType: peerKind,
    peerId,
    content: effectiveRawBody.substring(0, 50),
    streamId,
    isAIBotStreamRequest,
    isSelfBuiltAppRequest,
  });

  const route = routingApi.resolveAgentRoute({
    cfg: config,
    channel: "wecom",
    accountId: account.accountId,
    peer: {
      kind: peerKind,
      id: peerId,
    },
  });

  const storePath = sessionApi.resolveStorePath(config.session?.store, {
    agentId: route.agentId,
  });
  const envelopeOptions = replyApi.resolveEnvelopeFormatOptions(config);
  const previousTimestamp = sessionApi.readSessionUpdatedAt({
    storePath,
    sessionKey: route.sessionKey,
  });

  const senderLabel = isGroupChat ? `[${senderId}]` : senderId;
  const body = replyApi.formatAgentEnvelope({
    channel: isGroupChat ? "Enterprise WeChat Group" : "Enterprise WeChat",
    from: senderLabel,
    timestamp: Date.now(),
    previousTimestamp,
    envelope: envelopeOptions,
    body: effectiveRawBody,
  });

  const ctxBase = {
    Body: body,
    RawBody: effectiveRawBody,
    CommandBody: effectiveRawBody,
    From: `wecom:${senderId}`,
    To: conversationId,
    SessionKey: route.sessionKey,
    AccountId: route.accountId,
    ChatType: isGroupChat ? "group" : "direct",
    ConversationLabel: isGroupChat ? `Group ${chatId}` : senderId,
    SenderName: senderId,
    SenderId: senderId,
    GroupId: isGroupChat ? chatId : undefined,
    Provider: "wecom",
    Surface: "wecom",
    OriginatingChannel: "wecom",
    OriginatingTo: conversationId,
    // CommandAuthorized: commandAuthorized, // Removed
  };

  const allImageUrls = imageUrl ? [imageUrl] : imageUrls;

  if (allImageUrls.length > 0) {
    const mediaPaths = [];
    const mediaTypes = [];
    const fallbackUrls = [];

    for (const url of allImageUrls) {
      try {
        const result = await downloadAndDecryptImage(url, account.encodingAesKey, account.token);
        mediaPaths.push(result.localPath);
        mediaTypes.push(result.mimeType);
      } catch (e) {
        logger.warn("Image decryption failed, using URL fallback", { error: e.message, url: url.substring(0, 80) });
        fallbackUrls.push(url);
        mediaTypes.push("image/jpeg");
      }
    }

    if (mediaPaths.length > 0) {
      ctxBase.MediaPaths = mediaPaths;
    }
    if (fallbackUrls.length > 0) {
      ctxBase.MediaUrls = fallbackUrls;
    }
    ctxBase.MediaTypes = mediaTypes;

    logger.info("Image attachments prepared", {
      decrypted: mediaPaths.length,
      fallback: fallbackUrls.length,
    });

    if (!effectiveRawBody.trim()) {
      const count = allImageUrls.length;
      ctxBase.Body = count > 1
        ? `[用户发送了${count}张图片]`
        : "[用户发送了一张图片]";
      ctxBase.RawBody = "[图片]";
      ctxBase.CommandBody = "";
    }
  }

  if (fileUrl) {
    try {
      const { localPath: localFilePath, effectiveFileName } = await downloadWecomFile(fileUrl, fileName, account.encodingAesKey, account.token);
      ctxBase.MediaPaths = [...(ctxBase.MediaPaths || []), localFilePath];
      ctxBase.MediaTypes = [...(ctxBase.MediaTypes || []), guessMimeType(effectiveFileName)];
      logger.info("File attachment prepared", { path: localFilePath, name: effectiveFileName });
    } catch (e) {
      logger.warn("File download failed", { error: e.message });
      const label = fileName ? `[文件: ${fileName}]` : "[文件]";
      if (!effectiveRawBody.trim()) {
        ctxBase.Body = `[用户发送了文件] ${label}`;
        ctxBase.RawBody = label;
        ctxBase.CommandBody = "";
      }
    }
    if (!effectiveRawBody.trim() && !ctxBase.Body) {
      const label = fileName ? `[文件: ${fileName}]` : "[文件]";
      ctxBase.Body = `[用户发送了文件] ${label}`;
      ctxBase.RawBody = label;
      ctxBase.CommandBody = "";
    }
  }

  if (message.event && message.event_type) {
      ctxBase.MsgType = 'event';
      ctxBase.Event = message.event_type;
      ctxBase.RawBody = `[Event: ${message.event_type}]`;

      if (!effectiveRawBody.trim()) {
          ctxBase.Body = `[用户触发了事件: ${message.event_type}]`;
      }
  }

  const ctxPayload = replyApi.finalizeInboundContext(ctxBase);

  void sessionApi
    .recordSessionMetaFromInbound({
      storePath,
      sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
      ctx: ctxPayload,
    })
    .catch((err) => {
      logger.error("WeCom: failed updating session meta", { error: err.message });
    });

  if (isSelfBuiltAppRequest) {
    if (account.corpId && account.corpSecret) {
      // Async mode: Return "Thinking..." passively immediately
      const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey });
      const initialReplyXml = webhook.buildPassiveReplyXml(
          message.fromUser || message.chatId || message.ToUserName, 
          message.ToUserName,
          "text",
          THINKING_PLACEHOLDER,
          Math.floor(Date.now() / 1000),
          _nonce,
      );

      // Process reply in the background
      replyApi.dispatchReplyWithBufferedBlockDispatcher({
        ctx: ctxPayload,
        cfg: config,
        dispatcherOptions: {
          deliver: async (payload, info) => {
            // Deliver actual reply using WeCom Active Message API
            logger.info("WeCom Self-Built (Async): deliver called", {
              kind: info.kind,
              hasText: !!(payload.text && payload.text.trim()),
              hasMediaUrl: !!payload.mediaUrl,
              textPreview: (payload.text || "").substring(0, 50),
            });
            const text = payload.text || "";
            const mediaRegex = /^MEDIA:\s*(.+)$/gm;
            const mediaMatches = [];
            let match;
            while ((match = mediaRegex.exec(text)) !== null) {
              const mediaPath = match[1].trim();
              if (mediaPath.startsWith("/")) {
                mediaMatches.push({ fullMatch: match[0], path: mediaPath });
              }
            }

            if (payload.mediaUrl) {
                mediaMatches.push({
                    fullMatch: "", // It's not embedded in text
                    path: payload.mediaUrl,
                });
            }
            
            let processedText = text;
            if (mediaMatches.length === 1 && !text.replace(mediaMatches[0].fullMatch, "").trim()) {
              // Single media item, no other text. Send active media message.
              const media = mediaMatches[0];
              const type = getWecomMediaType(media.path);
              try {
                logger.info("WeCom Self-Built (Async): uploading single media for active reply", { path: media.path, type });
                const uploadRes = await wecomApi.uploadMedia(account.corpId, account.corpSecret, type, media.path);
                const mediaId = uploadRes.media_id;
                
                // Active API sends
                if (type === 'image') {
                  await wecomApi.sendImageMessage?.(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, mediaId);
                } else if (type === 'voice') {
                  await wecomApi.sendVoiceMessage?.(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, mediaId);
                } else if (type === 'video') {
                  await wecomApi.sendVideoMessage?.(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, mediaId);
                } else {
                  await wecomApi.sendFileMessage?.(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, mediaId);
                }
                logger.info(`WeCom Self-Built (Async): sent active ${type} message`, { mediaId });
                return;
              } catch (err) {
                logger.error("Failed to upload/send media for active reply", { error: err.message, path: media.path });
                // Fallback to text below
              }
            }

            for (const media of mediaMatches) {
               const type = getWecomMediaType(media.path);
               processedText = processedText.replace(media.fullMatch, `[${type === 'voice' ? '语音' : type === 'video' ? '视频' : type === 'image' ? '图片' : '文件'}: ${media.path}]`).trim();
            }
            
            if (processedText.trim() || mediaMatches.length > 0) {
              try {
                await wecomApi.sendTextMessage(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, processedText);
              } catch (e) {
                logger.error("WeCom Self-Built (Async): Failed to send active message", { error: e.message });
              }
            }
          },
          onError: async (err, info) => {
            logger.error("WeCom Self-Built App (Async): error during dispatch", { error: err.message, kind: info?.kind });
            try {
              await wecomApi.sendTextMessage(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, "处理消息时出错,请稍后再试。");
            } catch (e) {}
          }
        }
      }).catch(err => logger.error("WeCom Async Dispatch Error", { error: err.message }));

      return { passiveReplyXml: initialReplyXml };
    } else {
      // Sync mode: Wait for reply and then send it as passive reply
            let capturedReplyXml = null;
            let fullText = "";
            let fullMediaUrl = "";
            let errorOccurred = false;
      
            await replyApi.dispatchReplyWithBufferedBlockDispatcher({
              ctx: ctxPayload,
              cfg: config,
              dispatcherOptions: {
                deliver: async (payload, info) => {
                  logger.info("WeCom Self-Built (Sync): deliver called", {
                    kind: info.kind,
                    hasText: !!(payload.text && payload.text.trim()),
                    hasMediaUrl: !!payload.mediaUrl,
                  });
                  if (payload.text) {
                    fullText += payload.text;
                  }
                  if (payload.mediaUrl) {
                    fullMediaUrl = payload.mediaUrl;
                  }
                },
                onError: async (err, info) => {
                  errorOccurred = true;
                  logger.error("WeCom Self-Built App (Sync): error during dispatch", { error: err.message, kind: info?.kind });
                  const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey });
                  capturedReplyXml = webhook.buildPassiveReplyXml(
                      message.fromUser || message.chatId || message.ToUserName,
                      message.ToUserName,
                      "text",
                      "处理消息时出错,请稍后再试。",
                      Math.floor(Date.now() / 1000),
                      _nonce,
                  );
                }
              }
            });
      
            if (!errorOccurred) {
              logger.info('WeCom Sync: Final accumulated text', { text: fullText, mediaUrl: fullMediaUrl });
              const result = await deliverWecomReply({
                payload: { text: fullText, mediaUrl: fullMediaUrl },
                senderId: streamKey,
                streamId: undefined,
                isSelfBuiltAppRequest,
                originalMessage: message,
                account,
              });        if (result?.passiveReplyXml) {
          capturedReplyXml = result.passiveReplyXml;
        }
      }

      return { passiveReplyXml: capturedReplyXml };
    }
  } else { // AI Bot streaming response
    const replyResult = await streamContext.run({ streamId, streamKey }, async () => {
      const replyPromise = replyApi.dispatchReplyWithBufferedBlockDispatcher({
        ctx: ctxPayload,
        cfg: config,
        dispatcherOptions: {
          deliver: async (payload, info) => {
            logger.info("Dispatcher deliver called", {
              kind: info.kind,
              hasText: !!(payload.text && payload.text.trim()),
              textPreview: (payload.text || "").substring(0, 50),
              isSelfBuiltAppRequest,
            });

            return deliverWecomReply({
                payload,
                senderId: streamKey,
                streamId,
                isSelfBuiltAppRequest,
            });
          },
          onError: async (err, info) => {
            logger.error("WeCom reply failed", { error: err.message, kind: info.kind });
            if (streamId) {
              await handleStreamError(streamId, streamKey, "处理消息时出错,请稍后再试。");
            }
          },
        },
      });
      return replyPromise;
    });

    if (streamId) {
      const stream = streamManager.getStream(streamId);
      if (!stream || stream.finished) {
        unregisterActiveStream(streamKey, streamId);
      } else {
        setTimeout(async () => {
          const checkStream = streamManager.getStream(streamId);
          if (checkStream && !checkStream.finished) {
            const meta = streamMeta.get(streamId);
            const idleMs = Date.now() - checkStream.updatedAt;
            if (idleMs > 30000) {
              logger.warn("WeCom safety net: closing idle stream", { streamId, idleMs });
              try {
                await streamManager.finishStream(streamId);
                unregisterActiveStream(streamKey, streamId);
              } catch (err) {
                logger.error("WeCom safety net: failed to close stream", { streamId, error: err.message });
              }
            }
          }
        }, 35000);
      }
    }
    return replyResult;
  }
}

function getWecomMediaType(filePath) {
  const ext = filePath.split('.').pop().toLowerCase();
  if (['jpg', 'jpeg', 'png'].includes(ext)) return 'image';
  if (['mp3', 'wav', 'amr', 'm4a'].includes(ext)) return 'voice';
  if (['mp4'].includes(ext)) return 'video';
  return 'file';
}

export async function deliverWecomReply({ payload, senderId, streamId, isSelfBuiltAppRequest, originalMessage, account }) {
  logger.info('deliverWecomReply received payload', { payload: JSON.stringify(payload) });
  const text = payload.text || "";

  logger.debug("deliverWecomReply called", {
    hasText: !!text.trim(),
    hasMediaUrl: !!payload.mediaUrl,
    textPreview: text.substring(0, 50),
    streamId,
    senderId,
    isSelfBuiltAppRequest,
  });

  const mediaRegex = /^MEDIA:\s*(.+)$/gm;
  const mediaMatches = [];
  let match;
  while ((match = mediaRegex.exec(text)) !== null) {
    const mediaPath = match[1].trim();
    if (mediaPath.startsWith("/")) {
      mediaMatches.push({
        fullMatch: match[0],
        path: mediaPath,
      });
      logger.debug("Detected absolute path MEDIA line", {
        streamId,
        mediaPath,
        line: match[0],
      });
    }
  }

  if (payload.mediaUrl) {
      mediaMatches.push({
          fullMatch: "", // It's not embedded in text
          path: payload.mediaUrl,
      });
  }

  let processedText = text;
  if (isSelfBuiltAppRequest && mediaMatches.length === 1 && !text.replace(mediaMatches[0].fullMatch, "").trim()) {
    // Single media item, no other text. Use passive media reply.
    const media = mediaMatches[0];
    const type = getWecomMediaType(media.path);
    try {
      logger.info("WeCom Self-Built: uploading single media for passive reply", { path: media.path, type });
      const uploadRes = await wecomApi.uploadMedia(account.corpId, account.corpSecret, type, media.path);
      const mediaId = uploadRes.media_id;
      
      const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey });
      const passiveReply = webhook.buildPassiveReplyXml(
          originalMessage.fromUser || originalMessage.chatId || originalMessage.ToUserName, 
          originalMessage.ToUserName,
          type,
          mediaId,
          Math.floor(Date.now() / 1000),
          originalMessage.query.nonce,
      );
      logger.info(`WeCom Self-Built: constructed passive ${type} XML reply`, { mediaId });
      return { passiveReplyXml: passiveReply };
    } catch (err) {
      logger.error("Failed to upload media for passive reply", { error: err.message, path: media.path });
      // Fallback to text placeholder logic below
    }
  }

  if (mediaMatches.length > 0) {
    for (const media of mediaMatches) {
      if (streamId && !isSelfBuiltAppRequest) {
          const type = getWecomMediaType(media.path);
          if (type === 'image') {
              const queued = streamManager.queueImage(streamId, media.path);
              if (queued) {
                processedText = processedText.replace(media.fullMatch, "").trim();
                logger.info("Queued absolute path image for stream (AI Bot)", {
                  streamId,
                  imagePath: media.path,
                });
              }
          } else {
              processedText = processedText.replace(media.fullMatch, `[${type}: ${media.path}]`).trim();
              logger.warn("WeCom AI Bot: non-image media in stream, converting to text", { type, mediaPath: media.path });
          }
      } else if (isSelfBuiltAppRequest) {
          const type = getWecomMediaType(media.path);
          processedText = processedText.replace(media.fullMatch, `[${type === 'voice' ? '语音' : type === 'video' ? '视频' : type === 'image' ? '图片' : '文件'}: ${media.path}]`).trim();
          logger.warn("WeCom Self-Built: converting local media to text for passive reply (multi-item or text mixed)", { mediaPath: media.path, type });
      }
    }
  }

  if (!processedText.trim() && mediaMatches.length === 0) {
    logger.debug("WeCom: empty block after processing, skipping stream update/reply");
    return;
  }

  if (isSelfBuiltAppRequest) {
      if (!originalMessage || !account) {
          logger.error("WeCom Self-Built: missing originalMessage or account for passive reply");
          return;
      }
      const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey });
      const passiveReply = webhook.buildPassiveReplyXml(
          originalMessage.fromUser || originalMessage.chatId || originalMessage.ToUserName, 
          originalMessage.ToUserName,
          "text",
          processedText,
          Math.floor(Date.now() / 1000),
          originalMessage.query.nonce,
      );
      logger.info("WeCom Self-Built: constructed passive XML reply", { replyXmlPreview: passiveReply.substring(0, 100) });
      return { passiveReplyXml: passiveReply };

  } else { // AI Bot streaming response
      const appendToStream = (targetStreamId, content) => {
        const stream = streamManager.getStream(targetStreamId);
        if (!stream) {
          return false;
        }

        if (stream.content.trim() === THINKING_PLACEHOLDER.trim()) {
          streamManager.replaceIfPlaceholder(targetStreamId, content, THINKING_PLACEHOLDER);
          return true;
        }

        if (stream.content.includes(content.trim())) {
          logger.debug("WeCom: duplicate content, skipping", {
            streamId: targetStreamId,
            contentPreview: content.substring(0, 30),
          });
          return true;
        }

        const separator = stream.content.length > 0 ? "\n\n" : "";
        streamManager.appendStream(targetStreamId, separator + content);
        return true;
      };

      if (!streamId) {
        const ctx = streamContext.getStore();
        const contextStreamId = ctx?.streamId;
        const activeStreamId = contextStreamId ?? getActiveStreamContext(senderId);

        if (activeStreamId && streamManager.hasStream(activeStreamId)) {
          appendToStream(activeStreamId, processedText);
          logger.debug("WeCom stream appended (via context/activeStreams)", {
            streamId: activeStreamId,
            source: contextStreamId ? "asyncContext" : "activeStreams",
            contentLength: processedText.length,
          });
          return;
        }
        logger.warn("WeCom: no active stream for this message", { senderId });
        return;
      }

      if (!streamManager.hasStream(streamId)) {
        logger.warn("WeCom: stream not found, attempting response_url fallback", { streamId, senderId });

        const saved = responseUrls.get(senderId);
        if (saved && !saved.used && Date.now() < saved.expiresAt) {
          saved.used = true;
          try {
            await fetch(saved.url, {
              method: 'POST',
              headers: { 'Content-Type': 'application/json' },
              body: JSON.stringify({ msgtype: 'text', text: { content: processedText } }),
            });
            logger.info("WeCom: sent via response_url fallback (deliverWecomReply)", {
              senderId,
              contentPreview: processedText.substring(0, 50),
            });
            return;
          } catch (err) {
            logger.error("WeCom: response_url fallback failed", {
              senderId,
              error: err.message,
            });
          }
        }

        logger.warn("WeCom: unable to deliver message (stream closed + response_url unavailable)", {
          senderId,
          contentPreview: processedText.substring(0, 50),
        });
        return;
      }

      appendToStream(streamId, processedText);
      logger.debug("WeCom stream appended", {
        streamId,
        contentLength: processedText.length,
        to: senderId,
      });
  }
}