Joule
Search
K
Comment on page

MinIO S3

MinIO is a High-Performance Object Storage released under GNU Affero General Public License v3.0. It is API compatible with the Amazon S3 cloud storage service. The consumer transport processes bucket objects using notification driven events using batch processing to reduce the memory footprint of the Joule JVM.

Driver details

io.minio:minio:8.5.4

Example configuration

minioConsumer:
name: "marketdata-S3Consumer"
connection:
endpoint: "https://localhost"
port: 9000
credentials:
access key: "XXXXXX"
secret key: "YYYYYYYYYYYYYYY"
bucket:
bucketId: "marketdata"
object name: "prices"
versioning: ENABLED
notifications: ["s3:ObjectCreated:*"]
retries: 3
deserializer:
format: PARQUET
projection: ["symbol","vol"]
processed dir: ./processed
processing rate: 5
batchSize: 100000
The above example subscribes to bucket object creation events for a specific file name as a parquet file type. Data will be processed every 5 seconds using StreamEvent batches which are submitted to the processing pipeline. The processed file will be moved to a local processed directory on completion of the data ingestion.

Core Attributes

Available configuration parameters
Attribute
Description
Data Type
Required
name
Name of source stream
String
connection
Connection details
bucket
S3 bucket to ingest object data from
See Bucket Attributes section
deserializer
Deserialization configuration
processed dir
Directory to place processed files into. If setting is not provided files will be removed from temp store
String
processing rate
Rate of which the consumer will act upon notifications
Integer Default: 60 Seconds
batchSize
Number of records to be read in from file and passed to processing pipeline on a single cycle.
Long Default: 1024

Connection Attributes

Attribute
Description
Data Type
Required
endpoint
S3 service endpoint
String Default: https://localhost
port
Port the S3 service is hosted on
Integer Default:9000
url
Provide a fully qualified url endpoint, i.e. AWS, GCP, Azure urls. This is used over the endpoint setting if provided.
URL String
region
Region where the bucket is to be accessed
String
tls
Use a TLS connection
Boolean Default: false
credentials
IAM access credentials

Credentials Attributes

For non-production use cases the access/secret keys can be used to prove data ingestion functionality. When migrating to a production environment implement a provider plugin using the provided JouleProviderPlugin interface, see basic example below.
Attribute
Description
Data Type
Required
access key
IAM user access key
String
secret key
IAM user secret key
String
provider plugin
Custom implementation of credentials ideal for production level deployments
JouleProviderPlugin implementation

JouleProviderPlugin Interface

public class JWTCredentialsProvider implements JouleProviderPlugin {
@Override
public Provider getProvider() {
return null;
}
@Override
public void initialize() throws CustomPluginException {
}
@Override
public void validate() throws InvalidSpecificationException {
}
@Override
public void setProperties(Properties properties) {
}
}

Bucket Attributes

Attribute
Description
Data Type
Required
bucket Id
Bucket name
String
object name
Object name to listen for events too
String
versioning
Ability to use object versioning. Valid values are either ENABLED or SUSPENDED
ENUM
notifications
Type of bucket notifications to subscribe too. Currently only s3:ObjectCreated:* is supported
String[]

Deserialization Attributes

This topic provides configuration parameters available object deserialization process.
Attribute
Description
Data Type
Required
format
Object file format. Available formats: PARQUET, ORC, ARROW, CSV
Enum Default: PARQUET
projection
String array of fields to ingest. Consider using this for wide rows otherwise all columns are ingested
String[]
Default: false

Deserializer example

deserializer:
format: PARQUET
projection: ["symbol","vol"]

Additional Resources