当前位置:网站首页>Kafka-connect将Kafka数据同步到Mysql
Kafka-connect将Kafka数据同步到Mysql
2022-07-07 23:23:00 【W_Meng_H】
一、背景信息
Kafka Connect主要用于将数据流输入和输出消息队列Kafka版。Kafka Connect主要通过各种Source Connector的实现,将数据从第三方系统输入到Kafka Broker,通过各种Sink Connector实现,将数据从Kafka Broker中导入到第三方系统。
官方文档:How to use Kafka Connect - Getting Started | Confluent Documentation
二、开发环境
中间件 | 版本 |
zookeeper | 3.7.0 |
kafka | 2.10-0.10.2.1 |
三、配置Kafka Connect
1、进到kafka的config文件夹下,修改connect-standalone.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=kafka-0:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# 核心配置
#plugin.path=/home/kafka/plugins
如果Mysql数据同步到Kafka,需要修改如下信息:
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=io.confluent.connect.json.JsonSchemaConverter
value.converter=io.confluent.connect.json.JsonSchemaConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
注意:Kafka Connect的早期版本不支持配置plugin.path,您需要在CLASSPATH中指定插件位置
vi /etc/profile
export CLASSPATH=/home/kafka/*
source /etc/profile
2、修改connect-mysql-source.properties(mysql-kafka)
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Confluent Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# http://www.confluent.io/confluent-community-license
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# A simple example that copies all tables from a SQLite database. The first few settings are
# required for all connectors: a name, the connector class to run, and the maximum number of
# tasks to create:
name=test-source-mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# The remaining configs are specific to the JDBC source connector. In this example, we connect to a
# SQLite database stored in the file test.db, use and auto-incrementing column called 'id' to
# detect new rows as they are added, and output to topics prefixed with 'test-sqlite-jdbc-', e.g.
# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.
connection.url=jdbc:mysql://localhost:3306/demo?user=root&password=root
table.whitelist=test
mode=incrementing
incrementing.column.name=id
topic.prefix=mysql-
# Define when identifiers should be quoted in DDL and DML statements.
# The default is 'always' to maintain backward compatibility with prior versions.
# Set this to 'never' to avoid quoting fully-qualified or simple table and column names.
#quote.sql.identifiers=always
相关驱动jar包:
3、修改connect-mysql-sink.properties(kafka-mysql)
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Confluent Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# http://www.confluent.io/confluent-community-license
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# A simple example that copies from a topic to a SQLite database.
# The first few settings are required for all connectors:
# a name, the connector class to run, and the maximum number of tasks to create:
name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
# The topics to consume from - required for sink connectors like this one
topics=mysql-test_to_kafka
# Configuration specific to the JDBC sink connector.
# We want to connect to a SQLite database stored in the file test.db and auto-create tables.
connection.url=jdbc:mysql://localhost:3306/demo?user=root&password=root
auto.create=false
pk.mode=record_value
pk.fields=id
table.name.format=test_kafka_to
#delete.enabled=true
# 写入模式
insert.mode=upsert
# Define when identifiers should be quoted in DDL and DML statements.
# The default is 'always' to maintain backward compatibility with prior versions.
# Set this to 'never' to avoid quoting fully-qualified or simple table and column names.
#quote.sql.identifiers=always
四、启动命令
bin/connect-standalone.sh config/connect-standalone.properties config/connect-mysql-source.properties config/connect-mysql-sink.properties
边栏推荐
- 3. MNIST dataset classification
- High quality USB sound card / audio chip sss1700 | sss1700 design 96 kHz 24 bit sampling rate USB headset microphone scheme | sss1700 Chinese design scheme explanation
- 1.线性回归
- Su embedded training - Day8
- 5、離散控制與連續控制
- Su embedded training - Day7
- C#中string用法
- 133. 克隆图
- Basic realization of line chart (II)
- Ag9311maq design 100W USB type C docking station data | ag9311maq is used for 100W USB type C to HDMI with PD fast charging +u3+sd/cf docking station scheme description
猜你喜欢
[deep learning] AI one click to change the sky
Complete model verification (test, demo) routine
Solve the error: NPM warn config global ` --global`, `--local` are deprecated Use `--location=global` instead.
How to transfer Netease cloud music /qq music to Apple Music
USB type-C mobile phone projection scheme | USB type-C docking station scheme | TV / projector type-C converter scheme | ag9300ag9310ag9320
5. Discrete control and continuous control
Common fault analysis and Countermeasures of using MySQL in go language
2022 safety officer-c certificate examination paper and safety officer-c certificate simulated examination question bank
2021 tea master (primary) examination materials and tea master (primary) simulation test questions
Basic implementation of pie chart
随机推荐
10. CNN applied to handwritten digit recognition
Chapter VIII integrated learning
Macro definition and multiple parameters
[loss function] entropy / relative entropy / cross entropy
A little experience from reading "civilization, modernization, value investment and China"
redis的持久化方式-RDB和AOF 两种持久化机制
Study notes of single chip microcomputer and embedded system
2022 high altitude installation, maintenance and demolition examination materials and high altitude installation, maintenance and demolition operation certificate examination
2022 free test questions of fusion welding and thermal cutting and summary of fusion welding and thermal cutting examination
The communication clock (electronic time-frequency or electronic time-frequency auxiliary device) writes something casually
Ag7120 and ag7220 explain the driving scheme of HDMI signal extension amplifier | ag7120 and ag7220 design HDMI signal extension amplifier circuit reference
[deep learning] AI one click to change the sky
Markdown learning (entry level)
133. Clone map
Redis 主从复制
How to transfer Netease cloud music /qq music to Apple Music
y59.第三章 Kubernetes从入门到精通 -- 持续集成与部署(三二)
Scheme selection and scheme design of multifunctional docking station for type C to VGA HDMI audio and video launched by ange in Taiwan | scheme selection and scheme explanation of usb-c to VGA HDMI c
Parade ps8625 | replace ps8625 | EDP to LVDS screen adapter or screen drive board
Swift get URL parameters