写在前面
这是我扩充我的项目的一个点,有点摸着石头过河的意思,可能很多思路也不够企业化,然后技术选型什么的也不够正确。
背景介绍
因为我的项目是一个IM聊天项目,所以前端发来消息带有Uid和联系人Id,这时候后端需要进行权限验证,判断是不是好友,不是好友禁止发消息,这种验证是频繁的,所以用户的联系人要放在缓存里。但是用户可能会频繁的添加删除好友,这时候就需要维护缓存和数据库的一致性。
我一开始是采用手动删除,延迟双删的思路,用户发第一条消息的时候就会出现明显的卡顿。所以其实应该在更新完联系人之后就把新的联系人信息放到缓存里。但是手工操作容易出错忽略在哪里没有删。
刚好看八股看到了这种思路,就是通过订阅binlog,根据消息队列的消息里找出哪个用户的联系人信息被修改了,就来更新对应用户的联系人信息的缓存。
所以就有了这篇博客。
MySQL开启binlog并且设定为RAW模式
1 2 3 4 5 6 7 8 9 10 11 12 mysql> show variables like'%log_bin%'; +---------------------------------+-----------------------------+ | Variable_name | Value | +---------------------------------+-----------------------------+ | log_bin | ON | | log_bin_basename | /var/lib/mysql/binlog | | log_bin_index | /var/lib/mysql/binlog.index | | log_bin_trust_function_creators | OFF | | log_bin_use_v1_row_events | OFF | | sql_log_bin | ON | +---------------------------------+-----------------------------+ 6 rows in set (0.03 sec)
这里我简单看了一下我的库,不知道为什么是自己开启的,但是还是准备去配置文件看一眼是不是配置了server_id
这里进去
加一下配置开启binlog就行了
令人烦躁时的我50块的京东云服务器的2G内存快要顶不住压力了:)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 [mysqld] log-bin = /var/log/mysql/mysql-bin.logserver-id = 1 binlog_format = ROWexpire_logs_days = 7 max_binlog_size = 100 M
Canal下载并配置
然后是下载Canal,技术选型方面,其实我能选择的不太多,主要就是Canal 和 Debezium。
我选择Canal的原因大抵如下:
轻量级:Canal专注于 MySQL 数据库的 CDC,架构相对简单,更加轻量化。
独立于 Kafka:不像 Debezium一样,最初专为Kafka设计。
易于部署:Canal 的部署相对简单,尤其是对单一数据库的监听、
下载然后一键tar-zxf之后进行一下简单的配置,在提供的样例里修改。
修改conf/example/instance.properties
mysql serverId
1 2 3 4 5 canal.instance.mysql.slaveId = 100 canal.instance.master.address = 127.0 .0.1 :3306 canal.instance.dbUsername = zhima canal.instance.dbPassword = ********
配置完之后可以自己先启动一下,就很简单,bash /bin/startup.sh
然后稍微修改一下数据库中的一行,之后看一下example输出的日志
1 2 3 4 5 6 7 8 9 10 11 12 2025-03-31 22:00:50.725 [destination = example , address = /127.0.0.1:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:example[com.alibaba.otter.canal.parse.exception.CanalParseException: java.io.IOException: connect /127.0.0.1:3306 failure Caused by: java.io.IOException: connect /127.0.0.1:3306 failure at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:85) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:104) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:89) at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$1.run(AbstractEventParser.java:171) at java.lang.Thread.run(Thread.java:750) Caused by: java.io.IOException: Error When doing Client Authentication:ErrorPacket [errorNumber=1698, fieldCount=-1, message=Access denied for user 'root'@'localhost', sqlState=28000, sqlStateMarker=#] at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:325) at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:81) ... 4 more ]
如果像这样一般是配置有问题没连上,修改一下就行,之后成功连上之后就可以自己写一个客户端来进行调用啦,下面这段代码用来测试就很合适。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 package com.karlyn.dogie;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry.*;import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;import java.util.List;public class SimpleCanalClientExample { public static void run () { String hostname = "*.*.*.*" ; int port = 11111 ; String destination = "example" ; String username = "" ; String password = "" ; CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress (hostname, port), destination, username, password ); System.out.println("连接创立成功" ); int batchSize = 1000 ; try { connector.connect(); connector.subscribe(".*\\..*" ); connector.rollback(); while (true ) { Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0 ) { try { Thread.sleep(1000 ); } catch (InterruptedException e) { } } else { System.out.printf("message[batchId=%s, size=%s] \n" , batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); } } finally { connector.disconnect(); } } private static void printEntry (List<Entry> entries) { for (Entry entry : entries) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue ; } RowChange rowChange = null ; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException ("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChange.getEventType(); System.out.println(String.format("binlog[%s:%s] , name[%s,%s] , eventType : %s" , entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChange.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn (List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue()); } } public static void main (String[] args) { run(); } }
之后开始和消息队列和Springboot整合啦
修改canal配置canal.properties
1 2 3 4 5 6 7 8 9 10 canal.serverMode = rabbitMQ ################################################## ######### RabbitMQ ############# ################################################## rabbitmq.host = 127.0 .0 .1 rabbitmq.virtual.host = / rabbitmq.exchange = canal-exchange rabbitmq.username = root rabbitmq.password = 123456
同时继续修改instance.properties
1 2 3 4 5 6 7 8 canal.mq.topic =canal-routing-keycanal.instance.defaultDatabaseName =dogiecanal.instance.filter.regex =dogie\\.user_contact
然后重启canal服务
Springboot集成
消息订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package com.karlyn.dogie.Canal;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.amqp.core.Queue;@Configuration public class CanalProvider { @Bean public Queue canalQueue () { return new Queue (RabbitConstant.CanalQueue, true ); } @Bean DirectExchange canalExchange () { return new DirectExchange (RabbitConstant.CanalExchange, true , false ); } @Bean Binding bindingCanal () { return BindingBuilder.bind(canalQueue()).to(canalExchange()).with(RabbitConstant.CanalRouting); } }
消息消费,我这里写的比较简单,如果消息消费失败之后我会把它重新放回消息队列,但是这时候消息队列会一直把这个消息发给消费者,所以这块还需要优化一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 package com.karlyn.dogie.Canal;import com.alibaba.fastjson.JSON;import com.fasterxml.jackson.databind.ObjectMapper;import com.fasterxml.jackson.core.type.TypeReference;import com.karlyn.dogie.entity.enums.UserContactStatusEnum;import com.karlyn.dogie.entity.po.UserContact;import com.karlyn.dogie.entity.query.UserContactQuery;import com.karlyn.dogie.mappers.UserContactMapper;import com.karlyn.dogie.redis.RedisComponent;import com.karlyn.dogie.util.JsonUtils;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.messaging.handler.annotation.Header;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.LinkedHashMap;import java.util.List;import java.util.Map;import java.util.stream.Collectors;@Component @Slf4j @RabbitListener(queues = RabbitConstant.CanalQueue) public class CanalConsumer { private final UserContactMapper userContactMapper; private final RedisComponent redisComponent; public CanalConsumer (UserContactMapper userContactMapper, RedisComponent redisComponent) { this .userContactMapper = userContactMapper; this .redisComponent = redisComponent; } @RabbitHandler public void Listener (String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException { System.out.println("收到canal消息:" + message); ObjectMapper objectMapper = new ObjectMapper (); Map<String, Object> msg = objectMapper.readValue(message,new TypeReference <Map<String, Object>>() {}); boolean isDdl = (boolean ) msg.get("isDdl" ); if (isDdl) { return ; } String database = (String) msg.get("database" ); String table = (String) msg.get("table" ); String type = (String) msg.get("type" ); List<LinkedHashMap> data = (List<LinkedHashMap>) msg.get("data" ); if (database.equals("dogie" )&&table.equals("user_contact" )){ try { for (LinkedHashMap s : data) { String UserId = (String) s.get("user_id" ); log.info("更新{}的联系人缓存" ,UserId); UserContactQuery userContactQuery = new UserContactQuery (); userContactQuery.setUserId(UserId); userContactQuery.setStatus(UserContactStatusEnum.FRIEND.getStatus()); List<UserContact> userContactList = this .userContactMapper.selectList(userContactQuery); List<String> contactIds = userContactList.stream().map(item->item.getContactId()).collect(Collectors.toList()); this .redisComponent.cleanUserContact(UserId); if (!contactIds.isEmpty()){ this .redisComponent.addUserContactBatch(UserId, contactIds); } } channel.basicAck(tag,false ); }catch (Exception e){ System.out.println(e.getMessage()); channel.basicNack(tag,false ,true ); } } } }