Standardizing and Automating the Review Middleware: Protocols, Processes, and Implementation
This article describes how a review middle‑platform for a video service standardizes communication protocols with RocketMQ, defines unified data processing flows, and implements an automated business‑access detector with Java code to reduce duplication and improve development efficiency.
1. Background
1.1 Terminology
1.1.1 Business End
Business ends refer to concrete downstream services of a product line, such as video reporting, user avatar, comment capture, etc. Currently 58 business ends are integrated, each requiring specific review strategies regarding timeliness, priority, and strictness.
1.1.2 Review Middle‑Platform
Business ends send data to the review middle‑platform, which first performs machine review and then human review. After review, results are sent back to the business end for either exposure or deletion, ensuring user‑generated content safety and legality.
1.1.3 Domain Model
The domain model (MBusiness) bridges requirement analysis and object‑oriented design. It captures attributes such as inTopic, inGroup, outTopic, outGroup, and type, representing the flow of data entering and leaving the review platform.
1.2 Review Middle‑Platform Overview
The platform serves as a centralized hub for the Sohu video matrix, handling review requests from 58 business ends. Each new business end must send its data to the platform for standardized review.
1.3 Business Pain Points
Two main issues exist: (1) Inconsistent communication protocols (HTTP vs. MQ) and data formats; (2) Repetitive development of similar review‑process code for each new business end, leading to high maintenance cost and low code quality.
2. Standardization Refactor
2.1 Standardize Communication Protocol
To decouple business services, the platform adopts RocketMQ as the message middleware, offering high availability, performance, and reliability suitable for inter‑service messaging.
2.2 Standardize Processing Flow
Three standard review flows (video, album, text‑image) are predefined. The MBusiness domain model defines key attributes (inTopic, inGroup, outTopic, outGroup, type). Data flow rules are:
a. inTopic is the entry topic where business ends produce messages and the platform consumes them.
b. outTopic is the exit topic where the platform produces review results and business ends consume them.When a new business is onboarded, its MBusiness record is created in the database. The business end produces messages to inTopic and later consumes results from outTopic.
The platform continuously scans for new MBusiness entries, automatically creates consumers for inTopic, processes messages according to the predefined flow, and creates producers for outTopic to deliver results.
3. Automation Refactor
3.1 Business Access Detector
The detector starts with the system, holding two maps: consumerMap (key: inTopic, value: consumer instance) and producerMap (key: business name, value: producer instance). Every three minutes it reads all MBusiness records, creates missing consumers/producers, and registers them in the maps.
3.2 Automated Data Flow
1) Business ends produce entry messages to inTopic. 2) The platform consumes these messages, applies the appropriate standard flow, and processes the data. 3) After review, the platform uses the corresponding producer to send results to outTopic. 4) Business ends consume outTopic to obtain review outcomes.
3.3 Code Samples
3.3.1 Automatic Review (Consumer) Example
@Slf4j
@Component
public class ConsumerManager implements ApplicationRunner, DisposableBean {
private Map<String, Consumer> consumerMap = Collections.synchronizedMap(new HashMap<>());
@Resource
private VideoConsumerCallback videoConsumerCallback;
@Resource
private ContentConsumerCallback contentConsumerCallback;
@Resource
private PlayListConsumerCallback playListConsumerCallback;
@Resource
private BroadlistConsumerCallback broadlistConsumerCallback;
@Resource
private MBusinessDao businessDao;
@Override
public void run(ApplicationArguments args) throws Exception {
init();
flush();
}
private void flush() {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
init();
}
}, 1000 * 60 * 5L, 1000 * 60 * 5L);
}
private void init() {
List<MBusiness> businessList = businessDao.listValid();
if (CollectionUtils.isEmpty(businessList)) {
return;
}
for (MBusiness business : businessList) {
if (consumerMap.containsKey(business.getInTopic())) {
continue;
}
Consumer consumer = initConsumer(business);
if (consumer == null) {
continue;
}
consumer.start();
consumerMap.put(business.getInTopic(), consumer);
log.info("topic:{} group:{} start consumer.", business.getInTopic(), business.getInGroup());
}
}
@Override
public void destroy() throws Exception {
if (consumerMap.isEmpty()) {
return;
}
for (Consumer consumer : consumerMap.values()) {
consumer.shutdown();
}
}
private Consumer initConsumer(MBusiness business) {
if (StringUtils.isBlank(business.getInGroup()) || StringUtils.isBlank(business.getInTopic())) {
return null;
}
Consumer consumer = new Consumer(business.getInGroup(), business.getInTopic());
if (EnterType.VIDEO.getType().equals(business.getEnterType())) {
consumer.setConsumerCallback(videoConsumerCallback);
}
if (EnterType.CONTENT.getType().equals(business.getEnterType())) {
consumer.setConsumerCallback(contentConsumerCallback);
}
if (EnterType.PLAYLIST.getType().equals(business.getEnterType())) {
consumer.setConsumerCallback(playListConsumerCallback);
}
if (EnterType.BROADLIST.getType().equals(business.getEnterType())) {
consumer.setConsumerCallback(broadlistConsumerCallback);
}
consumer.setPullThresholdForQueue(1);
return consumer;
}
public Set<String> listTopic() {
return consumerMap.keySet();
}
public Consumer getConsumer(String topic) {
return consumerMap.get(topic);
}
}3.3.2 Automatic Result Publishing (Producer) Example
@Slf4j
@Component
public class ProducerManager implements ApplicationRunner, DisposableBean {
private Map<String, Producer> producerMap = Collections.synchronizedMap(new HashMap<>());
private Map<String, Producer> producerTypeMap = Collections.synchronizedMap(new HashMap<>());
private Map<String, String> webhookMap = Collections.synchronizedMap(new HashMap<>());
@Resource
private MBusinessDao businessDao;
@Override
public void run(ApplicationArguments args) throws Exception {
init();
flush();
}
private void flush() {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
init();
}
}, 1000 * 10 * 3L, 1000 * 60 * 3L);
}
private void init() {
List<MBusiness> businessList = businessDao.listValid();
if (CollectionUtils.isEmpty(businessList)) {
return;
}
for (MBusiness business : businessList) {
if (StringUtils.isNotBlank(business.getOutWebhook()) && !webhookMap.containsKey(business.getType())) {
webhookMap.put(business.getType(), business.getOutWebhook());
}
if (producerMap.containsKey(business.getOutTopic())) {
if (!producerTypeMap.containsKey(business.getType())) {
producerTypeMap.put(business.getType(), producerMap.get(business.getOutTopic()));
log.info("business type:{} topic:{} group:{} init producer.", business.getType(), business.getOutTopic(), business.getOutGroup());
}
continue;
}
Producer producer = initProducer(business);
if (producer == null) {
continue;
}
producer.start();
producerMap.put(business.getOutTopic(), producer);
producerTypeMap.put(business.getType(), producer);
log.info("business type:{} topic:{} group:{} init producer.", business.getType(), business.getOutTopic(), business.getOutGroup());
}
}
@Override
public void destroy() throws Exception {
if (producerMap.isEmpty()) {
return;
}
for (Producer producer : producerMap.values()) {
producer.shutdown();
}
}
public Producer initProducer(MBusiness business) {
if (StringUtils.isBlank(business.getOutGroup()) || StringUtils.isBlank(business.getOutTopic())) {
return null;
}
return new Producer(business.getOutGroup(), business.getOutTopic());
}
public String getWebhook(String type) {
if (StringUtils.isBlank(type)) {
return null;
}
return webhookMap.get(type);
}
public Producer getProducer(String type) {
if (StringUtils.isBlank(type)) {
return null;
}
return producerTypeMap.get(type);
}
}4. Summary
By standardizing communication protocols, data formats, and processing flows, and by introducing an automated business‑access detector, the platform abstracts diverse business ends into three core types (video, album, text‑image), eliminates repetitive development, and significantly improves development efficiency and system maintainability.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Sohu Tech Products
A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
