ArkFlow: High-performance Rust stream processing engine

2 months ago 4

English | 中文

Rust License

ArkFlow - High-performance rust stream processing engine | Product Hunt

High-performance Rust stream processing engine, providing powerful data stream processing capabilities, supporting multiple input/output sources and processors.

  • High Performance: Built on Rust and Tokio async runtime, offering excellent performance and low latency
  • Multiple Data Sources: Support for Kafka, MQTT, HTTP, files, and other input/output sources
  • Powerful Processing Capabilities: Built-in SQL queries, JSON processing, Protobuf encoding/decoding, batch processing, and other processors
  • Extensible: Modular design, easy to extend with new input, output, and processor components
# Clone the repository git clone https://github.com/arkflow-rs/arkflow.git cd arkflow # Build the project cargo build --release # Run tests cargo test
  1. Create a configuration file config.yaml:
logging: level: info streams: - input: type: "generate" context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }' interval: 1s batch_size: 10 pipeline: thread_num: 4 processors: - type: "json_to_arrow" - type: "sql" query: "SELECT * FROM flow WHERE value >= 10" output: type: "stdout" error_output: type: "stdout"
  1. Run ArkFlow:
./target/release/arkflow --config config.yaml

ArkFlow uses YAML format configuration files, supporting the following main configuration items:

logging: level: info # Log level: debug, info, warn, error streams: # Stream definition list - input: # Input configuration # ... pipeline: # Processing pipeline configuration # ... output: # Output configuration # ... error_output: # Error output configuration # ... buffer: # Buffer configuration # ...

ArkFlow supports multiple input sources:

  • Kafka: Read data from Kafka topics
  • MQTT: Subscribe to messages from MQTT topics
  • HTTP: Receive data via HTTP
  • File: Reading data from files(Csv,Json, Parquet, Avro, Arrow) using SQL
  • Generator: Generate test data
  • Database: Query data from databases(MySQL, PostgreSQL, SQLite, Duckdb)

Example:

input: type: kafka brokers: - localhost:9092 topics: - test-topic consumer_group: test-group client_id: arkflow start_from_latest: true

ArkFlow provides multiple data processors:

  • JSON: JSON data processing and transformation
  • SQL: Process data using SQL queries
  • Protobuf: Protobuf encoding/decoding
  • Batch Processing: Process messages in batches

Example:

pipeline: thread_num: 4 processors: - type: json_to_arrow - type: sql query: "SELECT * FROM flow WHERE value >= 10"

ArkFlow supports multiple output targets:

  • Kafka: Write data to Kafka topics
  • MQTT: Publish messages to MQTT topics
  • HTTP: Send data via HTTP
  • Standard Output: Output data to the console
  • Drop: Discard data

Example:

output: type: kafka brokers: - localhost:9092 topic: type: value value: test-topic client_id: arkflow-producer

ArkFlow supports multiple error output targets:

  • Kafka: Write error data to Kafka topics
  • MQTT: Publish error messages to MQTT topics
  • HTTP: Send error data via HTTP
  • Standard Output: Output error data to the console
  • Drop: Discard error data

Example:

error_output: type: kafka brokers: - localhost:9092 topic: type: value value: error-topic client_id: error-arkflow-producer

ArkFlow provides buffer capabilities to handle backpressure and temporary storage of messages:

  • Memory Buffer: Memory buffer, for high-throughput scenarios and window aggregation

Example:

buffer: type: memory capacity: 10000 # Maximum number of messages to buffer timeout: 10s # Maximum time to buffer messages

Kafka to Kafka Data Processing

streams: - input: type: kafka brokers: - localhost:9092 topics: - test-topic consumer_group: test-group pipeline: thread_num: 4 processors: - type: json_to_arrow - type: sql query: "SELECT * FROM flow WHERE value > 100" output: type: kafka brokers: - localhost:9092 topic: processed-topic

Generate Test Data and Process

streams: - input: type: "generate" context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }' interval: 1ms batch_size: 10000 pipeline: thread_num: 4 processors: - type: "json_to_arrow" - type: "sql" query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor" output: type: "stdout"

ArkFlow Plugin Examples

ArkFlow is licensed under the Apache License 2.0.

Discord: https://discord.gg/CwKhzb8pux

If you like or are using this project to learn or start your solution, please give it a star⭐. Thanks!

Read Entire Article