您现在的位置是:首页 > 技术教程 正文

风控系统指标计算/特征提取分析与实现01,Redis、Zset、模版方法

admin 阅读: 2024-03-14
后台-插件-广告管理-内容页头部广告(手机)

个人博客无奈何杨(wnhyang)

个人语雀wnhyang

共享语雀:在线知识共享

Githubwnhyang - Overview


引用AI对于风控系统的介绍

风控系统是一种用于在线业务的安全管理系统,它帮助企业和平台防范潜在的欺诈、信用风险以及不合规行为。简单来说,它的核心作用就是“保安全、防欺诈、控风险”。

最近也一直在研究风控系统体系、功能等,看了一些有关的文章,并且也在实践尝试中。

其实前一篇可配置“输入参数的接口如何设计”就是实践尝试的一部分,未来还会有更多的。

而本篇文章就风控系统的指标计算,或者说是特征提取做一些探讨,以下统一称呼为“指标”。

指标不仅可以作为风控系统的一部分配合风控规则或是模型/机器学习使用,而且可以用于离线分析、事后追查、用户画像标签等方面。

参考文章

风控笔记06:一个完整的风控引擎,需要有哪些功能?

风控笔记07:最常用的风控工具-特征库

指标分类

指标是由数据流支撑的,指标是时间纬度的数据提取计算。

根据指标分类举几个例子:

  • 次数统计:最近24小时 客户号向 {客户号}向 客户号{银行卡卡号}转账笔数
  • 求和:最近2天 客户号向 {客户号}向 客户号{银行卡卡号}转账金额之和
  • 平均:最近1个月 客户号向 {客户号}向 客户号{银行卡卡号}转账金额的平均数
  • 关联次数:最近72小时 客户号关联 {客户号}关联 客户号关联{设备mac地址}的次数
  • 等等

指标类型枚举

/**
 * @author wnhyang
 * @date 2024/3/13
 **/
@AllArgsConstructor
@Getter
public enum IndicatorType {
    COUNT(0, "count", "次数统计"),
    SUM(1, "sum", "求和"),
    AVG(2, "avg", "平均"),
    MAX(3, "max", "最大值"),
    MIN(4, "min", "最小值"),
    ASS(5, "ass", "关联次数");

    private final Integer code;

    private final String name;

    private final String desc;
}
 

指标实体类

目前filterScript条件还未确实如何做,使用Groovy脚本还是什么,之后再定吧。

/**
 * @author wnhyang
 * @date 2024/3/13
 **/
@Data
@EqualsAndHashCode(callSuper = true)
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("de_indicator")
public class IndicatorPO extends BasePO {

    @Serial
    private static final long serialVersionUID = 1L;

    /**
     * 自增编号
     */
    @TableId(value = "id", type = IdType.AUTO)
    private Long id;

    /**
     * 指标名
     */
    @TableField("name")
    private String name;

    /**
     * 状态
     */
    @TableField("status")
    private Boolean status;

    /**
     * 类型
     */
    @TableField("type")
    private Integer type;

    /**
     * 计算字段
     */
    @TableField("calc_field")
    private String calcField;

    /**
     * 窗口大小
     */
    @TableField("win_size")
    private String winSize;

    /**
     * 窗口类型
     */
    @TableField("win_type")
    private String winType;

    /**
     * 窗口数量
     */
    @TableField("win_count")
    private Integer winCount;

    /**
     * 时间片
     */
    @TableField("time_slice")
    private Long timeSlice;

    /**
     * 主字段
     */
    @TableField("master_field")
    private String masterField;

    /**
     * 从字段
     */
    @TableField("slave_fields")
    private String slaveFields;

    /**
     * 过滤脚本
     */
    @TableField("filter_script")
    private String filterScript;

    /**
     * 版本号
     */
    @TableField("version")
    private Integer version;

    /**
     * 描述
     */
    @TableField("description")
    private String description;
}
 

指标窗口枚举

/**
 * @author wnhyang
 * @date 2024/3/13
 **/
@AllArgsConstructor
@Getter
public enum WinType {

    LAST(0, "last", "最近"),
    CUR(1, "cur", "本");

    private final Integer code;

    private final String name;

    private final String desc;
}
 

指标存储

指标数据如何存储呢?下面是Redis方案。

1、使用Redis的有序集合(Sorted Set)结构,有序集合中的每个元素都有一个分数(score),这里的分数我们可以设置为时间戳。

2、添加数据:每当有新的请求到来时,将当前时间戳作为score,用一个固定的字符串(如"request")或者其他唯一标识符作为member,插入到有序集合中。

3、清理过期数据:每次添加新数据后,通过ZRANGEBYSCORE命令获取并删除窗口范围之外的数据。

4、统计指标:要得到窗口内的请求次数,可以直接使用ZCARD命令获取有序集合中元素的数量。

5、定时任务:为了确保过期数据能够自动清除,可以结合RedisKey空间通知机制(Keyspace Notifications)或者外部定时任务定期执行上述清理操作。

计算类指标

计算类指标比较通用,需要存储的数据很容易分析。

但还是有些差别的,次数统计可以在zset中存储value为${事件id},score为时间戳,计算时为zsetsize,但是对于平均、最大值、最小值、求和不能这么做,因为这些都是有计算字段的,并不是如次数统计那样取size就行,所以对于这类指标数据存储是不一样的。

zsetzset+hash两种方案。

单zset

zset作为时间窗口,value为${事件id}+{计算字段},score为时间戳,变化就是value变为事件id与计算字段的组合,计算字段这样就存储下来了,之后计算平均、最大值、最小值、求和取出来再算就可以了,事件id是为了在zset中存储时防重,另外也方便找到原始数据。

优点:直接从zset中获取计算字段,可以独立手动过期删除。

缺点:value存和取需要设计,数据冗余大。

zset+hash

zset作为时间窗口,value为${事件id},score为时间戳,这里没有变化,另外需要hash存储对应事件下需要计算字段的数据。

优点:hash可以存储多项数据,数据冗余少。

缺点:无法确定事件过期删除时间,每次需要多步查询。

指标计算与查询

指标计算可以简单梳理如下。

以下根据此流程分析,其实编程(也不全是指编程)有趣的地方是设计和实现的过程。

模版方法

设计模式-模版方法可以在这应用,从上面的流程并结合指标的分类来分析,主流程中的判断指标状态、指标条件、获取当前时间戳、获取redis数据设置过期、清理过期数据都是通用的,根据指标类型变化的只有添加事件这个步骤,所以定义模版抽象类如下:

@Setter
@Getter
@Slf4j
public abstract class AbstractIndicator {

    /**
     * 指标
     */
    protected IndicatorPO indicator;

    /**
     * 指标类型
     */
    protected final IndicatorType INDICATOR_TYPE;

    /**
     * redisson客户端
     */
    protected final RedissonClient redissonClient;

    protected AbstractIndicator(IndicatorType indicatorType, RedissonClient redissonClient) {
        INDICATOR_TYPE = indicatorType;
        this.redissonClient = redissonClient;
    }

    /**
     * 获取指标类型
     *
     * @return 指标类型
     */
    public Integer getType() {
        return INDICATOR_TYPE.getCode();
    }

    /**
     * 获取指标状态
     *
     * @return true/false
     */
    public boolean getStatus() {
        return indicator.getStatus();
    }

    /**
     * 指标过滤
     *
     * @return true/false
     */
    public boolean filter(Map<String, String> eventDetail) {
        // 1、主属性、从属性不为空
        if (indicator.getMasterField() != null && eventDetail.get(indicator.getMasterField()) != null) {
            if (indicator.getSlaveFields() != null) {
                String[] split = indicator.getSlaveFields().split(",");
                for (String s : split) {
                    if (eventDetail.get(s) == null) {
                        return false;
                    }
                }
                // 2、过滤脚本
                if (indicator.getFilterScript() == null) {
                    return true;
                } else {
                    // TODO 脚本过滤
                    return true;
                }
            }
        }

        return false;
    }

    /**
     * 获取redis key
     *
     * @param eventDetail 事件详情
     * @return redis key
     */
    public String getRedisKey(Map<String, String> eventDetail) {
        return RedisKeys.INDICATOR + indicator.getId() + ":" + INDICATOR_TYPE.getName() + ":" + eventDetail.get(indicator.getMasterField()) + "-" + eventDetail.get(indicator.getSlaveFields());
    }

    /**
     * 获取计算指标结果
     *
     * @param currentTime 当前时间戳
     * @param set         redis set
     * @return 计算指标结果
     */
    public abstract BigDecimal getResult(long currentTime, RScoredSortedSet<String> set);

    /**
     * 获取计算指标结果
     *
     * @param eventDetail 事件详情
     * @return 计算指标结果
     */
    public BigDecimal getResult(Map<String, String> eventDetail) {
        // 1、获取当前时间戳
        long currentTime = System.currentTimeMillis();
        // 2、获取redis中数据
        RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(getRedisKey(eventDetail));
        // 3、清理过期数据
        if ("last".equals(indicator.getWinType())) {
            set.removeRangeByScore(-1, true, currentTime - Duration.ofSeconds(indicator.getTimeSlice()).toMillis(), false);

        } else {
            set.removeRangeByScore(-1, true, calculateEpochMilli(LocalDateTime.now()), false);
        }

        return getResult(currentTime, set);
    }

    /**
     * 计算指标
     *
     * @param indicator   指标
     * @param eventDetail 事件详情
     */
    public void compute(IndicatorPO indicator, Map<String, String> eventDetail) {
        if (indicator == null) {
            return;
        } else {
            this.indicator = indicator;
        }
        // 1、状态检查和过滤
        if (getStatus() && filter(eventDetail)) {

            // 2、获取当前时间戳
            long currentTime = System.currentTimeMillis();
            // 3、获取redis中数据
            log.info("redisKey:{}", getRedisKey(eventDetail));
            RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(getRedisKey(eventDetail));
            if ("last".equals(this.indicator.getWinType())) {
                set.expire(Duration.ofSeconds(this.indicator.getTimeSlice() * this.indicator.getWinCount()));
            } else {
                set.expire(Duration.ofSeconds(this.indicator.getTimeSlice()));
            }

            // 4、添加事件
            addEvent(currentTime, set, eventDetail);

            // 5、清理过期数据
            cleanExpiredDate(currentTime, set);
        }

    }

    /**
     * 添加事件
     *
     * @param currentTime 当前时间戳
     * @param set         redis set
     * @param eventDetail 事件详情
     */
    public abstract void addEvent(long currentTime, RScoredSortedSet<String> set, Map<String, String> eventDetail);

    /**
     * 清理过期数据
     *
     * @param currentTime 当前时间戳
     * @param set         redis set
     */
    public void cleanExpiredDate(long currentTime, RScoredSortedSet<String> set) {
        if ("last".equals(indicator.getWinType())) {
            set.removeRangeByScore(-1, true, currentTime - Duration.ofSeconds(indicator.getTimeSlice()).toMillis(), false);
        } else {
            set.removeRangeByScore(-1, true, calculateEpochMilli(LocalDateTime.now()), false);
        }
    }

    /**
     * 计算时间戳
     *
     * @param now 当前时间
     * @return 时间戳
     */
    public long calculateEpochMilli(LocalDateTime now) {
        ZoneId zoneId = ZoneId.systemDefault();
        // 这个default分支仅处理WindowSize枚举中未包含的情况
        return switch (indicator.getWinSize()) {
            case "M" -> now.withDayOfMonth(1).with(LocalTime.MIN).atZone(zoneId).toInstant().toEpochMilli();
            case "d" -> now.with(LocalTime.MIN).atZone(zoneId).toInstant().toEpochMilli();
            case "H" -> now.withMinute(0).withSecond(0).withNano(0).atZone(zoneId).toInstant().toEpochMilli();
            case "m" -> now.withSecond(0).withNano(0).atZone(zoneId).toInstant().toEpochMilli();
            case "s" -> now.withNano(0).atZone(zoneId).toInstant().toEpochMilli();
            default -> throw new IllegalArgumentException("Unsupported window size: " + indicator.getWinSize());
        };
    }
}
 

次数统计指标

因为篇幅原因,这里只贴次数统计指标实现类了。

/**
 * @author wnhyang
 * @date 2024/3/11
 **/
@Component
public class CountIndicator extends AbstractIndicator {

    public CountIndicator(RedissonClient redissonClient) {
        super(IndicatorType.COUNT, redissonClient);
    }

    @Override
    public BigDecimal getResult(long currentTime, RScoredSortedSet<String> set) {

        return BigDecimal.valueOf(set.size());
    }

    @Override
    public void addEvent(long currentTime, RScoredSortedSet<String> set, Map<String, String> eventDetail) {

        set.add(currentTime, eventDetail.get("seqId"));
    }

}
 

总结

先到这里吧,还有其他内容到下次吧。

写在最后

拙作艰辛,字句心血,望诸君垂青,多予支持,不胜感激。


个人博客:无奈何杨(wnhyang)

个人语雀:wnhyang

共享语雀:在线知识共享

Github:wnhyang - Overview

标签:
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

在线投稿:投稿 站长QQ:1888636

后台-插件-广告管理-内容页尾部广告(手机)
关注我们

扫一扫关注我们,了解最新精彩内容

搜索