Shared Memory Principles and a Practical VCS Data Collection Implementation
The article explains Linux shared‑memory fundamentals, why it outperforms file‑based IPC, demonstrates the mmap() system call, and walks through a complete Go implementation that creates, synchronizes, reads, and protobuf‑serializes advertising‑tracking metrics in the VCS monitoring platform.
This article introduces the concept of shared memory, which is widely used in high‑performance components such as Redis, Kafka, and RabbitMQ, and then presents a concrete use case for advertising‑tracking data collection in the VCS (vivo control system) monitoring platform.
1. Shared Memory Theory
In Linux each process has its own address space and page table. When two processes map different virtual addresses to the same physical region, that region becomes shared memory, enabling inter‑process communication (IPC) without data copying. Synchronisation is achieved with semaphores to avoid read‑while‑write conflicts. The lifecycle of a shared‑memory segment follows a reference‑count model: the counter is incremented when a process attaches and decremented when it detaches; the segment is destroyed when the count reaches zero.
2. Comparison with Traditional File‑Based IPC
Shared memory is the fastest IPC because it eliminates the need for multiple copies between kernel and user space. Traditional file I/O or pipes require four copies, while shared memory needs only two (disk → shared memory and shared memory → output). Moreover, shared memory can stay mapped for the whole communication session, avoiding repeated mapping/unmapping overhead.
3. mmap() System Call
The mmap() call maps a file (or an anonymous region) into a process’s address space, allowing the file to be accessed like regular memory. Its prototype is:
void* mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset)Key parameters:
fd : file descriptor returned by open() or -1 for anonymous mapping (requires MAP_ANON flag).
len : number of bytes to map.
prot : access rights (e.g., PROT_READ , PROT_WRITE ).
flags : MAP_SHARED or MAP_PRIVATE (mutually exclusive), optional MAP_FIXED .
offset : start offset within the file, usually 0.
The call returns the starting address of the mapped region in the caller’s address space.
4. VCS Shared‑Memory Data Collection (Practical Implementation)
The VCS system gathers monitoring metrics from the whole network. The shared‑memory layout is designed as follows (simplified):
|4K protect|MagicNum1|idx|OssMapSz (1024×128 bytes) ×2|reserved|MagicNum2|4K protect|Key Go source files are described below.
4.1 general.proto
syntax = "proto2";
package general;
message Data {
map
kv = 1;
}
message GeneralData {
optional string rule_id = 1;
repeated Data data = 2;
optional int64 count = 3;
optional int64 left_size = 4;
optional int32 version = 5;
}4.2 constant.go
package moni_shm
const (
OssShmId uint32 = 0x3eeff00
MagicNum1 uint32 = 0x650a218
MagicNum2 uint32 = 0x138a4f2
CreateShmLock = "/var/run/.oss_shm_lock"
OssMapOneAttrCnt = 1024 * 128 // 1024 rules
OssOneAttrEntryCnt = 128 // 128 metrics per rule
EntrySz = 4
OssMapCnt = 2
OneAttrSz = OssOneAttrEntryCnt * EntrySz
OssMapSz = OssMapOneAttrCnt * OneAttrSz
OssAttrSz = OssMapSz*OssMapCnt + 4 + 4 + 64*4 + 4
defaultIntervalSec = 60
defaultTopic = "moni_general_shared_memory"
)4.3 util.go (memory utilities)
package moni_shm
import "unsafe"
// align rounds up 'actual' to the nearest multiple of 'to'.
func align(actual, to uint64) uint64 {
return (actual + to - 1) / to * to
}
// zero clears 'bts' bytes starting at 'ptr'. It works page‑by‑page (4 KB).
func zero(ptr uintptr, bts uint64) {
if bts == 0 {
return
}
const sz = 4096
var next uint64
for next+sz <= bts {
arr := (*[sz]byte)(unsafe.Pointer(ptr))
for i := range *arr {
(*arr)[i] = 0
}
next += sz
ptr += uintptr(sz)
}
if next == bts {
return
}
for i := uintptr(0); i < uintptr(bts-next); i++ {
*(*byte)(unsafe.Pointer(ptr + i)) = 0
}
}4.4 mgr.go (collection logic)
var (
_basePtr uintptr = 0
_shmUtil = NewShmUtil(OssShmId, OssAttrSz)
_intervalSec = defaultIntervalSec
_topic = defaultTopic
_on bool = false
)
func Stat(on bool) { _on = on }
func Start() { go collect() }
func tryInitBaseptr() error {
if _basePtr == 0 {
var err error
_basePtr, err = _shmUtil.GetData()
if err != nil {
logrus.Warnf("init base ptr failed, retrying: %v", err)
}
return err
}
return nil
}
func collect() {
var (
cost time.Duration
start time.Time
first = true
)
for {
if !first {
time.Sleep(time.Second*time.Duration(_intervalSec) - cost)
}
first = false
start = time.Now()
if !_on { cost = time.Since(start); continue }
if _basePtr == 0 {
if err := tryInitBaseptr(); err != nil { cost = time.Since(start); continue }
}
d := collectOnce()
for _, v := range d { moni_report.ProductReportData(*v) }
cost = time.Since(start)
}
}
func collectOnce() []*moni_report.ReportData {
now := time.Now()
data := make(map[uint32]*general.GeneralData)
d := SwitchAndFetch(_basePtr)
logrus.Infof("sending %d data from shm", len(d))
for _, v := range d {
ruleId := strconv.FormatUint(uint64(v[0]), 10)
dim := strconv.FormatUint(uint64(v[1]), 10)
value := strconv.FormatUint(uint64(v[2]), 10)
if _, ok := data[v[0]]; !ok {
data[v[0]] = &general.GeneralData{RuleId: proto.String(ruleId), Data: []*general.Data{}}
}
data[v[0]].Data = append(data[v[0]].Data, &general.Data{Kv: map[string]string{dim: value, "timestamp": strconv.FormatInt(now.Unix()*1000, 10), "ip": viper.GetString("host.inner_ip")}})
}
var ret []*moni_report.ReportData
for _, v := range data {
bts, err := proto.Marshal(v)
if err != nil { logrus.Errorf("marshal shm data failed: %v", err); continue }
ret = append(ret, &moni_report.ReportData{DataBytes: bts, Topic: _topic})
}
return ret
}4.5 shmutil.go (shared‑memory operations)
package moni_shm
import (
"fmt"
"log"
"os"
"syscall"
"unsafe"
"github.com/sirupsen/logrus"
)
const IpcCreate = 00001000
var (
ErrNotCreated = fmt.Errorf("shm not created")
ErrCreateFailed = fmt.Errorf("shm create failed")
)
type shmOpt func(*ShmUtil)
func WithCreate(b bool) shmOpt { return func(u *ShmUtil) { u.create = b } }
type ShmUtil struct {
pageSz int
dataSz uint64
total uint64
shmKey uint32
create bool
base uintptr
data uintptr
}
func NewShmUtil(key uint32, sz uint64, cfgs ...shmOpt) *ShmUtil {
if key == 0 { panic("invalid shm key: 0") }
ret := &ShmUtil{dataSz: sz, shmKey: key}
ret.pageSz = os.Getpagesize()
ret.dataSz = align(ret.dataSz, uint64(ret.pageSz))
ret.total = ret.dataSz + uint64(ret.pageSz)*2
for _, c := range cfgs { c(ret) }
return ret
}
func (s *ShmUtil) attachShm(flag int) error {
created := false
shmid, _, errno := syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag))
if errno != 0 { return errno }
if int(shmid) < 0 {
if !s.create { return ErrNotCreated }
shmid, _, errno = syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag|IpcCreate))
if errno != 0 { return fmt.Errorf("shm create: %v", errno) }
if int(shmid) < 0 { return ErrCreateFailed }
created = true
}
addr, _, errno := syscall.Syscall(syscall.SYS_SHMAT, shmid, 0, 0)
if errno != 0 { return fmt.Errorf("shmat: %v", errno) }
if created { zero(addr, s.total) }
s.base = addr
s.data = s.base + uintptr(s.pageSz)
// protect head and tail pages
_, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.base, uintptr(s.pageSz), 0)
if errno != 0 { s.detach(); return fmt.Errorf("mprotect head: %v", errno) }
_, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.data+uintptr(s.dataSz), uintptr(s.pageSz), 0)
if errno != 0 { s.detach(); return fmt.Errorf("mprotect tail: %v", errno) }
return nil
}
func (s *ShmUtil) detach() { if s.base != 0 { syscall.Syscall(syscall.SYS_SHMDT, s.base, 0, 0); s.base = 0; s.data = 0 } }
func (s *ShmUtil) GetData() (uintptr, error) {
if s.data != 0 { return s.data, nil }
if err := s.attachShm(0666); err != nil { return 0, err }
return s.data, nil
}
func SwitchAndFetch(ptr uintptr) [][3]uint32 {
if ptr == 0 { return nil }
m1 := (*uint32)(unsafe.Pointer(ptr))
m2 := (*uint32)(unsafe.Pointer(ptr + 8 + OssMapSz*2 + 4*64))
if MagicNum1 != *m1 || MagicNum2 != *m2 { logrus.Errorf("magic mismatch"); return nil }
idx := (*uint32)(unsafe.Pointer(ptr + 4))
old := *idx
*idx = 1 - *idx
ret := PartialRead(ptr, old)
zero(ptr+8+uintptr(old)*OssMapSz, OssMapSz)
return ret
}
func PartialRead(ptr uintptr, idx uint32) [][3]uint32 { startPtr := ptr + 8 + uintptr(idx)*OssMapSz; return ReadOssMap(startPtr) }
func ReadOssMap(ptr uintptr) [][3]uint32 {
var ret [][3]uint32
for i := uint32(0); i < OssMapOneAttrCnt; i++ {
for _, v := range ReadOneAttr(ptr) { ret = append(ret, [3]uint32{i, v[0], v[1]}) }
ptr += OneAttrSz
}
return ret
}
func ReadOneAttr(ptr uintptr) [][2]uint32 {
var ret [][2]uint32
for i := uint32(0); i < OssOneAttrEntryCnt; i++ {
v := *(*uint32)(unsafe.Pointer(ptr))
if v != 0 { ret = append(ret, [2]uint32{i, v}) }
ptr += EntrySz
}
return ret
}5. Summary
The article explains the underlying principles of shared memory, contrasts it with traditional file‑based IPC, details the usage of mmap() , and provides a complete Go implementation that creates, reads, and reports shared‑memory metrics for a production monitoring system. The source code demonstrates low‑level memory management, synchronization via semaphores, and protobuf‑based data serialization, offering a practical reference for engineers building high‑performance, low‑latency data pipelines.
vivo Internet Technology
Sharing practical vivo Internet technology insights and salon events, plus the latest industry news and hot conferences.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.