Joule
  • Welcome to Joule's Docs
  • Why Joule?
    • Joule capabilities
  • What is Joule?
    • Key features
    • The tech stack
  • Use case enablement
    • Use case building framework
  • Concepts
    • Core concepts
    • Low code development
    • Unified execution engine
    • Batch and stream processing
    • Continuous metrics
    • Key Joule data types
      • StreamEvent object
      • Contextual data
      • GeoNode
  • Tutorials
    • Getting started
    • Build your first use case
    • Stream sliding window quote analytics
    • Advanced tutorials
      • Custom missing value processor
      • Stateless Bollinger band analytics
      • IoT device control
  • FAQ
  • Glossary
  • Components
    • Pipelines
      • Use case anatomy
      • Data priming
        • Types of import
      • Processing unit
      • Group by
      • Emit computed events
      • Telemetry auditing
    • Processors
      • Common attributes
      • Filters
        • By type
        • By expression
        • Send on delta
        • Remove attributes
        • Drop all events
      • Enrichment
        • Key concepts
          • Anatomy of enrichment DSL
          • Banking example
        • Metrics
        • Dynamic contextual data
          • Caching architecture
        • Static contextual data
      • Transformation
        • Field Tokeniser
        • Obfuscation
          • Encryption
          • Masking
          • Bucketing
          • Redaction
      • Triggers
        • Change Data Capture
        • Business rules
      • Stream join
        • Inner stream joins
        • Outer stream joins
        • Join attributes & policy
      • Event tap
        • Anatomy of a Tap
        • SQL Queries
    • Analytics
      • Analytic tools
        • User defined analytics
          • Streaming analytics example
          • User defined analytics
          • User defined scripts
          • User defined functions
            • Average function library
        • Window analytics
          • Tumbling window
          • Sliding window
          • Aggregate functions
        • Analytic functions
          • Stateful
            • Exponential moving average
            • Rolling Sum
          • Stateless
            • Normalisation
              • Absolute max
              • Min max
              • Standardisation
              • Mean
              • Log
              • Z-Score
            • Scaling
              • Unit scale
              • Robust Scale
            • Statistics
              • Statistic summaries
              • Weighted moving average
              • Simple moving average
              • Count
            • General
              • Euclidean
        • Advanced analytics
          • Geospatial
            • Entity geo tracker
            • Geofence occupancy trigger
            • Geo search
            • IP address resolver
            • Reverse geocoding
            • Spatial Index
          • HyperLogLog
          • Distinct counter
      • ML inferencing
        • Feature engineering
          • Scripting
          • Scaling
          • Transform
        • Online predictive analytics
        • Model audit
        • Model management
      • Metrics engine
        • Create metrics
        • Apply metrics
        • Manage metrics
        • Priming metrics
    • Contextual data
      • Architecture
      • Configuration
      • MinIO S3
      • Apache Geode
    • Connectors
      • Sources
        • Kafka
          • Ingestion
        • RabbitMQ
          • Further RabbitMQ configurations
        • MQTT
          • Topic wildcards
          • Session management
          • Last Will and Testament
        • Rest endpoints
        • MinIO S3
        • File watcher
      • Sinks
        • Kafka
        • RabbitMQ
          • Further configurations
        • MQTT
          • Persistent messaging
          • Last Will and Testament
        • SQL databases
        • InfluxDB
        • MongoDB
        • Geode
        • WebSocket endpoint
        • MinIO S3
        • File transport
        • Slack
        • Email
      • Serialisers
        • Serialisation
          • Custom transform example
          • Formatters
        • Deserialisers
          • Custom parsing example
    • Observability
      • Enabling JMX for Joule
      • Meters
      • Metrics API
  • DEVELOPER GUIDES
    • Setting up developer environment
      • Environment setup
      • Build and deploy
      • Install Joule
        • Install Docker demo environment
        • Install with Docker
        • Install from source
        • Install Joule examples
    • Joulectl CLI
    • API Endpoints
      • Mangement API
        • Use case
        • Pipelines
        • Data connectors
        • Contextual data
      • Data access API
        • Query
        • Upload
        • WebSocket
      • SQL support
    • Builder SDK
      • Connector API
        • Sources
          • StreamEventParser API
        • Sinks
          • CustomTransformer API
      • Processor API
      • Analytics API
        • Create custom metrics
        • Define analytics
        • Windows API
        • SQL queries
      • Transformation API
        • Obfuscation API
        • FieldTokenizer API
      • File processing
      • Data types
        • StreamEvent
        • ReferenceDataObject
        • GeoNode
    • System configuration
      • System properties
  • Deployment strategies
    • Deployment Overview
    • Single Node
    • Cluster
    • GuardianDB
    • Packaging
      • Containers
      • Bare metal
  • Product updates
    • Public Roadmap
    • Release Notes
      • v1.2.0 Join Streams with stateful analytics
      • v1.1.0 Streaming analytics enhancements
      • v1.0.4 Predictive stream processing
      • v1.0.3 Contextual SQL based metrics
    • Change history
Powered by GitBook
On this page
  • Overview
  • Examples & DSL attributes
  • Attributes schema
  • Connection attributes schema
  • Credentials attributes
  • Provider plugin implementation
  • Bucket attributes schema
  • Deserialisation attributes schema
  • Deserialiser example
  • Additional resources

Was this helpful?

  1. Components
  2. Connectors
  3. Sources

MinIO S3

MinIO S3 file consumer for cloud or local hosted bucket storage

PreviousRest endpointsNextFile watcher

Last updated 6 months ago

Was this helpful?

Overview

MinIO is a high-performance object storage released under GNU Affero General Public License v3.0. It is API compatible with the Amazon S3, for efficient file processing and integration with the Joule process.

Joule's MinIO consumer transport, processes S3 objects using notification change events at the bucket level using an efficient I/O and memory access pattern.

This enables two key features:

  • Batch processing Large data files to be processed using stream processing

  • Machine learning model management ML models are loaded from S3 storage on process startup and replaced on model update

Detailed configuration examples illustrate setting up the MinIO consumer to subscribe to specific bucket events, set processing rates and handle data deserialisation in PARQUET format.

Additionally, guidance on connection credentials, provider plugins for production security and deserialisation attributes are provided for streamlined configuration and integration with the Joule ecosystem.

Driver details:

Examples & DSL attributes

This example sets up a MinIO consumer named marketdata-S3Consumer to subscribe to object creation events in the marketdata bucket for files named prices in PARQUET format.

Data is ingested every 5 seconds in batches of up to 100,000 records and processed according to a specified projection on the fields symbol and vol.

After processing, files are moved to a local processed directory and retries are set to 3 in case of processing failures.

minioConsumer:
  name: "marketdata-S3Consumer"

  connection:
    endpoint: "https://localhost"
    port: 9000
    credentials:
      access key: "XXXXXX"
      secret key: "YYYYYYYYYYYYYYY"

  bucket:
    bucketId: "marketdata"
    object name: "prices"
    versioning: ENABLED
    notifications: ["s3:ObjectCreated:*"]
    retries: 3

  deserializer:
    format: PARQUET
    projection: ["symbol","vol"]

  processed dir: ./processed
  processing rate: 5
  batchSize: 100000
  

Attributes schema

Attribute
Description
Data Type
Required

name

Name of source stream

String

connection

Connection details

bucket

S3 bucket to ingest object data from

deserializer

Deserialisation configuration

processed dir

Directory to place processed files into. If setting is not provided files will be removed from temp store

String

processing rate

Rate of which the consumer will act upon notifications

Integer Default: 60 Seconds

batchSize

Number of records to be read in from file and passed to processing pipeline on a single cycle

Long Default: 1024

Connection attributes schema

Attribute
Description
Data Type
Required

endpoint

S3 service endpoint

String Default: https://localhost

port

Port the S3 service is hosted on

Integer Default:9000

url

Provide a fully qualified url endpoint, i.e. AWS, GCP, Azure urls. This is used over the endpoint setting if provided

URL String

region

Region where the bucket is to be accessed

String

tls

Use a TLS connection

Boolean Default: false

credentials

IAM access credentials

Credentials attributes

For non-production use cases the access / secret keys can be used to prove data ingestion functionality.

When migrating to a production environment, implement a provider plugin using the provided JouleProviderPlugin interface, see basic example in the next section.

Attribute
Description
Data Type
Required

access key

IAM user access key

String

secret key

IAM user secret key

String

provider plugin

Custom implementation of credentials ideal for production level deployments

JouleProviderPlugin implementation

Provider plugin implementation

JouleProviderPlugin Interface

public class JWTCredentialsProvider implements JouleProviderPlugin {
    @Override
    public Provider getProvider() {
        return null;
    }

    @Override
    public void initialize() throws CustomPluginException {
    }

    @Override
    public void validate() throws InvalidSpecificationException {
    }

    @Override
    public void setProperties(Properties properties) {
    }
}

Bucket attributes schema

Attribute
Description
Data Type
Required

bucket Id

Bucket name

String

object name

Object name to listen for events too

String

versioning

Ability to use object versioning. Valid values are either ENABLED or SUSPENDED

ENUM

notifications

Type of bucket notifications to subscribe too. Currently only s3:ObjectCreated:* is supported

String[]

Deserialisation attributes schema

This topic provides configuration parameters available object deserialisation process.

Attribute
Description
Data Type
Required

format

Object file format. Available formats: PARQUET, ORC, ARROW, CSV

Enum Default: PARQUET

projection

String array of fields to ingest. Consider using this for wide rows otherwise all columns are ingested

String[]

Default: false

Deserialiser example

minioConsumer:
   ...

   deserializer:
      format: PARQUET
      projection: ["symbol","vol"]

Additional resources

See section

See section

See section

See

Official

MinIO

io.minio:minio:8.5.4
MinIO documentation
Docker image
Connection attributes
Bucket attributes
Deserialisation attributes
Credentials section