File processing

Joule provides utility classes to load large files efficiently


Requirements

See Setting up the environment documentation

Note all gradle commands must be execute at the root of the project directory

Overview

Under the hood Joule uses Apache Arrow to read files and thereby enable efficient large file handling and OOTB standard file format support. The classes that perform this work have been surfaced to developers in the form of a Callable task.

Two key classes are provided

Supported file formats supported

  • PARQUET

  • ORC

  • CSV

  • JSON

  • ARROW_IPC

The provided classes can be found under the SDK package

com.fractalworks.streams.sdk.util.file

FileProcessingTask

This processing task class reads a file contents and automatically converts each file logical row in to StreamEvent object. This is performed using micro-batch processing which reduces memory and processing overhead while driving stream processing throughput.

Example

The below example loads


FileProcessStatus comsumeFile(String eventType, String filename, String absoluteFilePath, FileFormat fileFormat, AtomicLong counter) throws Exception {
    var listener = new TransportListener() {
        @Override
        public void onEvent(Collection<StreamEvent> events) {
            counter.addAndGet(events.size());
        }
    };

    File fileuri = new File(absoluteFilePath);
    FileProcessingTask task = new FileProcessingTask(eventType, filename, fileuri.getAbsolutePath(), fileFormat, listener);
    FileProcessStatus status = task.call();

    await()
            .pollInterval(100, TimeUnit.MILLISECONDS)
            .until(checkForEvents(counter));
    return status;
}

// Simple event handler to check for number of events received
private Callable<Boolean> checkForEvents(AtomicLong eventsSeen) {
    return () -> (eventsSeen.get() == NUM_EVENTS);
}

ReferenceDataFileProcessingTask

This processing task class reads a reference data file contents and automatically converts each file logical row in to ReferenceData object. This is performed using micro-batch processing to reduce memory footprint and processing overhead and therefore able to read large files in to memory.

ReferenceData objects are stored within a in-memory data store to reduce the retrieval latency and I/O overhead.

Example

This example can be found within the fractalworks-geospatial-processor project test CellTowerCSVParserTest class.

// Create a in-memory store that implements the Store interface
CellTowerStore cellTowerStore = new CellTowerStore(250, 250);
cellTowerStore.setMaxElementsPerLevel(10000);
cellTowerStore.setTreeLevels(5);
cellTowerStore.initialize();

// Load the celltower file contents in to the store
File f = new File(CELLTOWER_FILE);
var task = new ReferenceDataFileProcessingTask<CellTower>((Store)cellTowerStore, f.getName(), f.getAbsolutePath(), FileFormat.CSV);
task.setMoveFileAfterProcessing(false);
task.setParser(new CellTowerArrowParser());
var status = task.call();

Last updated