Robin Moffatt
Robin Moffatt
  • 39
  • 331 630
[Devoxx UK] 🚂 On Track with Apache Kafka: Building a Streaming ETL solution with Rail Data
As data engineers, we frequently need to build scalable systems working with data from a variety of sources and with various ingest rates, sizes, and formats. This talk takes an in-depth look at how Apache Kafka can be used to provide a common platform on which to build data infrastructure driving both real-time analytics as well as event-driven applications.
Using a public feed of railway data it will show how to ingest data from message queues such as ActiveMQ with Kafka Connect, as well as from static sources such as S3 and REST endpoints. We’ll then see how to use stream processing to transform the data into a form useful for streaming to analytics in tools such as Elasticsearch and Neo4j. The same data will be used to drive a real-time notifications service through Telegram.
If you’re wondering how to build your next scalable data platform, how to reconcile the impedance mismatch between stream and batch, and how to wrangle streams of data-this talk is for you!
⏱️Time codes
===========
00:00:00 Introduction
00:01:14 Trains as a source of event streams
00:01:48 Streaming ETL - overview
00:02:47 Streaming ETL with rail data - live visualisation demo
00:06:27 ksqlDB introduction and brief overview
00:07:24 Streaming ETL - ingest - detail
00:07:39 Data sources
00:08:13 Streaming data from ActiveMQ into Kafka
00:09:34 Ingesting data from Amazon S3 into Kafka
00:10:29 Ingesting data from CSV into Kafka
00:11:54 Ingest - recap
00:12:11 Streaming ETL - Transformation overview
00:12:45 Modeling rail data in an Entity-Relationship-Diagram (ERD)
00:14:01 Extracting and wrangling ActiveMQ messages in Kafka
00:15:12 Using ksqlDB to process messages from ActiveMQ
00:16:03 Splitting multiple message types out from a single topic in Kafka using ksqlDB
00:16:58 Joining events to lookup data with ksqlDB (stream-table joins)
00:18:03 Building composite keys in ksqlDB
00:18:11 CASE statement in ksqlDB for decoding values
00:18:36 Schemas in stream processing, and using ksqlDB to define and apply them
00:20:37 The role of the Schema Registry in streaming ETL
00:21:18 Transformation - recap
00:21:40 Using the transformed data
00:22:33 Kafka Connect overview
00:22:59 Streaming from Kafka to Elasticsearch
00:23:36 Kafka to RDBMS (PostgreSQL)
00:24:28 Do you *actually* need a database?
00:25:00 Building materialised views in ksqlDB
00:25:42 Kafka to S3
00:27:02 Kafka to Neo4j
00:27:36 Building real-time alerting using Kafka
00:29:33 Monitoring and Maintenance
00:30:26 Conclusion & Summary
Confluent Cloud
============
Confluent Cloud provides fully managed Apache Kafka, connectors, Schema Registry, and ksqlDB. Try it out and use code RMOFF200 for money off your bill: www.confluent.io/confluent-cloud/tryfree/?.devx_ch.rmoff_xHV1mGXV5Ds&
Resources & Links
============
📓Slides: talks.rmoff.net/6GsyFX/on-track-with-apache-kafka-building-a-streaming-etl-solution-with-rail-data
👾Demo code: rmoff.dev/kafka-trains-code-01
zhlédnutí: 1 886

Video

[Kafka Summit] 🚢 All at Sea with Streams - Using Kafka to Detect Patterns in the Behaviour of Ships
zhlédnutí 1,1KPřed 2 lety
This talk is from #KafkaSummit Americas 2021 📝 Abstract: The great thing about streams of real-time events is that they can be used to spot behaviours as they happen and respond to them as needed. Instead of waiting until tomorrow to find out what happened yesterday, we can act on things straight away. This talk will show a real-life example of one particular pattern that it's useful to detect-...
[DevSum 2021] Kafka as a Platform: the Ecosystem from the Ground Up
zhlédnutí 753Před 3 lety
Presented at DevSum 2021: www.devsum.se Kafka has become a key data infrastructure technology, and we all have at least a vague sense that it is a messaging system, but what else is it? How can an overgrown message bus be getting this much buzz? Well, because Kafka is merely the center of a rich streaming data platform that invites detailed exploration. In this talk, we’ll look at the entire st...
Kafka Connect JDBC sink deep-dive: Working with Primary Keys
zhlédnutí 12KPřed 3 lety
The Kafka Connect JDBC Sink can be used to stream data from a Kafka topic to a database such as Oracle, Postgres, MySQL, DB2, etc. This video explains how to configure it to handle primary keys based on your data using the `pk.mode` and `pk.fields` configuration options. ✍️ [Blog] Kafka Connect JDBC Sink deep-dive: Working with Primary Keys rmoff.net/2021/03/12/kafka-connect-jdbc-sink-deep-dive...
ksqlDB HOWTO: Handling Time
zhlédnutí 2,4KPřed 3 lety
When you do processing in ksqlDB that is based on time (such as windowed aggregations, or stream-stream joins) it is important that you define correctly the timestamp by which you want your data to be processed. This could be the timestamp that's part of the Kafka message metadata, or it could be a field in the value of the Kafka message itself. By default ksqlDB will use the timestamp of the K...
ksqlDB HOWTO: Split and Merge Kafka Topics
zhlédnutí 3,6KPřed 3 lety
Using ksqlDB you can split streams of data in Apache Kafka based on values in a field. You can also merge separate streams of data together into one. ksqlDB uses SQL to describe the stream processing that you want to do. For example: Splitting a stream: CREATE STREAM ORDERS_UK AS SELECT * FROM ORDERS WHERE COUNTRY='UK'; CREATE STREAM ORDERS_OTHER AS SELECT * FROM ORDERS WHERE COUNTRY!='UK'; Mer...
ksqlDB HOWTO: Reserialising data in Apache Kafka
zhlédnutí 2KPřed 3 lety
Using ksqlDB you can reserialise data in Apache Kafka topics. For example, you can take a stream of CSV data and write it to a new topic in Avro. ksqlDB supports many serialisation formats including Avro, Protobuf, JSON Schema, JSON, and Delimited (CSV, TSV, etc). ksqlDB uses SQL to describe the stream processing that you want to do. For example: CREATE STREAM ORDERS_CSV WITH (VALUE_FORMAT='DEL...
ksqlDB HOWTO: Integration with other systems
zhlédnutí 1,5KPřed 3 lety
Using ksqlDB you can pull data in from other systems (e.g. databases, JMS message queues, etc etc), and push data down to other systems (NoSQL stores, Elasticsearch, databases, Neo4j, etc etc). This is done using Kafka Connect, which can be run embedded within ksqlDB or as a separate cluster of workers. ksqlDB can be used to create and control the connectors. For example: CREATE SINK CONNECTOR ...
ksqlDB HOWTO: Stateful Aggregates
zhlédnutí 2,9KPřed 3 lety
Using ksqlDB you can build stateful aggregations of state on events in Apache Kafka topics. These are persisted as Kafka topics and held in a state store within ksqlDB that you can query directly or from an external application using the Java client or REST API. ksqlDB uses SQL to describe the stream processing that you want to do. For example: CREATE TABLE ORDERS_BY_MAKE AS SELECT MAKE, COUNT(...
ksqlDB HOWTO: Joins
zhlédnutí 2,8KPřed 3 lety
Using ksqlDB you can enrich messages on a Kafka topic with reference data held in another topic. This could come from a database, message queue, producer API, etc. ksqlDB uses SQL to describe the stream processing that you want to do. With JOIN clause you can define relationships between streams and/or tables in ksqlDB (which are built on topics in Kafka) For example: CREATE STREAM ORDERS_ENRIC...
ksqlDB HOWTO: Schema Manipulation
zhlédnutí 1,7KPřed 3 lety
Using ksqlDB you can manipulate a stream of data in Apache Kafka and write it to a new topic with transformations including: * Remove/drop fields * CAST datatypes * Reformat timestamps from BIGINT epoch to human-readable strings * Flatten nested objects (STRUCT) 💾 Run ksqlDB yourself: ksqldb.io?.devx_ch.rmoff_7pH5KEQiYYo& ☁️ Use ksqlDB as a managed service: www.confluent.io/confluent-cloud/tryf...
ksqlDB HOWTO: Filtering
zhlédnutí 1,8KPřed 3 lety
Using ksqlDB you can filter streams of data in Apache Kafka and write new topics in Kafka populated by a subset of another. ksqlDB uses SQL to describe the stream processing that you want to do. With the WHERE clause you can define predicates to filter the data as you require. For example: CREATE STREAM ORDERS_NY AS SELECT * FROM ORDERS WHERE ADDRESS_STATE='New York'; 💾 Run ksqlDB yourself: ksq...
🎄Twelve Days of SMT 🎄 - Day 12: Community transformations
zhlédnutí 2KPřed 3 lety
Apache Kafka ships with many Single Message Transformations included - but the great thing about it being an open API is that people can, and do, write their own transformations. Many of these are shared with the wider community, and in this final installment of the series I’m going to look at some of the transformations written by Jeremy Custenborder and available in kafka-connect-transform-co...
🎄Twelve Days of SMT 🎄 - Day 11: Filter and Predicate
zhlédnutí 2,7KPřed 3 lety
Apache Kafka 2.6 added support for defining predicates against which transforms are conditionally executed, as well as a Filter Single Message Transform to drop messages - which in combination means that you can conditionally drop messages. The predicates that ship with Apache Kafka are: * RecordIsTombstone - The value part of the message is null (denoting a tombstone message) * HasHeaderKey- M...
🎄Twelve Days of SMT 🎄 - Day 10: ReplaceField
zhlédnutí 1,1KPřed 3 lety
The ReplaceField Single Message Transform has three modes of operation on fields of data passing through Kafka Connect, either in a Source connector or Sink connector. * Include *only* the fields specified in the list (`whitelist`) * Include all fields *except* the ones specified (`blacklist`) * Rename field(s) (`renames`) 👾 Demo code and details: github.com/confluentinc/demo-scene/blob/master/...
🎄Twelve Days of SMT 🎄 - Day 9: Cast
zhlédnutí 768Před 3 lety
🎄Twelve Days of SMT 🎄 - Day 9: Cast
🎄Twelve Days of SMT 🎄 - Day 8: TimestampConverter
zhlédnutí 1,7KPřed 3 lety
🎄Twelve Days of SMT 🎄 - Day 8: TimestampConverter
🎄 Twelve Days of SMT 🎄 - Day 7: TimestampRouter
zhlédnutí 570Před 3 lety
🎄 Twelve Days of SMT 🎄 - Day 7: TimestampRouter
🎄Twelve Days of SMT 🎄 - Day 6: InsertField II
zhlédnutí 874Před 3 lety
🎄Twelve Days of SMT 🎄 - Day 6: InsertField II
🎄Twelve Days of SMT 🎄 - Day 5: MaskField
zhlédnutí 750Před 3 lety
🎄Twelve Days of SMT 🎄 - Day 5: MaskField
🎄Twelve Days of SMT 🎄- Day 4: RegexRouter
zhlédnutí 1,4KPřed 3 lety
🎄Twelve Days of SMT 🎄- Day 4: RegexRouter
🎄Twelve Days of SMT 🎄 - Day 3: Flatten
zhlédnutí 1,7KPřed 3 lety
🎄Twelve Days of SMT 🎄 - Day 3: Flatten
🎄Twelve Days of SMT 🎄 - Day 2: ValueToKey and ExtractField
zhlédnutí 2,7KPřed 3 lety
🎄Twelve Days of SMT 🎄 - Day 2: ValueToKey and ExtractField
🎄 Twelve Days of SMT 🎄 - Day 1: InsertField (timestamp)
zhlédnutí 10KPřed 3 lety
🎄 Twelve Days of SMT 🎄 - Day 1: InsertField (timestamp)
Exploring the Kafka Connect REST API
zhlédnutí 11KPřed 3 lety
Exploring the Kafka Connect REST API
From Zero to Hero with Kafka Connect
zhlédnutí 28KPřed 3 lety
From Zero to Hero with Kafka Connect
Kafka Connect in 60 seconds
zhlédnutí 14KPřed 3 lety
Kafka Connect in 60 seconds
Apache Kafka and ksqlDB in Action: Let's Build a Streaming Data Pipeline!
zhlédnutí 7KPřed 4 lety
Apache Kafka and ksqlDB in Action: Let's Build a Streaming Data Pipeline!
Building a Telegram bot with Apache Kafka, ksqlDB, and Go
zhlédnutí 2,6KPřed 4 lety
Building a Telegram bot with Apache Kafka, ksqlDB, and Go
Kafka Connect in Action: Loading a CSV file into Kafka
zhlédnutí 25KPřed 4 lety
Kafka Connect in Action: Loading a CSV file into Kafka

Komentáře

  • @mirshahrukh2934
    @mirshahrukh2934 Před 12 dny

    i have configured the oracle as a source with kafka with debezium and logminer, it is taking the snapshot but not streaming any changes made to the database, also i don't find any consumer offsets related to data taken through the snapshot, what i am missing here, can any one tell me please @Robin Moffat

  • @lalitshukla6585
    @lalitshukla6585 Před měsícem

    Thanks for this video it really helped. Just one thing I’m curious to know…is this S3 connector free to use ? I meant that sink connector is open source?

  • @jorgeluiscarhuaricaaguilar4849

    Hello Robin I was trying following your video but the voluble connector is not longer supported. So the step in 9 min is not possible to follow. Can you give another options to produce data like that connector. I also was trying to use the kafka-console-producer command but I could not get data inserted into my bucket yet.

    • @rmoff
      @rmoff Před 2 měsíci

      Check out shadowtraffic.io - it's what Voluble was initially conceived as.

  • @walterferreiradossantos2378

    Robin i need your help. I´m new on development with kafka. So i have kafka connect with cdc as consumer and producer with jdbc connect sink. I can to to a upsert correctly, but i can´t to do work the delete operation. It is possible to use jdbc connect sink for make work all operations like insert, update and delete...? Can you help me please with a example kafka connect sql server to sql server without use debezium?

  • @jeremykenn
    @jeremykenn Před 2 měsíci

    create stream test02 (col1 int, col2 varchar) with (kafka_topic='test02', partitions=1, value_format='AVRO'); only returned col1 and col2, i don't see the default rowtime and rowkey ksql 7.3.2

  • @georgelza
    @georgelza Před 3 měsíci

    realized this is a "old'ish" video... you dont show at any time how you started your kafkacat container, also of course now kafkacat has been replaced/renamed to kcat

  • @Saikrishnabollabathini
    @Saikrishnabollabathini Před 3 měsíci

    Hi, Is there a way to modify the payload as needed? For example, if the kafka message is {"C":1, "C2":"Test"}, In elastic search, the document need to be created as {"C":1, "searchFields":{"C2":"Test"}} Is there a way to achieve this?

  • @user-ld8op1lb1p
    @user-ld8op1lb1p Před 3 měsíci

    So Amazing

  • @georgelza
    @georgelza Před 3 měsíci

    why why oh why have i not found this before.... would have saved me so much grieve, hehehe... now to figure out why my local cp kafka connect does not want to sink to MongoAtlas hosted collection...

  • @user-zc7vb8mc8s
    @user-zc7vb8mc8s Před 3 měsíci

    Awesome content Robin. Thanks. One question, if possible. In a topic where I can't guarantee the order of the messages, does changing timestamp for TABLE, solves the problem with late messages? The table records should reflect the last event, based not on the topic arrival, but in the event it self ...

  • @amuse_me
    @amuse_me Před 3 měsíci

    Where can I find the documentation that you used? I really appreciate any help you can provide.

  • @LMGaming0
    @LMGaming0 Před 4 měsíci

    Just wondering if there is already a video of creating a custom partionner and use it in my connector? I've been struggling to day to implement that since I'm not a Java guy .. thanks

  • @Pinaldba
    @Pinaldba Před 4 měsíci

    @Robin, I have Azure Synapse Sync JDBC driver kafka-connect-azure-sql-dw-1.0.7.jar in the path /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc but still I get error - Caused by: java.sql.SQLException: No suitable driver found for "jdbc:sqlserver I have tried all possible options no of the option worked.

  • @AjayMatai
    @AjayMatai Před 4 měsíci

    Is there a Kafka connector that would read from Kafka, do some transforms and write back to another Kafka topic?

  • @LMGaming0
    @LMGaming0 Před 4 měsíci

    4 years later, still usefull

  • @shibimukesh9369
    @shibimukesh9369 Před 4 měsíci

    Using mongodb sink connector tried to flatten message but iam getting error

  • @shibimukesh9369
    @shibimukesh9369 Před 4 měsíci

    How to handle nested document..only root level is working

  • @vivekshah1664
    @vivekshah1664 Před 5 měsíci

    Hi Robin, I am a software engineer at a startup. Last year we build a pipeline to sync our postgres data to elasticsearch and cassandra. It was all custom java code with lot of operational handling. Thank you for this video, I am planning to use connect for those pipelines.

  • @timothyvandermeer9845
    @timothyvandermeer9845 Před 5 měsíci

    FANTASTIC. thank you brother

  • @Abhijit_Patra
    @Abhijit_Patra Před 5 měsíci

    I am trying to do this but I get a null value.

  • @chintanthakker4129
    @chintanthakker4129 Před 5 měsíci

    I tried in MongoSinkConnector, I am seeing a failure with this message DataException: Only Map objects supported in absence of schema for [field insertion], fund java.lang.String \t in requireMap. Correct me, do I need to use value.convertor with value as JsonConvertor?

  • @user-qd4nw5mi6y
    @user-qd4nw5mi6y Před 5 měsíci

    I'm attempting to create a stream from a multi schema topic, similar to the train example at the end of this video. I would then split into multiple streams/topics for each message type. However, I can't seem to create the stream in a way that it's populated with the messages from the multi schema topic. Is there an actual example you can reference for this scenario?

  • @wongsiewwai
    @wongsiewwai Před 6 měsíci

    thank you.

  • @nesreenmohd665
    @nesreenmohd665 Před 6 měsíci

    Thanks

  • @sivakumarm1381
    @sivakumarm1381 Před 7 měsíci

    Thank you Robin for excellent explanation. In all your examples you're using with auto.create option with true , where I was trying to auto.create with false where I am creating the table before hand but I am getting error as below. I have tried all possible option. Will you able to help . Thanks in advance. Caused by: io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table "table_name" is missing and auto-creation is disabled

  • @Agrossio
    @Agrossio Před 9 měsíci

    Great Video!! Will this work for processing a csv with 1.000.000 registries ?? Would it last less than an hour to save it in an Oracle Database??

  • @karthikb.s.k.4486
    @karthikb.s.k.4486 Před 9 měsíci

    Thank you for tutorials Robin . If the schema changes frequently based on business requirements then each time we have to drop the stream and create it . Please let me know

  • @raghavankasthuri2908
    @raghavankasthuri2908 Před 11 měsíci

    Hi @Robin Moffatt, Many thank you. Request clarification if in my below example, I'm thinking of Kafka implementation correctly please: Say a train route is A-B-C-D-E-F-G-H. Train 1 starts at station A and stops only in stations C and F to reach destination H. In the above, in the dashboard display on stations C, F and H, if we want to display where is the train?(Example: train has already left station A and is between station B and C). To display the highly volatile information when the train is on the move, we use Apache Streams API? In other words, as kafka updates the topic (where is the train currently), the implementation should call subscribe() and poll() methods to pull data from partition every so often and display the message in real-time "Train is presently between station B and C and is running 6 minutes late". I shall much appreciate your confirmation that I am thinking of a right example please.

    • @rmoff
      @rmoff Před 11 měsíci

      Hi, yes I think you would use some stream processing logic to do this, I guess with some kind of timer to trigger since there'd be no event as such until it was next received from the source.

    • @raghavankasthuri2908
      @raghavankasthuri2908 Před 11 měsíci

      Hi @@rmoff Thank you. I shall be grateful to confirm kafka streaming is actually about an event that could change it's state/value over a period of time. In other words, the event here is "where is the train". The dashboard displaying between station B and C and later between C and D. Due to arrive in 5 minutes and later due in 2 min/1 min. Shall I be right in my assumption: Kafka streaming is used for streaming the data pertaining to an event which changes state and value every so often please? Many thank you.

    • @RobertMetzger
      @RobertMetzger Před 10 měsíci

      @@raghavankasthuri2908 You'd probably model this in a way that each train is emitting events regularly, or when they arrive in stations. Kafka streams (or any other stream processor) would subscribe to those stream events and emit arrival time estimations for all the relevant stations.

    • @raghavankasthuri2908
      @raghavankasthuri2908 Před 10 měsíci

      @@RobertMetzger many thank you. much appreciated.

  • @dswan01
    @dswan01 Před 11 měsíci

    So I assume you can use ksql to do transformation of data and then pass to stream to update target database. Is this correct?

  • @nejbahadnane7517
    @nejbahadnane7517 Před 11 měsíci

    Thanks 🙏 great video

    • @rmoff
      @rmoff Před 11 měsíci

      Thanks, glad you liked it :)

  • @sdon1011
    @sdon1011 Před rokem

    Very interesting series of videos. Very helpful. A little remark: at 38:58, it seems that the order value to be inserted was way higher that the currently displayed maximum (22190899.73 vs 216233.09) and still this value was not updated.

  • @shibilpm9873
    @shibilpm9873 Před rokem

    Why every one using conflunet kafka thsi and that, I wanted to do it in production and confluent kafka is not open source. Can anyone suggest any article or video to refer, I want to load csv or json file to kafka as a table.

  • @user-br1tt1wf7v
    @user-br1tt1wf7v Před rokem

    Great stuff , You help me understand it easily . Thx

  • @hamoudy41
    @hamoudy41 Před rokem

    My data from Kafka is schema-less (string key, json values). I want to sink them to postgres. Any tips? Do I need schema registry? what converters transformers(if any) do i need?

  • @mukeshdharajiya1261

    Thank you so much for your best explanation 🏆

    • @rmoff
      @rmoff Před 11 měsíci

      Glad it was helpful!

  • @MohamedAyman-wu6gk
    @MohamedAyman-wu6gk Před rokem

    Hi Robin, I'm working with Strimzi Kafka on openshift, and when I pulled the images for KSQL server and KSQL cli with 0.15.0 versions I got this error while writing any command.: The server has encountered an incompatible entry in its log and cannot process further DDL statements. This is most likely due to the service being rolled back to an earlier version. and the problem that when I'm using newer versions or the latest version I got the below error when I'm trying to identify fields as KEY or PRIMARY KEY: KSQL currently only supports KEY columns named ROWKEY, extraneous input 'PRIMARY.' so im struggiling to deal with keys and i have to go live soon so please kindly your support. Thanks in advance.

    • @rmoff
      @rmoff Před rokem

      Hi, A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.

  • @piusono
    @piusono Před rokem

    Thanks for the video. Though it is about 2 years old, I still find it useful. However, any idea why the error "Failed to create connector: {"error_code":409,"message":"Connector SINK_FOO_01_0 already exists"}" when I run for the first time, CREATE SINK CONNECTOR SINK_FOO_01_0 WITH (...) following your steps? Everything else worked up to that point.

    • @rmoff
      @rmoff Před rokem

      Hi, I don't work with Kafka directly anymore. A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.

  • @scr33tch
    @scr33tch Před rokem

    Thanks Robin. Your videos are by far the best, most detailed kafka resources.

    • @rmoff
      @rmoff Před 11 měsíci

      Thank you! :)

  • @marcorossi2268
    @marcorossi2268 Před rokem

    What if i want to shred the json into submessages and publish each section to a different topic that ends into a different destanation table

    • @rmoff
      @rmoff Před rokem

      Hi, I don't work with Kafka directly anymore. A good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.

  • @Evkayne
    @Evkayne Před rokem

    thank you

  • @user-rd4ih1uw8p
    @user-rd4ih1uw8p Před rokem

    Great tutorial and very helpful! Thanks for this

  • @emmanuelharel
    @emmanuelharel Před rokem

    Hello, does anyone know if it is possible to write to the WAL of a postgres database from ksql ?

    • @emmanuelharel
      @emmanuelharel Před rokem

      @@RobinNMoffatt to implement outbox pattern from sql within a transaction. So it would mean for example with postgres to use pg_logical_emit_messages function to store the message at the same time as storing an entity into a table. But all this from ksql or flinksql. Does it make sense ?

  • @robertoojeda3263
    @robertoojeda3263 Před rokem

    Hello Robin! Good Video, spanish : Tienes un video que se use un topico Dead Letter Queue donde guarde varios mensajes con diferentes schemas ,mejor dicho que use las propiedades confluent.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy y TopicRecordNameStrategy

  • @alexandersk.8963
    @alexandersk.8963 Před rokem

    Amazing materials for me, Robin, thank you a lot!

  • @tanakpek6270
    @tanakpek6270 Před rokem

    Hi Robin, Awesome video! I really appreciate this. I was wondering, I did these two smts and had JSON_SR configured everywhere. but the key was always prepended with a \" and appended with an "\ between the quotes. What is causing this

    • @rmoff
      @rmoff Před rokem

      Hi, glad you liked the video! a good place to go for help is www.confluent.io/en-gb/community/ask-the-community/. thanks.

  • @rushij6874
    @rushij6874 Před rokem

    "Great video, [Robin]! I really appreciated the clear and concise way you explained concepts . Your examples were also really helpful in solidifying my understanding of the kafka use case. Overall, I think you did an excellent job and I look forward to seeing more of your videos in the future. Keep up the great work!"

  • @mathiasyeremiaaryadi9097

    Can we make the insert query persistent ? So then it will be running automatically, not at that time when we write the query

  • @bonolomokgadi9025
    @bonolomokgadi9025 Před rokem

    Hi Robin. I'm seeing the below error. Key converter is in StringConverter "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:220) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142) \tat org.apache.kafka.connect.runtime.TransformationChain.transformRecord(TransformationChain.java:70) \tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) \tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:357) \tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:271) \tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200) \tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255) \tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) \tat java.util.concurrent.FutureTask.run(FutureTask.java:266) \tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) \tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) \tat java.lang.Thread.run(Thread.java:750) Caused by: org.apache.kafka.connect.errors.DataException: Field does not exist: ENTITY_NO \tat org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:89) \tat org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67) \tat org.apache.kafka.connect.runtime.TransformationChain.lambda$transformRecord$0(TransformationChain.java:70) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200) \t... 12 more " Below is the config "key.ignore":"false", "transforms": "CastToInt64,CastToString,copyIdToKey,extractKeyFromStruct", "transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.copyIdToKey.fields": "ENTITY_NO", "transforms.extractKeyFromStruct.type": "org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractKeyFromStruct.field": "ENTITY_NO", "transforms.CastToInt64.spec": "REQUESTING_ENTITY_NO:int64", "transforms.CastToInt64.type": "org.apache.kafka.connect.transforms.Cast$Value", "transforms.CastToString.spec": "REQUESTING_ENTITY_NO:string", "transforms.CastToString.type": "org.apache.kafka.connect.transforms.Cast$Value",

  • @carlaguelpa4574
    @carlaguelpa4574 Před rokem

    @rmoff, thanks for your videos, are the best! We have a situation: We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have: 1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3 1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each) 1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task 1 Source Connector per table (4 tables) 1 Sink Connector per Table (4 tables) We are not using Schema Registry The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds. Mesage size = 6kb Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors) When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great Resources are OK, source and target environments have pleanty of CPU and Memory. When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again. Question, what can we do to improve the consumer performance? Could you help me?

  • @carlaguelpa4574
    @carlaguelpa4574 Před rokem

    @rmoff, thanks for your videos, are the best! We have a situation: We have implemented Debezium with kafka, we are experience a performance issue when both, source and sink connectors, are working at the same time the consumer performance decrease significantly, here some number of what we have: 1 Kafka Cluster with 3 nodes (8GB ram -Xms: 4096m -Xmx: 4096m each) - All the topics with Replication Factor = 3 1 Zookeper Cluster with 3 nodes (4GB RAM -Xms: 2048m -Xmx: 2048m each) 1 Connect Cluster with 5 nodes (12GB RAM c/u - -Xms: 6168m -Xmx: 6168m) with 1 Partition and 1 task but we have tried with 1 / 5 / 100 partitions and 1 / 5 / 15 / 100 task 1 Source Connector per table (4 tables) 1 Sink Connector per Table (4 tables) We are not using Schema Registry The problem: the Target DB has a delay that increase when the quantity of messages is more than 750msjs per seconds. Mesage size = 6kb Now we are processing 2000 msjs of 6kb per second (This was the best performance that we get and it was enabling ONLY the sink connectors) When we enabled only the sink connectors the performance is good, the same with source conectors with no sinks, performance is great Resources are OK, source and target environments have pleanty of CPU and Memory. When we see the messages consume per second there is like a wait for 30sec and then it consume some msjs and wait and then again. Question, what can we do to improve the consumer performance? Could you help us?