MinIO S3

MinIO S3 file consumer for cloud or local hosted bucket storage

Overview

MinIO is a high-performance object storage released under GNU Affero General Public License v3.0. It is API compatible with the Amazon S3, for efficient file processing and integration with the Joule process.

Joule's MinIO consumer transport, processes S3 objects using notification change events at the bucket level using an efficient I/O and memory access pattern.

This enables two key features:

  • Batch processing Large data files to be processed using stream processing

  • Machine learning model management ML models are loaded from S3 storage on process startup and replaced on model update

Detailed configuration examples illustrate setting up the MinIO consumer to subscribe to specific bucket events, set processing rates and handle data deserialisation in PARQUET format.

Additionally, guidance on connection credentials, provider plugins for production security and deserialisation attributes are provided for streamlined configuration and integration with the Joule ecosystem.

Driver details: io.minio:minio:8.5.4

Examples & DSL attributes

This example sets up a MinIO consumer named marketdata-S3Consumer to subscribe to object creation events in the marketdata bucket for files named prices in PARQUET format.

Data is ingested every 5 seconds in batches of up to 100,000 records and processed according to a specified projection on the fields symbol and vol.

After processing, files are moved to a local processed directory and retries are set to 3 in case of processing failures.

minioConsumer:
  name: "marketdata-S3Consumer"

  connection:
    endpoint: "https://localhost"
    port: 9000
    credentials:
      access key: "XXXXXX"
      secret key: "YYYYYYYYYYYYYYY"

  bucket:
    bucketId: "marketdata"
    object name: "prices"
    versioning: ENABLED
    notifications: ["s3:ObjectCreated:*"]
    retries: 3

  deserializer:
    format: PARQUET
    projection: ["symbol","vol"]

  processed dir: ./processed
  processing rate: 5
  batchSize: 100000
  

Attributes schema

Connection attributes schema

Credentials attributes

For non-production use cases the access / secret keys can be used to prove data ingestion functionality.

When migrating to a production environment, implement a provider plugin using the provided JouleProviderPlugin interface, see basic example in the next section.

Provider plugin implementation

JouleProviderPlugin Interface

public class JWTCredentialsProvider implements JouleProviderPlugin {
    @Override
    public Provider getProvider() {
        return null;
    }

    @Override
    public void initialize() throws CustomPluginException {
    }

    @Override
    public void validate() throws InvalidSpecificationException {
    }

    @Override
    public void setProperties(Properties properties) {
    }
}

Bucket attributes schema

Deserialisation attributes schema

This topic provides configuration parameters available object deserialisation process.

Deserialiser example

minioConsumer:
   ...

   deserializer:
      format: PARQUET
      projection: ["symbol","vol"]

Additional resources

Last updated