# MinIO S3

## Overview

MinIO is a high-performance object storage released under GNU Affero General Public License v3.0. It is API compatible with the Amazon S3, for **efficient file processing and integration** with the Joule process.

Joule's MinIO consumer transport, processes S3 objects using **notification change events** at the bucket level using an efficient I/O and memory access pattern.&#x20;

This enables two key features:

* <mark style="color:green;">**Batch processing**</mark>\
  Large data files to be processed using stream processing
* <mark style="color:green;">**Machine learning model management**</mark>\
  ML models are loaded from S3 storage on process startup and replaced on model update

Detailed configuration examples illustrate setting up the MinIO consumer to subscribe to specific bucket events, set **processing rates and handle data deserialisation** in `PARQUET` format.

Additionally, guidance on connection credentials, provider plugins for production security and deserialisation attributes are provided for streamlined configuration and integration with the Joule ecosystem.

{% hint style="info" %}
**Driver details:** [io.minio:minio:8.5.4](https://mvnrepository.com/artifact/io.minio/minio/8.5.4)
{% endhint %}

## Examples & DSL attributes

This example sets up a MinIO consumer named `marketdata-S3Consumer` to subscribe to object creation events in the `marketdata` bucket for files named `prices` in `PARQUET` format.

Data is ingested every 5 seconds in batches of up to 100,000 records and processed according to a specified projection on the fields `symbol` and `vol`.

After processing, files are moved to a local `processed` directory and retries are set to 3 in case of processing failures.

```yaml
minioConsumer:
  name: "marketdata-S3Consumer"

  connection:
    endpoint: "https://localhost"
    port: 9000
    credentials:
      access key: "XXXXXX"
      secret key: "YYYYYYYYYYYYYYY"

  bucket:
    bucketId: "marketdata"
    object name: "prices"
    versioning: ENABLED
    notifications: ["s3:ObjectCreated:*"]
    retries: 3

  deserializer:
    format: PARQUET
    projection: ["symbol","vol"]

  processed dir: ./processed
  processing rate: 5
  batchSize: 100000
  
```

### Attributes schema

<table><thead><tr><th width="171">Attribute</th><th width="280">Description</th><th width="188">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>name</td><td>Name of source stream </td><td>String</td><td>true</td></tr><tr><td>connection </td><td>Connection details</td><td>See <a href="#connection-attributes">Connection attributes</a> section</td><td>true</td></tr><tr><td>bucket</td><td>S3 bucket to ingest object data from</td><td>See <a href="#bucket-attributes">Bucket attributes</a> section</td><td>true</td></tr><tr><td>deserializer</td><td>Deserialisation configuration</td><td>See <a href="#deserialization-attributes">Deserialisation attributes</a> section</td><td>false</td></tr><tr><td>processed dir</td><td>Directory to place processed files into. If setting is not provided files will be removed from temp store</td><td>String</td><td>false</td></tr><tr><td>processing rate</td><td>Rate of which the consumer will act upon notifications</td><td>Integer<br>Default: 60 Seconds</td><td>false</td></tr><tr><td>batchSize</td><td>Number of records to be read in from file and passed to processing pipeline on a single cycle</td><td>Long<br>Default:  1024</td><td>false</td></tr></tbody></table>

### **Connection a**ttributes schema

<table><thead><tr><th width="175">Attribute</th><th width="280">Description</th><th width="190">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>endpoint</td><td>S3 service endpoint</td><td>String<br>Default: https://localhost</td><td>false</td></tr><tr><td>port</td><td>Port the S3 service is hosted on</td><td>Integer<br>Default:9000</td><td>false</td></tr><tr><td>url</td><td>Provide a fully qualified url endpoint, i.e. AWS, GCP, Azure urls. This is used over the endpoint setting if provided</td><td>URL String</td><td>false</td></tr><tr><td>region</td><td>Region where the bucket is to be accessed</td><td>String</td><td>false</td></tr><tr><td>tls</td><td>Use a TLS connection</td><td>Boolean<br>Default: false</td><td>false</td></tr><tr><td>credentials</td><td>IAM access credentials</td><td>See <a href="#credentials-attributes">Credentials section</a></td><td>false</td></tr></tbody></table>

### Credentials attributes

For non-production use cases the access / secret keys can be used to prove data ingestion functionality.

When migrating to a production environment, implement a provider plugin using the provided `JouleProviderPlugin` interface, see basic example in the next section.

<table><thead><tr><th width="178">Attribute</th><th width="281">Description</th><th width="191">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>access key</td><td>IAM user access key</td><td>String</td><td>false</td></tr><tr><td>secret key</td><td>IAM user secret key</td><td>String</td><td>false</td></tr><tr><td>provider plugin</td><td>Custom implementation of credentials ideal for production level deployments</td><td>JouleProviderPlugin implementation</td><td>false</td></tr></tbody></table>

### Provider plugin implementation

`JouleProviderPlugin` Interface

```java
public class JWTCredentialsProvider implements JouleProviderPlugin {
    @Override
    public Provider getProvider() {
        return null;
    }

    @Override
    public void initialize() throws CustomPluginException {
    }

    @Override
    public void validate() throws InvalidSpecificationException {
    }

    @Override
    public void setProperties(Properties properties) {
    }
}
```

### Bucket attributes schema

<table><thead><tr><th width="200">Attribute</th><th width="306">Description</th><th width="156">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>bucket Id</td><td>Bucket name</td><td>String</td><td>true</td></tr><tr><td>object name</td><td>Object name to listen for events too</td><td>String</td><td>true</td></tr><tr><td>versioning</td><td>Ability to use object versioning. Valid values are either ENABLED or SUSPENDED</td><td>ENUM</td><td>false</td></tr><tr><td>notifications</td><td>Type of bucket notifications to subscribe too. Currently only s3:ObjectCreated:* is supported</td><td>String[]</td><td>false</td></tr></tbody></table>

### **Deserialisation** attributes schema

This topic provides configuration parameters available object deserialisation process.&#x20;

<table><thead><tr><th width="200">Attribute</th><th width="306">Description</th><th width="157">Data Type</th><th data-type="checkbox">Required</th></tr></thead><tbody><tr><td>format</td><td>Object file format. Available formats: PARQUET, ORC, ARROW, CSV</td><td>Enum<br>Default: PARQUET</td><td>false</td></tr><tr><td>projection</td><td>String array of fields to ingest. Consider using this for wide rows otherwise all columns are ingested</td><td><p> String[]</p><p>Default: false</p></td><td>false</td></tr></tbody></table>

### **Deserialiser example**

```yaml
minioConsumer:
   ...

   deserializer:
      format: PARQUET
      projection: ["symbol","vol"]
```

## Additional resources

* Official [MinIO documentation](https://min.io/docs/minio/kubernetes/upstream/)
* MinIO [Docker image](https://hub.docker.com/r/minio/minio)
