当前位置:网站首页>Project - redis message queue + worker thread fetches user operation logs and stores them (2)

Project - redis message queue + worker thread fetches user operation logs and stores them (2)

2022-06-11 02:24:00 Do My Love

1. Save the operation to

1. Configuration class

@ConditionalOnProperty(prefix = “spring.redis”, value = { “host”, “password” }) Annotation means only application.properties Configuration of the spring.redis.host perhaps spring.redis.password Configuration class will take effect .

@Configuration
public class RestTemplateConfig {
    

    @Bean(name = "logRedisTemplate")
    @ConditionalOnProperty(prefix = "spring.redis", value = {
     "host", "password" })
    public RedisTemplate<String, Object> redisTempalte(RedisConnectionFactory connectionFactory) {
    
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new StringRedisSerializer());
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(new StringRedisSerializer());
        template.setConnectionFactory(connectionFactory);
        template.afterPropertiesSet();
        return template;
    }
}

2. Use thread pool to create thread fetch redis Operation log in

@Component
public class LogProcessUtil {
    

    private static RedisTemplate<String, Object> redisTemplate;
    private static LogHandlerDelegate oprLogHandler;

    //  Using thread pool to create 2 Threads 
    private static final ThreadPoolExecutor THREADPOOL 
        = new ThreadPoolExecutor(
            2, 2, 0L,
            TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
            new NamedThreadFactory(LogProcessUtil.class.getSimpleName() + "-", true),
            new ThreadPoolExecutor.DiscardOldestPolicy()
    );

    //  Dependency injection 
    @Autowired
    public LogProcessUtil(
            @Qualifier("logRedisTemplate") RedisTemplate<String, Object> redisTemplate, 
            LogHandlerDelegate oprLogHandler
    ) {
    
        LogProcessUtil.redisTemplate = redisTemplate;
        LogProcessUtil.oprLogHandler = oprLogHandler;
    }

    /** *  Listen message queue  */
    @PostConstruct
    public static void listenMessage() {
    
        startOprLogHandler();
    }

    /** *  Start thread listening opr_log_channel Message of message queue  */
    public static void startOprLogHandler() {
    
        THREADPOOL.execute(() -> {
    
            List<String> messages;
            while (true) {
    
                try {
    
                    messages = rightLeftMessages(Constants.RedisChannel.OPR_LOG_CHANEL, 1000);
                    if (!CollectionUtils.isEmpty(messages)) {
    
                        //  The processing operation log is saved to the database 
                        oprLogHandler.handleMessage(messages);
                    }
                } catch (Exception e) {
    
                    e.printStackTrace();
                }
            }
        });
    }

    /** *  Bulk read redis Data in the  * */
    public static List<String> rightLeftMessages(String key, long length) {
    
        List<String> result = new ArrayList<>();
        String value;
        try {
    
            for (int i = 0; i < length; i++) {
    
                value = (String) redisTemplate.opsForList().leftPop(key, 5, TimeUnit.SECONDS);
                if (StringUtils.isEmpty(value)) {
    
                    break;
                }
                result.add(value);
            }
            return result;
        } catch (Exception e) {
    
            e.getMessage();
        }
        return result;
    }

}

3. Save log to database - Service layer

public interface LogHandlerDelegate {
    
    /** *  Log data processing  * * @param messages */
    void handleMessage(List<String> messages);
}
/** *  The operation log is written to the delegate class of the database  * */
@Slf4j
@Component(value = "oprLogHandler")
public class OprLogHandlerDelegate implements LogHandlerDelegate {
    

    private static ObjectMapper objectMapper = new ObjectMapper();
    private static final String DEFAULT_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";

    static {
    
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        objectMapper.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE);
        JavaTimeModule javaTimeModule = new JavaTimeModule();
        javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern(DEFAULT_TIME_FORMAT)));
        javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern(DEFAULT_TIME_FORMAT)));
        objectMapper.registerModule(javaTimeModule);
    }

    @Autowired
    private IOprLogDao oprLogDao;

    /** *  Delegate function , from redis Listen for container calls  * * @param messages  The operation log (json Format ) */
    @Override
    public void handleMessage(List<String> messages) {
    
        List<OprLog> oprLogList = new ArrayList<>();
        messages.stream().forEach(message -> {
    
            oprLogList.add(jsonToClassInstance(message, OprLog.class));
        });
        saveToDatabase(oprLogList);
    }

    private <T> T jsonToClassInstance(String jsonBody, Class<T> clazz) {
    
        try {
    
            return objectMapper.readValue(jsonBody, clazz);
        } catch (JsonProcessingException e) {
    
            log.error("operation log json type cast to {} error: {}", clazz, e.getMessage());
            return null;
        }
    }

    private void saveToDatabase(List<OprLog> oprLogs) {
    
        try {
    
            oprLogDao.batchInsertOprLog(oprLogs);
        } catch (Exception e) {
    
            log.error("Insert failed");
        }
    }
}

4. Save log to database - Dao layer

@Mapper
public interface IOprLogDao {
    

    /** *  Batch insert operation log  * * @param list */
    void batchInsertOprLog(List<OprLog> list);

}
<insert id="batchInsertOprLog" parameterType="entity.OprLog">
    INSERT INTO `t_opr_log`
    (
    username,
    userIp,
    target,
    action,
    actionTime,
    result
    )
    VALUES
    <foreach collection ="list" item="oprlog" separator =",">
        (
        #{oprlog.username},
        #{oprlog.userIp},
        #{oprlog.target},
        #{oprlog.action},
        #{oprlog.actionTime},
        #{oprlog.result}
        )
    </foreach >
</insert>

5. Log entity class

CREATE TABLE  IF NOT EXISTS `t_opr_log`  (
  `id` int UNSIGNED NOT NULL AUTO_INCREMENT COMMENT ' operation id',
  `username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT ' Operation user ',
  `userIp` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT ' Operate where the user is ip, If it is the operation of background scheduled tasks or other operations that do not ip The operation of , by null',
  `target` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT ' The manipulated object ',
  `action` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT ' Operation action ',
  `actionTime` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT ' Operating time ',
  `result` tinyint(1) NOT NULL COMMENT ' result , Success or failure ,1 It means success ,0 It means failure ',

  PRIMARY KEY (`id`) USING BTREE
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class OprLog implements Serializable {
    

    private static final long serialVersionUID = 1L;

    /** *  operation id */
    @JsonIgnore
    private Integer id;

    /** *  Operation user  */
    private String username;

    /** *  Operate where the user is ip, If it is the operation of background scheduled tasks or other operations that do not ip The operation of , by null */
    private String userIp;

    /** *  Operational objectives  */
    private String target;

    /** *  Operation behavior  */
    private String action;

    /** *  Operating time  */
    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
    private LocalDateTime actionTime;

    /** *  result , Success or failure ,1 It means success ,0 It means failure  */
    private Boolean result;
}

6. Start class

@SpringBootApplication
//  Asynchronous call 
@EnableAsync
public class PlatformApp {
    
    public static void main(String[] args) {
    
        SpringApplication.run(PlatformApp.class, args);
    }
}

2. Query operation log

1. Controller layer

@RestController
@ResponseResult
@RequestMapping("/oprLog")
public class OprLogController {
    

    @Autowired
    private IOperationLogService operationLogService;

    @GetMapping("/search")
    public PageData<OprLog> searchOperationLog(@Valid OprLogQo oprLogQo) {
    
        return operationLogService.searchOperationLog(oprLogQo);
    }
}

2. Service layer

public interface IOperationLogService {
    

    /** *  Operation log search  * * @param oprLogQo  Search field parameters  * @return */
    PageData<OprLog> searchOperationLog(OprLogQo oprLogQo);
}
@Service
public class OperationLogServiceImpl implements IOperationLogService {
    

    @Autowired
    private IOprLogDao oprLogDao;

    private static final String ORDER_BY = "actionTime desc";

    @Override
    public PageData<OprLog> searchOperationLog(OprLogQo oprLogQo) {
    
        //  because endTime Only accurate to days , And sql The query condition is left closed and right open , therefore endTime Need an extra day 
        oprLogQo.plusOneDayOnEndTime();

        //  Paging plug-ins 
        PageHelper.startPage(oprLogQo.getPageNum(), oprLogQo.getPageSize(), ORDER_BY);
        PageInfo<OprLog> pageInfo = new PageInfo<>(oprLogDao.searchOperationLog(oprLogQo));

        return assembleOprLogPageData(pageInfo);
    }

    private PageData<OprLog> assembleOprLogPageData(PageInfo<OprLog> pageInfo) {
    
        PageData<OprLog> pageData = new PageData<>();

        pageData.setData(pageInfo.getList());
        pageData.setPageNum(pageInfo.getPageNum());
        pageData.setPageSize(pageInfo.getPageSize());
        pageData.setTotalCount(pageInfo.getTotal());

        return pageData;
    }
}

3. Dao layer

@Mapper
public interface IOprLogDao {
    

    /** *  Search operation log  */
    List<OprLog> searchOperationLog(OprLogQo oprLogQo);
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="dao.IOprLogDao">
    <sql id="table_opr_log">
        t_opr_log
    </sql>

    <resultMap id="BaseResultMap" type="entity.OprLog">
        <id column="id" property="id"/>
        <result column="username" property="username"/>
        <result column="userIp" property="userIp"/>
        <result column="target" property="target"/>
        <result column="action" property="action"/>
        <result column="actionTime" property="actionTime"/>
        <result column="result" property="result"/>
    </resultMap>

    <sql id="Base_Column_List">
        id, username, userIp, target, action, actionTime, result
    </sql>

    <select id="searchOperationLog" resultMap="BaseResultMap" parameterType="qo.OprLogQo">
        select
        <include refid="Base_Column_List"/>
        from
        <include refid="table_opr_log"/>
        where 1=1
        <if test="startTime != null">
            and actionTime &gt;= #{startTime}
        </if>
        <if test="endTime != null">
            and actionTime &lt; #{endTime}
        </if>
        <if test="result != null">
            and result=#{result}
        </if>
        <if test="userIp != null and userIp != ''">
            and userIp=#{userIp}
        </if>
        <if test="username != null and username != ''">
            and username like CONCAT('%', #{username}, '%')
        </if>
        <if test="target != null and target != ''">
            and target like CONCAT('%', #{target}, '%')
        </if>
        <if test="action != null and action != ''">
            and action like CONCAT('%', #{action}, '%')
        </if>
    </select>
</mapper>
原网站

版权声明
本文为[Do My Love]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/162/202206110123100253.html