File processing
Joule provides utility classes to load large files efficiently
Overview
Under the hood Joule uses Apache Arrow to read files and thereby enable efficient large file handling and OOTB standard file format support. The classes that perform this work have been surfaced to developers in the form of a Callable
task.
Two key classes are provided:
Supported file formats supported
PARQUET
ORC
CSV
JSON
ARROW_IPC
The provided classes can be found under the SDK package
com.fractalworks.streams.sdk.util.file
FileProcessingTask
This processing task class reads a file contents and automatically converts each file logical row in to StreamEvent object. This is performed using micro-batch processing which reduces memory and processing overhead while driving stream processing throughput.
Example
The below example loads
FileProcessStatus comsumeFile(String eventType, String filename, String absoluteFilePath, FileFormat fileFormat, AtomicLong counter) throws Exception {
var listener = new TransportListener() {
@Override
public void onEvent(Collection<StreamEvent> events) {
counter.addAndGet(events.size());
}
};
File fileuri = new File(absoluteFilePath);
FileProcessingTask task = new FileProcessingTask(eventType, filename, fileuri.getAbsolutePath(), fileFormat, listener);
FileProcessStatus status = task.call();
await()
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(checkForEvents(counter));
return status;
}
// Simple event handler to check for number of events received
private Callable<Boolean> checkForEvents(AtomicLong eventsSeen) {
return () -> (eventsSeen.get() == NUM_EVENTS);
}
ReferenceDataFileProcessingTask
This processing task class reads a reference data file contents and automatically converts each file logical row in to ReferenceData object. This is performed using micro-batch processing to reduce memory footprint and processing overhead and therefore able to read large files in to memory.
ReferenceData objects are stored within a in-memory data store to reduce the retrieval latency and I/O overhead.
Example
This example can be found within the fractalworks-geospatial-processor project test CellTowerCSVParserTest
class.
// Create a in-memory store that implements the Store interface
CellTowerStore cellTowerStore = new CellTowerStore(250, 250);
cellTowerStore.setMaxElementsPerLevel(10000);
cellTowerStore.setTreeLevels(5);
cellTowerStore.initialize();
// Load the celltower file contents in to the store
File f = new File(CELLTOWER_FILE);
var task = new ReferenceDataFileProcessingTask<CellTower>((Store)cellTowerStore, f.getName(), f.getAbsolutePath(), FileFormat.CSV);
task.setMoveFileAfterProcessing(false);
task.setParser(new CellTowerArrowParser());
var status = task.call();
Last updated
Was this helpful?