Kafka Connect in Action: JDBC Sink

Sdílet
Vložit
  • čas přidán 9. 09. 2024

Komentáře • 109

  • @user-vu4tj1ix8u
    @user-vu4tj1ix8u Před 3 lety +7

    Simply the best instructor i have ever seen. Great job!

    • @rmoff
      @rmoff  Před 3 lety

      Thank you for your kind words! :)

  • @georgelza
    @georgelza Před 4 lety +1

    Still learning, and so getting blown away by the amazing simple capabilities... once you've figured out how...

  • @jeffyeh4537
    @jeffyeh4537 Před 2 lety +1

    omg, Robin, your video save my internship! Thank you soooooo much!

    • @rmoff
      @rmoff  Před 2 lety

      Happy to help! :D

  • @carlaguelpa4574
    @carlaguelpa4574 Před rokem

    @rmoff, thanks for your videos, are the best!
    We have a situation:
    We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have:
    1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3
    1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each)
    1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task
    1 Source Connector per table (4 tables)
    1 Sink Connector per Table (4 tables)
    We are not using Schema Registry
    The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds.
    Mesage size = 6kb
    Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors)
    When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great
    Resources are OK, source and target environments have pleanty of CPU and Memory.
    When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again.
    Question, what can we do to improve the consumer performance?
    Could you help me?

  • @OwenRubel
    @OwenRubel Před 4 lety +1

    great bit on schemas. Going to link to this on my next video. :)

  • @timothyvandermeer9845
    @timothyvandermeer9845 Před 5 měsíci

    FANTASTIC. thank you brother

  • @mukeshdharajiya1261
    @mukeshdharajiya1261 Před rokem

    Thank you so much for your best explanation 🏆

    • @rmoff
      @rmoff  Před 11 měsíci

      Glad it was helpful!

  • @jennana8003
    @jennana8003 Před 3 lety +1

    Thank you for sharing!

  • @JonWolski
    @JonWolski Před 4 lety +1

    Off-topic tip: if you pipe your logs into `less -r` instead of `more` you can get a pager that interprets the ANSI color sequences

  • @hamoudy41
    @hamoudy41 Před rokem +1

    My data from Kafka is schema-less (string key, json values). I want to sink them to postgres. Any tips? Do I need schema registry? what converters transformers(if any) do i need?

  • @BenedictReydo
    @BenedictReydo Před 2 lety

    why I keep getting empty set of tables in mysql while the connector worked? I also already make sure that I had my mysql connector on the directory

  • @dswan01
    @dswan01 Před 11 měsíci

    So I assume you can use ksql to do transformation of data and then pass to stream to update target database. Is this correct?

  • @niklaslehmann6551
    @niklaslehmann6551 Před 2 lety +1

    When creating the sink connector I get
    "Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSinkConnector ..."
    So after already 3 min i cannot even follow your guide. Maybe there is something wrong with the docker containers?

    • @rmoff
      @rmoff  Před 2 lety

      It sounds like the JDBC connector isn't installed correctly in your connect worker. Did you restart it after installing it?
      If you're still stuck head to forum.confluent.io/ and post full details of what guide you're following and steps you've taken. Thanks.

    • @niklaslehmann6551
      @niklaslehmann6551 Před 2 lety

      @@rmoff Thanks that solved it. My proxy setting were blocking the download of the jdbc connector. After disabling it worked

    • @rmoff
      @rmoff  Před 2 lety

      @@niklaslehmann6551 Great!

  • @JonathanLevinTKY
    @JonathanLevinTKY Před 4 lety

    Can you update an existing mysql table that has more columns that the Kafka stream has - like an id/primary key?

  • @heinhtetzaw9463
    @heinhtetzaw9463 Před rokem

    When I create a stream with Avro format, I m getting Unable to create schema from topic, Connection reset error.

    • @rmoff
      @rmoff  Před rokem

      hi, the best place to get help is at www.confluent.io/en-gb/community/ask-the-community/ :)

  • @sanket386
    @sanket386 Před 2 lety

    How can i create a schema with payload for nested json data?

  • @krishnachaitanyapulagam2769

    It’s a nice video appreciate your efforts.
    Can we create Key Schema as well for SOME_JASON _AS_AVRO? I have similar requirement where I need to do Upsert using JDBS Sink connector which reads from topic created out of streams.

    • @rmoff
      @rmoff  Před 4 lety

      Hi, the best place to ask this is on:
      → Slack group: cnfl.io/slack
      or
      → Mailing list: groups.google.com/forum/#!forum/confluent-platform

  • @panjisadewo4891
    @panjisadewo4891 Před 2 lety

    iam using jdbc connector sink from confluent developer how to setting this? please help

    • @rmoff
      @rmoff  Před 2 lety

      Please ask this question at forum.confluent.io/. Thanks!

  • @vishalmalhotra2243
    @vishalmalhotra2243 Před 3 lety +1

    @Robin Moffatt Is it possible to have a a mysql database table as source and we have a topic here say "sourceTopic" and then we use this "sourceTopic" in our sink curl config and the sink happens in another mysql table. Basically trying to set a table and audit table kind of scenario here.

    • @rmoff
      @rmoff  Před 3 lety

      I'm not quite clear - you want to read and write from the same MySQL database but from and to different tables? There's no reason why you couldn't do that, in theory. Perhaps head over to forum.confluent.io/ and ask there, with a bit more detail and context around what you're trying to do.

    • @vishalmalhotra2243
      @vishalmalhotra2243 Před 3 lety

      @@rmoff I have a table "books" in database motor. This is my source and for source connection I created a topic "mysql-books". So far all is good I am able to see messages on confluent platform UI. Now these messages I want to sink into another database called motor-audit so that in audit I am able to see all the changes that happened to the table "books". I have given the topic "mysql-books" in my sink curl for sink connector since changes are being published to this topic ,but this is not coming up and giving errors like - 1.This is with respect to the basic streams example you showed - Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id 3 -
      2. Error from my objective -
      [2021-04-28 18:14:48,231] ERROR WorkerSinkTask{id=sink-jdbc-mysql-02-json-0} Error converting message value in topic 'mysql-books' partition 0 at offset 0 and timestamp 1619602271548: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask:547)
      org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

    • @rmoff
      @rmoff  Před 3 lety +1

      @@vishalmalhotra2243 Please post this at forum.confluent.io/ and I can help you there :)

  • @rishikesanravichandran9914

    So when converting plain JSON to JSON schema using KsqlDB stream, can we specify only certain fields? In my case, I have a nested JSON for more than 100 field values, so do I have to explicitly mention each of them while creating a KsqlDB stream?

    • @rmoff
      @rmoff  Před 3 lety

      Yes, if you want to use them. If you have 100 on the input message but only want to use 5 of them you can just specify those 5, but the remainder will be dropped from any output message that you create

    • @rmoff
      @rmoff  Před 3 lety

      This is why using plain JSON is kind of a pain, because you end up with the situation of having to manually enter schemas. Whatever is producing that nested JSON of 100 values should instead serialise the data onto Kafka using Avro/Protobuf/JSONSchema.

    • @rishikesanravichandran9914
      @rishikesanravichandran9914 Před 3 lety +1

      @@rmoff Thanks for the clarification. BTW, your video was very informative.

  • @MrNiceseb
    @MrNiceseb Před 4 lety +1

    Any changes needed for connection to Postgres instead of mysql?

    • @rmoff
      @rmoff  Před 4 lety

      You will need to amend the JDBC URL, as well as making sure that the Postgres JDBC driver is installed.

    • @dmitriisergeev306
      @dmitriisergeev306 Před 3 lety

      @@rmoff you show on 6:44 that we have already Postgres driver. Is it not enough ?

    • @rmoff
      @rmoff  Před 3 lety

      @@dmitriisergeev306 Correct, the Postgres JDBC Driver ships with it, so you should not need to install it again.

  • @vinaygold20
    @vinaygold20 Před 2 lety

    Hi Robin, regarding 'blacklist : "COL2"'. It is loading NULL incase of insert. BUT, IF i need to load the value while inserting and DO NOT want to update. then ? what should be the configuration ?

    • @rmoff
      @rmoff  Před 2 lety

      Please ask this question at forum.confluent.io/. Thanks!

  • @Pinaldba
    @Pinaldba Před 4 měsíci

    @Robin, I have Azure Synapse Sync JDBC driver kafka-connect-azure-sql-dw-1.0.7.jar in the path /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc but still I get error - Caused by: java.sql.SQLException: No suitable driver found for "jdbc:sqlserver I have tried all possible options no of the option worked.

  • @thetrilbies1
    @thetrilbies1 Před 2 lety

    I tried to replicate the steps in 1:51. When inserting a row into TEST01, I get a column name ROWKEY does not exists.

    • @arunbhat105
      @arunbhat105 Před rokem

      same error for me too. were u able to solve the issue ?

  • @vishnumurali522
    @vishnumurali522 Před 4 lety

    Hi Robin
    I understand this tutorial clearly..it is very clear and easy to understand....But I am having a doubt.
    U r using Ksql to create a stream from existing topic and change it to type Avro and create a new topic
    I am not using KSQL so how can I do this without creating streams..

    • @rmoff
      @rmoff  Před 4 lety

      If you're not using ksqlDB and you want to apply a schema to the data you'll need to find another route, such as writing your own Kafka Streams application to do this.
      That's why it's always better to get the *producer* of the data to use a serialisation method that supports schemas (Avro, Protobuf, etc).

    • @vishnumurali522
      @vishnumurali522 Před 4 lety

      @@rmoff so as u said I am having my springboot application and configure that to use AvroSerializer for value converter thn if I send normal json data it will store in DB?
      Or else we need to add something else?

    • @rmoff
      @rmoff  Před 4 lety

      @@vishnumurali522 Yes, if you write Avro from your source application then you can use the AvroConverter in the JDBC sink and it will work.
      If you've got any more questions then head over to the #clients or #connect channel on cnfl.io/slack :-)

  • @SirKober
    @SirKober Před 2 lety

    Isn't there a way to make a JDBC Sink perform an INSERT IGNORE? If I use mode insert I always get duplicate errors. Can it really be that this case has not been considered? (MySQL dialect)

    • @rmoff
      @rmoff  Před 2 lety

      I'm not aware of this being an option. Does it work if you use `upsert` instead?

  • @naironviana
    @naironviana Před 3 lety

    Hi! I'm having trouble working with timestamp (date) values. Can't reflect changes in sink table if source table has a timestamp (with current_timestamp default). Is there any specific transformation to set in sink/source connectors to solve this problem? Thanks!

    • @rmoff
      @rmoff  Před 3 lety

      Hi, the best place to ask this is forum.confluent.io/ :)

  • @kiran0711
    @kiran0711 Před 4 lety

    Hi Robin,
    I am a great fan of your KAFKA JDBC source and sink connector. We are right now facing a challenge where in using the KAFKA JDBC Connectors we are unable to connect to ORACLE Database which is in Cloud Kerberoized Environment . Any video or details would be a great help .

    • @rmoff
      @rmoff  Před 4 lety

      Hi Kiran, this isn't a configuration I've tried, sorry. You could try asking at cnfl.io/slack.

  • @shashanksrivastava5654

    Hi Robin, Is it possible to save single topic data into multiple table of Postgres

    • @rmoff
      @rmoff  Před 3 lety

      Yes, you can create multiple connectors reading from the same topic and route it to different tables using an Single Message Transform like RegexRouter (see rmoff.net/2020/12/11/twelve-days-of-smt-day-4-regexrouter/)

  • @dmitrysergeev1845
    @dmitrysergeev1845 Před 3 lety

    For me it is not working . I have sink-jdbc-mysql-01 | SINK | io.confluent.connect.jdbc.JdbcSinkConnector | WARNING (0/1 tasks RUNNING) . How i can fix ?

    • @rmoff
      @rmoff  Před 3 lety

      You need to look at the Kafka Connect worker log to find out the actual error. CZcams comment threads aren't the easiest place to help debug, so head over to #connect on cnfl.io/slack

  • @aidangmccarthy
    @aidangmccarthy Před 4 lety

    Hi Robin, Thanks for sharing. Can you use the Protobuff format from 5.5 instead of Avro? Also for MySQL, I notice you are using version 8. Will this also work on MySQL 5.6 or 5.7? Many thanks, Aidan

    • @rmoff
      @rmoff  Před 4 lety +1

      Yes as of Confluent Platform 5.5 you can use Avro, Protobuf, or JSON Schema
      It'll work fine with earlier version of mySQL too.

  • @shikhachawla7259
    @shikhachawla7259 Před 4 lety

    Thanks for sharing it and I am following your videos a lot for learning Kafka, I have tried setting up JDBC sink connector to insert into SQL server with the batch size of 500, but it inserts into SQL server one by one rather than in batches which has a negative impact on SQL Server IO, Is something you can suggest to get the batch insertion working?Will look forward to your response. Thanks

    • @rmoff
      @rmoff  Před 4 lety +1

      Hi, the best place to ask this is on:
      → Mailing list: groups.google.com/forum/#!forum/confluent-platform
      or
      → Slack group: cnfl.io/slack

  • @sarathbaiju6040
    @sarathbaiju6040 Před 2 lety

    Hi i have question regarding the automatic table creation in the sink connector, how we can define the custom colum name the sink connector configuration.
    for eg: by default the column name is after_userId, after_firstName . i want to change it to UserId and FirstName. how we can do this in connector configuration?

    • @rmoff
      @rmoff  Před 2 lety +1

      Hi Sarath, this is a great question to ask over at forum.confluent.io/ :)

    • @sarathbaiju6040
      @sarathbaiju6040 Před 2 lety

      @@rmoffthanks for the reply, i got the answer. I use transform (smt) to achieve my requirement

  • @prateekshajoshi4539
    @prateekshajoshi4539 Před 2 lety

    How to connect Kafka with MySQL database?

    • @rmoff
      @rmoff  Před 2 lety

      You're on the right video! The Kafka Connect JDBC Sink enables you to stream data from Kafka to MySQL. If you have more questions do head over to forum.confluent.io/

  • @MahmoudShash
    @MahmoudShash Před 3 lety

    hi @Robin just asking if the kafka connect works with Cloudera kafka and if I can use it in Production.

    • @rmoff
      @rmoff  Před 3 lety

      Kafka Connect is part of Apache Kafka. It's widely used in Production. I'm not familiar with what Cloudera support; you'd need to check with them.

  • @shuchikumari8031
    @shuchikumari8031 Před 2 lety

    Can you plz help me reagarding how to connect mysql to kafka. If it is possible can you plz share any documentation Or link regarding this.
    Thank you .

    • @JordanCricketMoore
      @JordanCricketMoore Před 2 lety

      You may want to start at the Debezium documentation

    • @rmoff
      @rmoff  Před 2 lety

      If you want to get data from MySQL into Kafka then check out rmoff.dev/no-more-silos

  • @user-ow6jk5ix5q
    @user-ow6jk5ix5q Před 4 lety

    Is there a good way to diagnose why a connector becomes degraded? Is it ok to use a symlink to put the jar under the jdbc drivers folder?

    • @rmoff
      @rmoff  Před 4 lety

      1. Diagnosing connector issues - look at the Kafka Connect worker log
      2. symlink - I don't know, try it :)

    • @rmoff
      @rmoff  Před 4 lety

      For further help, head to #connect on cnfl.io/slack

  • @talahootube
    @talahootube Před 4 lety

    Great video, many thanks. May i get your document in reference of this? Thx

    • @rmoff
      @rmoff  Před 4 lety +1

      You can find the JDBC Sink connector docs here: rmoff.dev/01r

    • @talahootube
      @talahootube Před 4 lety

      @@rmoff ok, thanks ya

  • @nirmaladesai6964
    @nirmaladesai6964 Před 3 lety

    Do we need any settings in oracle database to start with?

    • @rmoff
      @rmoff  Před 3 lety

      You'll need a user with the appropriate permissions to write to the table(s). If you've got any more questions then head over to forum.confluent.io/ :)

  • @vignesh9458
    @vignesh9458 Před 3 lety

    @Robin Moffatt Is it possible to store all the kafka data to a single column as a json or AVRO ? can you please help?

    • @rmoff
      @rmoff  Před 3 lety

      Do you mean push the complex value of the Kafka message into a single target field in the database? If so, no I don't think this is supported.
      For more questions, head to forum.confluent.io/

    • @vignesh9458
      @vignesh9458 Před 3 lety

      @@rmoff exactly Robin. My requirement is to read the Json data from kafka and store it exactly as it is in a single column

    • @rmoff
      @rmoff  Před 3 lety +1

      @@vignesh9458 I think you would need to pre-process the topic and embed the message in a field first (since the data has to be serialised with a schema).
      If you would like to post this as a question over at forum.confluent.io/ I can try and answer properly there.

    • @vignesh9458
      @vignesh9458 Před 3 lety

      @@rmoff sure I will post there.

  • @sumitpratap999
    @sumitpratap999 Před 3 lety

    How to use , "pk.mode":"none", using DB sequence?

    • @rmoff
      @rmoff  Před 3 lety +1

      @Sumit Can you post full details of your question to forum.confluent.io? I can answer it over there. Thanks!

    • @sumitpratap999
      @sumitpratap999 Před 3 lety +1

      @@rmoff Done, "JDBC Sinc connector using DB Sequence for Primary key". Thanks for reply.

  • @abhrasarkar8040
    @abhrasarkar8040 Před 3 lety

    Hello sir, I couldn't find my 'mysql-connector-java-8.0.20.jar' in this folder 'confluent-hub-components/confluentinc-kafka-connect-jdbc'

    • @rmoff
      @rmoff  Před 3 lety +1

      You need to install it yourself. Have a look at rmoff.dev/fix-jdbc-driver-video.

    • @abhrasarkar8040
      @abhrasarkar8040 Před 3 lety

      @@rmoff I've downloaded that jdbc connector and copied into that particular path using this command 'docker cp /home/foldername/Downloads/mysql-connector-java-8.0.22/. kafka-connect:/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/mysql-connector-java-8.0.22.' i'm not getting any error message but mysql db is not updating and also my connector is showing 'warning 0/1' status. Is there any chat interface where i can have a brief discussion about this with you? I am new to kafka. please help. Thanks in advance.

    • @rmoff
      @rmoff  Před 3 lety +1

      ​@@abhrasarkar8040
      The best place to ask any further questions is:
      → Slack group: cnfl.io/slack
      or
      → Mailing list: groups.google.com/forum/#!forum/confluent-platform

    • @abhrasarkar8040
      @abhrasarkar8040 Před 3 lety +1

      @@rmoff Thank you so much. I think i found the problem. This link is not working 'cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.19.tar.gz'. So I'm getting the file from this link 'dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.22.tar.gz'. Is it fine?

    • @abhrasarkar8040
      @abhrasarkar8040 Před 3 lety

      @@rmoff
      Hello Robin! I am getting this error
      " java.sql.SQLSyntaxErrorException: Access denied for user 'connect_user'@'%' to database 'demo'".
      Can you please help me out with this?

  • @ashrafkhaled9790
    @ashrafkhaled9790 Před 4 lety

    How to do this with java

    • @rmoff
      @rmoff  Před 4 lety +2

      That's the point - you wouldn't. Kafka Connect is the integration API for Apache Kafka, and what you should generally use for getting data in and out of Kafka from systems such as RDBMS. Learn more: rmoff.dev/ljc-kafka-02

  • @antoxin
    @antoxin Před 4 lety

    Very fast English but anyway thanks for informative video.

    • @rmoff
      @rmoff  Před 4 lety

      Glad it was useful - sorry if it's not always easy to follow :)

  • @coltennabers634
    @coltennabers634 Před 3 lety

    ur keyboard noises are driving me crazy dude fix ur mic

    • @rmoff
      @rmoff  Před 3 lety

      Yeah, it's kinda clackity isn't it. I've got a quieter one since I filmed that video :)

  • @robertoojeda3263
    @robertoojeda3263 Před rokem

    Hello Robin! Good Video, spanish : Tienes un video que se use un topico Dead Letter Queue donde guarde varios mensajes con diferentes schemas ,mejor dicho que use las propiedades confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy y TopicRecordNameStrategy

  • @AashishOla
    @AashishOla Před 2 lety

    I need your help regarding kafka connect jdbc sink performance issue. I have sent you a message on twitter. Looking forward to your response

  • @amrish331
    @amrish331 Před 2 lety

    Hi how to handle if you have array in avro schema with jdbcsinkconnector ? I am stuck with this . Please help

    • @rmoff
      @rmoff  Před 2 lety

      Hi, please post this at forum.confluent.io/ :)

  • @AashishOla
    @AashishOla Před 2 lety

    I need your help regarding kafka connect jdbc sink performance issue. I have sent you a message on twitter. Looking forward to your response

    • @rmoff
      @rmoff  Před 2 lety +1

      The best place to ask is www.confluent.io/en-gb/community/ask-the-community/