Design and Performance Optimization of a Measurement Platform with Multi‑Level Caching

This article describes the background, architecture, encountered performance bottlenecks, and a three‑level caching strategy (Redis‑based) implemented in Python to accelerate a measurement platform that aggregates bug, issue, and code metrics for a rapidly growing development team.

转转QA
转转QA
转转QA
Design and Performance Optimization of a Measurement Platform with Multi‑Level Caching

The rapid expansion of the ZuanZuan team caused a geometric increase in both personnel and demand counts, making it difficult for managers to track project progress and spot operational issues. To provide clear visibility and efficiency metrics across the organization, a measurement platform was built to aggregate data from multiple dimensions such as bugs, requirements, and code.

Overall Structure

Data sources: bug and issue data from the Iwork platform (excluding Teambition), and code from ZuanZuan's SVN/Git repositories.

Technology stack: Front‑end – Vue, Semantic UI, jQuery, ECharts, WdatePicker; Back‑end – Python Tornado; Database – MySQL and Redis.

Business architecture focuses on human‑resource metrics.

The platform visualizes data layers, personnel grouping (using RTX relationships), and aggregation zones, with future work planned for deployment‑level analysis and test‑case metrics.

Practical Issues and Performance Optimization

Fast‑changing requirements, coarse data accuracy, and poor user experience (display and performance) were identified as early pain points.

Performance bottlenecks were traced to slow Iwork APIs (over 10 s for yearly bug/issue queries), lack of data reuse, and heavy front‑end JavaScript.

To mitigate these issues, a three‑level caching scheme was introduced:

Level‑1 Cache: Directly cache the raw Iwork API response in Redis using the request URL as the key.

Level‑2 Cache: Cache the aggregated results of the three product lines (ZZ, ZZYP, ES) to avoid repeated API calls.

Level‑3 Cache: Cache the final computed aggregation (functions) to eliminate repeated heavy calculations.

Performance tests showed a dramatic reduction from >11 s without caching to <25 ms with Redis caching.

Cache Pre‑warming Strategies

Interface‑level pre‑warming by programmatically invoking all API URLs once.

UI‑level pre‑warming using Selenium to simulate user interactions and populate caches.

Future topics include Redis sharding, cache invalidation UI, lightweight pre‑warming methods, and Redis configuration optimizations (disabling RDB/AOF for pure cache scenarios).

Code Implementation

import urllib2
import urllib
import json
import time
import sys
from common import onlineBugUrl, onlineIssueUrl, prodKeyYOUPIN, prodKeyZZ, prodKeyListing, key
from clog import *
from Redis import Redis

logging.root.setLevel(logging.DEBUG)

class iworkMethod:
    def getIssues4ZZ(self, startDate='2016-11-01', endDate='2016-12-31'):
        """Fetch issues for ZZ product line and cache in Redis (Level‑1)."""
        tempDict = {}
        posturl = onlineIssueUrl
        data = {'startDate': startDate, 'endDate': endDate, 'prodKey': prodKeyZZ, 'key': key}
        redisKey = posturl + str(data)
        try:
            redisResult = Redis().get(redisKey)
            if redisResult is None:
                respData = self.postUrl(posturl, data)
                tempDict = json.loads(respData)
                if tempDict.get('result') == '1' and 'issue' in tempDict:
                    if not Redis().set(redisKey, repr(tempDict['issue'])):
                        logging.error('set redis failed:key' + redisKey)
                    else:
                        logging.info('set redis succeed:key' + redisKey)
                return tempDict
            else:
                tempDict['result'] = '1'
                tempDict['issue'] = eval(redisResult)
                return tempDict
        except Exception as e:
            logging.error('post ' + posturl + str(data) + ' request error: ' + str(e))
            return tempDict
    # Similar methods getIssues4ES, getIssues4YP omitted for brevity
    def getIssues(self, startDate='2016-11-01', endDate='2016-12-31', product=""):
        """Merge issues from all product lines and apply Level‑2/3 caching."""
        tempDictIssuesMerge = {}
        if product == prodKeyZZ:
            tempDictIssuesMerge = self.getIssues4ZZ(startDate, endDate)
        elif product == prodKeyListing:
            tempDictIssuesMerge = self.getIssues4ES(startDate, endDate)
        elif product == prodKeyYOUPIN:
            tempDictIssuesMerge = self.getIssues4YP(startDate, endDate)
        else:
            redisKey = str(self.__class__) + sys._getframe().f_code.co_name + startDate + endDate + product
            try:
                redisResult = Redis().get(redisKey)
                if redisResult is None:
                    # fetch each line and merge
                    dZZ = self.getIssues4ZZ(startDate, endDate)
                    dES = self.getIssues4ES(startDate, endDate)
                    dYP = self.getIssues4YP(startDate, endDate)
                    if dZZ.get('result') == '1' and dES.get('result') == '1' and dYP.get('result') == '1':
                        tempDictIssuesMerge['result'] = '1'
                        tempDictIssuesMerge['issue'] = []
                        tempDictIssuesMerge['issue'].extend(json.loads(dZZ['issue']))
                        tempDictIssuesMerge['issue'].extend(json.loads(dES['issue']))
                        tempDictIssuesMerge['issue'].extend(json.loads(dYP['issue']))
                        if not Redis().set(redisKey, repr(tempDictIssuesMerge['issue'])):
                            logging.error('set redis failed:key' + redisKey)
                            Redis().delete(redisKey)
                        else:
                            logging.info('set redis succeed:key' + redisKey)
                    else:
                        logging.debug('One of the sources failed')
                else:
                    tempDictIssuesMerge['result'] = '1'
                    tempDictIssuesMerge['issue'] = eval(redisResult)
                return tempDictIssuesMerge
            except Exception as e:
                logging.error('Redis get error for key:' + redisKey + ' error:' + str(e))
                tempDictIssuesMerge['result'] = '0'
                return tempDictIssuesMerge
    def postUrl(self, url, data):
        req = urllib2.Request(url)
        data = urllib.urlencode(data)
        logging.debug('url:' + url + ' data:' + data)
        opener = urllib2.build_opener(urllib2.HTTPCookieProcessor())
        response = opener.open(req, data)
        return response.read()

In conclusion, the multi‑level caching approach dramatically improves response times for the measurement platform, while pre‑warming techniques and Redis configuration tweaks are proposed to further enhance reliability and scalability.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

BackendmonitoringperformanceredisMetricscaching
转转QA
Written by

转转QA

In the era of knowledge sharing, discover 转转QA from a new perspective.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.