Preliminary Exploration of Data Heterogeneity in Canal (Tips)
AD |
Canal canal 1 Canal MySQL binlog MySQL Elasticsearch Canal JOIN (buy_id)(shop_id)joinCanal canal binlogIDID2 Canal canal Canal MySQL master(binary log ) binary log eventsshow binlog eventsslavemasterbinary log events(relay log)slave MySQL Canal canal mysql slave mysql slave mysql master dump mysql master dump binary log slave (canal)canal binary log (byte) Canale servercanaljvminstance 1server1.
Canal canal
1
Canal MySQL binlog MySQL Elasticsearch
Canal
JOIN
(buy_id)(shop_id)join
Canal canal binlogIDID
2
Canal canal Canal
MySQL
- master(binary log ) binary log eventsshow binlog events
- slavemasterbinary log events(relay log)
- slave
MySQL Canal
- canal mysql slave mysql slave mysql master dump
- mysql master dump binary log slave (canal)
- canal binary log (byte)
Canale
- servercanaljvm
- instance 1server1..ninstance)
instance
- eventParser (slavemaster)
- eventSink (ParserStore)
- eventStore ()
- metaManager (&)
3 IntelliJ IDEA Canal Demo
Linux canal Canal Demo Debug
canal Canal
canal Demo example
canal client (server)
IDEA Canal Demo
1Canal Server Demo
package com.alibaba.otter.canal.server;import com.alibaba.otter.canal.instance.core.CanalInstance;import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;import com.alibaba.otter.canal.instance.manager.model.Canal;import com.alibaba.otter.canal.instance.manager.model.CanalParameter;import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;import com.alibaba.otter.canal.server.netty.CanalServerWithNetty;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.util.Arrays;public class CanalServerTestMain { protected static final String ZK_CLUSTER_ADDRESS = "127.0.0.1:2181"; protected static final String DESTINATION = "example"; protected static final String DETECTING_SQL = "select 1"; protected static final String MYSQL_ADDRESS = "127.0.0.1"; protected static final String USERNAME = "canal"; protected static final String PASSWORD = "canal"; protected static final String FILTER = ".*\\..*"; /** 500s */ protected static final long RUN_TIME = 120 * 1000; private final ByteBuffer header = ByteBuffer.allocate(4); private CanalServerWithNetty nettyServer; public static void main(String[] args) { CanalServerTestMain test = new CanalServerTestMain(); try { test.setUp(); System.out.println("start"); } catch (Throwable e) { e.printStackTrace(); } finally { System.out.println("sleep"); try { Thread.sleep(RUN_TIME); } catch (Throwable ee) { } test.tearDown(); System.out.println("end"); } } public void setUp() { CanalServerWithEmbedded embeddedServer = new CanalServerWithEmbedded(); embeddedServer.setCanalInstanceGenerator(new CanalInstanceGenerator() { public CanalInstance generate(String destination) { Canal canal = buildCanal(); return new CanalInstanceWithManager(canal, FILTER); } }); nettyServer = CanalServerWithNetty.instance(); nettyServer.setEmbeddedServer(embeddedServer); nettyServer.setPort(11111); nettyServer.start(); // instance embeddedServer.start("example"); } public void tearDown() { nettyServer.stop(); } private Canal buildCanal() { Canal canal = new Canal(); canal.setId(1L); canal.setName(DESTINATION); canal.setDesc("test"); CanalParameter parameter = new CanalParameter(); //parameter.setZkClusters(Arrays.asList(ZK_CLUSTER_ADDRESS)); parameter.setMetaMode(CanalParameter.MetaMode.MEMORY); parameter.setHaMode(CanalParameter.HAMode.HEARTBEAT); parameter.setIndexMode(CanalParameter.IndexMode.MEMORY); parameter.setStorageMode(CanalParameter.StorageMode.MEMORY); parameter.setMemoryStorageBufferSize(32 * 1024); parameter.setSourcingType(CanalParameter.SourcingType.MYSQL); parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, 3306), new InetSocketAddress(MYSQL_ADDRESS, 3306))); parameter.setDbUsername(USERNAME); parameter.setDbPassword(PASSWORD); parameter.setSlaveId(1234L); parameter.setDefaultConnectionTimeoutInSeconds(30); parameter.setConnectionCharset("UTF-8"); parameter.setConnectionCharsetNumber((byte) 33); parameter.setReceiveBufferSize(8 * 1024); parameter.setSendBufferSize(8 * 1024); parameter.setDetectingEnable(false); parameter.setDetectingIntervalInSeconds(10); parameter.setDetectingRetryTimes(3); parameter.setDetectingSQL(DETECTING_SQL); canal.setCanalParameter(parameter); return canal; }}
2Canal Client Demo
package com.alibaba.otter.canal.example;import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.common.utils.AddressUtils;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;public class SimpleCanalClientExample { public static void main(String[] args) { // CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*..*"); connector.rollback(); int totalEmptyCount = 3000; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // // connector.rollback(batchId); // , } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } }}
client
binlog
IDEA Demo canal canal
Disclaimer: The content of this article is sourced from the internet. The copyright of the text, images, and other materials belongs to the original author. The platform reprints the materials for the purpose of conveying more information. The content of the article is for reference and learning only, and should not be used for commercial purposes. If it infringes on your legitimate rights and interests, please contact us promptly and we will handle it as soon as possible! We respect copyright and are committed to protecting it. Thank you for sharing.(Email:[email protected])
Mobile advertising space rental |
Tag: Preliminary Exploration of Data Heterogeneity in Canal Tips
Founder of ofo Xiaohuangche, Dai Wei, went to the United States to start a business: registered users drink coffee for free, with an estimated value of $200 million
Next21 rules for dividing databases and tables, hold!
Guess you like
-
The 2025 Chinese New Year (Spring Festival) film box office has exploded, exceeding 3 billion RMB and setting a new record for presales!Detail
2025-01-29 11:55:06 1
-
Seres and Beihang University Join Hands to Build an Innovative Ecosystem, Deepening Industry-Academia-Research Collaboration and Promoting Technological TransformationDetail
2025-01-28 14:46:18 1
-
Douyin 2024 Platform Governance Report: Safeguarding Security, Building a Better CommunityDetail
2025-01-28 14:25:55 1
-
Chinese Scientists Develop a Lightweight Bionic Dexterous Hand with 19 Degrees of Freedom, Promising to Revolutionize Prosthetic and Robotics TechnologyDetail
2025-01-28 14:16:39 1
-
DeepSeek: A Chinese AI Startup's Meteoric Rise Shakes Up Global Tech and Sends US Stocks PlungingDetail
2025-01-28 14:13:23 1
-
WeChat's New Year's Red Envelope Feature Gets a Voice Message Upgrade for Warmer Wishes!Detail
2025-01-26 11:37:36 1
-
360 Digital Security Group and Zhibangyang Education Technology Join Forces to Build a New Ecosystem for Cybersecurity and AI Talent CultivationDetail
2025-01-24 15:09:51 1
-
Visionox Achieves Mass Production of AMOLED with Solid-State Laser Annealing (SLA) Technology, Ushering in a New Era for the Display IndustryDetail
2025-01-24 14:34:23 1
-
Seres at the Davos Forum: The Path to Globalizing New Energy Vehicles Through Cooperation in the Intelligent EraDetail
2025-01-23 13:28:12 1
-
Amazon to Close All French-Speaking Quebec Warehouses, Laying Off Nearly 2,000 EmployeesDetail
2025-01-23 10:51:23 1
-
The official launch of the 2025 Electric Bicycle Trade-in Policy: Upgraded Subsidy Standards, Procedures, and PromotionDetail
2025-01-23 10:48:52 1
-
Xbox Series X|S Officially Supports External Hard Drives Larger Than 16TB: Saying Goodbye to Storage WorriesDetail
2025-01-23 10:39:19 1
-
Leaders from the Beijing Chaoyang District CPPCC Visited Quantum Leap Group, Affirming its Contributions and Future Prospects in the Silver Hair EconomyDetail
2025-01-22 17:06:56 1
-
China's Car Imports Remain Sluggish in 2024: 12% Decline, Sharp Drop in New Energy VehiclesDetail
2025-01-22 11:37:25 1
-
China Railway Group Limited (CRGL) officially debunks "speed-up" ticket booking software: Not a shortcut, but a pathway to riskDetail
2025-01-22 11:36:09 1
-
Dago Bio Completes Over $20 Million A+ Round Funding to Accelerate Novel Molecular Glue Drug DevelopmentDetail
2025-01-22 11:34:05 11
-
Rapid Degradation of Global Lake Submerged Vegetation: Satellite Observations Reveal a Critical Period of Ecosystem ShiftDetail
2025-01-22 11:29:03 1
-
Star Ace Capital Group and Abu Dhabi Investment Office Partner to Build a Global Esports Industry BenchmarkDetail
2025-01-22 11:27:50 1
-
Hisense Television Leads the 100-Inch Large-Screen Market in 2024, Achieving an Unparalleled Industry LegacyDetail
2025-01-22 11:12:49 1
-
WeChat Launches "Gifts" Feature: Streamlining Gift-Giving and Powering Social Commerce GrowthDetail
2025-01-21 16:05:45 1