Skip to content

Latest commit

 

History

History
296 lines (223 loc) · 7.79 KB

File metadata and controls

296 lines (223 loc) · 7.79 KB

ArkFlow

English | 中文

Rust License

最新版文档 | 开发版文档

ArkFlow - High-performance rust stream processing engine | Product Hunt

高性能Rust流处理引擎,无缝集成AI能力,提供强大的实时数据处理与智能分析。 它不仅支持多种输入/输出源和处理器,更能轻松加载和执行机器学习模型,实现流式数据和推理、异常检测和复杂事件处理。

CNCF 云原生技术全景图

   

ArkFlow 已收录在 CNCF Cloud Native 云原生技术全景图中。

特性

  • 高性能:基于Rust和Tokio异步运行时构建,提供卓越的性能和低延迟
  • 智能分析:无缝集成AI模型,提供强大的智能分析功能
  • 多种数据源:支持Kafka、MQTT、HTTP、文件等多种输入输出源
  • 强大的处理能力:内置SQL查询、Python脚本、JSON处理、Protobuf编解码、批处理等多种处理器
  • 可扩展:模块化设计,易于扩展新的输入、缓冲区、输出和处理器组件

安装

从源码构建

# 克隆仓库
git clone https://github.com/arkflow-rs/arkflow.git
cd arkflow

# 构建项目
cargo build --release

# 运行测试
cargo test

快速开始

  1. 创建配置文件 config.yaml
logging:
  level: info
streams:
  - input:
      type: "generate"
      context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
      interval: 1s
      batch_size: 10

    pipeline:
      thread_num: 4
      processors:
        - type: "json_to_arrow"
        - type: "sql"
          query: "SELECT * FROM flow WHERE value >= 10"

    output:
      type: "stdout"
    error_output:
      type: "stdout"
  1. 运行ArkFlow:
./target/release/arkflow --config config.yaml

配置说明

ArkFlow使用YAML格式的配置文件,支持以下主要配置项:

顶级配置

logging:
  level: info  # 日志级别:debug, info, warn, error

streams: # 流定义列表
  - input:      # 输入配置
    # ...
    pipeline:   # 处理管道配置
    # ...
    output:     # 输出配置
    # ...
    error_output: # 错误输出配置
    # ...
    buffer:     # 缓冲配置
    # ...

输入组件

ArkFlow支持多种输入源:

  • Kafka:从Kafka主题读取数据
  • MQTT:从MQTT主题订阅消息
  • HTTP:通过HTTP接收数据
  • 文件:使用SQL从文件(Csv、Json、Parquet、Avro、Arrow)读取数据
  • 生成器:生成测试数据
  • 数据库:从数据库(MySQL、PostgreSQL、SQLite、Duckdb)查询数据
  • Nats: 订阅来自 Nats 主题的消息
  • Redis: 订阅来自 Redis 频道或列表的消息
  • Websocket: 订阅来自 WebSocket 连接的消息
  • Modbus: 从 Modbus 设备读取数据

示例:

input:
  type: kafka
  brokers:
    - localhost:9092
  topics:
    - test-topic
  consumer_group: test-group
  client_id: arkflow
  start_from_latest: true

处理器

ArkFlow提供多种数据处理器:

  • JSON:JSON数据处理和转换
  • SQL:使用SQL查询处理数据
  • Protobuf:Protobuf编解码
  • 批处理:将消息批量处理
  • Vrl: 使用VRL进行处理数据

示例:

pipeline:
  thread_num: 4
  processors:
    - type: json_to_arrow
    - type: sql
      query: "SELECT * FROM flow WHERE value >= 10"

输出组件

ArkFlow支持多种输出目标:

  • Kafka:将数据写入Kafka主题
  • MQTT:将消息发布到MQTT主题
  • HTTP:通过HTTP发送数据
  • 标准输出:将数据输出到控制台
  • Drop: 丢弃数据
  • Nats: 将消息发布到 Nats 主题

示例:

output:
  type: kafka
  brokers:
    - localhost:9092
  topic:
    type: value
    value: output-topic
  client_id: arkflow-producer

错误输出组件

  • Kafka:将错误数据写入 Kafka 主题
  • MQTT:将错误消息发布到 MQTT 主题
  • HTTP:通过 HTTP 发送错误数据
  • 标准输出:将错误数据输出到控制台
  • 丢弃:丢弃错误数据
  • Nats: 将消息发布到 Nats 主题

示例:

error_output:
  type: kafka
  brokers:
    - localhost:9092
  topic:
    type: value
    value: error-topic
  client_id: error-arkflow-producer

缓冲组件

ArkFlow 提供缓冲能力,以处理消息的背压和临时存储:

  • 内存缓冲: 内存缓冲区,用于高吞吐量场景和窗口聚合。
  • 会话窗口 (Session Window):会话窗口缓冲组件提供了一种基于会话的消息分组机制,其中消息根据活动间隙进行分组。它实现了一个会话窗口,在可配置的非活动期后关闭。
  • 滑动窗口 (Sliding Window):滑动窗口缓冲组件提供了一种基于时间的分批处理消息的窗口机制。它实现了一种滑动窗口算法,具有可配置的窗口大小、滑动间隔和滑动大小。
  • 滚动窗口 (Tumbling Window):滚动窗口缓冲组件提供了一种固定大小、不重叠的批处理消息的窗口机制。它实现了一种滚动窗口算法,具有可配置的间隔设置。

示例:

buffer:
  type: memory
  capacity: 10000  # Maximum number of messages to buffer
  timeout: 10s  # Maximum time to buffer messages

示例

Kafka到Kafka的数据处理

streams:
  - input:
      type: kafka
      brokers:
        - localhost:9092
      topics:
        - test-topic
      consumer_group: test-group

    pipeline:
      thread_num: 4
      processors:
        - type: json_to_arrow
        - type: sql
          query: "SELECT * FROM flow WHERE value > 100"

    output:
      type: kafka
      brokers:
        - localhost:9092
      topic:
        type: value
        value: processed-topic

生成测试数据并处理

streams:
  - input:
      type: "generate"
      context: '{ "timestamp": 1625000000000, "value": 10, "sensor": "temp_1" }'
      interval: 1ms
      batch_size: 10000

    pipeline:
      thread_num: 4
      processors:
        - type: "json_to_arrow"
        - type: "sql"
          query: "SELECT count(*) FROM flow WHERE value >= 10 group by sensor"

    output:
      type: "stdout"

用户

  • Conalog(国家: 韩国)

ArkFlow 插件

ArkFlow 插件示例

许可证

ArkFlow 使用 Apache License 2.0 许可证。

社区

Discord: https://discord.gg/CwKhzb8pux

微信社区群:

wx

您可以在群内提出任何需要改进的地方,我们会考虑合理性并尽快修改。 如果您发现 bug 请及时提 issue,我们会尽快确认并修改。

如果你喜欢或正在使用这个项目来学习或开始你的解决方案,请给它一个star⭐。谢谢!