diff --git a/index.js b/index.js index 040366b..7390778 100755 --- a/index.js +++ b/index.js @@ -283,10 +283,8 @@ gateway: { startAccount: async (ctx) => { try { - console.log("[DEBUG] startAccount called for appId:", ctx.accountId); const appAccount = ctx.account; - - console.log("[DEBUG] appAccount data:", JSON.stringify(appAccount)); + logger.info("WeCom gateway starting application", { appId: appAccount.id }); const unregister = registerWebhookTarget({ path: appAccount.webhookPath, @@ -294,25 +292,23 @@ config: ctx.cfg, }); - console.log("[DEBUG] startAccount succeeded for appId:", ctx.accountId); + const interval = setInterval(() => {}, 60 * 60 * 1000); // Keep alive - let resolveTask; - const task = new Promise((resolve) => { resolveTask = resolve; }); + logger.info("WeCom gateway application started successfully", { appId: appAccount.id }); return { shutdown: async () => { - console.log("[DEBUG] shutting down appId:", ctx.accountId); + logger.info("WeCom gateway shutting down application", { appId: appAccount.id }); + clearInterval(interval); unregister(); - resolveTask(); }, - task, }; } catch (err) { - console.error("[DEBUG] startAccount FAILED for appId:", ctx.accountId, err); + logger.error("WeCom gateway startAccount failed", { error: err.message, appId: ctx.accountId }); throw err; } }, - }, + }, }; // ============================================================================= @@ -482,16 +478,23 @@ return true; } } else { - // Self-Built App: passive XML reply - logger.info("Self-Built App message/event received, processing for passive reply", { + // Self-Built App: Immediately acknowledge, then process async + logger.info("Self-Built App message/event received, acknowledging and processing in background", { from: message.fromUser, appId: appAccount.id, }); + + // For async mode, we MUST reply immediately to WeCom's server to prevent timeouts. + // The actual reply will be sent via an active API call (now a Template Card). + res.writeHead(200, { "Content-Type": "text/plain" }); + res.end("success"); // Ensure isSelfBuiltAppRequest is set explicitly message.isSelfBuiltAppRequest = true; - const processingResult = await processInboundMessage({ + // Process the message in the background. This will now send a "Thinking" card + // and later update it with the final response. + processInboundMessage({ message, streamId: undefined, timestamp, @@ -499,30 +502,11 @@ account: appAccount, config: fullConfig, }).catch(async (err) => { - logger.error("WeCom Self-Built App message processing failed", { error: err.message, appId: appAccount.id }); - return { - passiveReplyXml: webhook.buildPassiveReplyXml( - message.fromUser, - appAccount.id, - "text", - "处理消息时出错,请稍后再试。", - Math.floor(Date.now() / 1000), - nonce, - ) - }; + // This is a top-level error handler in case processInboundMessage itself throws. + // The logic inside processInboundMessage has its own error handling for the AI turn. + logger.error("WeCom Self-Built App message processing failed catastrophically", { error: err.message, appId: appAccount.id }); }); - - if (processingResult?.passiveReplyXml) { - logger.info("WeCom Self-Built: returning passive XML reply", { replyXmlPreview: processingResult.passiveReplyXml.substring(0, 100), appId: appAccount.id }); - res.writeHead(200, { "Content-Type": "application/xml" }); - res.end(processingResult.passiveReplyXml); - return true; - } else { - logger.info("WeCom Self-Built: no immediate passive XML reply, returning success.", { appId: appAccount.id }); - res.writeHead(200, { "Content-Type": "text/plain" }); - res.end("success"); - return true; - } + return true; } } diff --git a/wecom-api.js b/wecom-api.js index ebdee94..0dd9508 100755 --- a/wecom-api.js +++ b/wecom-api.js @@ -73,7 +73,7 @@ body: JSON.stringify({ touser: toUser, msgtype: "text", - agentid: agentId, + agentid: agentId || 1000002, text: { content: text }, }), }); @@ -94,7 +94,7 @@ body: JSON.stringify({ touser: toUser, msgtype: "image", - agentid: agentId, + agentid: agentId || 1000002, image: { media_id: mediaId }, }), }); @@ -114,7 +114,7 @@ body: JSON.stringify({ touser: toUser, msgtype: "voice", - agentid: agentId, + agentid: agentId || 1000002, voice: { media_id: mediaId }, }), }); @@ -134,7 +134,7 @@ body: JSON.stringify({ touser: toUser, msgtype: "video", - agentid: agentId, + agentid: agentId || 1000002, video: { media_id: mediaId }, }), }); @@ -154,7 +154,7 @@ body: JSON.stringify({ touser: toUser, msgtype: "file", - agentid: agentId, + agentid: agentId || 1000002, file: { media_id: mediaId }, }), }); @@ -164,6 +164,59 @@ } return data; } + + async sendTemplateCard(corpId, corpSecret, agentId, toUser, text) { + logger.debug("WeCom API: Sending template card", { corpId, agentId, toUser, text }); + const token = await this.getAccessToken(corpId, corpSecret); + const res = await fetch(`https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=${token}`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + touser: toUser, + msgtype: "template_card", + agentid: agentId || 1000002, + template_card: { + card_type: "text_notice", + source: { + desc: "OpenClaw AI", + }, + main_title: { + title: "AI Agent", + desc: text, + }, + }, + }), + }); + const data = await res.json(); + if (data.errcode !== 0) { + throw new Error(`WeCom message/send template_card failed: ${data.errmsg} (${data.errcode})`); + } + return data; + } + + async updateTemplateCard(corpId, corpSecret, agentId, messageId, text) { + logger.debug("WeCom API: Updating template card", { corpId, agentId, messageId, text }); + const token = await this.getAccessToken(corpId, corpSecret); + const res = await fetch(`https://qyapi.weixin.qq.com/cgi-bin/message/update_template_card?access_token=${token}`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + response_code: messageId, + agentid: agentId || 1000002, + template_card: { + main_title: { + title: "AI Agent", + desc: text, + }, + }, + }), + }); + const data = await res.json(); + if (data.errcode !== 0) { + throw new Error(`WeCom message/update_template_card failed: ${data.errmsg} (${data.errcode})`); + } + return data; + } } export const wecomApi = new WeComApiClient(); diff --git a/wecom-message-processor.js b/wecom-message-processor.js index 0f409eb..769d1a8 100755 --- a/wecom-message-processor.js +++ b/wecom-message-processor.js @@ -140,6 +140,9 @@ // response_url is valid for 1 hour and can be used only once export const responseUrls = new Map(); +// Map to store pending template card message IDs for in-place updates +export const pendingCardUpdates = new Map(); + // Periodic cleanup for streamMeta and expired responseUrls to prevent memory leaks. setInterval(() => { const now = Date.now(); @@ -468,16 +471,18 @@ if (isSelfBuiltAppRequest) { if (account.corpId && account.corpSecret) { - // Async mode: Return "Thinking..." passively immediately - const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey }); - const initialReplyXml = webhook.buildPassiveReplyXml( - message.fromUser || message.chatId || message.ToUserName, - message.ToUserName, - "text", - THINKING_PLACEHOLDER, - Math.floor(Date.now() / 1000), - _nonce, - ); + // Async mode: Send a "Thinking..." card and process in background + try { + const cardRes = await wecomApi.sendTemplateCard(account.corpId, account.corpSecret, message.agentId || account.agentId, senderId, THINKING_PLACEHOLDER); + if (cardRes.response_code) { + pendingCardUpdates.set(streamKey, cardRes.response_code); + logger.info("WeCom Self-Built (Async): Sent 'Thinking...' card, pending update", { streamKey, response_code: cardRes.response_code }); + } + } catch (err) { + logger.error("WeCom Self-Built (Async): Failed to send initial 'Thinking...' card", { error: err.message }); + // Can't send initial card, so we can't update it later. + // The final reply will be sent as a new message. + } // Process reply in the background replyApi.dispatchReplyWithBufferedBlockDispatcher({ @@ -485,100 +490,37 @@ cfg: config, dispatcherOptions: { deliver: async (payload, info) => { - // Deliver actual reply using WeCom Active Message API logger.info("WeCom Self-Built (Async): deliver called", { kind: info.kind, hasText: !!(payload.text && payload.text.trim()), hasMediaUrl: !!payload.mediaUrl, textPreview: (payload.text || "").substring(0, 50), }); - const text = payload.text || ""; - const mediaRegex = /^MEDIA:\s*(.+)$/gm; - const mediaMatches = []; - let match; - while ((match = mediaRegex.exec(text)) !== null) { - const mediaPath = match[1].trim(); - if (mediaPath.startsWith("/")) { - mediaMatches.push({ fullMatch: match[0], path: mediaPath }); - } - } - - if (payload.mediaUrl) { - mediaMatches.push({ - fullMatch: "", // It's not embedded in text - path: payload.mediaUrl, - }); - } - - let processedText = text; - if (mediaMatches.length === 1 && !text.replace(mediaMatches[0].fullMatch, "").trim()) { - // Single media item, no other text. Send active media message. - const media = mediaMatches[0]; - const type = getWecomMediaType(media.path); - try { - logger.info("WeCom Self-Built (Async): uploading single media for active reply", { path: media.path, type }); - const uploadRes = await wecomApi.uploadMedia(account.corpId, account.corpSecret, type, media.path); - const mediaId = uploadRes.media_id; - - // Active API sends - if (type === 'image') { - await wecomApi.sendImageMessage?.(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, mediaId); - } else if (type === 'voice') { - await wecomApi.sendVoiceMessage?.(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, mediaId); - } else if (type === 'video') { - await wecomApi.sendVideoMessage?.(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, mediaId); - } else { - await wecomApi.sendFileMessage?.(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, mediaId); - } - logger.info(`WeCom Self-Built (Async): sent active ${type} message`, { mediaId }); - return; - } catch (err) { - logger.error("Failed to upload/send media for active reply", { error: err.message, path: media.path }); - // Fallback to text below - } - } - - for (const media of mediaMatches) { - const type = getWecomMediaType(media.path); - try { - const uploadRes = await wecomApi.uploadMedia(account.corpId, account.corpSecret, type, media.path); - const mediaId = uploadRes.media_id; - const agentId = message.agentId || account.agentId || 1000002; - if (type === 'image') { - await wecomApi.sendImageMessage?.(account.corpId, account.corpSecret, agentId, senderId, mediaId); - } else if (type === 'voice') { - await wecomApi.sendVoiceMessage?.(account.corpId, account.corpSecret, agentId, senderId, mediaId); - } else if (type === 'video') { - await wecomApi.sendVideoMessage?.(account.corpId, account.corpSecret, agentId, senderId, mediaId); - } else { - await wecomApi.sendFileMessage?.(account.corpId, account.corpSecret, agentId, senderId, mediaId); - } - logger.info("WeCom Self-Built (Async): sent active media message (mixed with text)", { mediaId, type }); - processedText = processedText.replace(media.fullMatch, "").trim(); - } catch (err) { - logger.error("WeCom Self-Built (Async): Failed to upload/send media actively", { error: err.message, path: media.path }); - processedText = processedText.replace(media.fullMatch, `[${type === 'voice' ? '语音' : type === 'video' ? '视频' : type === 'image' ? '图片' : '文件'}: ${media.path}]`).trim(); - } - } - - if (processedText.trim()) { - try { - await wecomApi.sendTextMessage(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, processedText); - } catch (e) { - logger.error("WeCom Self-Built (Async): Failed to send active message", { error: e.message }); - } - } + await deliverWecomReply({ + payload, + senderId: streamKey, + streamId: undefined, // No stream for self-built + isSelfBuiltAppRequest: true, + originalMessage: message, + account, + }); }, onError: async (err, info) => { logger.error("WeCom Self-Built App (Async): error during dispatch", { error: err.message, kind: info?.kind }); - try { - await wecomApi.sendTextMessage(account.corpId, account.corpSecret, message.agentId || account.agentId || 1000002, senderId, "处理消息时出错,请稍后再试。"); - } catch (e) {} + await deliverWecomReply({ + payload: { text: "处理消息时出错,请稍后再试。" }, + senderId: streamKey, + streamId: undefined, + isSelfBuiltAppRequest: true, + originalMessage: message, + account, + }); } } }).catch(err => logger.error("WeCom Async Dispatch Error", { error: err.message })); - return { passiveReplyXml: initialReplyXml }; + // Return nothing, as we are not using passive replies for async now + return; } else { // Sync mode: Wait for reply and then send it as passive reply logger.warn("WeCom Self-Built App: Missing corpId or corpSecret. Operating in Sync mode. Cannot send immediate '思考中...' reply and may timeout if AI takes longer than 5 seconds. Configure corpId and corpSecret to enable Async mode."); @@ -629,9 +571,10 @@ isSelfBuiltAppRequest, originalMessage: message, account, - }); if (result?.passiveReplyXml) { - capturedReplyXml = result.passiveReplyXml; - } + }); + if (result?.passiveReplyXml) { + capturedReplyXml = result.passiveReplyXml; + } } return { passiveReplyXml: capturedReplyXml }; @@ -742,118 +685,96 @@ } let processedText = text; - if (isSelfBuiltAppRequest && mediaMatches.length === 1 && !text.replace(mediaMatches[0].fullMatch, "").trim()) { - // Single media item, no other text. Use passive media reply. - const media = mediaMatches[0]; + + // Handle media for all message types first + for (const media of mediaMatches) { + processedText = processedText.replace(media.fullMatch, "").trim(); // Remove media placeholder const type = getWecomMediaType(media.path); + try { - logger.info("WeCom Self-Built: uploading single media for passive reply", { path: media.path, type }); - const uploadRes = await wecomApi.uploadMedia(account.corpId, account.corpSecret, type, media.path); - const mediaId = uploadRes.media_id; - - const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey }); - const passiveReply = webhook.buildPassiveReplyXml( - originalMessage.fromUser || originalMessage.chatId || originalMessage.ToUserName, - originalMessage.ToUserName, - type, - mediaId, - Math.floor(Date.now() / 1000), - originalMessage.query.nonce, - ); - logger.info(`WeCom Self-Built: constructed passive ${type} XML reply`, { mediaId }); - return { passiveReplyXml: passiveReply }; - } catch (err) { - logger.error("Failed to upload media for passive reply", { error: err.message, path: media.path }); - // Fallback to text placeholder logic below - } - } + if (streamId && !isSelfBuiltAppRequest) { // AI Bot + if (type === 'image') { + streamManager.queueImage(streamId, media.path); + logger.info("Queued absolute path image for stream (AI Bot)", { streamId, imagePath: media.path }); + } else { + logger.warn("WeCom AI Bot: non-image media in stream, sending as separate message", { type, mediaPath: media.path }); + // Fallthrough to send as active message + } + } else if (isSelfBuiltAppRequest) { // Self-Built App + const uploadRes = await wecomApi.uploadMedia(account.corpId, account.corpSecret, type, media.path); + const mediaId = uploadRes.media_id; + const toUser = originalMessage.fromUser || originalMessage.chatId || originalMessage.ToUserName; + const agentId = originalMessage.agentId || account.agentId || 1000002; + + if (type === 'image') await wecomApi.sendImageMessage?.(account.corpId, account.corpSecret, agentId, toUser, mediaId); + else if (type === 'voice') await wecomApi.sendVoiceMessage?.(account.corpId, account.corpSecret, agentId, toUser, mediaId); + else if (type === 'video') await wecomApi.sendVideoMessage?.(account.corpId, account.corpSecret, agentId, toUser, mediaId); + else await wecomApi.sendFileMessage?.(account.corpId, account.corpSecret, agentId, toUser, mediaId); - if (mediaMatches.length > 0) { - for (const media of mediaMatches) { - if (streamId && !isSelfBuiltAppRequest) { - const type = getWecomMediaType(media.path); - if (type === 'image') { - const queued = streamManager.queueImage(streamId, media.path); - if (queued) { - processedText = processedText.replace(media.fullMatch, "").trim(); - logger.info("Queued absolute path image for stream (AI Bot)", { - streamId, - imagePath: media.path, - }); - } - } else { - processedText = processedText.replace(media.fullMatch, `[${type}: ${media.path}]`).trim(); - logger.warn("WeCom AI Bot: non-image media in stream, converting to text", { type, mediaPath: media.path }); - } - } else if (isSelfBuiltAppRequest) { - const type = getWecomMediaType(media.path); - try { - const uploadRes = await wecomApi.uploadMedia(account.corpId, account.corpSecret, type, media.path); - const mediaId = uploadRes.media_id; - const toUser = originalMessage.fromUser || originalMessage.chatId || originalMessage.ToUserName; - const agentId = originalMessage.agentId || account.agentId || 1000002; - - if (type === 'image') { - await wecomApi.sendImageMessage?.(account.corpId, account.corpSecret, agentId, toUser, mediaId); - } else if (type === 'voice') { - await wecomApi.sendVoiceMessage?.(account.corpId, account.corpSecret, agentId, toUser, mediaId); - } else if (type === 'video') { - await wecomApi.sendVideoMessage?.(account.corpId, account.corpSecret, agentId, toUser, mediaId); - } else { - await wecomApi.sendFileMessage?.(account.corpId, account.corpSecret, agentId, toUser, mediaId); - } - logger.info("WeCom Self-Built: sent active media message (mixed with text)", { mediaId, type }); - processedText = processedText.replace(media.fullMatch, "").trim(); - } catch (err) { - logger.error("Failed to upload/send media actively", { error: err.message, path: media.path }); - processedText = processedText.replace(media.fullMatch, `[${type === 'voice' ? '语音' : type === 'video' ? '视频' : type === 'image' ? '图片' : '文件'}: ${media.path}]`).trim(); - } + logger.info("WeCom Self-Built: sent active media message", { mediaId, type }); } + } catch (err) { + logger.error("Failed to upload/send media", { error: err.message, path: media.path }); + // Add a text placeholder back if sending failed + processedText += ` [Failed to send ${type}: ${media.path}]`; } } - if (!processedText.trim() && mediaMatches.length === 0) { - logger.debug("WeCom: empty block after processing, skipping stream update/reply"); + + if (!processedText.trim()) { + logger.debug("WeCom: empty text after processing media, skipping text reply"); return; } if (isSelfBuiltAppRequest) { - if (!originalMessage || !account) { - logger.error("WeCom Self-Built: missing originalMessage or account for passive reply"); - return; + const pendingCardId = pendingCardUpdates.get(senderId); + if (pendingCardId) { + try { + await wecomApi.updateTemplateCard(account.corpId, account.corpSecret, originalMessage.agentId || account.agentId, pendingCardId, processedText); + logger.info("WeCom Self-Built (Async): Successfully updated 'Thinking...' card with final response.", { senderId, response_code: pendingCardId }); + pendingCardUpdates.delete(senderId); // Clean up + return; + } catch (err) { + logger.error("WeCom Self-Built (Async): Failed to update template card. Sending as new message.", { error: err.message, response_code: pendingCardId }); + // Fallthrough to send as a new message if update fails } - const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey }); - const passiveReply = webhook.buildPassiveReplyXml( - originalMessage.fromUser || originalMessage.chatId || originalMessage.ToUserName, - originalMessage.ToUserName, - "text", - processedText, - Math.floor(Date.now() / 1000), - originalMessage.query.nonce, - ); - logger.info("WeCom Self-Built: constructed passive XML reply", { replyXmlPreview: passiveReply.substring(0, 100) }); - return { passiveReplyXml: passiveReply }; + } + + // Fallback for sync mode, or if card update fails, or if no initial card was sent + if (!account.corpId || !account.corpSecret) { // Sync mode must use passive reply + if (!originalMessage) { + logger.error("WeCom Self-Built (Sync): missing originalMessage for passive reply"); + return; + } + const webhook = new WecomWebhook({ token: account.token, encodingAesKey: account.encodingAesKey }); + const passiveReply = webhook.buildPassiveReplyXml( + originalMessage.fromUser || originalMessage.chatId || originalMessage.ToUserName, + originalMessage.ToUserName, "text", processedText, + Math.floor(Date.now() / 1000), originalMessage.query.nonce + ); + logger.info("WeCom Self-Built (Sync): constructed passive XML reply", { replyXmlPreview: passiveReply.substring(0, 100) }); + return { passiveReplyXml: passiveReply }; + } else { // Async mode, but no pending card - send as new active message + try { + const toUser = originalMessage.fromUser || originalMessage.chatId || originalMessage.ToUserName; + const agentId = originalMessage.agentId || account.agentId || 1000002; + await wecomApi.sendTextMessage(account.corpId, account.corpSecret, agentId, toUser, processedText); + logger.info("WeCom Self-Built (Async): Sent final response as a new text message.", { senderId }); + } catch (err) { + logger.error("WeCom Self-Built (Async): Failed to send final active text message.", { error: err.message, senderId }); + } + } } else { // AI Bot streaming response const appendToStream = (targetStreamId, content) => { const stream = streamManager.getStream(targetStreamId); - if (!stream) { - return false; - } + if (!stream) return false; if (stream.content.trim() === THINKING_PLACEHOLDER.trim()) { streamManager.replaceIfPlaceholder(targetStreamId, content, THINKING_PLACEHOLDER); return true; } - if (stream.content.includes(content.trim())) { - logger.debug("WeCom: duplicate content, skipping", { - streamId: targetStreamId, - contentPreview: content.substring(0, 30), - }); - return true; - } - const separator = stream.content.length > 0 ? "\n\n" : ""; streamManager.appendStream(targetStreamId, separator + content); return true; @@ -866,46 +787,14 @@ if (activeStreamId && streamManager.hasStream(activeStreamId)) { appendToStream(activeStreamId, processedText); - logger.debug("WeCom stream appended (via context/activeStreams)", { - streamId: activeStreamId, - source: contextStreamId ? "asyncContext" : "activeStreams", - contentLength: processedText.length, - }); - return; + } else { + logger.warn("WeCom: no active stream for this message", { senderId }); } - logger.warn("WeCom: no active stream for this message", { senderId }); return; } if (!streamManager.hasStream(streamId)) { - logger.warn("WeCom: stream not found, attempting response_url fallback", { streamId, senderId }); - - const saved = responseUrls.get(senderId); - if (saved && !saved.used && Date.now() < saved.expiresAt) { - saved.used = true; - try { - await fetch(saved.url, { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ msgtype: 'text', text: { content: processedText } }), - }); - logger.info("WeCom: sent via response_url fallback (deliverWecomReply)", { - senderId, - contentPreview: processedText.substring(0, 50), - }); - return; - } catch (err) { - logger.error("WeCom: response_url fallback failed", { - senderId, - error: err.message, - }); - } - } - - logger.warn("WeCom: unable to deliver message (stream closed + response_url unavailable)", { - senderId, - contentPreview: processedText.substring(0, 50), - }); + logger.warn("WeCom: stream not found, cannot deliver message", { streamId, senderId }); return; }