企业文化

实时获取社交媒体洞察,使用亚马逊托管的 Apache Flink 服务和亚马逊 Bedrock 大数

实时获取社交媒体洞察:使用 Amazon 管理的 Apache Flink 与 Amazon Bedrock

作者 Francisco Morillo Subham Rakshit 和 Sergio Garcs Vitale日期 2024年6月25日类别 Amazon Bedrock Amazon 管理的 Apache Flink 分析 人工智能 生成式 AI Kinesis 数据流

关键要点

重要性 随着社交媒体例如前身为 Twitter 的 X上活跃用户超过55亿,品牌需要实时分析和解释大量数据,以优化投资回报ROI。解决方案 本文介绍了如何利用 Amazon 管理的 Apache Flink 和 Amazon Bedrock,通过生成式 AI 和自然语言处理 (NLP) 有效分析推文。应用价值 该方法可以识别品牌相关的趋势、进行细致的情感分析,并在问题扩散前主动处理客户关切。

随着社交媒体的迅速发展,品牌必须有效地分析和解读数据,以最大化其投资回报ROI。在每天超过5亿条推文的环境中,实时洞察变得尤为重要。

Amazon 管理的 Apache Flink 使您能够使用 Apache Flink 实时处理和分析流数据。Apache Flink 支持状态计算,实现大规模数据的实时处理,同时保证一致性。它具备灵活的时间控制,能够实现复杂的业务逻辑。流处理和生成式人工智能AI已成为利用实时数据的强大工具。Amazon Bedrock及其基础模型如 Anthropic Claude推动了 AI 的新一轮应用,通过提供自然语言对话体验,进一步增强了分析能力。

本文章将探讨如何结合实时分析与生成式 AI 的能力,利用前沿的自然语言处理模型,通过与品牌、产品或主题相关的查询分析推文。这种方法超越了基本的情感分析,使企业能够获得可以立即应用的可操作洞察,包括:

识别与品牌相关的趋势和讨论主题进行细致的情感分析,以真正理解客户的看法检测表情符号、缩写、讽刺等细微差别主动发现并应对客户顾虑根据功能请求和反馈指导产品开发为信息活动创建有针对性的客户群体

本文章将逐步展示如何利用检索增强生成RAG方法,将实时推文用作大语言模型LLMs的上下文。RAG通过使用外部知识库优化 LLM 的输出,从而提高模型的准确性和应用效果,而无需重新训练模型。

解决方案概述

在本节中,我们将阐释应用程序的流程和架构。应用的流程分为两部分:

流程描述数据摄取从流式源摄取数据,将其转换为向量嵌入并存储到向量数据库中。洞察检索使用用户查询调用 LLM,从向量数据库检索与推文相关的洞察。

数据摄取

以下图表说明了数据摄取流程:

处理来自流式源如社交媒体、Amazon Kinesis 数据流、或 Amazon 管理的 Kafka 服务的推送数据。实时将流数据转换为向量嵌入。将向量嵌入存储在向量数据库中。

markdown![Data Ingestion](https//d2908q01vomqb2cloudfrontnet/b6692ea5df920cad691c20319a6fffd7a4a766b8/2024/06/13/BDB4053dataingestionpng)

数据从流式源例如 X被摄入并使用 Apache Flink 应用进行处理。Apache Flink 是一个开源的流处理框架,提供强大的流处理能力,实现实时处理、状态计算、容错、高吞吐量和低延迟。Apache Flink 被用于处理流数据、去重并调用嵌入模型以创建向量嵌入。

向量嵌入是捕捉单词、句子及其他数据类型之间关系与意义的数值表示。这些向量嵌入将用于将语义搜索或神经搜索的信息检索作为 LLM 评估响应的上下文。在文本数据转换为向量后,向量将被持久化到 Amazon OpenSearch Service 域,这将作为向量数据库。向量数据库与传统的关系数据库不同,数据点通过具有固定维度数的向量表示,并根据相似性聚类。

小熊加速器免费版

实时获取社交媒体洞察,使用亚马逊托管的 Apache Flink 服务和亚马逊 Bedrock 大数

OpenSearch Service 提供可扩展高效的相似性搜索功能,专为处理大量稠密向量数据而设计,并与其他 AWS 服务无缝集成,让您能够在 AWS 中构建强大的数据管道。作为一种完全托管的服务,OpenSearch Service 减轻了管理基础设施的操作负担,同时提供如 近似 k最近邻 (kNN) 搜索算法、稠密向量支持以及通过 Amazon CloudWatch 的强大监控和日志功能。这些功能使 OpenSearch Service 成为需要快速准确相似性检索的应用的理想解决方案。

此设计允许实时向量嵌入,这使得其成为 AI 驱动应用的理想选择。

洞察检索

下面的图表展示了从用户侧的流程,用户通过前端提交查询,并使用所检索的向量数据库文档作为模型的上下文获取响应。

markdown![Insights Retrieval](https//d2908q01vomqb2cloudfrontnet/b6692ea5df920cad691c20319a6fffd7a4a766b8/2024/06/13/BDB4053insightsretrievalpng)

如上图所示,要从 LLM 获取洞察,首先需要接收来自用户的查询。文本查询将使用之前用于推文的相同模型转换为向量嵌入。确保在数据摄取和搜索阶段使用相同的嵌入模型。然后,这些向量嵌入将用于在向量数据库中进行语义搜索,以获取相关向量和关联文本。这将作为提示的上下文。接下来,任何先前的对话历史如有将被添加到提示中,为模型提供上下文。最后,用户的问题也会被纳入提示中,调用 LLM 以获得响应。

在本文中,我们不考虑对话历史或将其存储以供后续使用。

解决方案架构

现在您已经了解了整体的流程,让我们逐步分析以下 AWS 服务架构。

markdown![Solution Architecture](https//d2908q01vomqb2cloudfrontnet/b6692ea5df920cad691c20319a6fffd7a4a766b8/2024/06/17/social4png)

图中前半部分展示了数据摄取过程:

用户通过 Amazon Cognito 进行身份验证。用户连接到 Streamlit 前端并配置以下参数:查询词、API 授权令牌、检索推文的频率。使用 Amazon 管理的 Apache Flink 消费和处理推文,同时在 Apache Flink 的状态中存储从前端应用接收的 API 请求参数。流式应用使用 Apache Flink 的异步 I/O 以调用 Amazon Titan Embeddings 模型通过 Amazon Bedrock API。Amazon Bedrock 返回每条推文的向量嵌入。Apache Flink 应用将推文的原始文本与向量嵌入写入 OpenSearch Service kNN 索引。

架构图的其余部分展示了洞察检索过程:

用户通过 Streamlit 前端应用发送查询。AWS Lambda 函数通过 Amazon API Gateway 被调用,并传递用户查询作为输入。Lambda 函数利用 LangChain 协调 RAG 过程。首先,函数在 Amazon Bedrock 上调用 Amazon Titan Embeddings 模型以为问题创建向量嵌入。Amazon Bedrock 返回问题的向量嵌入。在 RAG 协调过程的第二步中,Lambda 函数在 OpenSearch Service 中进行语义搜索,检索与问题相关的文档。OpenSearch Service 把包含推文文本的相关文档返回给 Lambda 函数。LangChain 协作过程的最后一步,Lambda 函数增强提示,增加上下文并使用 少量示例提示。增强的提示,包括指令、示例、上下文和查询,将通过 Amazon Bedrock API 发送至 Anthropic Claude 模型。Amazon Bedrock 向 Lambda 函数返回自然语言的答案。响应通过 API Gateway 返回给用户。API Gateway 在 Streamlit 应用中提供用户问题的答案。

该解决方案可在 GitHub 仓库 中获取,按照 README 文件进行解决方案的部署。

Amazon Bedrock 聊天机器人 UI

Amazon Bedrock 聊天机器人 Streamlit 应用旨在提供推文的洞察,无论是从 X API 实际获取的推文,还是来自我的社交媒体应用的模拟推文或消息。

在 Streamlit 应用程序中,可以提供用于发出 API 请求的参数,获取来自 X 的数据。我们开发的 Apache Flink 应用将根据提供的参数调整 API 请求。

作为参数,您需要提供以下内容:

API 授权的 Bearer 令牌:这是在您注册使用 API 时从 X 开发者平台 获取的。过滤推文的查询词:您可以使用 X 文档中的 搜索操作符。请求频率:X 基础 API 仅允许每 15 秒进行一次请求。如果设置较低的间隔,应用程序将无法提取数据。

参数会通过 API Gateway 发送至 Kinesis 数据流,并由 Apache Flink 应用进行处理。

markdown![Streamlit Application](https//d2908q01vomqb2cloudfrontnet/b6692ea5df920cad691c20319a6fffd7a4a766b8/2024/06/18/image0041png)

我的社交媒体 UI

我的社交媒体应用是另外一个 Streamlit 应用,为用户提供了一个附加的 UI。通过该应用,用户可以撰写和发送消息,模拟在社交媒体上发帖的体验。随后,这些消息会被摄取到一个由 API Gateway、Kinesis 数据流和 Apache Flink 应用构成的 AWS 数据管道中。Apache Flink 应用处理传入的消息,调用 Amazon Bedrock 嵌入模型,并将数据存储到 OpenSearch Service 集群中。

为了同时支持真实的 X 数据和来自我的社交媒体应用的模拟数据,我们在 OpenSearch Service 集群中设置了不同的索引。这种分离允许用户选择他们想要分析或查询的数据源。Streamlit 应用的侧边栏选项称为 使用 X 索引,作为切换功能。当启用此选项时,应用程序将查询和分析来自 X API 的真实推文数据。如果禁用该选项,应用程序将查询和显示通过我的社交媒体应用发送的消息的数据。

markdown![My Social Media UI](https//d2908q01vomqb2cloudfrontnet/b6692ea5df920cad691c20319a6fffd7a4a766b8/2024/06/18/image0051png)

Apache Flink 被用于处理推文数量不断增加的能力。Apache Flink 应用负责执行数据摄取,如上述所示。让我们详细了解这个流程。

从 X 消费数据

我们使用 Apache Flink 处理从 Streamlit UI 发送的 API 参数。我们将参数存储在 Apache Flink 的状态中,这使得我们可以修改和更新参数,而无需重新启动应用程序。我们使用 ProcessFunction 来使用 Apache Flink 的内部计时器调度获取推文的请求频率。在本文章中,我们使用 X 的 最近搜索 API ,允许我们访问过去 7 天内发表的过滤公共推文。API 响应为分页,最多返回 100 条推文,以逆序方式排列。如果还有更多推文等待消费,前一个请求的响应将返回一个令牌,以便在下一个 API 调用中使用。在从 API 接收到推文后,我们将应用以下转化:

过滤掉空推文没有文本的推文。按作者 ID 对推文进行分区。这有助于将处理分配给 Apache Flink 中的多个子任务。应用去重逻辑,仅处理尚未处理的推文。为此,我们将已经处理的推文 ID 存储在 Apache Flink 的状态中,并匹配和过滤已处理的推文。我们按作者 ID 分组存储推文的 ID,这可能导致应用状态大小的增加。由于 API 仅在调用时提供过去 7 天内的推文,我们设置了7天的生存时间TTL,以防止应用状态无限增长。您可以根据需求进行调整。将推文转换为 JSON 对象,以便稍后调用 Amazon Bedrock API。

创建向量嵌入

向量嵌入是通过通过 Amazon Bedrock API 调用 Amazon Titan Embeddings 模型创建的。异步调用外部 API 是构建流处理架构时重要的性能考量。同步调用会增加延迟,降低吞吐量,并可能成为整体处理的瓶颈。

为了调用 Amazon Bedrock API,您将使用 Java 中的 Amazon Bedrock Runtime 依赖项,这提供了一个异步客户端,允许我们通过 BedrockRuntimeAsyncClient 异步调用 Amazon Bedrock 模型。具体流程如下所述:

javaDataStreamltJSONObjectgt resultStream = AsyncDataStreamunorderedWait(inputJSON new BedRockEmbeddingModelAsyncTweetFunction() 15000 TimeUnitMILLISECONDS 1000)uid(tweetasyncfunction)

让我们更详细地了解 Apache Flink 的异步 I/O 函数。在 CompletableFuture Java 类中:

首先,我们创建 Amazon Bedrock Runtime 异步客户端:

javaBedrockRuntimeAsyncClient runtime = BedrockRuntimeAsyncClientbuilder()region(Regionof(region)) // 使用指定的 AWS 区域build()

然后,我们提取事件中的推文,并建立将发送给 Amazon Bedrock 的负载:

javaString stringBody = jsonObjectgetString(tweet)ArrayListltStringgt stringList = new ArrayListltgt()stringListadd(stringBody)JSONObject jsonBody = new JSONObject()put(inputText stringBody)SdkBytes body = SdkBytesfromUtf8String(jsonBodytoString())

在我们有了负载后,我们可以调用 InvokeModel API 并调用 Amazon Titan 生成推文的向量嵌入:

javaInvokeModelRequest request = InvokeModelRequestbuilder()modelId(amazontitanembedtextv1)contentType(application/json)accept(/)body(body)build()CompletableFutureltInvokeModelResponsegt futureResponse = runtimeinvokeModel(request)

接收到向量后,我们将以下字段附加到输出的 JSONObject:

1 清理后的推文 2 推文创建时间戳 3 推文的点赞数 4 推文的转发数 5 推文的展示数 6 推文 ID

这些信息将被返回,包括嵌入、原始文本和所使用的代币数量。在我们的连接器中,我们仅消费英语消息,并忽略转发的其他推文。

对于从我的

性能测试 MySQL 迁移环境:使用查询回放和流量镜像

MySQL迁移环境性能测试的两种方法关键要点数据库从本地迁移到AWS的过程中,关键步骤是确保云环境可以处理真实的生产流量。文章讨论了两种测试迁移环境的方法:查询回放和流量镜像。查询回放提供了对测试时机...