之前曾介绍过使用 Redis 集和构建唯一计数器, 并将这个计数器用于计算网站的唯一房客 IP. 虽然使用集和实现唯一计数器可以实现该功能, 但这个方法有一个明显的缺陷: 随着被计数元素的不断增多, 唯一计数器占用的内存也会越来越大; 计数器越多, 他们的体积越大, 这一情况就会越严峻.
以计算唯一访客 IP 为例:
- 存储一个 IPv4 格式的 IP 地址最多需要 15 个字节
- 根据网站的规模不同, 每天出现的唯一 IP 可能会有数十万、数百万个
- 为了记录网站在不同时期的访客, 并进行相关的数据分析, 网站可能需要次序地记录每天的唯一访客 IP 数量
综上, 如果一个网站想要长时间记录访客 IP, 就必须创建多个唯一计数器. 如果访客比较多, 那么它创建的每个唯一计数器都将包含大量元素, 并因此占用相当一部分内存.
为了高效解决计算机唯一访客 IP 数量这类问题, 其中一种方法就是 HyperLogLog.
HyperLogLog 简介
HyperLogLog 是一个专门为了计算集和的基数而创建的概率算法, 对于一个给定的集和, HyperLogLog 可以计算出这个集合的近似基数: 近似基数并非集和的实际基数, 它可能会比实际的基数大一点或者小一点, 但误差会在一个合理范围内. 因此, 那些不需要知道实际基数的程序就可以把这个近似基数当作集合的基数来使用.
HyperLogLog 的优点在于计算近似基础所需的内存并不会因为集和的大小而改变, 无论集和包含元素有多少个, HyperLogLog 进行计算所需的内存总是固定的, 无论集和包含元素多少个, HyperLogLog 进行计算所需的内存总是固定的, 并且是非常少的.
PFADD: 对集和元素进行计数
PFADD hyperloglog element [element ...]
根据给定元素是否已经进行过计数, PFADD 命令可能返回0, 也可能返回1:
- 如果给定的所有元素都已经计数, 那么 PFADD 命令返回 0, 表示 HyperLogLog 计算出是近似基数没有发生变化
- 如果给定的的元素中出现了至少一个没有过进行计数的元素, 导致 HyperLogLog 计算出的近似基数发生了变化, 那么 PFADD 命令返回 1
复杂度: O(N), N 为给定元素数量
PFCOUNT: 返回集和的近似基数
PFCOUNT hyperloglog [hyperloglog ...]
使用 PFADD 命令对元素进行计数后, 用户可以通过执行 PFCOUNT 命令来获取 HyperLogLog 为集和计算出的近似基数.
当用户给定的 HyperLogLog 不存在时, PFCOUNT 命令将返回 0 作为结果.
当用户向 PFCOUNT 传入多个 HyperLogLog 时, PFCOUNT 命令将对所有给定的 HyperLogLog 执行并行计算, 然后返回并集计算出的近似基数.
PFADD alphabets1 "a" "b" "c"
PFADD alphabets2 "c" "d" "e"
PFCOUNT alphabets1 alphabets2
> (integer) 5
上面计算类似于
PFADD temp-hyperloglog "a" "b" "c" "c" "d" "e"
PFCOUNT temphyperloglog temp-hyperloglog
> (integer) 5
示例: 优化唯一计数器
from redis import Redis
class UniqueCounter:
def __init__(self, client, key):
self.client = client
self.key = key
def count_in(self, item):
"""对给定的元素进行计数"""
self.client.pfadd(self.key, item)
def get_result(self):
"""返回计数器的值"""
return self.client.pfcount(self.key)
client = Redis(decode_responses=True)
counter = UniqueCounter(client, 'unique-ip-counter') # 创建一个唯一 IP 计数器
counter.count_in('1.1.1.1')
counter.count_in('2.2.2.2')
counter.count_in('3.3.3.3')
print(counter.get_result())
counter.count_in('3.3.3.3')
print(counter.get_result())
通过使用 HyperLogLog 实现的唯一计数器去取代集和实现的唯一计数器, 可以大副降低存储所需的内存.
示例: 检测重复信息
在构建应用程序的过程中, 经常要处理广告等垃圾信息. 而发送者经常会使用不同的帐号, 发送相同的垃圾信息, 所有一种简单的方法就是找出重复 的信息: 如果两个用户发送了完全相同的信息, 那么这些信息很可能就是垃圾短信.
判断两段信息是否相同并不是意见容易的事情, 如果使用一般的比较函数, 那复杂度就会很高: O(N*M), 其中 N 为信息的长度, M 为系统目前已有的信息数量.
为了降低复杂度, 可以使用 HyperLogLog 来记录所有以发送的信息: 每当用户发送一条信息时, 程序就使用 PFADD 命令将这条信息添加到 HyperLogLog 中:
- 如果命令返回 1, 说明这条信息未出现过
- 如果命令返回 0, 说明这条信息已出现过
由于 HyperLogLog 是概率算法, 所以即使信息长度非常长, HyperLogLog判断信息是否重复所需的时间也非常短.
from redis import Redis
class DuplicateChecker:
def __init__(self, client, key):
self.client = client
self.key = key
def is_duplicated(self, content):
"""在信息重复时返回 True, 未重复时返回 False"""
return self.client.pfadd(self.key, content) == 0
def unique_count(self):
"""返回检测器已经检查过的非重复信息数量"""
return self.client.pfcount(self.key)
client = Redis(decode_responses=True)
checker = DuplicateChecker(client, 'duplicate-message-checker')
# 输入非重复信息
checker.is_duplicated("hello world!")
checker.is_duplicated("good morning!")
checker.is_duplicated("bye bye")
print(checker.unique_count())
checker.is_duplicated("hello world!")
PFMERGE: 计算多个 HyperLogLog 的并集
PFMERGE destination hyperloglog [hyperloglog ...]
该命令可以对多个给定的 HyperLogLog 执行并行计算, 然后把计算得出的并集 HyperLogLog 保存到指定的键中.
如果指定的键已存在, 则会覆盖原有的键, 执行成功后返回 OK.
HyperLogLog 并集计算的近似基数接近于所有给定 HyperLogLog 的被计数集合的并集基数.
PFADD numbers1 128 256 512
PFADD numbers2 128 256 512
PFADD numbers3 128 512 1024
PFMERGE union-numbers numbers1 numbers2 numbers3
> OK
PFCOUNT union-numbers
> (integer) 4
PFCOUNT 与 PFMERGE
PFCOUNT 命令在计算多个 HyperLogLog 的近似基数时会执行以下操作:
- 在内部调用 PFMERGE 命令, 计算所有给定 HyperLogLog 的并集, 并将这个并集存储到一个临时的 HyperLogLog 中
- 对临时的 HyperLogLog 执行 PFCOUNT 命令, 得到它的近似基数
- 删除临时 HyperLogLog
- 向用户返回之前得到的近似基数
示例: 实现每周/月度/年度计数器
通过 PFMERGE 命令可以对多个 HyperLogLog 实现的唯一计数器执行并集计算, 从而实现每周/月度/年度计数器:
- 通过对一周内每天的唯一访客 IP 计数器执行 PFMERGE 命令, 可以计算出那一周的唯一访客 IP 数量
- 通过对一个月每天的唯一访客 IP 计数器执行 PFMERGE 命令, 可以计数器那一个月的唯一访客 IP 数量
- 年度甚至更长时间的唯一访客 IP 数量也可以按照类似的方法计算
from redis import Redis
class UniqueCounterMerger:
def __init__(self, client):
self.client = client
def merge(self, destination, *hyperloglogs):
self.client.pfmerge(destination, *hyperloglogs)
client = Redis(decode_response=True)
merger = UniqueCounterMerger(client)
# 本周 7 天的计数器排名
counters = [
'unique_ip_counter:8-10',
'unique_ip_counter:8-11',
'unique_ip_counter:8-12',
'unique_ip_counter:8-13',
'unique_ip_counter:8-14',
'unique_ip_counter:8-15',
'unique_ip_counter:8-16',
]
# 计算并存储本周的唯一访客 IP 数量
merger.merger('unique_ip_counter::No_33_week', *counters)
# 去本周的唯一访客 IP 数量
weekly_unique_visitors = client.pfcount('unique_ip_counter::No_33_week')
print(f"本周唯一访客 IP 数量: {weekly_unique_visitors}")