Row-wise Callbacks for ReadStat using Coroutines

My last post involving ReadStat was about how I'd like to set up a Postgres FDW for SAS data using ReadStat to parse the data files. Part of achieving a solution that doesn't require converting to CSV on the fly would require having a callback that executes after a row has been read. Namely, this would be for the IterateForeignScan callback of a postgres FDW which fills up and returns a TupleTableSlot (i.e. a row of data). ReadStat has a callback api at the individual values which also can know which column is being read (in other words, we can know if we're reading the last column in a row) through the use of a context object.

What I would like to do is be able to hand control back to the postgres FDW after the row has been read for it to do its thing, then pass control back to ReadStat to continue reading data. This requires a row callback functionality on the ReadStat side, as well as the ability to track FDW state and ReadStat state. From my various searches, coroutines seems like they may meet my needs as they may "suspend" and "resume" execution in different parts of running code. minicoro, being a coroutine library implemented in a single header file, appeared to be easiest to get started with. The goal of this post is to essentially have a row callback api that can execute an arbitrary function after reading a row.

Docker container setup

Set up a docker container to work in. This is essentially the same as my prior postgres post.

FROM debian:latest
RUN apt-get update && \
    apt-get -y install gcc gdb clangd make git autotools-dev libtool gettext && \
    git clone https://github.com/WizardMac/ReadStat.git /home/ReadStat && \
    git clone https://github.com/edubart/minicoro.git /home/minicoro

# setup and install ReadStat
WORKDIR /home/ReadStat
RUN git checkout v1.1.9
RUN ./autogen.sh && ./configure --prefix /usr/local
RUN make && make install
RUN echo "/usr/local/lib/" >> /etc/ld.so.conf 
RUN ldconfig
docker build . -t readstat_rowwise:latest
docker container run --rm --name readstat_rowwise -dti -v ${PWD}:/home/src/ readstat_rowwise 
ccfce81cab1e03a2c70b3abeb11837d86ea455fb5cfb240acd0c3ef3e1ae0cd6

C source

Headers and a Context Object

Here we set up a struct that holds ReadStat's context information, in this case the number of columns in the given SAS dataset and a pointer to an mco_coro coroutine object. With the number of columns known, the ReadStat value callback will know when it is reading the last column.

#include <readstat.h>

#define MINICORO_IMPL
#include "../minicoro/minicoro.h"

typedef struct {
  int col_count;
  mco_coro *co;
} rs_ctx;

ReadStat Callbacks

These methods are largely adapted from ReadStat's README.

  • Metadata Handler

    The metadata handler is run once after calling one of the readstat_parse_* methods, and here will set the number of columns in the context object that is passed to all handlers.

    int handle_metadata(readstat_metadata_t *metadata, void *ctx) {
      rs_ctx *my_count = (rs_ctx *)ctx;
      /* `var_count` corresponds to column count */
      my_count->col_count = readstat_get_var_count(metadata);
    
      return READSTAT_HANDLER_OK;
    }
    
  • Variable Handler

    "Variable" in ReadStat refers to a column name and is run once for each column. Here it prints tab-separated column names.

    int handle_variable(int index, readstat_variable_t *variable,
                              const char *val_labels, void *ctx) {
      /* this loops through column names to print the first line */
      rs_ctx *meta = (rs_ctx *)ctx;
      printf("%s", readstat_variable_get_name(variable));
      if (index == meta->col_count - 1) {
        printf("\n");
      } else {
        printf("\t");
      }
      return READSTAT_HANDLER_OK;
    }
    
  • Value Handler

    The value handler will run once for each value being read. Through the meta object it has access to the column count and can call mco_resume to pass control back to the coroutine.

    int handle_value(int obs_index, readstat_variable_t *variable,
                           readstat_value_t value, void *ctx) {
       /* this loops through the rest of the rows to print values */
       rs_ctx *meta = (rs_ctx *)ctx;
       int var_index = readstat_variable_get_index(variable);
       readstat_type_t type = readstat_value_type(value);
       if (!readstat_value_is_system_missing(value)) {
         if (type == READSTAT_TYPE_STRING) {
                printf("%s", readstat_string_value(value));
         } else if (type == READSTAT_TYPE_INT8) {
                printf("%.2hhd", readstat_int8_value(value));
         } else if (type == READSTAT_TYPE_INT16) {
                printf("%.2hd", readstat_int16_value(value));
         } else if (type == READSTAT_TYPE_INT32) {
                printf("%.2d", readstat_int32_value(value));
         } else if (type == READSTAT_TYPE_FLOAT) {
                printf("%.2f", readstat_float_value(value));
         } else if (type == READSTAT_TYPE_DOUBLE) {
                printf("%.2lf", readstat_double_value(value));
         }
       }
    
       if (var_index == meta->col_count - 1) {
         printf("\n");
         mco_resume(meta->co);
       } else {
         printf("\t");
       }
       return READSTAT_HANDLER_OK;
     }
    

Coroutine Entry Function

This basic coroutine function will initialize the running row count and suspend execution by calling mco_yield. When execution returns, it will resume from the first mco_yield and continually print a message with the rowcount and yielding back so the next row may be read.

void coro_entry(mco_coro *co) {
  int current_row_count = 1;
  // stop after initializing state
  mco_yield(co);

  while (1) {
    printf("finished reading row #: %d\n", current_row_count++);
    mco_yield(co);
  }
}

Main

A small example taking the filename of a sas7bdat file to parse and print.

int main(int argc, char *argv[]) {
  if (argc != 2) {
    printf("Usage: %s <filename>\n", argv[0]);
    return 1;
  }

  // setup the coroutine object
  mco_desc desc = mco_desc_init(coro_entry, 0);
  desc.user_data = NULL;
  mco_coro *co;
  mco_result res = mco_create(&co, &desc);
  res = mco_resume(co);

  // initialize ReadStat's context
  rs_ctx meta = {.col_count = 0, .co = co};
  readstat_error_t error = READSTAT_OK;
  readstat_parser_t *parser = readstat_parser_init();

  // set the handlers
  readstat_set_metadata_handler(parser, &handle_metadata);
  readstat_set_variable_handler(parser, &handle_variable);
  readstat_set_value_handler(parser, &handle_value);

  error = readstat_parse_sas7bdat(parser, argv[1], &meta);
  res = mco_destroy(co);
  readstat_parser_free(parser);

  if (error != READSTAT_OK) {
    printf("Error processing %s: %d\n", argv[1], error);
    return 1;
  }
  return 0;
}

Compile

gcc src/readstat_rowwise.c -o readstat_rowwise.o -g -lreadstat

Run

./readstat_rowwise.o src/mtcars_head.sas7bdat
mpg	cyl	disp	hp	drat	wt	qsec	vs	am	gear	carb
21.00	6.00	160.00	110.00	3.90	2.62	16.46	0.00	1.00	4.00	4.00
finished reading row #: 1
21.00	6.00	160.00	110.00	3.90	2.88	17.02	0.00	1.00	4.00	4.00
finished reading row #: 2
22.80	4.00	108.00	93.00	3.85	2.32	18.61	1.00	1.00	4.00	1.00
finished reading row #: 3
21.40	6.00	258.00	110.00	3.08	3.21	19.44	1.00	0.00	3.00	1.00
finished reading row #: 4
18.70	8.00	360.00	175.00	3.15	3.44	17.02	0.00	0.00	3.00	2.00
finished reading row #: 5
18.10	6.00	225.00	105.00	2.76	3.46	20.22	1.00	0.00	3.00	1.00
finished reading row #: 6

This demonstrates how the coroutine makes it easy to pass execution control around at any point, here emulating the behavior of a row callback without having to modify ReadStat's internals in any way. Only a simple message is printed in this example, but appropriately modifying coro_entry should enable me to write a proper FDW for sas7bdat files.