MinIO S3
MinIO S3 file consumer for cloud or local hosted bucket storage
Overview
MinIO is a high-performance object storage released under GNU Affero General Public License v3.0. It is API compatible with the Amazon S3, for efficient file processing and integration with the Joule process.
Joule's MinIO consumer transport, processes S3 objects using notification change events at the bucket level using an efficient I/O and memory access pattern.
This enables two key features:
Batch processing Large data files to be processed using stream processing
Machine learning model management ML models are loaded from S3 storage on process startup and replaced on model update
Detailed configuration examples illustrate setting up the MinIO consumer to subscribe to specific bucket events, set processing rates and handle data deserialisation in PARQUET
format.
Additionally, guidance on connection credentials, provider plugins for production security and deserialisation attributes are provided for streamlined configuration and integration with the Joule ecosystem.
Driver details: io.minio:minio:8.5.4
Examples & DSL attributes
This example sets up a MinIO consumer named marketdata-S3Consumer
to subscribe to object creation events in the marketdata
bucket for files named prices
in PARQUET
format.
Data is ingested every 5 seconds in batches of up to 100,000 records and processed according to a specified projection on the fields symbol
and vol
.
After processing, files are moved to a local processed
directory and retries are set to 3 in case of processing failures.
Attributes schema
Attribute | Description | Data Type | Required |
---|---|---|---|
name | Name of source stream | String | |
connection | Connection details | See Connection attributes section | |
bucket | S3 bucket to ingest object data from | See Bucket attributes section | |
deserializer | Deserialisation configuration | See Deserialisation attributes section | |
processed dir | Directory to place processed files into. If setting is not provided files will be removed from temp store | String | |
processing rate | Rate of which the consumer will act upon notifications | Integer Default: 60 Seconds | |
batchSize | Number of records to be read in from file and passed to processing pipeline on a single cycle | Long Default: 1024 |
Connection attributes schema
Attribute | Description | Data Type | Required |
---|---|---|---|
endpoint | S3 service endpoint | String Default: https://localhost | |
port | Port the S3 service is hosted on | Integer Default:9000 | |
url | Provide a fully qualified url endpoint, i.e. AWS, GCP, Azure urls. This is used over the endpoint setting if provided | URL String | |
region | Region where the bucket is to be accessed | String | |
tls | Use a TLS connection | Boolean Default: false | |
credentials | IAM access credentials |
Credentials attributes
For non-production use cases the access / secret keys can be used to prove data ingestion functionality.
When migrating to a production environment, implement a provider plugin using the provided JouleProviderPlugin
interface, see basic example in the next section.
Attribute | Description | Data Type | Required |
---|---|---|---|
access key | IAM user access key | String | |
secret key | IAM user secret key | String | |
provider plugin | Custom implementation of credentials ideal for production level deployments | JouleProviderPlugin implementation |
Provider plugin implementation
JouleProviderPlugin
Interface
Bucket attributes schema
Attribute | Description | Data Type | Required |
---|---|---|---|
bucket Id | Bucket name | String | |
object name | Object name to listen for events too | String | |
versioning | Ability to use object versioning. Valid values are either ENABLED or SUSPENDED | ENUM | |
notifications | Type of bucket notifications to subscribe too. Currently only s3:ObjectCreated:* is supported | String[] |
Deserialisation attributes schema
This topic provides configuration parameters available object deserialisation process.
Attribute | Description | Data Type | Required |
---|---|---|---|
format | Object file format. Available formats: PARQUET, ORC, ARROW, CSV | Enum Default: PARQUET | |
projection | String array of fields to ingest. Consider using this for wide rows otherwise all columns are ingested | String[] Default: false |
Deserialiser example
Additional resources
Official MinIO documentation
MinIO Docker image
Last updated