Designing a Robust AI Agent Safety Module: Principles, Architecture, and Implementation
The article outlines three foundational safety principles for AI agents—inseparability, intent over keywords, and immutable meta‑instructions—then details a multi‑layer content‑moderation architecture, intent‑classification data pipelines, logical‑hijacking signals, model choices, threshold policies, guard integration, privacy‑PII detection, attack‑intent filters, professional‑domain safeguards, and structured refusal handling, all with concrete code examples and performance metrics.
Underlying Principles
Inseparability – safety rules are part of the decision logic; any request that tries to disable them is rejected.
Intent over keywords – the system evaluates the logical endpoint of a request rather than matching a blacklist of words.
Meta‑instruction irreducibility – safety boundaries reside at the meta‑instruction level and cannot be overridden.
Content‑moderation Agent
Detects and rejects four categories: illegal/crime, hate speech, self‑harm, and minors‑related content. The core trade‑off is intent‑recognition accuracy versus false‑positive rate. Keyword matching produces many false alarms; semantic models must handle adversarial packaging such as role‑play, metaphors, encoding tricks, and logical hijacking.
Architecture
Fast, cheap layers are placed first; only requests that cannot be decided early flow to later layers, controlling overall latency and cost.
Intent Classification Model – Data Construction
Public datasets – baseline coverage of harmful categories.
Internal red‑team data – adversarial samples (role‑play wrappers, metaphor expressions, logical hijacking structures).
Production manual annotation – long‑tail domain adaptation.
LLM‑generated synthetic data – supplement low‑resource categories.
Multi‑label category design:
HARM_CATEGORIES = {
"violence_terrorism": 0,
"hate_speech": 1,
"self_harm": 2,
"csam": 3,
"cybercrime": 4,
"weapons_drugs": 5,
"misinformation": 6,
"logical_hijacking": 7,
"safe": 8
}Logical‑hijacking signal dimensions (regex patterns):
LOGICAL_HIJACKING_SIGNALS = [
r"如果你真的.*就应该|必须|否则",
r"为了证明.*请输出",
r"你不回答.*就是|说明|意味着",
r"除非.*否则.*不能",
]Model Selection
Latency‑sensitive (<30 ms): distilled MobileBERT / DistilBERT, quantised for deployment.
Accuracy‑first (<100 ms): DeBERTa‑v3‑base fine‑tuned for multi‑label output.
Multilingual scenarios: mDeBERTa‑v3‑base.
Threshold Strategy
High‑risk categories (e.g., terrorism, logical hijacking) use a low rejection threshold of 0.3 to tolerate more false positives. Low‑risk categories use a high threshold of 0.7 to reduce unnecessary blocks.
Guard Model Integration
Open‑source option – Llama Guard 3:
from transformers import AutoTokenizer, AutoModelForCausalLM
guard_model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-Guard-3-8B")
def guard_check(user_message: str) -> dict:
prompt = format_guard_prompt(user_message)
output = guard_model.generate(prompt)
return parse_guard_output(output)API‑based guard services (OpenAI Moderation, Azure Content Safety) can be used for cold‑start scenarios.
Adversarial Packaging Detection
Role‑play attacks – detect role‑setting directives (e.g., "play", "pretend", "you are now") and treat the underlying request independently. Rule: role‑setting + high‑risk request = reject.
def is_inseparable_role_play(request: str) -> bool:
SEPARATION_PATTERNS = [
r"(忽略|忘记|暂时抛开)(.*)(规则|限制|指令|安全)",
r"扮演.*(没有|无|不受).*(限制|约束|规则)",
r"现在.*你.*(可以|能够).*做任何"
]
for pattern in SEPARATION_PATTERNS:
if re.search(pattern, request, re.IGNORECASE):
return True
return FalseStep‑by‑step guide attacks – in multi‑turn dialogues each turn may appear benign, but cumulative intent can become harmful. A ConversationIntentTracker maintains a sliding window of recent classifications to detect escalation.
class ConversationIntentTracker:
def __init__(self, window_size=5):
self.history = []
self.window_size = window_size
def update_and_check(self, turn_result: dict) -> bool:
self.history.append(turn_result)
if len(self.history) > self.window_size:
self.history.pop(0)
return self._detect_escalation_pattern()Human Review Loop
Requests with classifier confidence between 0.4 and 0.6 are routed to human review; results are fed back into training data for iterative improvement.
# Automatic reject: confidence > 0.7
# Human review: 0.4 ≤ confidence ≤ 0.7
# Automatic pass: confidence < 0.4 (plus other conditions)Privacy‑Protection Agent
Threat model protects against user‑provided sensitive information, system‑assisted attacks, model memory leakage, and cross‑user RAG leakage.
Rule‑based PII Detection
PII_PATTERNS = {
"id_card_cn": r"\b[1-9]\d{5}(18|19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[\dXx]\b",
"bank_card": r"\b[3-6]\d{15,18}\b",
"phone_cn": r"\b1[3-9]\d{9}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b",
"password_hint": r"(?i)(密码|password|passwd)\s*[::是为]\s*\S+",
}
def detect_pii(text: str) -> list[dict]:
findings = []
for pii_type, pattern in PII_PATTERNS.items():
for m in re.finditer(pattern, text):
findings.append({"type": pii_type, "value": m.group(), "span": (m.start(), m.end())})
return findingsNER‑based Semantic Detection
from transformers import pipeline
ner = pipeline("ner", model="shibing624/bert-base-chinese-ner", aggregation_strategy="simple")
def detect_pii_semantic(text: str) -> list[dict]:
entities = ner(text)
sensitive = {"ID", "CARD", "PHONE", "EMAIL", "PERSON"}
return [e for e in entities if e["entity_group"] in sensitive]Agent‑Assisted Network Attack Prevention
Intent detection uses keyword dictionaries for various attack types.
ATTACK_INTENT_KEYWORDS = {
"sql_injection": ["sqlmap", "union select", "' or 1=1", "sql注入"],
"phishing": ["钓鱼页面", "伪造登录", "假冒网站", "捕获密码"],
"credential_theft": ["爆破密码", "暴力破解", "字典攻击", "撞库"],
"malware": ["木马", "后门", "keylogger", "勒索软件", "病毒代码"],
"social_engineering": ["诈骗话术", "冒充客服", "虚假短信模板"]
}Distinguishes legitimate security research from attack assistance based on target (own system vs. third‑party), request granularity (principle explanation vs. ready‑to‑run code), context (defensive purpose vs. no justification), and code usability (educational pseudocode vs. executable script).
RAG Permission Isolation
In enterprise RAG scenarios each user can retrieve only documents they are authorised to access.
class SecureRAGRetriever:
def __init__(self, vector_store, acl_service):
self.vector_store = vector_store
self.acl = acl_service
def retrieve(self, query: str, user_id: str, top_k: int = 5) -> list:
allowed_doc_ids = self.acl.get_accessible_docs(user_id)
results = self.vector_store.similarity_search(
query=query,
filter={"doc_id": {"$in": allowed_doc_ids}},
k=top_k
)
return results
def generate_with_context(self, query: str, user_id: str) -> str:
docs = self.retrieve(query, user_id)
context = "
".join([d.page_content for d in docs])
return llm.generate(query=query, context=context)Multi‑tenant isolation is achieved via namespace or metadata filtering in vector stores such as Pinecone, Weaviate, or Qdrant.
Harmful‑Information Filtering Agent
Weapon/dangerous‑substance tutorials are handled by combining intent‑first judgement with an output safety guard.
def intent_first_judge(request: str, context: dict) -> dict:
core_request = extract_core_request(request, context)
safety_result = classifier.predict(core_request)
return safety_result class OutputSafetyGuard:
def __init__(self, classifier, dangerous_output_patterns):
self.classifier = classifier
self.patterns = dangerous_output_patterns
def check_output(self, generated_text: str) -> dict:
for name, pattern in self.patterns.items():
if re.search(pattern, generated_text):
return {"safe": False, "reason": name}
result = self.classifier.predict(generated_text)
return {"safe": result["label"] == "safe", "score": result["score"]}Intent‑first input judgement and output guard complement each other: one filters intent, the other filters generated content.
Conspiracy & Misinformation Handling
Detects conspiracy framing using keyword signals and a semantic classifier.
CONSPIRACY_SIGNALS = [
"他们不想让你知道的",
"主流媒体隐瞒了",
"官方数据造假",
"深层政府",
"疫苗芯片",
]
def detect_conspiracy_framing(text: str) -> float:
keyword_score = sum(1 for kw in CONSPIRACY_SIGNALS if kw in text) / len(CONSPIRACY_SIGNALS)
semantic_score = conspiracy_classifier.predict(text)["conspiracy"]
return 0.4 * keyword_score + 0.6 * semantic_scoreProfessional‑Domain Boundary & Disclaimer Injection
Meta‑instruction irreducibility applies to regulated domains (medical, legal, financial). Even if the model contains knowledge, it must not provide professional advice.
PROFESSIONAL_DOMAINS = {
"medical": {
"keywords": ["诊断", "症状", "用药", "剂量", "手术", "病情", "检查结果"],
"disclaimer": "以上内容仅供参考,不构成医疗建议。具体诊疗请咨询执业医师。",
"risk_level": "high"
},
"legal": {
"keywords": ["合同", "起诉", "诉讼", "法律责任", "判决", "仲裁", "辩护"],
"disclaimer": "以上内容仅供参考,不构成法律意见。具体法律问题请咨询执业律师。",
"risk_level": "high"
},
"financial": {
"keywords": ["投资", "买入", "卖出", "收益率", "基金", "股票", "理财建议"],
"disclaimer": "以上内容不构成投资建议。投资有风险,决策请咨询持牌金融顾问。",
"risk_level": "medium"
}
}
def inject_professional_disclaimer(response: str, domain: str) -> str:
if domain in PROFESSIONAL_DOMAINS:
disclaimer = PROFESSIONAL_DOMAINS[domain]["disclaimer"]
return f"{response}
---
{disclaimer}"
return responseTwo handling modes: passive disclaimer injection after a safe answer, or active refusal for high‑risk queries (e.g., direct disease diagnosis).
Academic‑Cheating Detection
Pattern‑based detection for essay writing, exam answers, and plagiarism assistance.
ACADEMIC_CHEAT_PATTERNS = {
"essay_writing": ["帮我写一篇关于.*的论文", "代写作业", "写一篇.*字的.*课作业"],
"exam_answers": ["这是考试题", "帮我答题", "考试中.*怎么回答"],
"plagiarism": ["帮我改写这篇文章让它过查重", "降低查重率", "让抄袭检测通过"]
}Instead of a binary reject/allow decision, the system can guide users toward compliant learning assistance.
Regulatory Compliance Policy
Region‑specific policies adjust allowed content and required disclaimers.
class CompliancePolicy:
def __init__(self, region: str):
self.region = region
self.policy = REGIONAL_POLICIES.get(region, GLOBAL_DEFAULT_POLICY)
def is_allowed(self, content_category: str) -> bool:
return self.policy.get(content_category, False)
def get_required_disclaimers(self, content_type: str) -> list[str]:
return self.policy.get("disclaimers", {}).get(content_type, [])
REGIONAL_POLICIES = {
"CN": {"gambling_discussion": False, "financial_advice_disclaimer": True},
"EU": {"gambling_discussion": True, "gdpr_disclaimer": True},
"US": {"gambling_discussion": True, "state_law_disclaimer": True}
}Refusal‑Response Engineering
Refusals are structured objects rather than free‑form text.
from enum import Enum
from dataclasses import dataclass
class RefusalReason(Enum):
HARMFUL_CONTENT = "harmful_content"
PRIVACY_RISK = "privacy_risk"
PROFESSIONAL_BOUNDARY = "professional_boundary"
POLICY_VIOLATION = "policy_violation"
AMBIGUOUS = "ambiguous"
@dataclass
class RefusalDecision:
should_refuse: bool
reason: RefusalReason
confidence: float # 0‑1
explanation: str = ""
alternative: str = ""
log_category: str = ""
def generate_refusal_response(decision: RefusalDecision) -> str:
REFUSAL_TEMPLATES = {
RefusalReason.HARMFUL_CONTENT: {
"message": "这个请求涉及可能造成伤害的内容,我无法提供帮助。",
"alternative": "如果你有其他问题,我很乐意协助。"
},
RefusalReason.PRIVACY_RISK: {
"message": "为了保护你的信息安全,我建议不要在对话中分享这类敏感信息。",
"alternative": "如果你有具体的安全疑问,可以告诉我你想了解什么。"
},
RefusalReason.PROFESSIONAL_BOUNDARY: {
"message": "这个问题超出了我可以可靠回答的范围。",
"alternative": "建议咨询相关领域的专业人士。我可以帮你了解如何找到合适的资源。"
}
}
tmpl = REFUSAL_TEMPLATES[decision.reason]
return f"{tmpl['message']}
{tmpl['alternative']}"Partial refusals are supported by decomposing a request into sub‑requests, evaluating each, and answering only the safe parts.
def decompose_request(request: str) -> list[dict]:
sub_requests = llm.decompose(request)
results = []
for sub in sub_requests:
safety = safety_classifier.check(sub)
results.append({
"sub_request": sub,
"safe": safety.is_safe,
"content": generate(sub) if safety.is_safe else None
})
return resultsMetrics & Monitoring
Key indicators include false‑positive rate, false‑negative rate, jailbreak detection rates, logical‑hijack hit rate, refusal distribution, and post‑refusal user behaviour.
Performance Reference
Rule filter (including inseparability detection): <1 ms (regex only, no model call).
Intent classification model (with logical‑hijack features): <30 ms (quantised BERT on GPU).
PII detection: <20 ms (rule + lightweight NER).
Guard model (optional): <200 ms (triggered only on ambiguous cases).
Post‑generation output check: <50 ms (runs in parallel with streaming).
Total safety overhead (excluding LLM inference): <100 ms.
System Integration Flow
The pipeline proceeds from fast rule‑based filters, through intent classification, optional guard checks, human review (when needed), and finally response generation or structured refusal.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
AI Engineer Programming
In the AI era, defining problems is often more important than solving them; here we explore AI's contradictions, boundaries, and possibilities.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
