Home

February 24, 2019

Realtime Immunizations: Rust, Kafka, WebSockets, FHIR

Recently I had occasion to learn about FHIR: Fast Healthcare Interoperability Resources. Lacking my own hospital, I was in need of some examples of FHIR data: how it’s structured, what kinds of patient information it includes, etc. I soon located the smart-on-fhir/sample-patients repository containing a randomized FHIR record generator written in Python. One of the subrecords pouring out of that firehose concerns immunizations; it seemed to lend itself well to a visualization that eventually became this:



In this post I’ll walk through how this particular visualization works, moving from the back-end to the front.

FHIR Data and Kafka

I picked immunizations because the corresponding data element is relatively simple; there’s a reference to a patient, the name of an immunization the patient received, and the date the patient received the immunization, as seen in this example:

<Immunization>
    <id value="-Immunization-12"/>
    <text>
        <status value="generated"/>
        <div xmlns="http://www.w3.org/1999/xhtml">pneumonia</div>
    </text>
    <patient>
        <reference value="Patient/2042917"/>
    </patient>
    <date value="2011-09-20"/>
    <vaccineCode>
        <coding>
            <system value="http://www2a.cdc.gov/vaccines/IIS/IISStandards/vaccines.asp?rpt=cvx"/>
            <code value="141"/>
            <display value="pneumonia"/>
        </coding>
    </vaccineCode>
    <reported value="false"/>
    <wasNotGiven value="false"/>
</Immunization>

This <Immunization> element has been extracted from a larger XML context containing all relevant information about the patient; I’ve omitted that here to save space. In fact, the smart-on-fhir/sample-patients generator spits out a ton of information per record: each randomized record is about 1 MB total. I modified my version of the generator to only include immunizations alongiside the patient’s personal info prelude, resulting in much smaller record. I then modified the generator to operate indefinitely, sending the FHIR records to a topic in Apache Kafka instead of writing them to disk. Needing an intermediary to read the Kafka topic, I turned to…

Rust

I use Rust any chance I get, but this was a natural choice given that Rust crates for both Kafka and WebSockets exist. Here I’ll walk through main.rs section by section; at only 78 lines, it’s really quite impressive how much can be accomplished with so few lines of Rust.

With dependencies for ws, eventual, xml-rs, and kafka defined in Cargo.toml, the prelude for main.rs is:

extern crate ws;
extern crate eventual;
extern crate xml;

use ws::listen;
use ws::{Sender};
use std::thread;
use std::sync::{Arc, RwLock};
use std::sync::mpsc::channel;
use kafka::consumer::{Consumer, FetchOffset};
use xml::reader::{EventReader, XmlEvent};

We then open the main function and define an (atomically) reference-counted read-write lock on an optional Sender called lslock, where Sender is a WebSocket client:

fn main() {

  let lslock : Arc<RwLock<Option<Sender>>> = Arc::new(RwLock::new(None));

We’ll be updating this as new WebSocket clients connect, then using it to broadcast new immunizations to all clients as they are read from Kafka.

Next we need a thread that performs the broadcast, and a means by which to pass vaccine names around: Rust’s multi-producer single-consumer mpsc::channel works well for this. Here the channel is created, a clone of lslock above is created, and ownership of both that clone and the receiving end of the channel are moved into a new thread that waits to receive a new vaccine name. Upon receipt, the thread acquires a reader lock on the last seen sender, checks to see if there is in fact a previously seen sender, and if so broadcasts the received vaccine name to all WebSocket clients via that last seen sender:

  let (immtx, immrx) = channel();
  let lslock1 = lslock.clone();
  let _child = thread::spawn(move || {
      loop {
        let imm_msg = immrx.recv().unwrap();
        if *lslock1.read().unwrap() != None {
            let last_sender = lslock1.read().unwrap();
            let s = last_sender.clone();
            s.unwrap().broadcast(imm_msg).unwrap();
        }
      }
  });

Now we’ll set up a consumer to read the topic in Apache Kafka to which the Python generator is sending messages, and a thread to own that consumer and continually read from it:

  let mut consumer =
   Consumer::from_hosts(vec!("localhost:9092".to_owned()))
      .with_topic("fhir".to_string())
      .with_fallback_offset(FetchOffset::Latest)
      .create()
      .unwrap();
  let _child = thread::spawn(move || {
    loop {

The body of this thread loop needs to parse each FHIR message as it is read from Kafka. Here we care only about the <display> element inside the <vaccineCode> element; if it’s present in a message, we extract just that vaccine name and send it out as simple JSON on the mpsc channel, where it will eventually be broadcast to all WebSocket clients by the thread detailed above:

        for ms in consumer.poll().unwrap().iter() {
            for m in ms.messages() {
                let msg = String::from_utf8(m.value.to_vec()).unwrap();
                let parser = EventReader::from_str(&msg[..]);
                let mut in_vaccine = false;
                for e in parser {
                    match e {
                        Ok(XmlEvent::StartElement { name, attributes, .. }) => {
                            if format!("{}", name).ends_with("vaccineCode") {
                                in_vaccine = true;
                            }
                            if format!("{}", name).ends_with("display")
                                && in_vaccine {
                                let v = attributes.get(0).unwrap().value.clone();
                                println!("Saw vaccine: {}", v);
                                immtx.send(format!("{{\"vaccine\":\"{}\"}}", v)).unwrap();
                            }
                        },
                        Ok(XmlEvent::EndElement { name }) => {
                            if format!("{}", name).ends_with("vaccineCode") {
                                in_vaccine = false;
                            }
                        },
                        _ => {}
                    }
                }
            }
            consumer.consume_messageset(ms).unwrap();
        }
        consumer.commit_consumed().unwrap();
    }
  });

(Note that the above is SAX style: it could probably be simplified greatly using a well-crafted XPath selector, but I didn’t take the time to do so.)

The final missing piece simply listens for new WebSocket connections; when we get one, we obtain the lock to the last seen sender as a writer and update it with the client that just connected:

  listen("0.0.0.0:3012", |sender| {
      let mut last_sender = lslock.write().unwrap();
      *last_sender = Some(sender.clone());
      move |_msg| {
         Ok(())
      }
  }).unwrap()
}

D3

I adapted this D3 bar chart to open a WebSocket connection to the 3012 port on which the above Rust program was running. After that it was relatively straightforward to make it update continuously: everytime a WS message is received, update a dictionary that maps vaccine names to counts, then wrap the D3 initialization code in a function that accepts such a dictionary so that it can be called repeatedly. (I learned the hard way that the entire <svg> element that D3 writes to needs to be deleted from the DOM each time; I found no clean way to simply “reload” data from the dictionary instead).

The end result is what you see above at the top of this post. Does this particular visualization have much utility? Not really, unless you’re an interior designer at the CDC who runs in the same circles as the designers of the FAA’s Potomac Consolidated TRACON. Even better than a bar chart here would be to observe immunizations over time on a daily summary basis. Ideally prior to the next outbreak of swine flu, of course.