import { AsyncLocalStorage } from "node:async_hooks";
import * as crypto from "node:crypto";
import { mkdirSync, existsSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import { logger } from "./logger.js";
import { streamManager } from "./stream-manager.js";
import { WecomWebhook } from "./webhook.js"; // WecomWebhook is now the parser
import { wecomApi } from "./wecom-api.js";
const DEFAULT_ACCOUNT_ID = "default";
const THINKING_PLACEHOLDER = "思考中...";
const MEDIA_CACHE_DIR = join(process.env.HOME || "/tmp", ".openclaw", "media", "wecom");
// =============================================================================
// Media Download and Decryption
// =============================================================================
/**
* Download and decrypt a WeCom encrypted image.
* @param {string} imageUrl - Encrypted image URL from WeCom
* @param {string} encodingAesKey - AES key
* @param {string} token - Token
* @returns {Promise<string>} Local path to decrypted image
*/
async function downloadAndDecryptImage(imageUrl, encodingAesKey, token) {
if (!existsSync(MEDIA_CACHE_DIR)) {
mkdirSync(MEDIA_CACHE_DIR, { recursive: true });
}
logger.info("Downloading encrypted image", { url: imageUrl.substring(0, 80) });
const response = await fetch(imageUrl);
if (!response.ok) {
throw new Error(`Failed to download image: ${response.status}`);
}
const encryptedBuffer = Buffer.from(await response.arrayBuffer());
logger.debug("Downloaded encrypted image", { size: encryptedBuffer.length });
const wecomCrypto = new WecomWebhook({token, encodingAesKey}).crypto;
const decryptedBuffer = wecomCrypto.decryptMedia(encryptedBuffer);
// Detect image type via magic bytes.
let ext = "jpg";
if (decryptedBuffer[0] === 0x89 && decryptedBuffer[1] === 0x50) {
ext = "png";
} else if (decryptedBuffer[0] === 0x47 && decryptedBuffer[1] === 0x49) {
ext = "gif";
}
const filename = `wecom_${Date.now()}_${Math.random().toString(36).substring(2, 8)}.${ext}`;
const localPath = join(MEDIA_CACHE_DIR, filename);
writeFileSync(localPath, decryptedBuffer);
const mimeType = ext === "png" ? "image/png" : ext === "gif" ? "image/gif" : "image/jpeg";
logger.info("Image decrypted and saved", { path: localPath, size: decryptedBuffer.length, mimeType });
return { localPath, mimeType };
}
/**
* Download and decrypt a file from WeCom.
* @param {string} fileUrl - File download URL
* @param {string} fileName - Original file name
* @param {string} encodingAesKey - AES key for decryption
* @param {string} token - Token for decryption
* @returns {Promise<{localPath: string, effectiveFileName: string}>} Local path and resolved filename
*/
async function downloadWecomFile(fileUrl, fileName, encodingAesKey, token) {
if (!existsSync(MEDIA_CACHE_DIR)) {
mkdirSync(MEDIA_CACHE_DIR, { recursive: true });
}
logger.info("Downloading encrypted file", { url: fileUrl.substring(0, 80), name: fileName });
const response = await fetch(fileUrl);
if (!response.ok) {
throw new Error(`Failed to download file: ${response.status}`);
}
const encryptedBuffer = Buffer.from(await response.arrayBuffer());
let effectiveFileName = fileName;
if (!effectiveFileName) {
const contentDisposition = response.headers.get("content-disposition");
if (contentDisposition) {
const filenameMatch = contentDisposition.match(/filename\*?=(?:UTF-8'')?["']?([^"';\n]+)["']?/i);
if (filenameMatch && filenameMatch[1]) {
effectiveFileName = decodeURIComponent(filenameMatch[1]);
logger.info("Extracted filename from Content-Disposition", { name: effectiveFileName });
}
}
}
const wecomCrypto = new WecomWebhook({token, encodingAesKey}).crypto;
const decryptedBuffer = wecomCrypto.decryptMedia(encryptedBuffer);
const safeName = (effectiveFileName || `file_${Date.now()}`).replace(/[/\\:*?"<>|]/g, "_");
const localPath = join(MEDIA_CACHE_DIR, `${Date.now()}_${safeName}`);
writeFileSync(localPath, decryptedBuffer);
logger.info("File decrypted and saved", { path: localPath, size: decryptedBuffer.length });
return { localPath, effectiveFileName: effectiveFileName || fileName };
}
/**
* Guess MIME type from file extension.
*/
function guessMimeType(fileName) {
const ext = (fileName || "").split(".").pop()?.toLowerCase() || "";
const mimeMap = {
pdf: "application/pdf",
doc: "application/msword",
docx: "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
xls: "application/vnd.ms-excel",
xlsx: "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
ppt: "application/vnd.ms-powerpoint",
pptx: "application/vnd.openxmlformats-officedocument.presentationml.presentation",
txt: "text/plain",
csv: "text/csv",
zip: "application/zip",
png: "image/png",
jpg: "image/jpeg",
jpeg: "image/jpeg",
gif: "image/gif",
mp4: "video/mp4",
mp3: "audio/mpeg",
};
return mimeMap[ext] || "application/octet-stream";
}
// Runtime state (module-level singleton)
let _api = null;
let _openclawConfig = null;
// Track active stream for each user, so outbound messages (like reset confirmation)
// can be added to the correct stream instead of using response_url
const activeStreams = new Map();
const activeStreamHistory = new Map();
// Store stream metadata for delayed finish (main response done flag)
export const streamMeta = new Map();
// Store response_url for fallback delivery after stream closes
// response_url is valid for 1 hour and can be used only once
export const responseUrls = new Map();
// Map to store pending template card message IDs for in-place updates
export const pendingCardUpdates = new Map();
// Periodic cleanup for streamMeta and expired responseUrls to prevent memory leaks.
setInterval(() => {
const now = Date.now();
for (const streamId of streamMeta.keys()) {
if (!streamManager.hasStream(streamId)) {
streamMeta.delete(streamId);
}
}
for (const [key, entry] of responseUrls.entries()) {
if (now > entry.expiresAt) {
responseUrls.delete(key);
}
}
}, 60 * 1000).unref();
// AsyncLocalStorage for propagating the correct streamId through the async
// processing chain.
export const streamContext = new AsyncLocalStorage();
export function getMessageStreamKey(message) {
if (!message || typeof message !== "object") {
return "";
}
const chatType = message.chatType || "single";
const chatId = message.chatId || "";
if (chatType === "group" && chatId) {
return chatId;
}
return message.fromUser || "";
}
export function setApi(api) {
_api = api;
}
export function getApi() {
if (!_api) {
throw new Error("[wecom] API not initialized");
}
return _api;
}
export function setOpenClawConfig(config) {
_openclawConfig = config;
}
export function getActiveStreamContext(streamKey) {
if (!streamKey) {
return null;
}
const history = activeStreamHistory.get(streamKey);
if (!history || history.length === 0) {
activeStreams.delete(streamKey);
return null;
}
const remaining = history.filter((id) => streamManager.hasStream(id));
if (remaining.length === 0) {
activeStreamHistory.delete(streamKey);
activeStreams.delete(streamKey);
return null;
}
activeStreamHistory.set(streamKey, remaining);
const latest = remaining[remaining.length - 1];
activeStreams.set(streamKey, latest);
return latest;
}
export function registerActiveStream(streamKey, streamId) {
if (!streamKey || !streamId) {
return;
}
const history = activeStreamHistory.get(streamKey) ?? [];
const deduped = history.filter((id) => id !== streamId);
deduped.push(streamId);
activeStreamHistory.set(streamKey, deduped);
activeStreams.set(streamKey, streamId);
}
export function unregisterActiveStream(streamKey, streamId) {
if (!streamKey || !streamId) {
return;
}
const history = activeStreamHistory.get(streamKey);
if (!history || history.length === 0) {
if (activeStreams.get(streamKey) === streamId) {
activeStreams.delete(streamKey);
}
return;
}
const remaining = history.filter((id) => id !== streamId);
if (remaining.length === 0) {
activeStreamHistory.delete(streamKey);
activeStreams.delete(streamKey);
return;
}
activeStreamHistory.set(streamKey, remaining);
activeStreams.set(streamKey, remaining[remaining.length - 1]);
}
/**
* Handle stream error: replace placeholder with error message, finish stream, unregister.
*/
export async function handleStreamError(streamId, streamKey, errorMessage) {
if (!streamId) return;
const stream = streamManager.getStream(streamId);
if (stream && !stream.finished) {
if (stream.content.trim() === THINKING_PLACEHOLDER.trim()) {
streamManager.replaceIfPlaceholder(streamId, errorMessage, THINKING_PLACEHOLDER);
}
await streamManager.finishStream(streamId);
}
unregisterActiveStream(streamKey, streamId);
}
export async function processInboundMessage({
message,
streamId,
timestamp: _timestamp,
nonce: _nonce,
account,
config,
}) {
const _api = getApi();
const channelApi = _api.runtime?.channel || _api.channel;
const routingApi = channelApi?.routing || _api.routing;
const sessionApi = channelApi?.session || _api.session;
const replyApi = channelApi?.reply || _api.reply;
const senderId = message.fromUser;
const msgType = message.msgType || "text";
const imageUrl = message.imageUrl || "";
const imageUrls = message.imageUrls || [];
const fileUrl = message.fileUrl || "";
const fileName = message.fileName || "";
const rawContent = message.content || "";
const chatType = message.chatType || "single";
const chatId = message.chatid || "";
const isGroupChat = chatType === "group" && chatId;
const isAIBotStreamRequest = message.stream && message.stream.id; // AI Bot stream refresh
const isSelfBuiltAppRequest = message.isSelfBuiltAppRequest !== undefined
? message.isSelfBuiltAppRequest
: (!isAIBotStreamRequest && !message.responseUrl); // Heuristic fallback
const peerId = isGroupChat ? chatId : senderId;
const peerKind = isGroupChat ? "group" : "dm";
const conversationId = isGroupChat ? `wecom:group:${chatId}` : `wecom:${senderId}`;
const streamKey = isGroupChat ? chatId : senderId;
if (streamId) {
registerActiveStream(streamKey, streamId);
}
if (message.responseUrl && message.responseUrl.trim()) {
responseUrls.set(streamKey, {
url: message.responseUrl,
expiresAt: Date.now() + 60 * 60 * 1000, // 1 hour
used: false,
});
logger.debug("WeCom: saved response_url for fallback (AI Bot)", { streamKey });
}
let effectiveRawBody = rawContent;
if (!effectiveRawBody.trim() && !imageUrl && imageUrls.length === 0 && !fileUrl && !message.event) {
logger.debug("WeCom: empty message or unhandled event, skipping");
if (streamId) {
await streamManager.finishStream(streamId);
unregisterActiveStream(streamKey, streamId);
}
return;
}
logger.info("WeCom processing message", {
from: senderId,
chatType: peerKind,
peerId,
content: effectiveRawBody.substring(0, 50),
streamId,
isAIBotStreamRequest,
isSelfBuiltAppRequest,
});
const route = routingApi.resolveAgentRoute({
cfg: config,
channel: "wecom",
accountId: account.accountId,
peer: {
kind: peerKind,
id: peerId,
},
});
const storePath = sessionApi.resolveStorePath(config.session?.store, {
agentId: route.agentId,
});
const envelopeOptions = replyApi.resolveEnvelopeFormatOptions(config);
const previousTimestamp = sessionApi.readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
});
const senderLabel = isGroupChat ? `[${senderId}]` : senderId;
const body = replyApi.formatAgentEnvelope({
channel: isGroupChat ? "Enterprise WeChat Group" : "Enterprise WeChat",
from: senderLabel,
timestamp: Date.now(),
previousTimestamp,
envelope: envelopeOptions,
body: effectiveRawBody,
});
const ctxBase = {
Body: body,
RawBody: effectiveRawBody,
CommandBody: effectiveRawBody,
From: `wecom:${senderId}`,
To: conversationId,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isGroupChat ? "group" : "direct",
ConversationLabel: isGroupChat ? `Group ${chatId}` : senderId,
SenderName: senderId,
SenderId: senderId,
GroupId: isGroupChat ? chatId : undefined,
Provider: "wecom",
Surface: "wecom",
OriginatingChannel: "wecom",
OriginatingTo: conversationId,
// CommandAuthorized: commandAuthorized, // Removed
};
const allImageUrls = imageUrl ? [imageUrl] : imageUrls;
if (allImageUrls.length > 0) {
const mediaPaths = [];
const mediaTypes = [];
const fallbackUrls = [];
for (const url of allImageUrls) {
try {
const result = await downloadAndDecryptImage(url, account.encodingAesKey, account.token);
mediaPaths.push(result.localPath);
mediaTypes.push(result.mimeType);
} catch (e) {
logger.warn("Image decryption failed, using URL fallback", { error: e.message, url: url.substring(0, 80) });
fallbackUrls.push(url);
mediaTypes.push("image/jpeg");
}
}
if (mediaPaths.length > 0) {
ctxBase.MediaPaths = mediaPaths;
}
if (fallbackUrls.length > 0) {
ctxBase.MediaUrls = fallbackUrls;
}
ctxBase.MediaTypes = mediaTypes;
logger.info("Image attachments prepared", {
decrypted: mediaPaths.length,
fallback: fallbackUrls.length,
});
if (!effectiveRawBody.trim()) {
const count = allImageUrls.length;
ctxBase.Body = count > 1
? `[用户发送了${count}张图片]`
: "[用户发送了一张图片]";
ctxBase.RawBody = "[图片]";
ctxBase.CommandBody = "";
}
}
if (fileUrl) {
try {
const { localPath: localFilePath, effectiveFileName } = await downloadWecomFile(fileUrl, fileName, account.encodingAesKey, account.token);
ctxBase.MediaPaths = [...(ctxBase.MediaPaths || []), localFilePath];
ctxBase.MediaTypes = [...(ctxBase.MediaTypes || []), guessMimeType(effectiveFileName)];
logger.info("File attachment prepared", { path: localFilePath, name: effectiveFileName });
} catch (e) {
logger.warn("File download failed", { error: e.message });
const label = fileName ? `[文件: ${fileName}]` : "[文件]";
if (!effectiveRawBody.trim()) {
ctxBase.Body = `[用户发送了文件] ${label}`;
ctxBase.RawBody = label;
ctxBase.CommandBody = "";
}
}
if (!effectiveRawBody.trim() && !ctxBase.Body) {
const label = fileName ? `[文件: ${fileName}]` : "[文件]";
ctxBase.Body = `[用户发送了文件] ${label}`;
ctxBase.RawBody = label;
ctxBase.CommandBody = "";
}
}
if (message.event && message.event_type) {
ctxBase.MsgType = 'event';
ctxBase.Event = message.event_type;
ctxBase.RawBody = `[Event: ${message.event_type}]`;
if (!effectiveRawBody.trim()) {
ctxBase.Body = `[用户触发了事件: ${message.event_type}]`;
}
}
const ctxPayload = replyApi.finalizeInboundContext(ctxBase);
void sessionApi
.recordSessionMetaFromInbound({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
})
.catch((err) => {
logger.error("WeCom: failed updating session meta", { error: err.message });
});
if (isSelfBuiltAppRequest) {
if (account.corpId && account.corpSecret) {
// Async mode: Send a "Thinking..." card and process in background
try {
const cardRes = await wecomApi.sendTemplateCard(account.corpId, account.corpSecret, message.agentId || account.agentId, senderId, THINKING_PLACEHOLDER);
if (cardRes.response_code) {
pendingCardUpdates.set(streamKey, cardRes.response_code);
logger.info("WeCom Self-Built (Async): Sent 'Thinking...' card, pending update", { streamKey, response_code: cardRes.response_code });
}
} catch (err) {
logger.error("WeCom Self-Built (Async): Failed to send initial 'Thinking...' card", { error: err.message });
// Can't send initial card, so we can't update it later.
// The final reply will be sent as a new message.
}
// Process reply in the background
replyApi.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: config,
dispatcherOptions: {
deliver: async (payload, info) => {
logger.info("WeCom Self-Built (Async): deliver called", {
kind: info.kind,
hasText: !!(payload.text && payload.text.trim()),
hasMediaUrl: !!payload.mediaUrl,
textPreview: (payload.text || "").substring(0, 50),
});
await deliverWecomReply({
payload,
senderId: streamKey,
streamId: undefined, // No stream for self-built
isSelfBuiltAppRequest: true,
originalMessage: message,
account,
});
},
onError: async (err, info) => {
logger.error("WeCom Self-Built App (Async): error during dispatch", { error: err.message, kind: info?.kind });
await deliverWecomReply({
payload: { text: "处理消息时出错,请稍后再试。" },
senderId: streamKey,
streamId: undefined,
isSelfBuiltAppRequest: true,
originalMessage: message,
account,
});
}
}
}).catch(err => logger.error("WeCom Async Dispatch Error", { error: err.message }));
// Return nothing, as we are not using passive replies for async now
return;
} else {
// Sync mode: Wait for reply and then send it as passive reply
logger.warn("WeCom Self-Built App: Missing corpId or corpSecret. Operating in Sync mode. Cannot send immediate '思考中...' reply and may timeout if AI takes longer than 5 seconds. Configure corpId and corpSecret to enable Async mode.");
let capturedReplyXml = null;
let fullText = "";
let fullMediaUrl = "";
let errorOccurred = false;
await replyApi.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: config,
dispatcherOptions: {
deliver: async (payload, info) => {
logger.info("WeCom Self-Built (Sync): deliver called", {
kind: info.kind,
hasText: !!(payload.text && payload.text.trim()),
hasMediaUrl: !!payload.mediaUrl,
});
if (payload.text) {
fullText += payload.text;
}
if (payload.mediaUrl) {
fullMediaUrl = payload.mediaUrl;
}
},
onError: async (err, info) => {
errorOccurred = true;
logger.error("WeCom Self-Built App (Sync): error during dispatch", { error: err.message, kind: info?.kind });
const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey });
capturedReplyXml = webhook.buildPassiveReplyXml(
message.fromUser || message.chatId || message.ToUserName,
message.ToUserName,
"text",
"处理消息时出错,请稍后再试。",
Math.floor(Date.now() / 1000),
_nonce,
);
}
}
});
if (!errorOccurred) {
logger.info('WeCom Sync: Final accumulated text', { text: fullText, mediaUrl: fullMediaUrl });
const result = await deliverWecomReply({
payload: { text: fullText, mediaUrl: fullMediaUrl },
senderId: streamKey,
streamId: undefined,
isSelfBuiltAppRequest,
originalMessage: message,
account,
});
if (result?.passiveReplyXml) {
capturedReplyXml = result.passiveReplyXml;
}
}
return { passiveReplyXml: capturedReplyXml };
}
} else { // AI Bot streaming response
const replyResult = await streamContext.run({ streamId, streamKey }, async () => {
const replyPromise = replyApi.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: config,
dispatcherOptions: {
deliver: async (payload, info) => {
logger.info("Dispatcher deliver called", {
kind: info.kind,
hasText: !!(payload.text && payload.text.trim()),
textPreview: (payload.text || "").substring(0, 50),
isSelfBuiltAppRequest,
});
return deliverWecomReply({
payload,
senderId: streamKey,
streamId,
isSelfBuiltAppRequest,
});
},
onError: async (err, info) => {
logger.error("WeCom reply failed", { error: err.message, kind: info.kind });
if (streamId) {
await handleStreamError(streamId, streamKey, "处理消息时出错,请稍后再试。");
}
},
},
});
return replyPromise;
});
if (streamId) {
const stream = streamManager.getStream(streamId);
if (!stream || stream.finished) {
unregisterActiveStream(streamKey, streamId);
} else {
setTimeout(async () => {
const checkStream = streamManager.getStream(streamId);
if (checkStream && !checkStream.finished) {
const meta = streamMeta.get(streamId);
const idleMs = Date.now() - checkStream.updatedAt;
if (idleMs > 30000) {
logger.warn("WeCom safety net: closing idle stream", { streamId, idleMs });
try {
await streamManager.finishStream(streamId);
unregisterActiveStream(streamKey, streamId);
} catch (err) {
logger.error("WeCom safety net: failed to close stream", { streamId, error: err.message });
}
}
}
}, 35000);
}
}
return replyResult;
}
}
function getWecomMediaType(filePath) {
const ext = filePath.split('.').pop().toLowerCase();
if (['jpg', 'jpeg', 'png'].includes(ext)) return 'image';
if (['mp3', 'wav', 'amr', 'm4a'].includes(ext)) return 'voice';
if (['mp4'].includes(ext)) return 'video';
return 'file';
}
export async function deliverWecomReply({ payload, senderId, streamId, isSelfBuiltAppRequest, originalMessage, account }) {
logger.info('deliverWecomReply received payload', { payload: JSON.stringify(payload) });
const text = payload.text || "";
logger.debug("deliverWecomReply called", {
hasText: !!text.trim(),
hasMediaUrl: !!payload.mediaUrl,
textPreview: text.substring(0, 50),
streamId,
senderId,
isSelfBuiltAppRequest,
});
const mediaRegex = /^MEDIA:\s*(.+)$/gm;
const mediaMatches = [];
let match;
while ((match = mediaRegex.exec(text)) !== null) {
const mediaPath = match[1].trim();
if (mediaPath.startsWith("/")) {
mediaMatches.push({
fullMatch: match[0],
path: mediaPath,
});
logger.debug("Detected absolute path MEDIA line", {
streamId,
mediaPath,
line: match[0],
});
}
}
if (payload.mediaUrl) {
mediaMatches.push({
fullMatch: "", // It's not embedded in text
path: payload.mediaUrl,
});
}
let processedText = text;
// Handle media for all message types first
for (const media of mediaMatches) {
processedText = processedText.replace(media.fullMatch, "").trim(); // Remove media placeholder
const type = getWecomMediaType(media.path);
try {
if (streamId && !isSelfBuiltAppRequest) { // AI Bot
if (type === 'image') {
streamManager.queueImage(streamId, media.path);
logger.info("Queued absolute path image for stream (AI Bot)", { streamId, imagePath: media.path });
} else {
logger.warn("WeCom AI Bot: non-image media in stream, sending as separate message", { type, mediaPath: media.path });
// Fallthrough to send as active message
}
} else if (isSelfBuiltAppRequest) { // Self-Built App
const uploadRes = await wecomApi.uploadMedia(account.corpId, account.corpSecret, type, media.path);
const mediaId = uploadRes.media_id;
const toUser = originalMessage.fromUser || originalMessage.chatId || originalMessage.ToUserName;
const agentId = originalMessage.agentId || account.agentId || 1000002;
if (type === 'image') await wecomApi.sendImageMessage?.(account.corpId, account.corpSecret, agentId, toUser, mediaId);
else if (type === 'voice') await wecomApi.sendVoiceMessage?.(account.corpId, account.corpSecret, agentId, toUser, mediaId);
else if (type === 'video') await wecomApi.sendVideoMessage?.(account.corpId, account.corpSecret, agentId, toUser, mediaId);
else await wecomApi.sendFileMessage?.(account.corpId, account.corpSecret, agentId, toUser, mediaId);
logger.info("WeCom Self-Built: sent active media message", { mediaId, type });
}
} catch (err) {
logger.error("Failed to upload/send media", { error: err.message, path: media.path });
// Add a text placeholder back if sending failed
processedText += ` [Failed to send ${type}: ${media.path}]`;
}
}
if (!processedText.trim()) {
logger.debug("WeCom: empty text after processing media, skipping text reply");
return;
}
if (isSelfBuiltAppRequest) {
const pendingCardId = pendingCardUpdates.get(senderId);
if (pendingCardId) {
try {
await wecomApi.updateTemplateCard(account.corpId, account.corpSecret, originalMessage.agentId || account.agentId, pendingCardId, processedText);
logger.info("WeCom Self-Built (Async): Successfully updated 'Thinking...' card with final response.", { senderId, response_code: pendingCardId });
pendingCardUpdates.delete(senderId); // Clean up
return;
} catch (err) {
logger.error("WeCom Self-Built (Async): Failed to update template card. Sending as new message.", { error: err.message, response_code: pendingCardId });
// Fallthrough to send as a new message if update fails
}
}
// Fallback for sync mode, or if card update fails, or if no initial card was sent
if (!account.corpId || !account.corpSecret) { // Sync mode must use passive reply
if (!originalMessage) {
logger.error("WeCom Self-Built (Sync): missing originalMessage for passive reply");
return;
}
const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey });
const passiveReply = webhook.buildPassiveReplyXml(
originalMessage.fromUser || originalMessage.chatId || originalMessage.ToUserName,
originalMessage.ToUserName, "text", processedText,
Math.floor(Date.now() / 1000), originalMessage.query.nonce
);
logger.info("WeCom Self-Built (Sync): constructed passive XML reply", { replyXmlPreview: passiveReply.substring(0, 100) });
return { passiveReplyXml: passiveReply };
} else { // Async mode, but no pending card - send as new active message
try {
const toUser = originalMessage.fromUser || originalMessage.chatId || originalMessage.ToUserName;
const agentId = originalMessage.agentId || account.agentId || 1000002;
await wecomApi.sendTextMessage(account.corpId, account.corpSecret, agentId, toUser, processedText);
logger.info("WeCom Self-Built (Async): Sent final response as a new text message.", { senderId });
} catch (err) {
logger.error("WeCom Self-Built (Async): Failed to send final active text message.", { error: err.message, senderId });
}
}
} else { // AI Bot streaming response
const appendToStream = (targetStreamId, content) => {
const stream = streamManager.getStream(targetStreamId);
if (!stream) return false;
if (stream.content.trim() === THINKING_PLACEHOLDER.trim()) {
streamManager.replaceIfPlaceholder(targetStreamId, content, THINKING_PLACEHOLDER);
return true;
}
const separator = stream.content.length > 0 ? "\n\n" : "";
streamManager.appendStream(targetStreamId, separator + content);
return true;
};
if (!streamId) {
const ctx = streamContext.getStore();
const contextStreamId = ctx?.streamId;
const activeStreamId = contextStreamId ?? getActiveStreamContext(senderId);
if (activeStreamId && streamManager.hasStream(activeStreamId)) {
appendToStream(activeStreamId, processedText);
} else {
logger.warn("WeCom: no active stream for this message", { senderId });
}
return;
}
if (!streamManager.hasStream(streamId)) {
logger.warn("WeCom: stream not found, cannot deliver message", { streamId, senderId });
return;
}
appendToStream(streamId, processedText);
logger.debug("WeCom stream appended", {
streamId,
contentLength: processedText.length,
to: senderId,
});
}
}