Skip to main content

Reading and Writing Arrow Data

About RecordBatches

Arrow tables are typically split into record batches, allowing them to be incrementally loaded or written, and naturally the Arrow API provides classes to facilite this reading.

Reading Arrow Data

The Table class provides a simple Table.from convenience method for reading an Arrow formatted data file into Arrow data structures:

import { readFileSync } from 'fs';
import { Table } from 'apache-arrow';
const arrow = readFileSync('simple.arrow');
const table = Table.from([arrow]);
console.log(table.toString());

Using RecordBatchReader to read from a Data Source

To read Arrow tables incrementally, you use the RecordBatchReader class.

If you only have one table in your file (the normal case), then you'll only need one RecordBatchReader:

const reader = await RecordBatchReader.from(fetch(path, {credentials: 'omit'}));
for await (const batch of reader) {
console.log(batch.length);
}

Reading Multiple Tables from a Data Source

The JavaScript Arrow API supports arrow data streams that contain multiple tables (this is an "extension" to the arrow spec). Naturally, each Table comes with its own set of record batches, so to read all batches from all tables in the data source you will need a double loop:

const readers = RecordBatchReader.readAll(fetch(path, {credentials: 'omit'}));
for await (const reader of readers) {
for await (const batch of reader) {
console.log(batch.length);
}
}

Note: this code also works if there is only one table in the data source, in which case the outer loop will only execute once.

Writing Arrow Data

The RecordStreamWriter class allows you to write Arrow Table and RecordBatch instances to a data source.

Using Transform Streams

Connecting to Python Processes

A more complicated example of using Arrow to go from node -> python -> node:

const {AsyncIterable} = require('ix');
const {child} = require('event-stream');
const {fork} = require('child_process');
const {RecordBatchStreamWriter} = require('apache-arrow');

const compute_degrees_via_gpu_accelerated_sql = (
(scriptPath) => (edgeListColumnName) =>
spawn('python3', [scriptPath, edgeListColumnName], {
env: process.env,
stdio: ['pipe', 'pipe', 'inherit']
})
)(require('path').resolve(__dirname, 'compute_degrees.py'));

function compute_degrees(colName, recordBatchReaders) {
return AsyncIterable.as(recordBatchReaders)
.mergeAll()
.pipe(RecordBatchStreamWriter.throughNode())
.pipe(compute_degrees_via_gpu_accelerated_sql(colName));
}

module.exports = compute_degrees;

This example construct pipes of streams of events and that python process just reads from stdin, does a GPU-dataframe operation, and writes the results to stdout. (This example uses Rx/IxJS style functional streaming pipelines).

compute_degrees_via_gpu_accelerated_sql returns a node child_process that is also a duplex stream, similar to the event-stream#child() method