Summary
Bài viết này khám phá cách xử lý dữ liệu IoT từ Kafka đến Apache Paimon thông qua Flink CDC, mang đến cho người đọc cái nhìn sâu sắc và thực tiễn về quy trình chuyển đổi dữ liệu. Qua đó, tôi cảm nhận được tầm quan trọng của việc tối ưu hóa luồng dữ liệu để tăng cường khả năng ra quyết định trong các hệ thống IoT hiện đại. Key Points:
- **Tối ưu hóa luồng dữ liệu với Flink CEP:** Bằng cách sử dụng Flink CEP, bạn có thể phát hiện các sự kiện phức tạp trong dòng dữ liệu như nhiệt độ bất thường từ cảm biến. Tôi đã thử nghiệm điều này và thấy rằng nó giúp tạo ra những thông tin giá trị hơn rất nhiều.
- **Chuyển đổi sang kiến trúc microservices:** Việc áp dụng kiến trúc microservices không chỉ cải thiện tính mở rộng mà còn giúp dễ dàng bảo trì và tái sử dụng ứng dụng IoT. Mỗi microservice đảm nhiệm một chức năng cụ thể, giúp hệ thống linh hoạt hơn khi đáp ứng nhu cầu ngày càng cao.
- **Sử dụng Kafka Schema Registry:** Để đảm bảo tính toàn vẹn của dữ liệu, việc quản lý schema bằng Kafka Schema Registry là thiết yếu. Tôi nhận thấy rằng điều này giúp tránh được nhiều lỗi không tương thích và duy trì sự nhất quán trong toàn bộ hệ thống.
Tổng quan về việc sử dụng Kafka và Flink CDC để lưu trữ dữ liệu IoT
Dòng dữ liệu IoT từ Kafka qua khung Flink CDC Action, lưu trữ vào Apache Paimon trên S3 - Phần 1
Dữ liệu IoT đóng gói dạng JSON được đẩy vào Topic Kafka, trích xuất bằng Flink CDC thông qua khung công việc Flink action rồi lưu vào Apache Paimon. (17/03/2025)
**Tổng quan**
Trong một [blog] trước đây, chúng tôi từng hướng dẫn cách đưa dữ liệu IoT đóng gói JSON vào MongoDB Collection, sau đó sử dụng khung [flink_paimon_action] của Apache Flink để CDC nguồn dữ liệu và chèn vào cơ sở dữ liệu [Apache Paimon]. Lần này, chúng ta sẽ thử nghiệm cách khác: đẩy cùng loại dữ liệu đó lên Topic Kafka của Confluent, rồi tận dụng chính khung [flink_paimon_action] để [CDC] trích xuất trực tiếp từ Topic và lưu thẳng vào file [Apache Parquet] trong bảng [Apache Paimon] được lưu trữ trên S3 thông qua server [MinIO].
Một chút giải thích thêm cho rõ: Kafka ở đây hoạt động như hệ thống thu thập và phân phối dữ liệu theo thời gian thực, trong khi Flink CDC có khả năng bắt các thay đổi dữ liệu (change data capture) cực nhạy. Cách tiếp cận này đặc biệt hữu ích cho các ứng dụng như giám sát môi trường hay quản lý chuỗi cung ứng, nơi cần xử lý luồng dữ liệu liên tục với độ trễ thấp.
Đoạn trên chỉ là phần mở đầu thôi, chúng tôi sẽ đi sâu vào chi tiết kỹ thuật ở các phần sau.
Dữ liệu IoT đóng gói dạng JSON được đẩy vào Topic Kafka, trích xuất bằng Flink CDC thông qua khung công việc Flink action rồi lưu vào Apache Paimon. (17/03/2025)
**Tổng quan**
Trong một [blog] trước đây, chúng tôi từng hướng dẫn cách đưa dữ liệu IoT đóng gói JSON vào MongoDB Collection, sau đó sử dụng khung [flink_paimon_action] của Apache Flink để CDC nguồn dữ liệu và chèn vào cơ sở dữ liệu [Apache Paimon]. Lần này, chúng ta sẽ thử nghiệm cách khác: đẩy cùng loại dữ liệu đó lên Topic Kafka của Confluent, rồi tận dụng chính khung [flink_paimon_action] để [CDC] trích xuất trực tiếp từ Topic và lưu thẳng vào file [Apache Parquet] trong bảng [Apache Paimon] được lưu trữ trên S3 thông qua server [MinIO].
Một chút giải thích thêm cho rõ: Kafka ở đây hoạt động như hệ thống thu thập và phân phối dữ liệu theo thời gian thực, trong khi Flink CDC có khả năng bắt các thay đổi dữ liệu (change data capture) cực nhạy. Cách tiếp cận này đặc biệt hữu ích cho các ứng dụng như giám sát môi trường hay quản lý chuỗi cung ứng, nơi cần xử lý luồng dữ liệu liên tục với độ trễ thấp.
Đoạn trên chỉ là phần mở đầu thôi, chúng tôi sẽ đi sâu vào chi tiết kỹ thuật ở các phần sau.
Quá trình tạo dữ liệu IoT từ ứng dụng Python
Đây sẽ là một quy trình IoT rất thực tế mà bạn thường gặp trong thực tế. Dữ liệu của chúng tôi được tạo ra thông qua chương trình IoT bằng Python (nằm trong thư mục _app_iot1/_ và đã được container hóa) được phát triển từ bài viết trước.
- Ứng dụng đầu tiên _app_iot1_ tạo ra payload JSON IoT đơn giản nhất bằng cách đặt các giá trị _TSHUMAN, STRUCMOD & DEVICETYPE_ = 0.
- Trong _app_iot2_, chúng tôi mở rộng payload để bao gồm _TSHUMAN_=1 và _STRUCMOD_=1. Điều này bổ sung thêm trường ngày tháng dễ đọc cho con người và đối tượng vị trí vào phần metadata.
Một số điểm có thể cải thiện trong quá trình này bao gồm: tối ưu hóa việc thu thập dữ liệu, sử dụng các thư viện như `paho-mqtt` để truyền dữ liệu qua giao thức MQTT giúp giảm độ trễ, hay áp dụng các giải pháp nén dữ liệu để tiết kiệm băng thông. Việc xác thực và ghi log dữ liệu cũng rất quan trọng để đảm bảo tính toàn vẹn của hệ thống.
- Ứng dụng đầu tiên _app_iot1_ tạo ra payload JSON IoT đơn giản nhất bằng cách đặt các giá trị _TSHUMAN, STRUCMOD & DEVICETYPE_ = 0.
- Trong _app_iot2_, chúng tôi mở rộng payload để bao gồm _TSHUMAN_=1 và _STRUCMOD_=1. Điều này bổ sung thêm trường ngày tháng dễ đọc cho con người và đối tượng vị trí vào phần metadata.
Một số điểm có thể cải thiện trong quá trình này bao gồm: tối ưu hóa việc thu thập dữ liệu, sử dụng các thư viện như `paho-mqtt` để truyền dữ liệu qua giao thức MQTT giúp giảm độ trễ, hay áp dụng các giải pháp nén dữ liệu để tiết kiệm băng thông. Việc xác thực và ghi log dữ liệu cũng rất quan trọng để đảm bảo tính toàn vẹn của hệ thống.
Extended Perspectives Comparison:
Tiêu đề | Nội dung |
---|---|
Cơ chế vận hành | Hệ thống tự động giám sát các thay đổi về cấu trúc dữ liệu khi có dữ liệu mở rộng. |
Tùy chỉnh tham số | Có thể điều chỉnh thời gian trễ và ngưỡng lỗi tùy theo nhu cầu cụ thể. |
Dữ liệu tham khảo | Hiểu rõ cách các cảm biến IoT phát sinh nhiều loại dữ liệu khác nhau để xử lý hiệu quả hơn. |
Quy trình đăng ký nhận dữ liệu | Kích hoạt quy trình từ topic nguồn (_factory_iot_) và đẩy vào bảng Apache Paimon. |
Schema evolution | Hệ thống vẫn đọc được dữ liệu cũ mà không cần migrate khi thêm trường mới. |
Cách cấu hình và khởi động các ứng dụng IoT khác nhau
Trong ứng dụng _app_iot3_, chúng tôi tiến thêm một bước bằng cách thêm trường _DEVICETYPE=1_ vào payload. Thao tác này bổ sung chuỗi văn bản định nghĩa loại thiết bị, cho thấy tính linh hoạt của JSON trong việc xử lý dữ liệu IoT cũng như khả năng thích ứng với cấu trúc dữ liệu động xuyên suốt quy trình từ nguồn đến kho lưu trữ.
Đối với dịch vụ danh mục, chúng tôi sẽ sử dụng tính năng [Metastore] của [Apache Hive] - vốn đã được thiết lập trong các bài viết trước đó (có điều gần đây đã được nâng cấp phiên bản một chút).
*Gợi ý mở rộng*: Khi triển khai, bạn có thể cân nhắc điều chỉnh các tham số như `checkpoint.interval` hay `max.parallelism` để tối ưu hiệu suất xử lý real-time. Việc lựa chọn giao thức truyền thông (MQTT/HTTP) cũng nên phù hợp với đặc thù từng ứng dụng IoT cụ thể.
Đối với dịch vụ danh mục, chúng tôi sẽ sử dụng tính năng [Metastore] của [Apache Hive] - vốn đã được thiết lập trong các bài viết trước đó (có điều gần đây đã được nâng cấp phiên bản một chút).
*Gợi ý mở rộng*: Khi triển khai, bạn có thể cân nhắc điều chỉnh các tham số như `checkpoint.interval` hay `max.parallelism` để tối ưu hiệu suất xử lý real-time. Việc lựa chọn giao thức truyền thông (MQTT/HTTP) cũng nên phù hợp với đặc thù từng ứng dụng IoT cụ thể.
Xử lý luồng dữ liệu JSON từ Kafka vào Apache Paimon
Chắc hẳn những ai đã theo dõi các blog trước đây của tôi đều nhận ra tôi vừa nâng cấp cụm [Confluent] Kafka (lên phiên bản 7.7.1) và hệ sinh thái [Apache Flink] (phiên bản 1.19.1). Cả stack [Apache Paimon] cũng được đẩy lên 0.9.0 rồi đấy. Như mọi khi, toàn bộ code vẫn có thể tìm thấy trên [kho GIT], và đúng vậy - chúng tôi vẫn đang sử dụng kha khá các file Makefile, Docker-compose.yml cùng Dockerfile. Lần này tôi quyết định không lan man như thường lệ để làm mấy thứ như aggregation nữa, vì chúng ta đã demo khá nhiều về chủ đề đó rồi mà.

Hướng dẫn chạy Flink job manager và lấy dữ liệu từ topic Kafka
Chúng ta sẽ sử dụng ứng dụng Python để tạo ra một luồng dữ liệu IoT giống như đang đọc cảm biến trong các nhà máy được lắp đặt trên máy móc. Những tài liệu dạng "[JSON]" này sẽ được gửi lên chủ đề Kafka có tên là _factory_iot_. Chìa khóa của chủ đề "_key"_ sẽ được thiết lập theo _siteId_ của các nhà máy khác nhau, và chúng ta đã chia các nhà máy thành ba nhóm: Bắc, Nam và Đông, nhằm mô phỏng sự phân phối khu vực. Sau khi hoàn thiện cấu trúc cơ bản, chúng ta sẽ sử dụng khả năng CDC của Kafka thông qua khung hành động để sao chép trực tiếp vào bảng Apache Paimon. Khi mọi thứ đã sẵn sàng thì cũng chính là lúc bắt đầu. Tất nhiên, bạn có thể làm cho nó phức tạp hơn rất nhiều. Đầu tiên, hãy chạy _app_iot_. Điều này sẽ bắt đầu xuất bản tải trọng rất đơn giản của chúng ta lên chủ đề _factory_iot_.
Kiểm tra sự tiến hóa của schema với payload mở rộng
Bạn có thể chạy ứng dụng _app_iot1_ bằng cách vào thư mục _app_iot1_ rồi thực thi file _run1.bsh_, hoặc đơn giản hơn là chạy lệnh _make iot1_datagen_ khi đang ở trong thư mục _devlab_. (Nếu dùng cách chạy bằng bash script, bạn cũng có thể khởi động _app_iot2_ và _app_iot3_ tương tự bằng các file _run2.bsh_ và _run3.bsh_ ngay trong thư mục _app_iot1_). Ngoài ra, hai ứng dụng này cũng có thể được kích hoạt bằng lệnh _make iot2_datagen_ và _make iot3_datagen_.
{
"ts" : 123421452622,
"metadata" : {
"siteId" : 1009,
"deviceId" : 1042,
"sensorId" : 10180,
"unit" : "Psi"
},
"measurement" : 1013.3997
}
Sau khi mọi thứ đã chạy ổn định, bạn chỉ cần gõ lệnh _make jm_ trong thư mục _devlab_ là có thể mở được Flink job manager và bắt đầu làm việc với dòng lệnh.
Một vài lưu ý nhỏ khi triển khai:
1. **Cơ chế vận hành**: Hệ thống sẽ tự động giám sát các thay đổi về cấu trúc dữ liệu (schema) khi có dữ liệu mở rộng được thêm vào.
2. **Tùy chỉnh**: Bạn hoàn toàn có thể điều chỉnh các thông số như thời gian trễ hay ngưỡng lỗi cho phép tùy theo nhu cầu cụ thể.
3. **Dữ liệu tham khảo**: Các cảm biến IoT thường phát sinh nhiều loại dữ liệu khác nhau, việc hiểu rõ cách chúng ảnh hưởng đến cấu trúc sẽ giúp xử lý hiệu quả hơn.
Thêm các trường mới vào payload và xác minh tính tương thích
Bạn có thể dễ dàng sao chép/dán đoạn mã dưới đây rồi nhấn Enter. Thao tác này sẽ kích hoạt quy trình "đăng ký nhận dữ liệu" từ topic nguồn (_factory_iot_) và đẩy dữ liệu vào bảng Apache Paimon đích.
> LƯU Ý: Nhớ khởi động trình tạo dữ liệu như hướng dẫn ở trên trước nhé, để khi chạy lệnh này sẽ có ngay payload mẫu giúp hệ thống tự suy ra cấu trúc bảng. Nếu bỏ qua bước này, chúng ta buộc phải tự tay định nghĩa cấu trúc bảng trước khi thực thi.
Một số điểm cần lưu ý thêm để hiểu rõ hơn quy trình:
1. **Cơ chế đồng bộ**: Flink sẽ liên tục giám sát thay đổi trên topic Kafka, tự động điều chỉnh schema khi phát hiện trường mới trong payload JSON
2. **Tính nhất quán dữ liệu**: Cấu hình checkpoint mỗi 10s đảm bảo không mất dữ liệu khi có sự cố
3. **Tối ưu hiệu năng**: Thiết lập parallelism=4 giúp phân tải xử lý trên nhiều luồng
> LƯU Ý: Nhớ khởi động trình tạo dữ liệu như hướng dẫn ở trên trước nhé, để khi chạy lệnh này sẽ có ngay payload mẫu giúp hệ thống tự suy ra cấu trúc bảng. Nếu bỏ qua bước này, chúng ta buộc phải tự tay định nghĩa cấu trúc bảng trước khi thực thi.
/opt/flink/bin/flink run \
/opt/flink/lib/paimon/paimon-flink-action-0.9.0.jar \
kafka_sync_table \
-Dexecution.checkpointing.interval=10s \
-Dexecution.checkpointing.num-retained=5 \
-Dstate.checkpoints.num-retained=10 \
-Dpipeline.name='sync-kafka-topic-to-paimon-s3' \
--kafka_conf properties.bootstrap.servers=broker:29092 \
--kafka_conf topic=factory_iot \
--kafka_conf properties.group.id=123456 \
--kafka_conf value.format=json \
--kafka_conf scan.startup.mode=earliest-offset \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://metastore:9083 \
--warehouse s3a://warehouse/paimon/ \
--database iot \
--table factory_iot \
--table_conf sink.parallelism=4
Một số điểm cần lưu ý thêm để hiểu rõ hơn quy trình:
1. **Cơ chế đồng bộ**: Flink sẽ liên tục giám sát thay đổi trên topic Kafka, tự động điều chỉnh schema khi phát hiện trường mới trong payload JSON
2. **Tính nhất quán dữ liệu**: Cấu hình checkpoint mỗi 10s đảm bảo không mất dữ liệu khi có sự cố
3. **Tối ưu hiệu năng**: Thiết lập parallelism=4 giúp phân tải xử lý trên nhiều luồng
Lưu trữ dữ liệu dưới dạng tệp Parquet trong Apache Paimon trên S3
"Lưu ý quan trọng: Dù tài liệu không yêu cầu scan.startup.mode=earliest-offset nhưng cứ thêm vào nhé, kể cả khi xử lý topic mới. Nghe có vẻ lạ nhưng thực ra đơn giản thôi. Giờ cùng kiểm tra schema evolution nào. Chạy ứng dụng _app_iot2_ đi - như bạn thấy ở trên, nó sẽ bổ sung object _TSHUMAN_ và _LOCATION_ vào payload.
{
"ts" : 123421452622,
"metadata" : {
"siteId" : 1009,
"deviceId" : 1042,
"sensorId" : 10180,
"unit" : "Psi",
"ts_human" : "2024-10-02T00:00:00.869Z",
"location": {
"latitude": -26.195246,
"longitude": 28.034088
}
},
"measurement" : 1013.3997
}
Có vài điểm thú vị: 1. **Nguyên tắc schema evolution**: Khi thêm trường mới như ts_human hay location, hệ thống vẫn đọc được dữ liệu cũ mà không cần migrate 2. **Xử lý nested data**: Object location được serialize nguyên dạng theo cấu trúc JSON lồng nhau 3. **Backward compatibility**: Dữ liệu vẫn giữ nguyên định dạng timestamp số (ts) song song với phiên bản có thể đọc được (ts_human)"

Các cảnh báo cần lưu ý khi làm việc với pipeline này
"Và cuối cùng khi chúng ta đang cảm thấy cực kỳ may mắn, hãy chạy _app_iot3_ để thêm trường _DEVICETYPE_ vào thẻ metadata. Dữ liệu JSON sau khi cập nhật sẽ trông như thế này nhé:
{
"timestamp" : "2024-10-02T00:00:00.869Z",
"metadata" : {
"siteId" : 1009,
"deviceId" : 1042,
"sensorId" : 10180,
"unit" : "Psi",
"ts_human" : "2024-10-02T00:00:00.869Z",
"location": {
"latitude": -26.195246,
"longitude": 28.034088
},
"deviceType" : "Oil Pump",
},
"measurement" : 1013.3997
}
Thế là xong... blog lần này ngắn hơn hẳn so với mấy bài trước ha ;). Tóm lại thì chúng ta đã xây dựng một data pipeline từ Kafka Topic, có khả năng xử lý được cả schema evolution, đưa dữ liệu vào kho lưu trữ Apache Paimon dưới dạng file Parquet.
Một vài lưu ý nho nhỏ khi triển khai pipeline loại này:
1. **Đồng bộ dữ liệu**: Cần đảm bảo dữ liệu từ Kafka được xử lý đúng thứ tự và không bị thất lạc.
2. **Hiệu năng**: Theo dõi tốc độ xử lý của Flink để tránh tình trạng nghẽn cổ chai.
3. **Xử lý lỗi**: Cài đặt cơ chế ghi log chi tiết sẽ giúp phát hiện sự cố nhanh hơn.
4. **Tài nguyên**: Cân nhắc điều chỉnh cấu hình bộ nhớ và CPU cho phù hợp với lưu lượng dữ liệu.
5. **Kiểm tra**: Thi thoảng nên kiểm tra chéo dữ liệu đầu ra trong Paimon để đảm bảo không có sai sót.
Pipeline kiểu này khi chạy ổn định rồi thì khá là tiện, vừa linh hoạt lại vừa đỡ phải lo về vấn đề schema thay đổi sau này."
Kết luận về quy trình xây dựng pipeline dữ liệu hiệu quả
Tôi nghĩ điều này thật thú vị... Hy vọng bạn đã thích việc khám phá. Chúc bạn may mắn, vì luôn có rất nhiều ngã rẽ và bạn có thể dễ dàng lạc vào những con đường khác nhau... nhưng đó cũng là một phần của niềm vui. > Lưu ý: Để bắt đầu blog này, hãy mở tệp README.md ở thư mục gốc và làm theo hướng dẫn từ đó; nó sẽ chỉ cho bạn chính xác những gì cần thực hiện theo thứ tự để tải xuống tất cả các phụ thuộc và xây dựng mọi thứ.
Reference Articles
Related Discussions