当前位置:网站首页>Project - redis message queue + worker thread fetches user operation logs and stores them (2)
Project - redis message queue + worker thread fetches user operation logs and stores them (2)
2022-06-11 02:24:00 【Do My Love】
List of articles
1. Save the operation to
1. Configuration class
@ConditionalOnProperty(prefix = “spring.redis”, value = { “host”, “password” }) Annotation means only application.properties Configuration of the spring.redis.host perhaps spring.redis.password Configuration class will take effect .
@Configuration
public class RestTemplateConfig {
@Bean(name = "logRedisTemplate")
@ConditionalOnProperty(prefix = "spring.redis", value = {
"host", "password" })
public RedisTemplate<String, Object> redisTempalte(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new StringRedisSerializer());
template.setConnectionFactory(connectionFactory);
template.afterPropertiesSet();
return template;
}
}
2. Use thread pool to create thread fetch redis Operation log in
@Component
public class LogProcessUtil {
private static RedisTemplate<String, Object> redisTemplate;
private static LogHandlerDelegate oprLogHandler;
// Using thread pool to create 2 Threads
private static final ThreadPoolExecutor THREADPOOL
= new ThreadPoolExecutor(
2, 2, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(LogProcessUtil.class.getSimpleName() + "-", true),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
// Dependency injection
@Autowired
public LogProcessUtil(
@Qualifier("logRedisTemplate") RedisTemplate<String, Object> redisTemplate,
LogHandlerDelegate oprLogHandler
) {
LogProcessUtil.redisTemplate = redisTemplate;
LogProcessUtil.oprLogHandler = oprLogHandler;
}
/** * Listen message queue */
@PostConstruct
public static void listenMessage() {
startOprLogHandler();
}
/** * Start thread listening opr_log_channel Message of message queue */
public static void startOprLogHandler() {
THREADPOOL.execute(() -> {
List<String> messages;
while (true) {
try {
messages = rightLeftMessages(Constants.RedisChannel.OPR_LOG_CHANEL, 1000);
if (!CollectionUtils.isEmpty(messages)) {
// The processing operation log is saved to the database
oprLogHandler.handleMessage(messages);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/** * Bulk read redis Data in the * */
public static List<String> rightLeftMessages(String key, long length) {
List<String> result = new ArrayList<>();
String value;
try {
for (int i = 0; i < length; i++) {
value = (String) redisTemplate.opsForList().leftPop(key, 5, TimeUnit.SECONDS);
if (StringUtils.isEmpty(value)) {
break;
}
result.add(value);
}
return result;
} catch (Exception e) {
e.getMessage();
}
return result;
}
}
3. Save log to database - Service layer
public interface LogHandlerDelegate {
/** * Log data processing * * @param messages */
void handleMessage(List<String> messages);
}
/** * The operation log is written to the delegate class of the database * */
@Slf4j
@Component(value = "oprLogHandler")
public class OprLogHandlerDelegate implements LogHandlerDelegate {
private static ObjectMapper objectMapper = new ObjectMapper();
private static final String DEFAULT_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
static {
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.disable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE);
JavaTimeModule javaTimeModule = new JavaTimeModule();
javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern(DEFAULT_TIME_FORMAT)));
javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern(DEFAULT_TIME_FORMAT)));
objectMapper.registerModule(javaTimeModule);
}
@Autowired
private IOprLogDao oprLogDao;
/** * Delegate function , from redis Listen for container calls * * @param messages The operation log (json Format ) */
@Override
public void handleMessage(List<String> messages) {
List<OprLog> oprLogList = new ArrayList<>();
messages.stream().forEach(message -> {
oprLogList.add(jsonToClassInstance(message, OprLog.class));
});
saveToDatabase(oprLogList);
}
private <T> T jsonToClassInstance(String jsonBody, Class<T> clazz) {
try {
return objectMapper.readValue(jsonBody, clazz);
} catch (JsonProcessingException e) {
log.error("operation log json type cast to {} error: {}", clazz, e.getMessage());
return null;
}
}
private void saveToDatabase(List<OprLog> oprLogs) {
try {
oprLogDao.batchInsertOprLog(oprLogs);
} catch (Exception e) {
log.error("Insert failed");
}
}
}
4. Save log to database - Dao layer
@Mapper
public interface IOprLogDao {
/** * Batch insert operation log * * @param list */
void batchInsertOprLog(List<OprLog> list);
}
<insert id="batchInsertOprLog" parameterType="entity.OprLog">
INSERT INTO `t_opr_log`
(
username,
userIp,
target,
action,
actionTime,
result
)
VALUES
<foreach collection ="list" item="oprlog" separator =",">
(
#{oprlog.username},
#{oprlog.userIp},
#{oprlog.target},
#{oprlog.action},
#{oprlog.actionTime},
#{oprlog.result}
)
</foreach >
</insert>
5. Log entity class
CREATE TABLE IF NOT EXISTS `t_opr_log` (
`id` int UNSIGNED NOT NULL AUTO_INCREMENT COMMENT ' operation id',
`username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT ' Operation user ',
`userIp` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT ' Operate where the user is ip, If it is the operation of background scheduled tasks or other operations that do not ip The operation of , by null',
`target` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT ' The manipulated object ',
`action` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT ' Operation action ',
`actionTime` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT ' Operating time ',
`result` tinyint(1) NOT NULL COMMENT ' result , Success or failure ,1 It means success ,0 It means failure ',
PRIMARY KEY (`id`) USING BTREE
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class OprLog implements Serializable {
private static final long serialVersionUID = 1L;
/** * operation id */
@JsonIgnore
private Integer id;
/** * Operation user */
private String username;
/** * Operate where the user is ip, If it is the operation of background scheduled tasks or other operations that do not ip The operation of , by null */
private String userIp;
/** * Operational objectives */
private String target;
/** * Operation behavior */
private String action;
/** * Operating time */
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private LocalDateTime actionTime;
/** * result , Success or failure ,1 It means success ,0 It means failure */
private Boolean result;
}
6. Start class
@SpringBootApplication
// Asynchronous call
@EnableAsync
public class PlatformApp {
public static void main(String[] args) {
SpringApplication.run(PlatformApp.class, args);
}
}
2. Query operation log
1. Controller layer
@RestController
@ResponseResult
@RequestMapping("/oprLog")
public class OprLogController {
@Autowired
private IOperationLogService operationLogService;
@GetMapping("/search")
public PageData<OprLog> searchOperationLog(@Valid OprLogQo oprLogQo) {
return operationLogService.searchOperationLog(oprLogQo);
}
}
2. Service layer
public interface IOperationLogService {
/** * Operation log search * * @param oprLogQo Search field parameters * @return */
PageData<OprLog> searchOperationLog(OprLogQo oprLogQo);
}
@Service
public class OperationLogServiceImpl implements IOperationLogService {
@Autowired
private IOprLogDao oprLogDao;
private static final String ORDER_BY = "actionTime desc";
@Override
public PageData<OprLog> searchOperationLog(OprLogQo oprLogQo) {
// because endTime Only accurate to days , And sql The query condition is left closed and right open , therefore endTime Need an extra day
oprLogQo.plusOneDayOnEndTime();
// Paging plug-ins
PageHelper.startPage(oprLogQo.getPageNum(), oprLogQo.getPageSize(), ORDER_BY);
PageInfo<OprLog> pageInfo = new PageInfo<>(oprLogDao.searchOperationLog(oprLogQo));
return assembleOprLogPageData(pageInfo);
}
private PageData<OprLog> assembleOprLogPageData(PageInfo<OprLog> pageInfo) {
PageData<OprLog> pageData = new PageData<>();
pageData.setData(pageInfo.getList());
pageData.setPageNum(pageInfo.getPageNum());
pageData.setPageSize(pageInfo.getPageSize());
pageData.setTotalCount(pageInfo.getTotal());
return pageData;
}
}
3. Dao layer
@Mapper
public interface IOprLogDao {
/** * Search operation log */
List<OprLog> searchOperationLog(OprLogQo oprLogQo);
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="dao.IOprLogDao">
<sql id="table_opr_log">
t_opr_log
</sql>
<resultMap id="BaseResultMap" type="entity.OprLog">
<id column="id" property="id"/>
<result column="username" property="username"/>
<result column="userIp" property="userIp"/>
<result column="target" property="target"/>
<result column="action" property="action"/>
<result column="actionTime" property="actionTime"/>
<result column="result" property="result"/>
</resultMap>
<sql id="Base_Column_List">
id, username, userIp, target, action, actionTime, result
</sql>
<select id="searchOperationLog" resultMap="BaseResultMap" parameterType="qo.OprLogQo">
select
<include refid="Base_Column_List"/>
from
<include refid="table_opr_log"/>
where 1=1
<if test="startTime != null">
and actionTime >= #{startTime}
</if>
<if test="endTime != null">
and actionTime < #{endTime}
</if>
<if test="result != null">
and result=#{result}
</if>
<if test="userIp != null and userIp != ''">
and userIp=#{userIp}
</if>
<if test="username != null and username != ''">
and username like CONCAT('%', #{username}, '%')
</if>
<if test="target != null and target != ''">
and target like CONCAT('%', #{target}, '%')
</if>
<if test="action != null and action != ''">
and action like CONCAT('%', #{action}, '%')
</if>
</select>
</mapper>
边栏推荐
- 可扩/减容线程池C语言原理讲解及代码实现
- [3.delphi common components] 5 List class component
- Implementing queues with stacks
- 记录一下我的刷题实录
- Understand the role of before and after Clearfixafter clear floating
- ABAP CDs realizes multi line field content splicing
- 1031. maximum sum of two non overlapping subarrays
- C DataGrid binding data
- NFT insider 61:animoca brands holds US $1.5 billion of encrypted assets in 340 investments
- Our understanding of the industrial Internet may be more out of the existing logic
猜你喜欢

金属有机框架MOF-Al(DIBA),MOF-Zr(DIBA),MOF-Fe(DIBA)包载姜黄素/羧苄西林/MTX甲氨蝶呤/紫杉醇PTX/阿霉素DOX/顺铂CDDP/CPT喜树碱等药物

NFT Insider #61:Animoca Brands 在 340 项投资中持有 15 亿美元的加密资产

Using an old mobile phone to build a server and achieve intranet penetration does not require root (I have personally tested the simplest one many times)
![[3.delphi common components] 8 dialog box](/img/4b/c06f8789cee58705a0b77f3ca1eee6.jpg)
[3.delphi common components] 8 dialog box

可扩/减容线程池C语言原理讲解及代码实现

JS basic part hand exercises

腾讯面试官曰Mysql架构的内部模块索引原理及性能优化思路谁会?

Binary tree sequence traversal

Understand the role of before and after Clearfixafter clear floating

环糊精金属有机骨架(β-CD-MOF)装载二巯丁二酸/大黄素/槲皮素/三氯蔗糖/二氟尼柳/奥美拉唑(OME)
随机推荐
InfoQ geek media's 15th anniversary solicitation | in depth analysis of container runtime Technology
SAP smartforms text content manual wrap output
Optimized dispatching (thermal power, wind energy and energy storage) [matlab code implementation]
CRS-5017
English subtitle video translated into Chinese subtitles
cannot import name ‘dtensor‘ from ‘tensorflow. compat. v2.experimental‘
SAP smartforms page feed printing automatic judgment
The largest kth element in the array
Shader of double sided material
浅析直播间海量聊天消息的架构设计难点
Tencent test development post interview programming questions
Web watermark
Colab reported an error: importerror: cannot import name '_ check_ savefig_ extra_ args‘ from ‘matplotlib. backend_ bases‘
JS basic part hand exercises
软件测试是否需要掌握编程能力
In the past 10 years, from zero foundation testing to test architect, he has made himself successful
为什么有的程序员能力一般却能拿到好offer?
Li Kou brushing questions - hash table
Record the actual record of my question brushing
1031. 两个非重叠子数组的最大和