一聚教程网:一个值得你收藏的教程网站

最新下载

热门教程

Redis在批量发货状态缓存和错误信息暂存中的应用小结

时间:2026-06-23 09:02:59 编辑:袖梨 来源:一聚教程网

一、批量发货状态缓存

1.1 解决什么问题

批量发货场景下,用户选择多条待发货单据一键发货,系统需要:

Redis在批量发货状态缓存与错误信息暂存中的应用小结

  • 防止同一单据被多次发货(重复点击、MQ 重试)
  • 标记哪些单据正在处理中,让前端能轮询进度或阻止重复提交
  • 处理完成后清除标记

数据库加状态字段虽然也能做,但频繁读写、并发查询性能差。Redis 作为内存缓存,天然适合这种"短生命周期、高频读写"的临时状态管理。

1.2 核心设计思路

用户触发批量发货    ↓遍历每一条待发货单据    ↓检查 Redis key 是否存在(是否正在处理中)    ├── 存在 → 跳过,返回"正在处理中"    └── 不存在 → 写入 Redis key(标记占位) → 执行发货逻辑                                                 ↓                                         发货成功 → 删除 key 或设短过期                                         发货失败 → 删除 key,返回错误

1.3 涉及的 Redis 数据结构和命令

操作Redis 命令说明
标记正在处理SET key value EX ttl NXNX 保证互斥,EX 防死锁
检查是否处理中GET key非空则跳过
处理完成清除DEL key释放标记
批量检查MGET key1 key2 ...一次查多个单据状态

1.4 代码示例(通用)

import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Service;import java.util.ArrayList;import java.util.List;import java.util.concurrent.TimeUnit;@Servicepublic class BatchTaskService {    private final StringRedisTemplate redisTemplate;    // key 前缀    private static final String PROCESSING_KEY_PREFIX = "batch:task:processing:";    // 标记过期时间(防止异常未清除导致永久阻塞)    private static final long EXPIRE_SECONDS = 300;    public BatchTaskService(StringRedisTemplate redisTemplate) {        this.redisTemplate = redisTemplate;    }    /**     * 批量处理任务.     *     * @param taskIds 待处理的任务ID列表     * @return 处理结果     */    public BatchResult processBatch(List<String> taskIds) {        List<String> successList = new ArrayList<>();        List<String> skippedList = new ArrayList<>();        List<String> failedList = new ArrayList<>();        for (String taskId : taskIds) {            String redisKey = PROCESSING_KEY_PREFIX + taskId;            // 尝试标记为处理中(SET NX EX:不存在才设置 + 过期时间)            Boolean acquired = redisTemplate.opsForValue()                .setIfAbsent(redisKey, "processing", EXPIRE_SECONDS, TimeUnit.SECONDS);            if (Boolean.FALSE.equals(acquired)) {                // key 已存在,说明正在被其他线程/实例处理                skippedList.add(taskId);                continue;            }            try {                // 执行业务逻辑                doProcess(taskId);                successList.add(taskId);            } catch (Exception e) {                failedList.add(taskId);            } finally {                // 无论成功失败都清除标记,允许后续重试                redisTemplate.delete(redisKey);            }        }        return new BatchResult(successList, skippedList, failedList);    }    /**     * 查询哪些任务正在处理中(前端轮询用).     */    public List<String> listProcessingTasks(List<String> taskIds) {        List<String> keys = taskIds.stream()            .map(id -> PROCESSING_KEY_PREFIX + id)            .collect(Collectors.toList());        List<String> values = redisTemplate.opsForValue().multiGet(keys);        List<String> processingIds = new ArrayList<>();        for (int i = 0; i < taskIds.size(); i++) {            if (values != null && values.get(i) != null) {                processingIds.add(taskIds.get(i));            }        }        return processingIds;    }    private void doProcess(String taskId) {        // 具体业务逻辑...    }}

1.5 进阶:带进度的批量状态

如果前端需要展示"已处理 3/10"这样的进度,可以用 Redis Hash:

/** * 批量任务进度跟踪. */@Servicepublic class BatchProgressService {    private final StringRedisTemplate redisTemplate;    private static final String PROGRESS_KEY_PREFIX = "batch:progress:";    public BatchProgressService(StringRedisTemplate redisTemplate) {        this.redisTemplate = redisTemplate;    }    /** 初始化批次进度. */    public String initBatch(List<String> taskIds) {        String batchId = UUID.randomUUID().toString();        String progressKey = PROGRESS_KEY_PREFIX + batchId;        // Hash 结构:field=taskId, value=状态        Map<String, String> statusMap = taskIds.stream()            .collect(Collectors.toMap(id -> id, id -> "PENDING"));        redisTemplate.opsForHash().putAll(progressKey, statusMap);        redisTemplate.expire(progressKey, 1, TimeUnit.HOURS);        return batchId;    }    /** 更新单个任务状态. */    public void updateStatus(String batchId, String taskId, String status) {        String progressKey = PROGRESS_KEY_PREFIX + batchId;        redisTemplate.opsForHash().put(progressKey, taskId, status);    }    /** 查询批次进度. */    public Map<String, String> getProgress(String batchId) {        String progressKey = PROGRESS_KEY_PREFIX + batchId;        Map<Object, Object> entries = redisTemplate.opsForHash().entries(progressKey);        return entries.entrySet().stream()            .collect(Collectors.toMap(                e -> e.getKey().toString(),                e -> e.getValue().toString()            ));    }}

1.6 关键设计考量

考量点说明
过期时间必须设置,防止应用崩溃后 key 永久残留阻塞后续操作
幂等性加锁失败时不报错而是跳过,配合重试机制实现最终一致
key 设计业务前缀:模块:唯一标识,如 batch:delivery:SO202401010001
清除时机成功/失败都删除(允许重试)或成功后保留短时间(防重复成功回调)
集群环境确保所有实例连同一个 Redis 集群,key 可见性一致

二、错误信息暂存

2.1 解决什么问题

自动发货场景下:

  • 发货是异步触发的,没有前端页面等待结果
  • 失败信息需要暂存,由定时任务定期捞取并推送告警
  • 错误信息时效性短(当天处理即可),不需要持久化到数据库
  • 可能短时间内产生大量错误,需要高性能写入

Redis List 结构完美匹配:左进右出、支持批量读取、设置过期自动清理。

2.2 核心设计思路

自动发货失败    ↓格式化错误信息(单号 + 客户码 + requestId + 失败原因)    ↓RPUSH 写入 Redis List    ↓设置 key 1天过期(EXPIRE)    ↓定时任务每分钟 LPOP/LRANGE 取出 → 推送钉钉/企微

2.3 涉及的 Redis 数据结构和命令

操作Redis 命令说明
追加错误RPUSH key messageList 尾部追加,O(1)
批量读取LRANGE key 0 -1取所有消息
逐条消费LPOP key从头部弹出一条
批量消费LPOP key count (Redis 6.2+)弹出 N 条
设置过期EXPIRE key seconds自动清理过期数据
查看队列长度LLEN key监控积压量

2.4 代码示例(通用)

import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.stereotype.Service;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.List;import java.util.concurrent.TimeUnit;/** * 异步任务错误信息暂存服务. * 用于暂存自动任务的失败信息,供定时任务消费后推送告警. */@Servicepublic class ErrorMessageBufferService {    private final StringRedisTemplate redisTemplate;    // Redis key    private static final String ERROR_BUFFER_KEY = "error:buffer:auto-task";    // 过期时间(防止无人消费时无限增长)    private static final long EXPIRE_HOURS = 24;    // 单次消费最大条数    private static final int BATCH_SIZE = 50;    public ErrorMessageBufferService(StringRedisTemplate redisTemplate) {        this.redisTemplate = redisTemplate;    }    /**     * 写入错误信息.     *     * @param taskId    任务标识     * @param errorMsg  错误原因     * @param context   上下文信息(如 requestId)     */    public void recordError(String taskId, String errorMsg, String context) {        String timestamp = LocalDateTime.now()            .format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));        // 格式化消息内容        String message = String.format("[%s] 任务:%s, 上下文:%s, 失败原因:%s",            timestamp, taskId, context, errorMsg);        try {            // 追加到 List 尾部            redisTemplate.opsForList().rightPush(ERROR_BUFFER_KEY, message);            // 刷新过期时间(每次写入都续期,保证最后一条写入后24小时过期)            redisTemplate.expire(ERROR_BUFFER_KEY, EXPIRE_HOURS, TimeUnit.HOURS);        } catch (Exception e) {            // Redis 操作失败不影响主流程,仅记录日志            log.warn("错误信息写入Redis失败, taskId={}", taskId, e);        }    }    /**     * 批量消费错误信息(定时任务调用).     *     * @return 本次消费的错误消息列表     */    public List<String> consumeErrors() {        // 先查长度,取 min(length, BATCH_SIZE) 条        Long length = redisTemplate.opsForList().size(ERROR_BUFFER_KEY);        if (length == null || length == 0) {            return List.of();        }        int count = (int) Math.min(length, BATCH_SIZE);        // LRANGE 读取 + LTRIM 删除(两步操作,实际可用 Lua 保证原子性)        List<String> messages = redisTemplate.opsForList().range(ERROR_BUFFER_KEY, 0, count - 1);        redisTemplate.opsForList().trim(ERROR_BUFFER_KEY, count, -1);        return messages;    }    /**     * 查看当前积压的错误数量(监控用).     */    public long getPendingCount() {        Long size = redisTemplate.opsForList().size(ERROR_BUFFER_KEY);        return size != null ? size : 0;    }}

2.5 定时任务消费 + 推送告警

import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import java.util.List;/** * 定时消费错误缓存并推送告警. */@Componentpublic class ErrorAlertJob {    private final ErrorMessageBufferService errorBufferService;    private final AlertService alertService;    public ErrorAlertJob(ErrorMessageBufferService errorBufferService,                         AlertService alertService) {        this.errorBufferService = errorBufferService;        this.alertService = alertService;    }    /**     * 每分钟执行一次,消费错误消息并批量推送.     */    @Scheduled(fixedDelay = 60000)    public void consumeAndAlert() {        List<String> errors = errorBufferService.consumeErrors();        if (errors.isEmpty()) {            return;        }        // 拼接为一条告警消息        StringBuilder content = new StringBuilder();        content.append("⚠️ 自动任务失败告警(").append(errors.size()).append("条)nn");        for (String error : errors) {            content.append("• ").append(error).append("n");        }        // 推送到钉钉/企微/飞书        alertService.sendToWebhook(content.toString());    }}

2.6 原子消费的 Lua 脚本方案

LRANGE + LTRIM 两步操作在并发消费时可能丢数据或重复消费,用 Lua 脚本保证原子性:

/** * 原子性批量弹出 List 元素. */public List<String> atomicBatchPop(String key, int count) {    String luaScript =        "local result = redis.call('lrange', KEYS[1], 0, ARGV[1] - 1) " +        "if #result > 0 then " +        "  redis.call('ltrim', KEYS[1], #result, -1) " +        "end " +        "return result";    DefaultRedisScript<List> script = new DefaultRedisScript<>(luaScript, List.class);    List<String> result = redisTemplate.execute(        script, Collections.singletonList(key), String.valueOf(count));    return result != null ? result : List.of();}

2.7 使用模板(配置化消息格式)

项目中通过数据库/Redis 存储消息模板,运行时动态填充参数:

/** * 模板化错误消息. */public class ErrorMessageTemplate {    /**     * 根据模板和参数生成最终消息.     *     * @param template 模板,如 "任务{1}失败,客户{2},原因:{3}"     * @param args     参数数组     * @return 填充后的消息     */    public static String format(String template, String[] args) {        String result = template;        for (int i = 0; i < args.length; i++) {            result = result.replace("{" + (i + 1) + "}", args[i]);        }        return result;    }}// 使用String template = configService.getTemplate("auto-delivery-error");String message = ErrorMessageTemplate.format(template, new String[]{    orderCode, sellerCode, requestId, exception.getMessage()});errorBufferService.recordError(orderCode, message, requestId);

三、两种场景的对比

维度批量状态缓存错误信息暂存
数据结构String(每个任务一个 key)List(所有错误共用一个 key)
生命周期任务处理完即删除定时消费后删除,或自然过期
并发模式写少读多(前端轮询)写多读少(定时消费)
一致性要求强(标记必须准确)弱(少量丢失可接受)
失败影响Redis 写失败可能导致重复处理Redis 写失败仅丢失告警,不影响主流程
典型 key 模式batch:delivery:{orderCode}error:buffer:auto-delivery

四、注意事项汇总

问题解决方案
Redis 宕机时标记丢失状态缓存:降级为数据库 SELECT FOR UPDATE;错误暂存:降级为本地日志
List 无限增长设置 EXPIRE + 消费任务监控积压量告警
并发消费重复单消费者模式,或用 Lua 原子弹出
消息格式变更模板配置化,修改模板不需要改代码和重启
跨机房同步错误暂存场景容忍延迟,可用异步复制;状态缓存需要同机房 Redis

热门栏目