当前位置:网站首页>Kafka connect synchronizes Kafka data to MySQL
Kafka connect synchronizes Kafka data to MySQL
2022-07-08 01:31:00 【W_ Meng_ H】
One 、 Background information
Kafka Connect It is mainly used to input and output data streams to message queues Kafka edition .Kafka Connect Mainly through various Source Connector The implementation of the , Input data from a third-party system into Kafka Broker, Through a variety of Sink Connector Realization , Take data from Kafka Broker Import into the third-party system .
Official documents :How to use Kafka Connect - Getting Started | Confluent Documentation
Two 、 development environment
middleware | edition |
zookeeper | 3.7.0 |
kafka | 2.10-0.10.2.1 |
3、 ... and 、 To configure Kafka Connect
1、 Enter kafka Of config Under the folder , modify connect-standalone.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=kafka-0:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# The core configuration
#plugin.path=/home/kafka/plugins
If Mysql Data synchronization to Kafka, The following information needs to be modified :
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter=io.confluent.connect.json.JsonSchemaConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
Be careful :Kafka Connect The previous version of does not support configuration plugin.path, You need to CLASSPATH Specify the plug-in location in
vi /etc/profile
export CLASSPATH=/home/kafka/*
source /etc/profile
2、 modify connect-mysql-source.properties(mysql-kafka)
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Confluent Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# http://www.confluent.io/confluent-community-license
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:mysql://localhost:3306/demo?user=root&password=root
table.whitelist=test
mode=incrementing
incrementing.column.name=id
topic.prefix=mysql-
# Define when identifiers should be quoted in DDL and DML statements.
# The default is 'always' to maintain backward compatibility with prior versions.
# Set this to 'never' to avoid quoting fully-qualified or simple table and column names.
#quote.sql.identifiers=always
Related drive jar package :
kafka- Related driver package -Java Document resources -CSDN download
3、 modify connect-mysql-sink.properties(kafka-mysql)
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Confluent Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# http://www.confluent.io/confluent-community-license
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# A simple example that copies from a topic to a SQLite database.
# The first few settings are required for all connectors:
# a name, the connector class to run, and the maximum number of tasks to create:
name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
# The topics to consume from - required for sink connectors like this one
topics=mysql-test_to_kafka
# Configuration specific to the JDBC sink connector.
# We want to connect to a SQLite database stored in the file test.db and auto-create tables.
connection.url=jdbc:mysql://localhost:3306/demo?user=root&password=root
auto.create=false
pk.mode=record_value
pk.fields=id
table.name.format=test_kafka_to
#delete.enabled=true
# Write mode
insert.mode=upsert
# Define when identifiers should be quoted in DDL and DML statements.
# The default is 'always' to maintain backward compatibility with prior versions.
# Set this to 'never' to avoid quoting fully-qualified or simple table and column names.
#quote.sql.identifiers=always
Four 、 Start command
bin/connect-standalone.sh config/connect-standalone.properties config/connect-mysql-source.properties config/connect-mysql-sink.properties
边栏推荐
- QT -- package the program -- don't install qt- you can run it directly
- 2022 new examination questions for crane driver (limited to bridge crane) and question bank for crane driver (limited to bridge crane) operation examination
- 2022 operation certificate examination for main principals of hazardous chemical business units and main principals of hazardous chemical business units
- Generic configuration legend
- Two methods for full screen adaptation of background pictures, background size: cover; Or (background size: 100% 100%;)
- COMSOL----微阻梁模型的搭建---最终的温度分布和变形情况---材料的添加
- 小金额炒股,在手机上开户安全吗?
- Getting started STM32 -- how to learn stm32
- Ag9310 for type-C docking station scheme circuit design method | ag9310 for type-C audio and video converter scheme circuit design reference
- Talk about smart Park
猜你喜欢
Guojingxin center "friendship and righteousness" - the meta universe based on friendship and friendship, and the parallel of "honguniverse"
2022 free test questions of fusion welding and thermal cutting and summary of fusion welding and thermal cutting examination
2、TD+Learning
Chapter 7 Bayesian classifier
3. Multi agent reinforcement learning
Ag9310 for type-C docking station scheme circuit design method | ag9310 for type-C audio and video converter scheme circuit design reference
Capstone/cs5210 chip | cs5210 design scheme | cs5210 design data
USB type-C docking design | design USB type-C docking scheme | USB type-C docking circuit reference
Cs5212an design display to VGA HD adapter products | display to VGA Hd 1080p adapter products
Arm bare metal
随机推荐
ROS 问题(topic types do not match、topic datatype/md5sum not match、msg xxx have changed. rerun cmake)
Multi purpose signal modulation generation system based on environmental optical signal detection and user-defined signal rules
从cmath文件看名字是怎样被添加到命名空间std中的
LaTeX 中 xcolor 颜色的用法
Capstone/cs5210 chip | cs5210 design scheme | cs5210 design data
2022 safety officer-a certificate free examination questions and safety officer-a certificate mock examination
The examination contents of the third batch of Guangdong Provincial Safety Officer a certificate (main person in charge) in 2021 and the free examination questions of the third batch of Guangdong Prov
NPM internal split module
After modifying the background of jupyter notebook and adding jupyterthemes, enter 'JT -l' and the error 'JT' is not an internal or external command, nor a runnable program
[loss function] entropy / relative entropy / cross entropy
Macro definition and multiple parameters
Ag9310meq ag9310mfq angle two USB type C to HDMI audio and video data conversion function chips parameter difference and design circuit reference
Kuntai ch7511b scheme design | ch7511b design EDP to LVDS data | pin to pin replaces ch7511b circuit design
Basic realization of line graph
Guojingxin center "friendship and righteousness" - the meta universe based on friendship and friendship, and the parallel of "honguniverse"
2021-03-14 - play with generics
Chapter 7 Bayesian classifier
STM32GPIO口的工作原理
Study notes of single chip microcomputer and embedded system
Common configurations in rectangular coordinate system