Build a Real‑Time Semantic Search with EventBridge, DashVector, and FunctionCompute

This tutorial walks through constructing a zero‑to‑one RAG pipeline that ingests OSS text files via EventBridge, transforms them into embeddings with DashScope, stores vectors in DashVector, and performs semantic search using FunctionCompute and a Qwen‑Turbo LLM, complete with code samples and configuration steps.

Alibaba Cloud Native
Alibaba Cloud Native
Alibaba Cloud Native
Build a Real‑Time Semantic Search with EventBridge, DashVector, and FunctionCompute

RAG Background

Large language models (LLMs) excel at general language tasks but lack domain‑specific knowledge. Converting queries and documents into high‑quality embeddings and performing vector similarity search enables efficient retrieval of relevant domain information.

Prerequisites

Enable Lingji model service in DashScope and obtain an API key.

Enable DashVector vector search service and obtain an API key.

Enable OSS, FunctionCompute, and EventBridge services on Alibaba Cloud.

Ingestion Pipeline (EventBridge → OSS → DashVector)

Create a new rule in the EventBridge console.

Configure the OSS source:

Bucket: select or create a bucket.

Prefix: optional; leave empty to ingest the whole bucket.

Document loader: TextLoader.

Load mode: "single document load" for this demo.

Set the filter to "match all events".

Define a transformation that calls DashScope’s TextEmbedding API to convert raw text into vectors.

FunctionCompute embedding function

# -*- coding: utf-8 -*-
import os, json, logging
import dashscope
from dashscope import TextEmbedding
from http import HTTPStatus

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dashscope.api_key = os.getenv('DASHSCOPE_API_KEY')

def handler(event, context):
    evt = json.loads(event)
    text = evt['data']
    resp = TextEmbedding.call(
        model=TextEmbedding.Models.text_embedding_v1,
        input=text)
    if resp.status_code == HTTPStatus.OK:
        print(resp)
    else:
        print(resp)
    return resp

Install third‑party dependencies in the function environment:

pip3 install dashvector dashscope -t .

Sample embedding response

{
  "code": "",
  "message": "",
  "output": {
    "embeddings": [{
      "embedding": [-2.1928, -0.7031, ... , -0.4715],
      "text_index": 0
    }]
  },
  "request_id": "e9f9a555-85f2-9d15-ada8-133af54352b8",
  "status_code": 200,
  "usage": {"total_tokens": 3}
}

DashVector collection configuration

Create a collection with dimension 1536, distance metric Cosine, and upsert mode for data insertion. Map the vector field to $.output.embeddings[0].embedding and provide the DashVector API key.

DashVector collection configuration
DashVector collection configuration

Search Task

Workflow: embed the user question, query DashVector for the most similar vectors (top‑k), retrieve the raw document, and feed it to an LLM prompt.

embedding.py

import os, dashscope
from dashscope import TextEmbedding

def generate_embeddings(text):
    rsp = TextEmbedding.call(
        model=TextEmbedding.Models.text_embedding_v1,
        input=text)
    embeddings = [rec['embedding'] for rec in rsp.output['embeddings']]
    return embeddings if isinstance(text, list) else embeddings[0]

if __name__ == '__main__':
    dashscope.api_key = '{your-dashscope-api-key}'

search.py

from dashvector import Client
from embedding import generate_embeddings

def search_relevant_news(question):
    client = Client(
        api_key='{your-dashvector-api-key}',
        endpoint='{your-dashvector-cluster-endpoint}'
    )
    collection = client.get('news_embedings')
    rsp = collection.query(
        generate_embeddings(question),
        output_fields=['raw'],
        topk=1
    )
    return rsp.output[0].fields['raw']

answer.py

from dashscope import Generation

def answer_question(question, context):
    prompt = f'''请基于```内的内容回答问题。
```
{context}
```
我的问题是:{question}。'''
    rsp = Generation.call(model='qwen-turbo', prompt=prompt)
    return rsp.output.text

run.py (end‑to‑end)

import dashscope
from search import search_relevant_news
from answer import answer_question

if __name__ == '__main__':
    dashscope.api_key = '{your-dashscope-api-key}'
    question = 'EventBridge 是什么,它有哪些能力?'
    context = search_relevant_news(question)
    answer = answer_question(question, context)
    print(f'question: {question}
answer: {answer}')

Upload knowledge‑base files to the configured OSS bucket before running the script.

Key URLs

Lingji Model Service: https://dashscope.aliyun.com/

Embedding API Docs: https://help.aliyun.com/zh/dashscope/developer-reference/text-embedding-api-details

DashScope Console: https://dashscope.console.aliyun.com/overview

DashVector Cluster Creation: https://dashvector.console.aliyun.com/cn-hangzhou/cluster

EventBridge Console: https://eventbridge.console.aliyun.com/

Install third‑party dependencies for FunctionCompute: https://help.aliyun.com/zh/functioncompute/fc-3-0/user-guide/install-third-party-dependencies-for-a-function

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

LLMRAGEmbeddingsemantic searchEventBridgeDashVectorFunctionCompute
Alibaba Cloud Native
Written by

Alibaba Cloud Native

We publish cloud-native tech news, curate in-depth content, host regular events and live streams, and share Alibaba product and user case studies. Join us to explore and share the cloud-native insights you need.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.