OpenCSG dataflow is a one-stop data processing platform designed to leverage large model technology and advanced algorithms to optimize the entire data processing lifecycle, enhancing efficiency and precision, while addressing enterprise challenges in data management such as inefficiency, adaptability gaps, and security and compliance issues.
DataFlow is an open-source platform engineered to streamline end-to-end data processing within the AI/ML lifecycle. By unifying data workflows and model optimization, it transforms fragmented pipelines into a cohesive, automated system—ideal for enterprises tackling data complexity at scale.
🔑 Key Features
- Full Lifecycle Management
- Unified handling of data ingestion, transformation, modeling, and evaluation.
- Seamless CSGHub Integration
- Directly ingest datasets from CSGHub and push refined data back for model retraining, creating a continuous feedback loop .
- Modular & Extensible Design
- Plug-and-play operators for custom pipelines (e.g., NLP, image, audio processing).
- Distributed Computing
- Scale workloads across clusters via Kubernetes integration .
- Multi-Agent Task Orchestration
- Dynamically allocate complex tasks (e.g., data validation, anomaly detection) to collaborative agents.
- MinerU Engine
- Convert PDFs to structured Markdown/JSON for LLM-friendly datasets .
- Growing Operator Library
- Expandable support for multimodal data (text, image, video) and domain-specific transformations.
This project is built upon Data Juicer. We sincerely thank the Data Juicer team for their impactful work in data engineering.
This project inherits the Apache License 2.0 from Data Juicer.
DataFlow uses two images with different roles:
| Image | Dockerfile | Purpose |
|---|---|---|
| API Server | Dockerfile |
Runs data_server API; submits jobs to CSGHub/Argo |
| Argo Execution | Dockerfile-argo |
Runs inside Argo Workflow pods; executes datasource / formatify / pipeline / tool tasks via run_dataflow_task.py |
docker build -t dataflow . -f Dockerfile
docker buildx build --provenance false --platform linux/amd64 -t dataflow . -f Dockerfile
docker buildx build --provenance false --platform linux/arm64 -t dataflow . -f DockerfileThe Argo image bundles full operator dependencies (docker/dataflow_requirements.txt) and runtime code (data_server, data_engine, run_dataflow_task.py). CSGHub creates Argo Workflow pods from this image.
Build locally (no push):
docker build -f Dockerfile-argo \
--build-arg BUILD_CN=true \
--build-arg PRELOAD_ASSETS=true \
-t opencsg_public/dataflow:argo-latest .Build and push (recommended):
# ./scripts/build-push-argo.sh [registry] [tag]
./scripts/build-push-argo.sh 192.168.2.98:8140 argo-latest
./scripts/build-push-argo.sh opencsg-registry.cn-beijing.cr.aliyuncs.com argo-20260529Multi-platform build:
docker buildx build --provenance false --platform linux/amd64 \
-f Dockerfile-argo \
--build-arg BUILD_CN=true \
--build-arg PRELOAD_ASSETS=true \
-t opencsg_public/dataflow:argo-latest .Build args:
BUILD_CN=true— use Aliyun apt/pip mirrors (recommended in China)PRELOAD_ASSETS=true— preload Data Juicer assets/models into the image (recommended for production)
Configure the API server to use the image:
Set CSGHUB_DATAFLOW_TEMPLATE_IMAGE on the DataFlow API service (also in .env / docker run). CSGHub prepends the registry prefix automatically — pass only the repository path, for example:
CSGHUB_DATAFLOW_TEMPLATE_IMAGE=opencsg_public/dataflow:argo-latestWhen a job is submitted, each Argo pod runs:
python run_dataflow_task.py --task-type <datasource|formatify|pipeline|tool> --task-params '<json>'Ensure DATA_DIR on the API server matches the workflow-data volume mountPath configured in CSGHub (default /data/dataflow_data).
Launch postgres container
docker run -d --name dataflow-pg \
-p 5433:5432 \
-v /tmp/data_flow/pgdata:/var/lib/postgresql/data \
-e POSTGRES_DB=data_flow \
-e POSTGRES_USER=postgres \
-e POSTGRES_PASSWORD=postgres \
opencsg-registry.cn-beijing.cr.aliyuncs.com/opencsghq/csghub/postgres:15.10docker run -d --name dataflow-api -p 8000:8000 \
-v /tmp/data_flow/apidata:/data/dataflow_data \
-c "uvicorn data_server.main:app --host 0.0.0.0 --port 8000" \
-e DATA_DIR=/data/dataflow_data \
-e CSGHUB_ENDPOINT=https://hub.opencsg.com \
-e MAX_WORKERS=99 \
-e RAY_ADDRESS=auto \
-e RAY_ENABLE=False \
-e RAY_LOG_DIR=/data/ray_output \
-e API_SERVER=0.0.0.0 \
-e API_PORT=8000 \
-e ENABLE_OPENTELEMETRY=False \
-e DATABASE_DB=data_flow \
-e DATABASE_USERNAME=postgres \
-e DATABASE_PASSWORD=postgres \
-e DATABASE_HOSTNAME=127.0.0.1 \
-e DATABASE_PORT=5433 \
-e STUDIO_JUMP_URL=https://data-label.opencsg.com \
-e CSGHUB_DATAFLOW_TEMPLATE_IMAGE=opencsg_public/dataflow:argo-latest \
dataflow
DataFlow submits job execution to CSGHub/Argo. The API server builds a DAG and references the Argo execution image via CSGHUB_DATAFLOW_TEMPLATE_IMAGE. The old standalone data-flow-celery worker deployment is retired and should not be started anymore.
See Argo Execution Image above for build and configuration details.
uv venv --python 3.10
source .venv/bin/activate
# or
conda create -n dataflow python=3.10# Install dependencies
#pip install '.[dist]' -i https://pypi.tuna.tsinghua.edu.cn/simple/
#pip install '.[tools]' -i https://pypi.tuna.tsinghua.edu.cn/simple/
#pip install '.[sci]' -i https://pypi.tuna.tsinghua.edu.cn/simple/
#pip install -r docker/requirements.txt
uv pip install -r docker/dataflow_requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
# Run the server locally
uvicorn data_server.main:app --reloadNotes:
kenlm,simhash-pybind,opencc==1.1.8,imagededupin fileenvironments/science_requires.txtare only support X86 platform. Remove them if you are using ARM platform.- DataFlow no longer relies on standalone Celery workers for task scheduling. Use the DataFlow API together with the CSGHub/Argo execution chain.
- If you want to use the data annotation service, please install and enable the Label Studio service. Additionally, you need to set the
STUDIO_JUMP_URLvariable of thedata-flowservice to the address of theLabel Studioservice.
Upcoming:
- Enhanced real-time data streaming
- AutoML integration for automated model tuning
- Cross-cloud synchronization
- Support more data sources
We welcome contributions!
For support or queries:
- Email: community@opencsg.com
- GitHub: OpenCSG/DataFlow