如何统计 UV、独立访客与海量数据去重?

🎯 面试题:DAU 如何统计?40 亿数据如何去重?


一、UV 统计的核心矛盾

DAU = 一天内有多少独立用户访问了系统

❌ 直接 COUNT(DISTINCT user_id)
   → 亿级数据,全表去重,查询极慢
   → 100ms~几秒的延迟,无法实时

✅ 方案:专用数据结构,在写入时去重
   → HyperLogLog / Bitmap / HashSet
   → 查询 O(1),毫秒级返回

二、HyperLogLog(生产推荐方案)

原理

HyperLogLog 是一种概率算法:
  通过哈希函数将每个 userId 映射为一个 64 位二进制串
  统计二进制串从左开始连续出现的最大 0 前缀长度:maxZeros
  估计基数 ≈ 2^maxZeros

举例:
  哈希值 001011... → 前导零 = 2
  哈希值 0001011... → 前导零 = 3

  maxZeros = 3 → 估计基数为 2^3 = 8

误差率:约 0.81%(可接受)

空间:固定 12KB,存 2^64 个数的基数 ✅
@Service
@Slf4j
public class UvService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private static final String UV_KEY_PREFIX = "uv:daily:";

    /**
     * 记录一次访问
     * userId 可以是登录用户 ID,也可以是匿名访客的 cookie/session ID
     */
    public void recordVisit(String userId) {
        LocalDate today = LocalDate.now();
        String key = UV_KEY_PREFIX + today;
        // PFADD 自动去重
        redisTemplate.opsForHyperLogLog().add(key, userId);
    }

    /**
     * 获取当天 DAU
     */
    public long getDailyUv() {
        String key = UV_KEY_PREFIX + LocalDate.now();
        return redisTemplate.opsForHyperLogLog().size(key);
    }

    /**
     * 获取时间段内 UV(去重合并)
     * 例:过去 7 天的去重 UV
     */
    public long getUvBetween(LocalDate start, LocalDate end) {
        List<String> keys = new ArrayList<>();
        for (LocalDate d = start; !d.isAfter(end); d = d.plusDays(1)) {
            keys.add(UV_KEY_PREFIX + d.toString());
        }
        if (keys.isEmpty()) return 0;

        return redisTemplate.opsForHyperLogLog().size(
            keys.toArray(new String[0])
        );
    }

    /**
     * 设置过期时间(保留 90 天)
     */
    @Scheduled(cron = "0 5 0 * * ?") // 每天凌晨 00:05 执行
    public void setKeyExpire() {
        LocalDate today = LocalDate.now();
        String key = UV_KEY_PREFIX + today;
        redisTemplate.expire(key, 90, TimeUnit.DAYS);
    }
}

精确去重合并场景

需求:同时统计 DAU、周 UV、月 UV

实现:用 HyperLogLog 做周/月合并

周 UV = PFCOUNT 周一 Key + 周二 Key + ... + 周日 Key
       → 自动去重,返回真实周独立访客数

三、Bitmap 方案(精确版)

适用条件

用户 ID 必须满足:
  ✅ 是数字类型
  ✅ 范围已知且相对连续(如 1 ~ 1 亿)
  ✅ 可预测最大值

原理:每个 bit 代表一个用户 ID
  bit[1] = 1 → 用户 ID 1 今天访问过
  bit[2] = 1 → 用户 ID 2 今天访问过
  COUNT(bit == 1) = DAU

空间:1 亿用户 = 100,000,000 bits ≈ 12.5MB ✅
@Service
@Slf4j
public class UvBitmapService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 用户访问 → 对应 bit 位置 1
     */
    public void recordVisit(long userId) {
        LocalDate today = LocalDate.now();
        String key = String.format("uv:bitmap:%s", today);
        // userId - 1,因为 bit 从 0 开始
        redisTemplate.opsForValue().setBit(key, userId - 1, true);
    }

    /**
     * 获取当天 DAU
     */
    public long getDailyUv() {
        String key = String.format("uv:bitmap:%s", LocalDate.now());
        return redisTemplate.opsForValue().bitCount(key);
    }

    /**
     * 获取指定用户今天是否访问
     */
    public boolean hasVisited(long userId) {
        String key = String.format("uv:bitmap:%s", LocalDate.now());
        return Boolean.TRUE.equals(
            redisTemplate.opsForValue().getBit(key, userId - 1)
        );
    }

    /**
     * 跨天 UV 合并(BITOP OR)
     */
    public long getUvBetween(LocalDate start, LocalDate end) {
        String destKey = String.format("uv:bitmap:merged:%s_%s", start, end);
        List<String> srcKeys = new ArrayList<>();
        for (LocalDate d = start; !d.isAfter(end); d = d.plusDays(1)) {
            srcKeys.add(String.format("uv:bitmap:%s", d));
        }

        // BITOP OR:将所有天的 bitmap OR 合并,结果存 destKey
        redisTemplate.opsForValue().bitOp(
            RedisStringCommands.BitOperation.OR,
            destKey,
            srcKeys.toArray(new String[0])
        );

        long uv = redisTemplate.opsForValue().bitCount(destKey);
        // 合并完删除临时 key
        redisTemplate.delete(destKey);
        return uv;
    }
}

HyperLogLog vs Bitmap 对比

指标 HyperLogLog Bitmap
空间 固定 12KB userId 最大值 / 8
精度 ≈ 0.81% 误差 100% 精确
适用 DAU 亿级 百万~亿级(ID 连续)
合并去重 PFCOUNT 多 key BITOP OR
适用场景 精确度要求不高 需要 100% 精确

四、40 亿数据去重方案

问题分析

40 亿 QQ 号,1GB 内存
每个 QQ 号 8 字节(long)→ 320GB ❌

目标:在 1GB 内存内完成去重

核心思路:
  1. 用 bit 代替 byte(节省 8 倍空间)
  2. 分桶处理(将数据分散到多个桶)
  3. 外部排序(数据在外存,分批读入)

方案一:Bitmap(QQ 号范围已知)

QQ 号范围:100000000 ~ 2999999999(约 29 亿个可能的号码)
29 亿 bits ≈ 360MB ✅ 1GB 内存足够
# Python 伪代码
import bitarray

QQ_MIN = 100_000_000
QQ_MAX = 2_999_999_999
BIT_COUNT = QQ_MAX - QQ_MIN  # ≈ 29 亿

# 创建 bitmap
bits = bitarray.bitarray(BIT_COUNT)
bits.setall(0)

duplicates = 0
unique = 0

# 遍历 40 亿 QQ 号(分批从文件/数据库读取)
with open("qq_numbers.dat", "rb") as f:
    while True:
        batch = f.read(8_000_000)  # 每次读 800 万个
        if not batch: break
        for i in range(0, len(batch), 8):
            qq = int.from_bytes(batch[i:i+8], 'big')
            offset = qq - QQ_MIN
            if bits[offset]:
                duplicates += 1
            else:
                bits[offset] = 1
                unique += 1

print(f"唯一 QQ 号: {unique}, 重复: {duplicates}")
# 空间:≈ 360MB + 少量处理开销

方案二:外部排序归并(通用方案)

内存 1GB:每次读 5000 万条(5000万 × 8字节 = 400MB)

步骤:
  1. 分批读取:40亿条分 80 批读入内存
  2. 每批内部排序:QuickSort O(N log N)
  3. 每批排好序后写入临时文件(共 80 个临时文件)
  4. 多路归并:每次从 80 个文件各取一条,取最小的输出
     → 相同号码相邻输出 → 跳过重复 → 最终唯一序列
public class ExternalMergeSort {

    private static final long BATCH_SIZE = 50_000_000L; // 5000 万条

    /**
     * 第一阶段:分批排序写文件
     */
    public void phase1Sort(String inputFile, List<String> tempFiles) throws IOException {
        long totalRecords = countRecords(inputFile);
        long batches = (totalRecords + BATCH_SIZE - 1) / BATCH_SIZE;

        try (FileInputStream fis = new FileInputStream(inputFile);
             BufferedInputStream bis = new BufferedInputStream(fis, 8 * 1024 * 1024);
             DataInputStream dis = new DataInputStream(bis)) {

            for (long b = 0; b < batches; b++) {
                long[] batch = new long[(int) Math.min(BATCH_SIZE, totalRecords - b * BATCH_SIZE)];
                for (int i = 0; i < batch.length; i++) {
                    batch[i] = dis.readLong();
                }
                Arrays.sort(batch);
                String tempFile = "/tmp/sorted_" + b + ".bin";
                writeSortedBatch(tempFile, batch);
                tempFiles.add(tempFile);
            }
        }
    }

    /**
     * 第二阶段:N 路归并去重
     */
    public void phase2Merge(List<String> tempFiles, String outputFile) throws IOException {
        // 1. 初始化 N 个文件的迭代器
        PriorityQueue<RecordPointer> heap = new PriorityQueue<>();
        List<DataInputStream> inputs = new ArrayList<>();

        for (int i = 0; i < tempFiles.size(); i++) {
            DataInputStream dis = new DataInputStream(
                new BufferedInputStream(new FileInputStream(tempFiles.get(i)), 1024 * 1024));
            inputs.add(dis);
            if (dis.available() > 0) {
                heap.add(new RecordPointer(i, dis.readLong()));
            }
        }

        // 2. 归并输出
        try (BufferedOutputStream bos = new BufferedOutputStream(
                new FileOutputStream(outputFile), 8 * 1024 * 1024);
             DataOutputStream dos = new DataOutputStream(bos)) {

            long last = -1;
            while (!heap.isEmpty()) {
                RecordPointer min = heap.poll();
                long current = min.value;

                // 去重:只输出与上一个不同的
                if (current != last) {
                    dos.writeLong(current);
                    last = current;
                }

                // 从同一文件读下一条
                DataInputStream dis = inputs.get(min.fileIndex);
                if (dis.available() >= 8) {
                    min.value = dis.readLong();
                    heap.add(min);
                }
            }
        }
    }
}

方案三:布隆过滤器 + 二次确认

第一步:40 亿条全部过布隆过滤器
  - 不存在 → 一定不重复 ✅
  - 存在 → 可能重复,需要二次确认

第二步:把判断为"可能重复"的写入文件
  文件内排序去重

布隆过滤器参数:
  - n = 40 亿
  - 期望假阳性率 p = 0.01(1%)
  - m = -n * ln(p) / (ln(2)^2) ≈ 5.1 GB ≈ 480MB ✅
  - k = ln(2) * m / n ≈ 7 个哈希函数

五、DAU 亿级统计实战架构

┌──────────────────────────────────────────────────────┐
│                   DAU 统计架构                         │
│                                                       │
│  用户访问 → 写入 Kafka → Flink 实时聚合               │
│                         │                             │
│                    PFADD uv:daily:2026-03-26         │
│                         │                             │
│                    Redis HyperLogLog                   │
│                         │                             │
│               查询 → 毫秒级返回 DAU                    │
│                                                       │
│  备选:每小时预聚合 → 结果写 Redis → 查询直接读         │
└──────────────────────────────────────────────────────┘
/**
 * Flink 实时 DAU 统计(Kafka → Redis)
 */
public class DailyUvJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> events = env
            .addSource(new FlinkKafkaConsumer<>("user_visit", new JsonSchema(), props))
            .uid("kafka-source");

        events.keyBy(e -> e.getDate())  // 按天分组
            .window(SlidingEventTimeWindows.of(Time.hours(1), Time.hours(1)))
            .apply(new UvAggregator())
            .addSink(new RedisSink<>()); // 结果写入 Redis

        env.execute("Daily UV Job");
    }

    // 聚合函数:PFADD
    public static class UvAggregator
            implements WindowFunction<String, Long, String, TimeWindow> {

        @Override
        public void apply(String key, TimeWindow window,
                         Iterable<String> records, Collector<Long> out) {
            RedisCommands<String, String> redis = jedisPool.getResource();
            String hllKey = "uv:daily:" + key;
            for (String userId : records) {
                redis.pfadd(hllKey, userId);
            }
            long uv = redis.pfcount(hllKey);
            out.collect(uv);
        }
    }
}

六、高频面试题

Q1: HyperLogLog 的误差是怎么产生的?

通过哈希后的二进制串前导零个数来估算基数,天然有统计误差。公式:误差率 ≈ 1.04 / sqrt(2^m),m=registers 数量。Redis HyperLogLog 有 16384 个 registers,误差 ≈ 0.81%,即 1 亿 DAU 的误差在 81 万以内。

Q2: 40 亿数据用 1GB 内存去重,哪种方案最优?

如果 QQ 号范围已知且跨度不大,用 Bitmap(5 亿 bit ≈ 360MB)。如果号段跨度大且不连续,用外部排序归并(分批读入内存→排序→多路归并去重)。布隆过滤器适合”先判断存在性”而非精确去重场景。

Q3: DAU 统计的冷启动怎么处理?

每天凌晨从 DB 导出全量用户 ID,PFADD 到 Redis HyperLogLog重建当日数据。如果 Redis 故障降级,直接从 DB COUNT(DISTINCT user_id WHERE login_date = today)。

Q4: 分布式 UV 怎么统计?

每个机房/服务节点维护自己的 HyperLogLog,最后合并所有节点的 Key。PFCOUNT 支持多 Key 合并,自动去重,不存在重复计数问题。


参考链接: