# MinIO S3

## Overview

MinIO is a high-performance object storage released under GNU Affero General Public License v3.0. It is API compatible with the Amazon S3, for **efficient file processing and integration** with the Joule process.

Joule's MinIO consumer transport, processes S3 objects using **notification change events** at the bucket level using an efficient I/O and memory access pattern.&#x20;

This enables two key features:

* <mark style="color:green;">**Batch processing**</mark>\
  Large data files to be processed using stream processing
* <mark style="color:green;">**Machine learning model management**</mark>\
  ML models are loaded from S3 storage on process startup and replaced on model update

Detailed configuration examples illustrate setting up the MinIO consumer to subscribe to specific bucket events, set **processing rates and handle data deserialisation** in `PARQUET` format.

Additionally, guidance on connection credentials, provider plugins for production security and deserialisation attributes are provided for streamlined configuration and integration with the Joule ecosystem.

{% hint style="info" %}
**Driver details:** [io.minio:minio:8.5.4](https://mvnrepository.com/artifact/io.minio/minio/8.5.4)
{% endhint %}

## Examples & DSL attributes

This example sets up a MinIO consumer named `marketdata-S3Consumer` to subscribe to object creation events in the `marketdata` bucket for files named `prices` in `PARQUET` format.

Data is ingested every 5 seconds in batches of up to 100,000 records and processed according to a specified projection on the fields `symbol` and `vol`.

After processing, files are moved to a local `processed` directory and retries are set to 3 in case of processing failures.

```yaml
minioConsumer:
  name: "marketdata-S3Consumer"

  connection:
    endpoint: "https://localhost"
    port: 9000
    credentials:
      access key: "XXXXXX"
      secret key: "YYYYYYYYYYYYYYY"

  bucket:
    bucketId: "marketdata"
    object name: "prices"
    versioning: ENABLED
    notifications: ["s3:ObjectCreated:*"]
    retries: 3

  deserializer:
    format: PARQUET
    projection: ["symbol","vol"]

  processed dir: ./processed
  processing rate: 5
  batchSize: 100000
  
```

### Attributes schema

<table><thead><tr><th width="171">Attribute</th><th width="280">Description</th><th width="188">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>name</td><td>Name of source stream </td><td>String</td><td>true</td></tr><tr><td>connection </td><td>Connection details</td><td>See <a href="#connection-attributes">Connection attributes</a> section</td><td>true</td></tr><tr><td>bucket</td><td>S3 bucket to ingest object data from</td><td>See <a href="#bucket-attributes">Bucket attributes</a> section</td><td>true</td></tr><tr><td>deserializer</td><td>Deserialisation configuration</td><td>See <a href="#deserialization-attributes">Deserialisation attributes</a> section</td><td>false</td></tr><tr><td>processed dir</td><td>Directory to place processed files into. If setting is not provided files will be removed from temp store</td><td>String</td><td>false</td></tr><tr><td>processing rate</td><td>Rate of which the consumer will act upon notifications</td><td>Integer<br>Default: 60 Seconds</td><td>false</td></tr><tr><td>batchSize</td><td>Number of records to be read in from file and passed to processing pipeline on a single cycle</td><td>Long<br>Default:  1024</td><td>false</td></tr></tbody></table>

### **Connection a**ttributes schema

<table><thead><tr><th width="175">Attribute</th><th width="280">Description</th><th width="190">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>endpoint</td><td>S3 service endpoint</td><td>String<br>Default: https://localhost</td><td>false</td></tr><tr><td>port</td><td>Port the S3 service is hosted on</td><td>Integer<br>Default:9000</td><td>false</td></tr><tr><td>url</td><td>Provide a fully qualified url endpoint, i.e. AWS, GCP, Azure urls. This is used over the endpoint setting if provided</td><td>URL String</td><td>false</td></tr><tr><td>region</td><td>Region where the bucket is to be accessed</td><td>String</td><td>false</td></tr><tr><td>tls</td><td>Use a TLS connection</td><td>Boolean<br>Default: false</td><td>false</td></tr><tr><td>credentials</td><td>IAM access credentials</td><td>See <a href="#credentials-attributes">Credentials section</a></td><td>false</td></tr></tbody></table>

### Credentials attributes

For non-production use cases the access / secret keys can be used to prove data ingestion functionality.

When migrating to a production environment, implement a provider plugin using the provided `JouleProviderPlugin` interface, see basic example in the next section.

<table><thead><tr><th width="178">Attribute</th><th width="281">Description</th><th width="191">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>access key</td><td>IAM user access key</td><td>String</td><td>false</td></tr><tr><td>secret key</td><td>IAM user secret key</td><td>String</td><td>false</td></tr><tr><td>provider plugin</td><td>Custom implementation of credentials ideal for production level deployments</td><td>JouleProviderPlugin implementation</td><td>false</td></tr></tbody></table>

### Provider plugin implementation

`JouleProviderPlugin` Interface

```java
public class JWTCredentialsProvider implements JouleProviderPlugin {
    @Override
    public Provider getProvider() {
        return null;
    }

    @Override
    public void initialize() throws CustomPluginException {
    }

    @Override
    public void validate() throws InvalidSpecificationException {
    }

    @Override
    public void setProperties(Properties properties) {
    }
}
```

### Bucket attributes schema

<table><thead><tr><th width="200">Attribute</th><th width="306">Description</th><th width="156">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>bucket Id</td><td>Bucket name</td><td>String</td><td>true</td></tr><tr><td>object name</td><td>Object name to listen for events too</td><td>String</td><td>true</td></tr><tr><td>versioning</td><td>Ability to use object versioning. Valid values are either ENABLED or SUSPENDED</td><td>ENUM</td><td>false</td></tr><tr><td>notifications</td><td>Type of bucket notifications to subscribe too. Currently only s3:ObjectCreated:* is supported</td><td>String[]</td><td>false</td></tr></tbody></table>

### **Deserialisation** attributes schema

This topic provides configuration parameters available object deserialisation process.&#x20;

<table><thead><tr><th width="200">Attribute</th><th width="306">Description</th><th width="157">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>format</td><td>Object file format. Available formats: PARQUET, ORC, ARROW, CSV</td><td>Enum<br>Default: PARQUET</td><td>false</td></tr><tr><td>projection</td><td>String array of fields to ingest. Consider using this for wide rows otherwise all columns are ingested</td><td><p> String[]</p><p>Default: false</p></td><td>false</td></tr></tbody></table>

### **Deserialiser example**

```yaml
minioConsumer:
   ...

   deserializer:
      format: PARQUET
      projection: ["symbol","vol"]
```

## Additional resources

* Official [MinIO documentation](https://min.io/docs/minio/kubernetes/upstream/)
* MinIO [Docker image](https://hub.docker.com/r/minio/minio)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.fractalworks.io/joule/components/connectors/sources/minio-s3.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
