Redis+Hbase+RocketMQ 实际使用问题案例讲解
目录
需求分析及确定方案实现部分代码踩坑总结需求
将Hbase数据,解析后推送到RocketMQ。redis使用list数据类型,存储了需要推送的数据的RowKey及表名。简单画个流程图就是:
分析及确定方案
Redis
(相关资料图)
明确list中元素结构{"rowkey":rowkey,"table":table}
解析出rowkey;一次取多个元素加快效率;取了之后放入重试队列,并删除原来的元素;处理数据永远是重试队列里的,成功之后删除,失败就加上重试次数并重新放回;明确从list中取值所使用的redis命令;范围获取LRANGE
;范围删除(留下指定范围的数据)LTRIM
;判断list长度LLEN
;加入listRPUSH
;删除LREM
等等;从Hbase获取数据失败和发送到mq失败都令重试次数加一;每次碰到重试次数不为0的数据都休眠1s;设置最大重试次数,达到限制后丢弃;考虑客户redis部署方式,单机、主从、集群、哨兵等;选择合适的客户端,Jedis、Redisson、Lettuce等;编写不同的操作代码,也可以利用配置文件、环境变量、工厂模式等适配各种部署模式;
Hbase
基本理论知识学习(原来没接触过),rowkey是没条数据的主键,限定符是字段名,列族是多个限定名的集合等;当时看这个觉得不错https://www.jb51.net/article/230731.htm因为是不停读取数据、链接、Table不用close,可以缓存起来,没必要每次都创建;确定批量获取数据方式为批量Get
,没用scan
;了解解析方式,一些网上的解析试了之后会乱码,这边用的是它自带的CellUtil.clone
相关方法;考虑所有都没数据时休眠10s;
RocketMQ
有现成的发送代码,公司封装好的;调整发送的速度、太快了服务端会吃不消(获取Hbase数据速度太快了,最开始没限制一会儿就入了百万数据),设置超时时间(默认3s);调整服务端的内存、线程数等参数;实现
配置
#server configuration server.port=8896 #log config logging.file.path=./logs #redis-standalone redis.standalone.host= redis.standalone.port=6379 redis.standalone.password= redis.standalone.enable=true #redis-cluster redis.cluster.nodes= redis.cluster.password= redis.cluster.timeout=30000 redis.cluster.enable=false # Zookeeper 集群地址,逗号分隔 hbase.zookeeper.quorum= # Zookeeper 端口 hbase.zookeeper.property.clientPort=2181 # 消息目的rocketmq地址 rocketmq.server.host= # 发送消息间隔时间,防止发送过快mq受不了 rocketmq.send.interval.millisec=10 # 每次从redis读取数据量限制。 data.access.redisDataSize=100 # 失败数据重试次数,超过的直接丢弃 data.access.retryNum=10 # 需要接入的表,需要发送到rocketmq的topic和在redis中的key的映射。xxx.xxx.xxx[topic]=redisKey data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back
部分代码
获取配置,其余的直接@Value("${}")
:
@Setter @Getter @Configuration @ConfigurationProperties(prefix = "data.access") public class AccessRedisMqConfig { /** * key:topic; value:redis的key */ private MaptopicKeyMap = new HashMap<>(); /** * 一次从redis中读取数据量限制 */ private long redisDataSize = 50; /** * 失败数据重试次数 */ private int retryNum = 10; }
开启接入:
@Component public class AdapterRunner implements ApplicationRunner { @Resource private DataAccessService dataAccessService; @Override public void run(ApplicationArguments args) { System.out.println("项目已启动,开始接入数据到RocketMQ……"); dataAccessService.accessData2Mq(); } }
其他代码其实也在分析里了。
踩坑
mq发送问题
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525) at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523) at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610) at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167) at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572) at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54) at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Wo
上面分析也说了,注意发送速度,有多少资源就接入多快。还有注意相关三个端口是否开放。
总结
程序很简单,主要涉及方案的是,获取redis的list数据时,是考虑效率,及加入重试策略,保证数据不丢失等。
X 关闭
X 关闭
- 15G资费不大降!三大运营商谁提供的5G网速最快?中国信通院给出答案
- 2联想拯救者Y70发布最新预告:售价2970元起 迄今最便宜的骁龙8+旗舰
- 3亚马逊开始大规模推广掌纹支付技术 顾客可使用“挥手付”结账
- 4现代和起亚上半年出口20万辆新能源汽车同比增长30.6%
- 5如何让居民5分钟使用到各种设施?沙特“线性城市”来了
- 6AMD实现连续8个季度的增长 季度营收首次突破60亿美元利润更是翻倍
- 7转转集团发布2022年二季度手机行情报告:二手市场“飘香”
- 8充电宝100Wh等于多少毫安?铁路旅客禁止、限制携带和托运物品目录
- 9好消息!京东与腾讯续签三年战略合作协议 加强技术创新与供应链服务
- 10名创优品拟通过香港IPO全球发售4100万股 全球发售所得款项有什么用处?