Preliminary Exploration of Data Heterogeneity in Canal (Tips)
AD |
Canal canal 1 Canal MySQL binlog MySQL Elasticsearch Canal JOIN (buy_id)(shop_id)joinCanal canal binlogIDID2 Canal canal Canal MySQL master(binary log ) binary log eventsshow binlog eventsslavemasterbinary log events(relay log)slave MySQL Canal canal mysql slave mysql slave mysql master dump mysql master dump binary log slave (canal)canal binary log (byte) Canale servercanaljvminstance 1server1.
Canal canal
1
Canal MySQL binlog MySQL Elasticsearch
Canal
JOIN
(buy_id)(shop_id)join
Canal canal binlogIDID
2
Canal canal Canal
MySQL
data:image/s3,"s3://crabby-images/9beef/9beef43fbf0c4f556f613a7f0fde5d7a2daa1b45" alt=""
- master(binary log ) binary log eventsshow binlog events
- slavemasterbinary log events(relay log)
- slave
MySQL Canal
data:image/s3,"s3://crabby-images/16615/166152e1ea2c23009b666036480ca4cc7fae150f" alt=""
- canal mysql slave mysql slave mysql master dump
- mysql master dump binary log slave (canal)
- canal binary log (byte)
Canale
data:image/s3,"s3://crabby-images/cfb82/cfb82c677650261400cc8610e8cba5b88274d84d" alt=""
- servercanaljvm
- instance 1server1..ninstance)
instance
- eventParser (slavemaster)
- eventSink (ParserStore)
- eventStore ()
- metaManager (&)
3 IntelliJ IDEA Canal Demo
Linux canal Canal Demo Debug
canal Canal
canal Demo example
data:image/s3,"s3://crabby-images/23902/23902d62f5e96bbb2e0552fcc01349b662844b79" alt=""
canal client (server)
data:image/s3,"s3://crabby-images/cb243/cb243e75c974f49dec4c484d6c78d2de0d4d1632" alt=""
IDEA Canal Demo
1Canal Server Demo
package com.alibaba.otter.canal.server;import com.alibaba.otter.canal.instance.core.CanalInstance;import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;import com.alibaba.otter.canal.instance.manager.model.Canal;import com.alibaba.otter.canal.instance.manager.model.CanalParameter;import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.util.Arrays;public class CanalServerTestMain { protected static final String ZK_CLUSTER_ADDRESS = "127.0.0.1:2181"; protected static final String DESTINATION = "example"; protected static final String DETECTING_SQL = "select 1"; protected static final String MYSQL_ADDRESS = "127.0.0.1"; protected static final String USERNAME = "canal"; protected static final String PASSWORD = "canal"; protected static final String FILTER = ".*\\..*"; /** 500s */ protected static final long RUN_TIME = 120 * 1000; private final ByteBuffer header = ByteBuffer.allocate(4); private CanalServerWithNetty nettyServer; public static void main(String[] args) { CanalServerTestMain test = new CanalServerTestMain(); try { test.setUp(); System.out.println("start"); } catch (Throwable e) { e.printStackTrace(); } finally { System.out.println("sleep"); try { Thread.sleep(RUN_TIME); } catch (Throwable ee) { } test.tearDown(); System.out.println("end"); } } public void setUp() { CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded(); embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() { public CanalInstance generate(String destination) { Canal canal = buildCanal(); return new CanalInstanceWithManager(canal, FILTER); } }); nettyServer = CanalServerWithNetty.instance(); nettyServer.setEmbeddedServer(embeddedServer); nettyServer.setPort(11111); nettyServer.start(); // instance embeddedServer.start("example"); } public void tearDown() { nettyServer.stop(); } private Canal buildCanal() { Canal canal = new Canal(); canal.setId(1L); canal.setName(DESTINATION); canal.setDesc("test"); CanalParameter parameter = new CanalParameter(); //parameter.setZkClusters(Arrays.asList(ZK_CLUSTER_ADDRESS)); parameter.setMetaMode(CanalParameter.MetaMode.MEMORY); parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT); parameter.setIndexMode(CanalParameter.IndexMode.MEMORY); parameter.setStorageMode(CanalParameter.StorageMode.MEMORY); parameter.setMemoryStorageBufferSize(32 * 1024); parameter.setSourcingType(CanalParameter.SourcingType.MYSQL); parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306), new InetSocketAddress(MYSQL_ADDRESS, 3306))); parameter.setDbUsername(USERNAME); parameter.setDbPassword(PASSWORD); parameter.setSlaveId(1234L); parameter.setDefaultConnectionTimeoutInSeconds(30); parameter.setConnectionCharset("UTF-8"); parameter.setConnectionCharsetNumber((byte) 33); parameter.setReceiveBufferSize(8 * 1024); parameter.setSendBufferSize(8 * 1024); parameter.setDetectingEnable(false); parameter.setDetectingIntervalInSeconds(10); parameter.setDetectingRetryTimes(3); parameter.setDetectingSQL(DETECTING_SQL); canal.setCanalParameter(parameter); return canal; }}
2Canal Client Demo
package com.alibaba.otter.canal.example;import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;public class SimpleCanalClientExample { public static void main(String[] args) { // CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*..*"); connector.rollback(); int totalEmptyCount = 3000; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // // connector.rollback(batchId); // , } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } }}
client
data:image/s3,"s3://crabby-images/81896/818961334c1131f2c74c46f1039e27cb3ce25aae" alt=""
binlog
data:image/s3,"s3://crabby-images/64c9a/64c9af35823644f23cf05be90d6cae238d434d3a" alt=""
IDEA Demo canal canal
Disclaimer: The content of this article is sourced from the internet. The copyright of the text, images, and other materials belongs to the original author. The platform reprints the materials for the purpose of conveying more information. The content of the article is for reference and learning only, and should not be used for commercial purposes. If it infringes on your legitimate rights and interests, please contact us promptly and we will handle it as soon as possible! We respect copyright and are committed to protecting it. Thank you for sharing.(Email:[email protected])
Mobile advertising space rental |
Tag: Preliminary Exploration of Data Heterogeneity in Canal Tips
Founder of ofo Xiaohuangche, Dai Wei, went to the United States to start a business: registered users drink coffee for free, with an estimated value of $200 million
Next21 rules for dividing databases and tables, hold!
Guess you like
-
Unitree Robotics Founder Wang Xingxing Releases Video Statement: Warning Against Online Misinformation, Protecting Shareholder InterestsDetail
2025-03-02 12:40:18 1
-
The Age of Smart Homes Arrives: Habitat L32 Ushers in an Upgrade to Living ExperienceDetail
2025-02-28 21:16:59 1
-
Alibaba's DAMO Academy Announces Imminent Delivery of XuanTie C930 Processor, Achieving 15/GHz in SPECint2006 BenchmarkDetail
2025-02-28 11:06:08 1
-
China's OTA Platforms: A High-Efficiency Miracle Under Low Commission RatesDetail
2025-02-28 10:38:34 21
-
China Leads in Setting International Standard for Elderly Care Robots, Ushering in a New Era for the Global Silver EconomyDetail
2025-02-28 10:37:23 1
-
Xiaomi SU7 Ultra: The World's Strongest Four-Door Production Car, 10,000 Pre-orders in Two Hours, Price Drop Ignites the Market!Detail
2025-02-28 10:29:25 1
-
Kingdee Qatar Company Established: Empowering Middle Eastern Enterprises' Digital Transformation with Digital Technology, Driving the "National Vision 2030"Detail
2025-02-28 09:56:02 1
- Detail
-
DeepSeek API Price Adjustment: Off-Peak Discounts Reduce Costs, Up to 75% OffDetail
2025-02-27 10:47:53 21
-
Lenovo's Ask Tian AI Computing Platform Receives Major Upgrade, Enabling Single-Machine Deployment of 671B-Parameter DeepSeek-R1 ModelDetail
2025-02-26 15:22:05 1
-
Largest Mesozoic Scorpion Fossil Discovered in China: Jeholialongchengi Fills Fossil GapDetail
2025-02-26 10:35:56 1
-
Haier Smart Home Leads the Globalization of Appliance Services: Unified Standards, Setting a New Benchmark for Digital ServicesDetail
2025-02-25 17:39:01 1
-
Douyin Livestreaming Shops: A New Engine Driving the Digital Transformation of the Real EconomyDetail
2025-02-25 17:38:14 21
-
Zhou Hongyi, founder of 360 Group, and Nano AI Search's New Energy Vehicle Giveaway Event Concludes Successfully, Marking a Step Forward in AI PopularizationDetail
2025-02-24 18:36:23 31
-
Leaked CAD Renderings Reveal iPhone 17 Series: Two-Tone Back and Novel Camera Designs Spark InterestDetail
2025-02-24 17:27:08 1
-
Yadea Unveils the Modern Series: High-Style Design Meets Tenfold Safety, Ushering in a New Era of Women's CommuteDetail
2025-02-24 14:34:28 1
-
IBM's mandatory return-to-office policy sparks controversy: disguised layoffs, unfair to employees?Detail
2025-02-24 14:15:41 1
-
Apple Halts iCloud Advanced Data Protection in UK: A Stand Against Government 'Backdoor' DemandsDetail
2025-02-24 14:10:40 31
-
S&P Global Sustainability Yearbook 2024: Baidu's Inclusion Highlights the Crucial Role of AI GovernanceDetail
2025-02-19 21:08:50 1
-
Ronshen Refrigerators Lead 2024 Offline Market: Full-Scenario Embedded Refrigerators Drive Consumption UpgradeDetail
2025-02-19 19:12:01 11