MinIO S3

MinIO S3 file consumer for cloud or local hosted bucket storage

Overview

MinIO is a high-performance object storage released under GNU Affero General Public License v3.0. It is API compatible with the Amazon S3, for efficient file processing and integration with the Joule process.

Joule's MinIO consumer transport, processes S3 objects using notification change events at the bucket level using an efficient I/O and memory access pattern.

This enables two key features:

  • Batch processing Large data files to be processed using stream processing

  • Machine learning model management ML models are loaded from S3 storage on process startup and replaced on model update

Detailed configuration examples illustrate setting up the MinIO consumer to subscribe to specific bucket events, set processing rates and handle data deserialisation in PARQUET format.

Additionally, guidance on connection credentials, provider plugins for production security and deserialisation attributes are provided for streamlined configuration and integration with the Joule ecosystem.

Driver details: io.minio:minio:8.5.4

Examples & DSL attributes

This example sets up a MinIO consumer named marketdata-S3Consumer to subscribe to object creation events in the marketdata bucket for files named prices in PARQUET format.

Data is ingested every 5 seconds in batches of up to 100,000 records and processed according to a specified projection on the fields symbol and vol.

After processing, files are moved to a local processed directory and retries are set to 3 in case of processing failures.

minioConsumer:
  name: "marketdata-S3Consumer"

  connection:
    endpoint: "https://localhost"
    port: 9000
    credentials:
      access key: "XXXXXX"
      secret key: "YYYYYYYYYYYYYYY"

  bucket:
    bucketId: "marketdata"
    object name: "prices"
    versioning: ENABLED
    notifications: ["s3:ObjectCreated:*"]
    retries: 3

  deserializer:
    format: PARQUET
    projection: ["symbol","vol"]

  processed dir: ./processed
  processing rate: 5
  batchSize: 100000
  

Attributes schema

Attribute
Description
Data Type
Required

name

Name of source stream

String

connection

Connection details

bucket

S3 bucket to ingest object data from

deserializer

Deserialisation configuration

processed dir

Directory to place processed files into. If setting is not provided files will be removed from temp store

String

processing rate

Rate of which the consumer will act upon notifications

Integer Default: 60 Seconds

batchSize

Number of records to be read in from file and passed to processing pipeline on a single cycle

Long Default: 1024

Connection attributes schema

Attribute
Description
Data Type
Required

endpoint

S3 service endpoint

String Default: https://localhost

port

Port the S3 service is hosted on

Integer Default:9000

url

Provide a fully qualified url endpoint, i.e. AWS, GCP, Azure urls. This is used over the endpoint setting if provided

URL String

region

Region where the bucket is to be accessed

String

tls

Use a TLS connection

Boolean Default: false

credentials

IAM access credentials

Credentials attributes

For non-production use cases the access / secret keys can be used to prove data ingestion functionality.

When migrating to a production environment, implement a provider plugin using the provided JouleProviderPlugin interface, see basic example in the next section.

Attribute
Description
Data Type
Required

access key

IAM user access key

String

secret key

IAM user secret key

String

provider plugin

Custom implementation of credentials ideal for production level deployments

JouleProviderPlugin implementation

Provider plugin implementation

JouleProviderPlugin Interface

public class JWTCredentialsProvider implements JouleProviderPlugin {
    @Override
    public Provider getProvider() {
        return null;
    }

    @Override
    public void initialize() throws CustomPluginException {
    }

    @Override
    public void validate() throws InvalidSpecificationException {
    }

    @Override
    public void setProperties(Properties properties) {
    }
}

Bucket attributes schema

Attribute
Description
Data Type
Required

bucket Id

Bucket name

String

object name

Object name to listen for events too

String

versioning

Ability to use object versioning. Valid values are either ENABLED or SUSPENDED

ENUM

notifications

Type of bucket notifications to subscribe too. Currently only s3:ObjectCreated:* is supported

String[]

Deserialisation attributes schema

This topic provides configuration parameters available object deserialisation process.

Attribute
Description
Data Type
Required

format

Object file format. Available formats: PARQUET, ORC, ARROW, CSV

Enum Default: PARQUET

projection

String array of fields to ingest. Consider using this for wide rows otherwise all columns are ingested

String[]

Default: false

Deserialiser example

minioConsumer:
   ...

   deserializer:
      format: PARQUET
      projection: ["symbol","vol"]

Additional resources

Last updated