Comment on page
MinIO S3
MinIO is a High-Performance Object Storage released under GNU Affero General Public License v3.0. It is API compatible with the Amazon S3 cloud storage service. The consumer transport processes bucket objects using notification driven events using batch processing to reduce the memory footprint of the Joule JVM.
io.minio:minio:8.5.4
minioConsumer:
name: "marketdata-S3Consumer"
connection:
endpoint: "https://localhost"
port: 9000
credentials:
access key: "XXXXXX"
secret key: "YYYYYYYYYYYYYYY"
bucket:
bucketId: "marketdata"
object name: "prices"
versioning: ENABLED
notifications: ["s3:ObjectCreated:*"]
retries: 3
deserializer:
format: PARQUET
projection: ["symbol","vol"]
processed dir: ./processed
processing rate: 5
batchSize: 100000
The above example subscribes to bucket object creation events for a specific file name as a parquet file type. Data will be processed every 5 seconds using StreamEvent batches which are submitted to the processing pipeline. The processed file will be moved to a local
processed
directory on completion of the data ingestion.Available configuration parameters
Attribute | Description | Data Type | Required |
---|---|---|---|
name | Name of source stream | String | |
connection | Connection details | ||
bucket | S3 bucket to ingest object data from | ||
deserializer | Deserialization configuration | ||
processed dir | Directory to place processed files into. If setting is not provided files will be removed from temp store | String | |
processing rate | Rate of which the consumer will act upon notifications | Integer
Default: 60 Seconds | |
batchSize | Number of records to be read in from file and passed to processing pipeline on a single cycle. | Long
Default: 1024 |
Attribute | Description | Data Type | Required |
---|---|---|---|
endpoint | S3 service endpoint | String
Default: https://localhost | |
port | Port the S3 service is hosted on | Integer
Default:9000 | |
url | Provide a fully qualified url endpoint, i.e. AWS, GCP, Azure urls. This is used over the endpoint setting if provided. | URL String | |
region | Region where the bucket is to be accessed | String | |
tls | Use a TLS connection | Boolean
Default: false | |
credentials | IAM access credentials |
For non-production use cases the access/secret keys can be used to prove data ingestion functionality. When migrating to a production environment implement a provider plugin using the provided
JouleProviderPlugin
interface, see basic example below.Attribute | Description | Data Type | Required |
---|---|---|---|
access key | IAM user access key | String | |
secret key | IAM user secret key | String | |
provider plugin | Custom implementation of credentials ideal for production level deployments | JouleProviderPlugin implementation |
public class JWTCredentialsProvider implements JouleProviderPlugin {
@Override
public Provider getProvider() {
return null;
}
@Override
public void initialize() throws CustomPluginException {
}
@Override
public void validate() throws InvalidSpecificationException {
}
@Override
public void setProperties(Properties properties) {
}
}
Attribute | Description | Data Type | Required |
---|---|---|---|
bucket Id | Bucket name | String | |
object name | Object name to listen for events too | String | |
versioning | Ability to use object versioning. Valid values are either ENABLED or SUSPENDED | ENUM | |
notifications | Type of bucket notifications to subscribe too. Currently only s3:ObjectCreated:* is supported | String[] |
This topic provides configuration parameters available object deserialization process.
Attribute | Description | Data Type | Required |
---|---|---|---|
format | Object file format. Available formats: PARQUET, ORC, ARROW, CSV | Enum
Default: PARQUET | |
projection | String array of fields to ingest. Consider using this for wide rows otherwise all columns are ingested | String[] Default: false |
deserializer:
format: PARQUET
projection: ["symbol","vol"]
Last modified 2mo ago