Row-wise Callbacks for ReadStat using Coroutines
My last post involving ReadStat was about how I'd like to set up a Postgres FDW
for SAS data using ReadStat to parse the data files. Part of achieving a
solution that doesn't require converting to CSV on the fly would require having
a callback that executes after a row has been read. Namely, this would be for
the IterateForeignScan
callback of a postgres FDW which fills up and returns a
TupleTableSlot
(i.e. a row of data). ReadStat has a callback api at the
individual values which also can know which column is being read (in other
words, we can know if we're reading the last column in a row) through the use of
a context object.
What I would like to do is be able to hand control back to the postgres FDW after the row has been read for it to do its thing, then pass control back to ReadStat to continue reading data. This requires a row callback functionality on the ReadStat side, as well as the ability to track FDW state and ReadStat state. From my various searches, coroutines seems like they may meet my needs as they may "suspend" and "resume" execution in different parts of running code. minicoro, being a coroutine library implemented in a single header file, appeared to be easiest to get started with. The goal of this post is to essentially have a row callback api that can execute an arbitrary function after reading a row.
Docker container setup
Set up a docker container to work in. This is essentially the same as my prior postgres post.
FROM debian:latest RUN apt-get update && \ apt-get -y install gcc gdb clangd make git autotools-dev libtool gettext && \ git clone https://github.com/WizardMac/ReadStat.git /home/ReadStat && \ git clone https://github.com/edubart/minicoro.git /home/minicoro # setup and install ReadStat WORKDIR /home/ReadStat RUN git checkout v1.1.9 RUN ./autogen.sh && ./configure --prefix /usr/local RUN make && make install RUN echo "/usr/local/lib/" >> /etc/ld.so.conf RUN ldconfig
docker build . -t readstat_rowwise:latest
docker container run --rm --name readstat_rowwise -dti -v ${PWD}:/home/src/ readstat_rowwise
ccfce81cab1e03a2c70b3abeb11837d86ea455fb5cfb240acd0c3ef3e1ae0cd6
C source
Headers and a Context Object
Here we set up a struct that holds ReadStat's context information, in this case
the number of columns in the given SAS dataset and a pointer to an mco_coro
coroutine object. With the number of columns known, the ReadStat value callback
will know when it is reading the last column.
#include <readstat.h> #define MINICORO_IMPL #include "../minicoro/minicoro.h" typedef struct { int col_count; mco_coro *co; } rs_ctx;
ReadStat Callbacks
These methods are largely adapted from ReadStat's README.
- Metadata Handler
The metadata handler is run once after calling one of the
readstat_parse_*
methods, and here will set the number of columns in the context object that is passed to all handlers.int handle_metadata(readstat_metadata_t *metadata, void *ctx) { rs_ctx *my_count = (rs_ctx *)ctx; /* `var_count` corresponds to column count */ my_count->col_count = readstat_get_var_count(metadata); return READSTAT_HANDLER_OK; }
- Variable Handler
"Variable" in ReadStat refers to a column name and is run once for each column. Here it prints tab-separated column names.
int handle_variable(int index, readstat_variable_t *variable, const char *val_labels, void *ctx) { /* this loops through column names to print the first line */ rs_ctx *meta = (rs_ctx *)ctx; printf("%s", readstat_variable_get_name(variable)); if (index == meta->col_count - 1) { printf("\n"); } else { printf("\t"); } return READSTAT_HANDLER_OK; }
- Value Handler
The value handler will run once for each value being read. Through the
meta
object it has access to the column count and can callmco_resume
to pass control back to the coroutine.int handle_value(int obs_index, readstat_variable_t *variable, readstat_value_t value, void *ctx) { /* this loops through the rest of the rows to print values */ rs_ctx *meta = (rs_ctx *)ctx; int var_index = readstat_variable_get_index(variable); readstat_type_t type = readstat_value_type(value); if (!readstat_value_is_system_missing(value)) { if (type == READSTAT_TYPE_STRING) { printf("%s", readstat_string_value(value)); } else if (type == READSTAT_TYPE_INT8) { printf("%.2hhd", readstat_int8_value(value)); } else if (type == READSTAT_TYPE_INT16) { printf("%.2hd", readstat_int16_value(value)); } else if (type == READSTAT_TYPE_INT32) { printf("%.2d", readstat_int32_value(value)); } else if (type == READSTAT_TYPE_FLOAT) { printf("%.2f", readstat_float_value(value)); } else if (type == READSTAT_TYPE_DOUBLE) { printf("%.2lf", readstat_double_value(value)); } } if (var_index == meta->col_count - 1) { printf("\n"); mco_resume(meta->co); } else { printf("\t"); } return READSTAT_HANDLER_OK; }
Coroutine Entry Function
This basic coroutine function will initialize the running row count and suspend
execution by calling mco_yield
. When execution returns, it will resume from
the first mco_yield
and continually print a message with the rowcount and
yielding back so the next row may be read.
void coro_entry(mco_coro *co) { int current_row_count = 1; // stop after initializing state mco_yield(co); while (1) { printf("finished reading row #: %d\n", current_row_count++); mco_yield(co); } }
Main
A small example taking the filename of a sas7bdat file to parse and print.
int main(int argc, char *argv[]) { if (argc != 2) { printf("Usage: %s <filename>\n", argv[0]); return 1; } // setup the coroutine object mco_desc desc = mco_desc_init(coro_entry, 0); desc.user_data = NULL; mco_coro *co; mco_result res = mco_create(&co, &desc); res = mco_resume(co); // initialize ReadStat's context rs_ctx meta = {.col_count = 0, .co = co}; readstat_error_t error = READSTAT_OK; readstat_parser_t *parser = readstat_parser_init(); // set the handlers readstat_set_metadata_handler(parser, &handle_metadata); readstat_set_variable_handler(parser, &handle_variable); readstat_set_value_handler(parser, &handle_value); error = readstat_parse_sas7bdat(parser, argv[1], &meta); res = mco_destroy(co); readstat_parser_free(parser); if (error != READSTAT_OK) { printf("Error processing %s: %d\n", argv[1], error); return 1; } return 0; }
Compile
gcc src/readstat_rowwise.c -o readstat_rowwise.o -g -lreadstat
Run
./readstat_rowwise.o src/mtcars_head.sas7bdat
mpg cyl disp hp drat wt qsec vs am gear carb 21.00 6.00 160.00 110.00 3.90 2.62 16.46 0.00 1.00 4.00 4.00 finished reading row #: 1 21.00 6.00 160.00 110.00 3.90 2.88 17.02 0.00 1.00 4.00 4.00 finished reading row #: 2 22.80 4.00 108.00 93.00 3.85 2.32 18.61 1.00 1.00 4.00 1.00 finished reading row #: 3 21.40 6.00 258.00 110.00 3.08 3.21 19.44 1.00 0.00 3.00 1.00 finished reading row #: 4 18.70 8.00 360.00 175.00 3.15 3.44 17.02 0.00 0.00 3.00 2.00 finished reading row #: 5 18.10 6.00 225.00 105.00 2.76 3.46 20.22 1.00 0.00 3.00 1.00 finished reading row #: 6
This demonstrates how the coroutine makes it easy to pass execution control
around at any point, here emulating the behavior of a row callback without
having to modify ReadStat's internals in any way. Only a simple message is
printed in this example, but appropriately modifying coro_entry
should enable
me to write a proper FDW for sas7bdat files.