风控系统指标计算/特征提取分析与实现01,Redis、Zset、模版方法
后台-插件-广告管理-内容页头部广告(手机) |
共享语雀:在线知识共享
引用AI
对于风控系统的介绍
风控系统是一种用于在线业务的安全管理系统,它帮助企业和平台防范潜在的欺诈、信用风险以及不合规行为。简单来说,它的核心作用就是“保安全、防欺诈、控风险”。
最近也一直在研究风控系统体系、功能等,看了一些有关的文章,并且也在实践尝试中。
其实前一篇可配置“输入参数的接口如何设计”就是实践尝试的一部分,未来还会有更多的。
而本篇文章就风控系统的指标计算,或者说是特征提取做一些探讨,以下统一称呼为“指标”。
指标不仅可以作为风控系统的一部分配合风控规则或是模型/机器学习使用,而且可以用于离线分析、事后追查、用户画像标签等方面。
参考文章
指标分类
指标是由数据流支撑的,指标是时间纬度的数据提取计算。
根据指标分类举几个例子:
- 次数统计:最近24小时 客户号向 {客户号}向 客户号向{银行卡卡号}转账笔数
- 求和:最近2天 客户号向 {客户号}向 客户号向{银行卡卡号}转账金额之和
- 平均:最近1个月 客户号向 {客户号}向 客户号向{银行卡卡号}转账金额的平均数
- 关联次数:最近72小时 客户号关联 {客户号}关联 客户号关联{设备mac地址}的次数
- 等等
指标类型枚举
/**
* @author wnhyang
* @date 2024/3/13
**/
@AllArgsConstructor
@Getter
public enum IndicatorType {
COUNT(0, "count", "次数统计"),
SUM(1, "sum", "求和"),
AVG(2, "avg", "平均"),
MAX(3, "max", "最大值"),
MIN(4, "min", "最小值"),
ASS(5, "ass", "关联次数");
private final Integer code;
private final String name;
private final String desc;
}
指标实体类
目前filterScript
条件还未确实如何做,使用Groovy
脚本还是什么,之后再定吧。
/**
* @author wnhyang
* @date 2024/3/13
**/
@Data
@EqualsAndHashCode(callSuper = true)
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("de_indicator")
public class IndicatorPO extends BasePO {
@Serial
private static final long serialVersionUID = 1L;
/**
* 自增编号
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* 指标名
*/
@TableField("name")
private String name;
/**
* 状态
*/
@TableField("status")
private Boolean status;
/**
* 类型
*/
@TableField("type")
private Integer type;
/**
* 计算字段
*/
@TableField("calc_field")
private String calcField;
/**
* 窗口大小
*/
@TableField("win_size")
private String winSize;
/**
* 窗口类型
*/
@TableField("win_type")
private String winType;
/**
* 窗口数量
*/
@TableField("win_count")
private Integer winCount;
/**
* 时间片
*/
@TableField("time_slice")
private Long timeSlice;
/**
* 主字段
*/
@TableField("master_field")
private String masterField;
/**
* 从字段
*/
@TableField("slave_fields")
private String slaveFields;
/**
* 过滤脚本
*/
@TableField("filter_script")
private String filterScript;
/**
* 版本号
*/
@TableField("version")
private Integer version;
/**
* 描述
*/
@TableField("description")
private String description;
}
指标窗口枚举
/**
* @author wnhyang
* @date 2024/3/13
**/
@AllArgsConstructor
@Getter
public enum WinType {
LAST(0, "last", "最近"),
CUR(1, "cur", "本");
private final Integer code;
private final String name;
private final String desc;
}
指标存储
指标数据如何存储呢?下面是Redis
方案。
1、使用Redis
的有序集合(Sorted Set
)结构,有序集合中的每个元素都有一个分数(score
),这里的分数我们可以设置为时间戳。
2、添加数据:每当有新的请求到来时,将当前时间戳作为score
,用一个固定的字符串(如"request
")或者其他唯一标识符作为member
,插入到有序集合中。
3、清理过期数据:每次添加新数据后,通过ZRANGEBYSCORE
命令获取并删除窗口范围之外的数据。
4、统计指标:要得到窗口内的请求次数,可以直接使用ZCARD
命令获取有序集合中元素的数量。
5、定时任务:为了确保过期数据能够自动清除,可以结合Redis
的Key
空间通知机制(Keyspace Notifications
)或者外部定时任务定期执行上述清理操作。
计算类指标
计算类指标比较通用,需要存储的数据很容易分析。
但还是有些差别的,次数统计可以在zset
中存储value
为${事件id},score
为时间戳,计算时为zset
的size
,但是对于平均、最大值、最小值、求和不能这么做,因为这些都是有计算字段的,并不是如次数统计那样取size
就行,所以对于这类指标数据存储是不一样的。
单zset
和zset+hash
两种方案。
单zset
zset
作为时间窗口,value
为${事件id}+{计算字段},score
为时间戳,变化就是value
变为事件id
与计算字段的组合,计算字段这样就存储下来了,之后计算平均、最大值、最小值、求和取出来再算就可以了,事件id
是为了在zset
中存储时防重,另外也方便找到原始数据。
优点:直接从zset
中获取计算字段,可以独立手动过期删除。
缺点:value
存和取需要设计,数据冗余大。
zset+hash
zset
作为时间窗口,value
为${事件id},score
为时间戳,这里没有变化,另外需要hash
存储对应事件下需要计算字段的数据。
优点:hash
可以存储多项数据,数据冗余少。
缺点:无法确定事件过期删除时间,每次需要多步查询。
指标计算与查询
指标计算可以简单梳理如下。
以下根据此流程分析,其实编程(也不全是指编程)有趣的地方是设计和实现的过程。
模版方法
设计模式-模版方法可以在这应用,从上面的流程并结合指标的分类来分析,主流程中的判断指标状态、指标条件、获取当前时间戳、获取redis
数据设置过期、清理过期数据都是通用的,根据指标类型变化的只有添加事件这个步骤,所以定义模版抽象类如下:
@Setter
@Getter
@Slf4j
public abstract class AbstractIndicator {
/**
* 指标
*/
protected IndicatorPO indicator;
/**
* 指标类型
*/
protected final IndicatorType INDICATOR_TYPE;
/**
* redisson客户端
*/
protected final RedissonClient redissonClient;
protected AbstractIndicator(IndicatorType indicatorType, RedissonClient redissonClient) {
INDICATOR_TYPE = indicatorType;
this.redissonClient = redissonClient;
}
/**
* 获取指标类型
*
* @return 指标类型
*/
public Integer getType() {
return INDICATOR_TYPE.getCode();
}
/**
* 获取指标状态
*
* @return true/false
*/
public boolean getStatus() {
return indicator.getStatus();
}
/**
* 指标过滤
*
* @return true/false
*/
public boolean filter(Map<String, String> eventDetail) {
// 1、主属性、从属性不为空
if (indicator.getMasterField() != null && eventDetail.get(indicator.getMasterField()) != null) {
if (indicator.getSlaveFields() != null) {
String[] split = indicator.getSlaveFields().split(",");
for (String s : split) {
if (eventDetail.get(s) == null) {
return false;
}
}
// 2、过滤脚本
if (indicator.getFilterScript() == null) {
return true;
} else {
// TODO 脚本过滤
return true;
}
}
}
return false;
}
/**
* 获取redis key
*
* @param eventDetail 事件详情
* @return redis key
*/
public String getRedisKey(Map<String, String> eventDetail) {
return RedisKeys.INDICATOR + indicator.getId() + ":" + INDICATOR_TYPE.getName() + ":" + eventDetail.get(indicator.getMasterField()) + "-" + eventDetail.get(indicator.getSlaveFields());
}
/**
* 获取计算指标结果
*
* @param currentTime 当前时间戳
* @param set redis set
* @return 计算指标结果
*/
public abstract BigDecimal getResult(long currentTime, RScoredSortedSet<String> set);
/**
* 获取计算指标结果
*
* @param eventDetail 事件详情
* @return 计算指标结果
*/
public BigDecimal getResult(Map<String, String> eventDetail) {
// 1、获取当前时间戳
long currentTime = System.currentTimeMillis();
// 2、获取redis中数据
RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(getRedisKey(eventDetail));
// 3、清理过期数据
if ("last".equals(indicator.getWinType())) {
set.removeRangeByScore(-1, true, currentTime - Duration.ofSeconds(indicator.getTimeSlice()).toMillis(), false);
} else {
set.removeRangeByScore(-1, true, calculateEpochMilli(LocalDateTime.now()), false);
}
return getResult(currentTime, set);
}
/**
* 计算指标
*
* @param indicator 指标
* @param eventDetail 事件详情
*/
public void compute(IndicatorPO indicator, Map<String, String> eventDetail) {
if (indicator == null) {
return;
} else {
this.indicator = indicator;
}
// 1、状态检查和过滤
if (getStatus() && filter(eventDetail)) {
// 2、获取当前时间戳
long currentTime = System.currentTimeMillis();
// 3、获取redis中数据
log.info("redisKey:{}", getRedisKey(eventDetail));
RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(getRedisKey(eventDetail));
if ("last".equals(this.indicator.getWinType())) {
set.expire(Duration.ofSeconds(this.indicator.getTimeSlice() * this.indicator.getWinCount()));
} else {
set.expire(Duration.ofSeconds(this.indicator.getTimeSlice()));
}
// 4、添加事件
addEvent(currentTime, set, eventDetail);
// 5、清理过期数据
cleanExpiredDate(currentTime, set);
}
}
/**
* 添加事件
*
* @param currentTime 当前时间戳
* @param set redis set
* @param eventDetail 事件详情
*/
public abstract void addEvent(long currentTime, RScoredSortedSet<String> set, Map<String, String> eventDetail);
/**
* 清理过期数据
*
* @param currentTime 当前时间戳
* @param set redis set
*/
public void cleanExpiredDate(long currentTime, RScoredSortedSet<String> set) {
if ("last".equals(indicator.getWinType())) {
set.removeRangeByScore(-1, true, currentTime - Duration.ofSeconds(indicator.getTimeSlice()).toMillis(), false);
} else {
set.removeRangeByScore(-1, true, calculateEpochMilli(LocalDateTime.now()), false);
}
}
/**
* 计算时间戳
*
* @param now 当前时间
* @return 时间戳
*/
public long calculateEpochMilli(LocalDateTime now) {
ZoneId zoneId = ZoneId.systemDefault();
// 这个default分支仅处理WindowSize枚举中未包含的情况
return switch (indicator.getWinSize()) {
case "M" -> now.withDayOfMonth(1).with(LocalTime.MIN).atZone(zoneId).toInstant().toEpochMilli();
case "d" -> now.with(LocalTime.MIN).atZone(zoneId).toInstant().toEpochMilli();
case "H" -> now.withMinute(0).withSecond(0).withNano(0).atZone(zoneId).toInstant().toEpochMilli();
case "m" -> now.withSecond(0).withNano(0).atZone(zoneId).toInstant().toEpochMilli();
case "s" -> now.withNano(0).atZone(zoneId).toInstant().toEpochMilli();
default -> throw new IllegalArgumentException("Unsupported window size: " + indicator.getWinSize());
};
}
}
次数统计指标
因为篇幅原因,这里只贴次数统计指标实现类了。
/**
* @author wnhyang
* @date 2024/3/11
**/
@Component
public class CountIndicator extends AbstractIndicator {
public CountIndicator(RedissonClient redissonClient) {
super(IndicatorType.COUNT, redissonClient);
}
@Override
public BigDecimal getResult(long currentTime, RScoredSortedSet<String> set) {
return BigDecimal.valueOf(set.size());
}
@Override
public void addEvent(long currentTime, RScoredSortedSet<String> set, Map<String, String> eventDetail) {
set.add(currentTime, eventDetail.get("seqId"));
}
}
总结
先到这里吧,还有其他内容到下次吧。
写在最后
拙作艰辛,字句心血,望诸君垂青,多予支持,不胜感激。
个人博客:无奈何杨(wnhyang)
个人语雀:wnhyang
共享语雀:在线知识共享
Github:wnhyang - Overview
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。
在线投稿:投稿 站长QQ:1888636
后台-插件-广告管理-内容页尾部广告(手机) |