重庆微信网站开发公司,互联网下的网络营销,哈尔滨建筑工程招聘信息,广告制作行业发展前景一、上线结论
实现了将用户线上实时浏览的沉浸式视频信息#xff0c;保存在Redis中这样一个功能。为实现沉浸式视频离线推荐到实时推荐提供了强有力的支持。目前只是应用在沉浸式场景#xff0c;后续也能扩展到其他所有场景。用于两个场景#xff1a;#xff08;1#xf…一、上线结论
实现了将用户线上实时浏览的沉浸式视频信息保存在Redis中这样一个功能。为实现沉浸式视频离线推荐到实时推荐提供了强有力的支持。目前只是应用在沉浸式场景后续也能扩展到其他所有场景。用于两个场景1根据用户近期观看物料匹配相似物料2过滤用户近期观看物料
二、实现效果展示
用户在线上刷一个视频redis就会将用户的视频信息保存在用户历史浏览的队列中。 队列大小为100。具体保存的信息如下所示
一、Redis存储KEYkafka:user_short_video_streaming:_5c91e0cf0cf2f3d119f92774 二、Redis存储value:[{duration:4,resourceId:28808,appType:DOCTOR,actionCode:1006,resourceType:VIDEO}, {duration:9,resourceId:24262,appType:DOCTOR,actionCode:1006,resourceType:VIDEO}, {duration:5,resourceId:25330,appType:DOCTOR,actionCode:1006,resourceType:VIDEO}]
三、实现策略
采用Java语言实现先监听kafka然后解析kafka消息进行解码再解析从解析后的结果中提取user_id, resource_id和resource_type字段。连接Redis构造用户队列队列长度设置为100用户刷的视频个数将数据写入Redis队列大小为100超过100顺序pop
代码Githttp://gitlab.dzj.com/applied_algorithm/data_analysis/kafka_streaming_immersive.git
四、项目后续规划
扩展到Feed流搜索召回等全部的场景jar包后台运行方式改为CICD部署
五、附录
5.1 BUG分享
在实现的过程中遇到一个序列化问题就是写入的key和value乱码导致用Python查询的定义好的KEY的时候查询不到解决方案如下
自定义RedisTemplete进行重写, 用jackson进行序列化将这个类注册到Spring Boot中 折叠源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package com.dzj.kafka_streaming.Config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * Author : wangyongpeng * Date : 2022/12/16 14:34 * Description : 重写RedisTemplate 进行序列化 */ Configuration public class RedisConfig { Bean SuppressWarnings(all) public RedisTemplateString, Object redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplateString, Object template new RedisTemplate(); template.setConnectionFactory(redisConnectionFactory); // JSON序列化配置 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // String 的序列化 StringRedisSerializer stringRedisSerializer new StringRedisSerializer(); // key 采用String的序列化方式 template.setKeySerializer(stringRedisSerializer); // hash的key也采用String的序列化方式 template.setHashKeySerializer(stringRedisSerializer); // valuex序列化方式采用jackson template.setValueSerializer(jackson2JsonRedisSerializer); // hash的序列化也用jackson template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
5.2 项目核心代码 折叠源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 package com.dzj.kafka_streaming.listener; import com.dzj.kafka_streaming.dto.TagNameTypeInfo; import com.dzj.kafka_streaming.service.ContentTagRelationService; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import javax.annotation.Resource; import java.util.ArrayList; import java.util.Base64; import java.util.List; /** * immersive_streaming_ userId; 这是旧的key需要清除 */ Component public class MessageListener { Autowired private ContentTagRelationService relationService; Resource private RedisTemplateString, Object redisTemplate; private final String TOPIC_NAME event-trace-log; // KafkaListener(topics {TOPIC_NAME},groupId itmentuGroup) KafkaListener(topics {TOPIC_NAME}) public void listener(ConsumerRecordString,String record) { //获取消息 String message record.value(); //消息偏移量 long offset record.offset(); String redisKeyPrefix kafka:user_short_video_streaming:_; JSONObject dataJson parseJson(message); String eventCode dataJson.getString(eventCode); if (145001.equals(eventCode)){ // 测试环境------------------------------------------------------------------------------------------ // 目前只关注沉浸式中得数据 String resourceId dataJson.getJSONObject(eventBody).getString(resourceId); String resourceType dataJson.getJSONObject(eventBody).getString(resourceType); Integer duration dataJson.getJSONObject(eventBody).getInteger(duration); String actionCode dataJson.getJSONObject(eventBody).getString(actionCode); String userId dataJson.getJSONObject(eventBody).getString(userId); String appType dataJson.getJSONObject(eventBody).getString(appType); // System.out.println(________kafka msg: eventCode eventCode eventBody dataJson.getJSONObject(eventBody)); /** * 写入Redis * redis存储结构 key List(5),是一个定长为5右进左出的队列 * 首先查询该key的list长度如果长度超过5就先左边出队列一个再右边进一个否则右边进一个 */ String key redisKeyPrefix userId; // String key immersive_streaming_wyp0001; // 定义Redis队列写入的结构 JSONObject redisListItem new JSONObject(); redisListItem.put(resourceId,resourceId); redisListItem.put(resourceType,resourceType); redisListItem.put(duration,duration); redisListItem.put(actionCode,actionCode); redisListItem.put(appType,appType); String redisListItemString redisListItem.toJSONString(); if (redisTemplate.opsForList().size(key) 100){ Object leftPop redisTemplate.opsForList().leftPop(key); redisTemplate.opsForList().rightPush(key, redisListItemString); System.out.println([pop]redis key : redisKeyPrefix userId now contains: redisTemplate.opsForList().range(key,0, -1)); }else { if (!resourceId.isEmpty() !resourceType.isEmpty()){ redisTemplate.opsForList().rightPush(key, redisListItemString); Long size redisTemplate.opsForList().size(key); System.out.println(redis key : redisKeyPrefix userId pushed one: size redisListItemString); System.out.println(redis key : redisKeyPrefix userId now contains: redisTemplate.opsForList().range(key,0, -1)); } } } } /** * 解析json,解码功能 */ public JSONObject parseJson(String message) { JSONObject messageJson JSONObject.parseObject(message); String dataString messageJson.getString(data); // --------------------base64解码字符串-------------------- String data_string ; final Base64.Decoder decoder Base64.getDecoder(); try{ data_string new String(decoder.decode(dataString), UTF-8); }catch (Exception e){ System.out.println(【kafka parseJson ERROR】com.dzj.kafka_streaming.listener.MessageListener.parseJson e); } // string转换为json,只取eventCode 145001沉浸式的 JSONObject dataJson JSONObject.parseObject(data_string); return dataJson; } /** * 从数据库查询 * param resourceId * param resourceType * return */ public ListTagNameTypeInfo queryByIdAndType(String resourceId, String resourceType ){ ListTagNameTypeInfo tagNameTypeInfos new ArrayList(); try { tagNameTypeInfos relationService.queryTagNameTypeInfo(Long.valueOf(resourceId), resourceType); } catch (Exception e){ System.out.println(【ERROR】 resourceId resourceType 在数据库中查询不到.......); } return tagNameTypeInfos; } }