基于AgentRun构建舆情分析专家

更新时间:
复制为 MD 格式

在舆情分析领域,如何高效处理海量数据、保证分析的实时性,是开发者面临的关键技术挑战。传统系统在数据处理时效性上的不足,可能影响决策的及时性。本文将指导您使用函数计算 AgentRun 平台,搭建一套自动化、流式处理的舆情分析系统,以应对这些挑战。

效果展示

ezgif

开始体验

准备工作

在开始之前,请确保您已完成以下准备工作:

  1. 开通云服务并完成授权:

  2. 获取模型访问凭证 (API-KEY):

    • 本应用依赖大语言模型进行对话理解。请登录百炼控制台,在密钥管理页面创建或复制一个API-KEY,后续步骤将使用此密钥。

      image

  3. 通过模型管理创建LLM模型

    下文以配置百炼中的qwen3-max模型为例,如果您想配置其他模型或自定义模型,请参见:大语言模型

    • 进入模型管理页面,点击添加模型

    • 选择API模型,并配置以下参数:

      • 名称qwen3-model

      • 服务提供商:从下拉框中选择阿里云

      • API端点:保持默认;

      • 配置具体模型:搜索并选择qwen3-max

      • 凭证管理:选择API密钥,并粘贴上一步获取的百炼API Key;您也可以选择使用已有凭证,并从下拉框中选择您提前配置好的凭证,详情可参见:凭证管理

  4. 通过Sandbox沙箱创建浏览器沙箱实例

    下文以控制台快速配置浏览器沙箱为例,如果您对其中的一些具体参数感兴趣或想使用沙箱的API,请参见:BrowserTool浏览器

    • 进入Sandbox沙箱页面,点击创建沙箱模板

    • 选择浏览器,点击立即创建。在跳转到的配置页面中,填写以下参数,其他保持默认即可:

      • 名称:sandbox-in-opinion_analysis

      • 执行角色ARN:AliyunAgentRunDefaultRole;

    • 点击创建浏览器,等待沙箱创建成功。

选择模板并部署

  1. AgentRun控制台上方点击探索按钮,在热门Agent项目模板中找到舆情分析专家,点击部署。您也可以点击卡片详情查看该模板概述。

    image

  2. 在部署页面,参考以下示例填写配置参数:

    • 应用名称:为您的应用取一个易于识别的名称,便于后续管理和识别,如:opinion_analysis

    • 描述(可选):简要描述应用的功能,如:基于舆情分析专家模板创建的Agent

    • 权限配置:为AgentRun应用授权,以便其能访问模型、工具等云资源。

      • (推荐)点击快速创建,系统将引导您创建一个名为 AliyunAgentRunDefaultRole 的默认角色,该角色已包含所需权限;

        image

      • 如需自定义权限,您也可以点击下拉框右侧的添加按钮image,通过创建角色手动配置,并为其添加相应的权限策略,详情可参见按需配置执行角色

    • 大语言模型:从下拉框中选择您刚刚配置的qwen3-model,并选择qwen3-max模型。如果您未提前配置,可点击下拉框右侧的添加按钮image,按照准备工作步骤3立即配置。

    • 浏览器沙箱:从下拉框中选择您刚刚配置的sandbox-in-opinion_analysis,如果您未提前配置,可点击下拉框右侧的添加按钮image,按照准备工作步骤4立即配置。

      image

    • 点击确认创建,等待应用部署完成。

    • 部署成功后,页面会显示应用的访问链接。点击链接即可进入WebUI体验页面。后续您也可以通过在Agent控制台找到该应用,点击应用详情,在概览与配置页面右侧的集成配置中找到访问地址。

      image

应用体验

  1. 进入体验页面后,可以看到舆情分析系统页面,此时可以输入一个关键词或主题进行舆情分析,示例输入:新能源汽车。分析过程中,系统会调用您配置的浏览器沙箱sandbox-in-opinion_analysis,AI控制云上的浏览器进行数据检索。

    image

  2. 分析完成之后,系统会整理所有采集到的数据和信息,最终生成文字+图表的可视化报告

    image

    image

查看应用详情

应用部署成功后,您可以在AgentRun控制台对应用进行全方位的管理和运维。在应用列表中找到您创建的应用,点击进入详情页面,左侧导航栏提供了以下核心功能:

  • 概览与配置:快速查看应用的基本信息、运行时配置(如模型、内存规格),并在此管理环境变量。

  • 代码与调试:支持在线查看、编辑和调试,并进行在线二次开发。

  • 版本与灰度:提供版本管理能力,支持通过灰度发布将少量流量切换到新版本进行测试,验证通过后再全量上线。每个版本都有一个临时域名供测试使用。

  • 集成与发布:可参考Agent集成与发布,将开发的Agent快速集成到您的前端网页、后端应用等。

  • 弹性与实例:查看当前正在运行的实例列表和状态,并配置灵活的弹性伸缩策略,让应用根据业务负载自动增减实例。

  • 可观测性:通过集成阿里云应用实时监控服务(ARMS),可以获得代码级的性能诊断、请求追踪、异常监控和告警能力,保障应用的稳定运行。

计费说明

部署“舆情分析专家”应用,将为您创建并启用以下阿里云服务。您仅需为实际使用的资源付费,最终费用以您的阿里云账单为准。

  • 核心计算:函数计算 (FC)

    • 说明:部署此应用将创建Agent实例和浏览器沙箱。这两个组件将作为应用的核心运行单元,部署在阿里云函数计算(FC)平台上。函数计算采用按量付费模式,仅根据组件的实际调用次数和资源消耗(如执行时长、内存等)计费。

    • 计费文档:函数计算计费概述

  • 大语言模型:阿里云百炼

    • 说明:应用中的智能分析与内容生成功能,依赖阿里云百炼提供的大语言模型服务。模型调用费用根据输入和输出文本的Token数量计算。

    • 计费文档:百炼模型调用计费说明

  • 日志与监控:SLS & ARMS

    • 说明:为保障应用稳定运行,系统将自动开通日志服务(SLS)和应用实时监控服务(ARMS)。两项服务均提供免费额度,超出部分将按量计费。

    • 计费文档:SLS计费概述ARMS产品计费

成本管理提示:建议您在部署前详细阅读各服务的计费文档,并根据业务需求在阿里云控制台设置消费预警,以有效管理成本。

应用架构解析

系统架构设计

整个舆情分析系统采用分层架构设计,核心思想是通过代码严格控制流程执行顺序,而非依赖 LLM 的自主决策。

image

核心优势

  1. 安全隔离的执行环境

    传统舆情系统通常直接在服务器上运行爬虫程序,面临着安全风险和环境污染问题。当某个网站的反爬机制触发时,可能影响整个服务器的稳定性。而 AgentRun Sandbox 提供了完全隔离的浏览器环境,即使单个采集任务出现问题,也不会影响系统的整体运行。

    async def create_browser_sandbox() -> Optional[BrowserSandbox]:
        """创建隔离的浏览器环境,避免环境污染"""
        try:
            sandbox = await Sandbox.create_async(
                template_type=TemplateType.BROWSER,
                template_name=agentrun_browser_sandbox_name,
            )
            _sandboxes[sandbox.sandbox_id] = sandbox
            return sandbox
        except Exception as e:
            # 单个Sandbox失败不影响其他实例
            raise SandboxCreationError(f"创建 Sandbox 失败: {e}")
  2. 真实浏览器环境模拟

    传统爬虫方案通常使用简单的HTTP请求库,容易被现代网站的反爬机制识别和拦截。AgentRun Sandbox 提供的是真实的 Chrome 浏览器环境,能够完整执行JavaScript、处理复杂的页面交互,大大提高了数据采集的成功率。从代码中可以看到,系统通过 Playwright 连接到真实的 Chrome 实例。

    async with async_playwright() as playwright:
        browser = await playwright.chromium.connect_over_cdp(sandbox.get_cdp_url())
        context = browser.contexts[0] if browser.contexts else await browser.new_context()
        page = context.pages[0] if context.pages else await context.new_page()
  3. 可视化调试能力

    函数计算 AgentRun 最独特的优势是提供了实时的 VNC 预览功能,开发者和用户可以实时观察浏览器的操作过程。这种透明性在传统方案中是无法实现的,它不仅有助于调试和优化采集逻辑,还能让用户直观地了解系统的工作状态。

    image

  4. 弹性扩展和故障恢复

    传统系统在面临大规模采集任务时,往往需要复杂的分布式架构设计。而 函数计算 AgentRun 天然支持多 Sandbox 并行处理,系统可以根据需要动态创建和销毁浏览器实例。更重要的是,当某个实例出现故障时,系统能够自动检测并重建。

    async def recreate_sandbox_if_closed(sandbox_id: str, error_message: str):
        """智能故障检测和自动重建机制"""
        closed_error_patterns = [
            "Target page, context or browser has been closed",
            "Browser has been closed",
            "Connection closed",
        ]
        
        is_closed_error = any(pattern.lower() in error_message.lower()
                              for pattern in closed_error_patterns)
        
        if is_closed_error:
            await remove_sandbox(sandbox_id)
            new_sandbox = await create_browser_sandbox()
            return new_sandbox

前端VNC集成实现

  1. 动态库加载机制

    前端 VNC 客户端需要动态加载 noVNC 库,系统实现了智能的加载机制,支持本地资源和 CDN 回退。

    function loadScript(url) {
        return new Promise(function(resolve, reject) {
            var script = document.createElement('script');
            script.src = baseUrl + url;
            script.onload = resolve;
            script.onerror = function() {
                // 本地加载失败,尝试 CDN
                var fallbackUrl = url.includes('wordcloud') ?
                    'https://cdn.jsdelivr.net/npm/echarts-wordcloud@2.1.0/dist/echarts-wordcloud.min.js' :
                    'https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js';
                var fallbackScript = document.createElement('script');
                fallbackScript.src = fallbackUrl;
                fallbackScript.onload = resolve;
                fallbackScript.onerror = reject;
                document.head.appendChild(fallbackScript);
            };
            document.head.appendChild(script);
        });
    }
  2. 多协议适配

    考虑到部署环境的复杂性,VNC 组件实现了 HTTP/HTTPS 环境下的 WebSocket 协议自适应。

    import { useCallback } from 'react';
    
    const adjustWebSocketUrl = useCallback((url: string): string => {
        const isHttps = window.location.protocol === 'https:';
    
        if (!isHttps && url.startsWith('wss://')) {
            return url.replace('wss://', 'ws://');
        }
    
        if (isHttps && url.startsWith('ws://')) {
            return url.replace('ws://', 'wss://');
        }
    
        return url;
    }, []);

后端核心实现

  1. Agent工具链设计

    系统的核心是一个基于 PydanticAI 的智能体,该智能体包含四个关键工具,每个工具负责舆情分析的不同阶段。Agent 的设计遵循严格的执行顺序,确保数据收集的完整性和分析的准确性。

    opinion_agent = Agent(
        agentrun_model,
        deps_type=StateDeps,
        system_prompt="""你是舆情分析系统的执行者。你的任务是按照以下严格流程执行舆情分析:
    【流程】
    1. 收到关键词后,调用 collect_data 工具收集数据
    2. 数据收集完成后,调用 analyze_data 工具分析数据
    3. 分析完成后,调用 write_report 工具撰写报告
    4. 报告完成后,调用 render_html 工具生成 HTML
    
    【重要规则】
    - 必须按顺序调用工具
    - 每个工具只调用一次
    - 不要跳过任何步骤
    - 不要编造数据""",
        retries=3,
    )
  2. 流式输出与实时反馈

    传统舆情系统通常采用批处理模式,用户需要等待很长时间才能看到结果。而基于 函数计算 AgentRun 的系统实现了真正的流式输出,用户可以实时观察每个处理步骤的进展。这种实时性不仅提升了用户体验,也便于及时发现和解决问题。

    import time
    
    async def push_state_event(run_id: str, state: OpinionState):
        """实时推送状态更新,用户无需等待"""
        event = StateSnapshotEvent(
            type=EventType.STATE_SNAPSHOT,
            snapshot=state.model_dump(),
            timestamp=int(time.time() * 1000)
        )
        await event_manager.push_event(run_id, event)
  3. 智能数据质量控制

    系统实现了严格的数据质量控制机制,通过多维度评估确保收集到的数据具有较高的相关性和价值。这种质量控制在传统系统中往往是缺失的,导致大量噪音数据影响分析结果。

    async def evaluate_relevance(keyword: str, title: str, snippet: str) -> float:
        """多维度相关性评估,确保数据质量"""
        text = f"{title} {snippet}"
        text_lower = text.lower()
    
        # 检测关键词匹配度
        has_chinese_keyword = any('\u4e00' <= char <= '鿿' for char in keyword)
        result_has_chinese = any('\u4e00' <= char <= '鿿' for char in text)
    
        # 中文关键词必须在结果中有中文内容
        if has_chinese_keyword and not result_has_chinese:
            return 0.0
    
        # 排除明显的无关网站
        irrelevant_patterns = [
            "calculator", "deepseek", "chegg", "stackoverflow",
            "翻译", "dictionary", "词典"
        ]
        if any(pattern in text_lower for pattern in irrelevant_patterns):
            return 0.0
    
        # 计算相关性得分
        score = 0.0
        if keyword in text:
            score += 0.6  # 基础分
    
        # 时效性加分
        time_keywords = ["最新", "今日", "近日", "2024", "2025"]
        if any(tk in text for tk in time_keywords):
            score += 0.1
    
        return max(0.0, min(1.0, score))

深度内容抓取技术

  1. 平台适配策略

    不同的社交媒体平台具有不同的页面结构和内容组织方式,传统系统往往采用统一的抓取策略,导致数据质量参差不齐。AgentRun 系统针对不同平台实现了定制化的抓取逻辑。

    async def explore_page_with_llm(page, keyword: str, url: str, source: str, initial_content: str):
        """基于平台特性的智能内容抓取"""
        if "weibo.com" in url:
            # 微博特定的评论和转发抓取
            available_actions = [
                {"action": "view_comments", "selector": ".WB_feed_expand, [class*='comment']"},
                {"action": "view_retweets", "selector": ".WB_feed_expand, [class*='repost']"},
            ]
        elif "zhihu.com" in url:
            # 知乎回答和评论抓取
            available_actions = [
                {"action": "view_more_answers", "selector": ".AnswerItem, .List-item"},
                {"action": "view_comments", "selector": ".Comments-container, .CommentItem"},
            ]
        elif "bilibili.com" in url:
            # B站视频评论抓取
            available_actions = [
                {"action": "view_comments", "selector": ".reply-item, .root-reply"},
                {"action": "view_related", "selector": ".video-page-card, .recommend-list"},
            ]
  2. LLM 驱动的智能探索

    系统创新性地引入了 LLM 驱动的智能探索机制,让 AI 决定是否需要深入抓取某个页面的额外内容,如评论区、相关推荐等。这种智能决策大大提高了数据采集的效率和针对性。

    import json
    
    async def llm_decide_exploration(keyword: str, page_url: str, page_content: str, source: str):
        """LLM 智能决策是否进行深度探索"""
        prompt = f"""请根据以下信息决定是否需要进一步探索页面获取更多舆情数据。
    【搜索关键词】{keyword}
    【当前页面】{page_url}
    【已获取内容预览】{page_content[:500]}
    【决策标准】
    1. 如果当前内容已经足够丰富,可能不需要进一步探索
    2. 如果是微博/B站等平台,评论区通常包含重要的舆情信息
    3. 权衡时间成本,每个页面最多探索1-2个操作
    
    请返回 JSON 格式的决策结果。"""
    
        result = await explorer.run(prompt)
        return json.loads(result.output)
    

智能分析与报告生成

  1. 标准化情感分析

    系统实现了基于关键词词典的情感分析算法,相比传统的机器学习模型,这种方法更加透明和可控。

    class SentimentStandards:
        """情感倾向标准化计算"""
    
        POSITIVE_KEYWORDS = [
            "优秀", "卓越", "创新", "领先", "突破", "成功", "赞", "好评", "支持",
            "认可", "满意", "信赖", "期待", "看好", "值得", "推荐", "喜欢"
        ]
    
        NEGATIVE_KEYWORDS = [
            "差", "糟糕", "失败", "落后", "问题", "缺陷", "批评", "质疑", "担忧",
            "失望", "不满", "抱怨", "投诉", "差评", "垃圾", "骗局"
        ]
    
        @staticmethod
        def calculate_sentiment_score(text: str) -> float:
            """计算情感得分 (-1.0 到 1.0)"""
            positive_count = sum(1 for word in SentimentStandards.POSITIVE_KEYWORDS if word in text)
            negative_count = sum(1 for word in SentimentStandards.NEGATIVE_KEYWORDS if word in text)
    
            total_count = positive_count + negative_count
            if total_count == 0:
                return 0.0
    
            return (positive_count - negative_count) / total_count
  2. 流式报告生成

    报告生成过程采用流式输出,用户可以实时观察报告的撰写过程,这种体验是传统系统无法提供的。

    import asyncio
    
    async with writer.run_stream(report_prompt) as result:
        async for text in result.stream_text():
            report_content = text
            state.report_text = report_content
    
            current_time = asyncio.get_event_loop().time()
            content_delta = len(report_content) - last_event_length
            time_delta = current_time - last_event_time
    
            # 每 100 字符或每 0.3 秒发送一次更新
            if content_delta >= 100 or time_delta >= 0.3:
                await push_state_event(run_id, state)

部署与运维优势

  1. 简化的部署流程

    相比传统舆情系统需要复杂的分布式爬虫集群部署,AgentRun 系统的部署相对简单。只需要配置好环境变量和 AgentRun Sandbox 模板,系统就能自动管理浏览器实例的创建和销毁。

    # 核心配置
    AGENTRUN_MODEL_NAME=your_model_name
    MODEL_NAME=qwen3-max
    AGENTRUN_BROWSER_SANDBOX_NAME=your_browser_template
    TIMEOUT=180
  2. 自动化运维能力

    系统内置了完善的监控和自恢复机制,大大降低了运维复杂度。当检测到异常时,系统能够自动重建资源,保证服务的连续性。

    import { useEffect, useRef } from 'react';
    
    // 连接失败时自动重连(每 10 秒尝试一次)
    useEffect(() => {
        if (status === 'error' && active && rfbLoaded) {
            reconnectTimerRef.current = setTimeout(() => {
                cleanupRfb();
                lastUrlRef.current = null;
                fetchVncUrl(true);
            }, RECONNECT_INTERVAL);
        }
    }, [status, active, rfbLoaded]);

性能与扩展性分析

  1. 并发处理能力

    传统系统的并发能力往往受限于单机资源,而 函数计算 AgentRun 系统可以根据需要动态创建多个 Sandbox 实例,实现真正的水平扩展。系统通过异步编程模型和连接池管理,能够高效处理大量并发请求。

    import uvicorn
    
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8000,
        log_level="info",
        timeout_keep_alive=120,
        limit_concurrency=100,  # 支持高并发
    )
  2. 资源弹性管理

    系统实现了智能的资源管理策略,能够根据任务负载动态调整 Sandbox 实例数量。这种弹性扩展能力是传统固定架构难以实现的。

    from typing import Any, Dict, List
    
    async def get_all_sandboxes() -> List[Dict[str, Any]]:
        """动态获取所有可用的Sandbox实例"""
        result = []
        async with _sandbox_lock:
            for sandbox_id, sandbox in _sandboxes.items():
                try:
                    # 检查实例健康状态
                    vnc_url = sandbox.get_vnc_url()
                    result.append({
                        "sandbox_id": sandbox_id,
                        "vnc_url": vnc_url,
                        "active": True,
                    })
                except Exception:
                    # 自动清理失效实例
                    result.append({
                        "sandbox_id": sandbox_id,
                        "active": False,
                    })
        return result

总结

基于函数计算 AgentRun 构建的舆情分析系统,在安全性、可靠性、可观测性和扩展性方面,均展现出相较于传统方案的显著优势。它通过隔离的 Sandbox 环境解决了安全与环境污染的核心痛点;以实时的 VNC 预览提供了前所未有的过程透明度;并借助内置的故障检测与自愈机制,极大降低了运维复杂度。更重要的是,该系统实现了从数据采集、智能分析到流式报告生成的端到端自动化。用户可以实时观察分析全过程,获得了远超传统批处理模式的卓越体验。这种将复杂流程高度自动化的能力,结合 AI 技术的持续进步,将为企业和机构的舆情洞察与决策支持带来深刻的价值提升。