Handling Late Arriving Data with Apache Beam and Apache Airflow

Ryan Berti
Google Cloud - Community
7 min readJun 16, 2021

--

Late in 2019, I was hired at a streaming service focused on short form content to help build the data platform. One key component of this streaming service was the focus on mobile consumption, and this necessitated that our data platform handle late arriving data gracefully. I used the example of offline viewing due to a plane flight to communicate this problem in a presentation.

Do you include this user’s playback with metrics for hour 7 (event time), or hour 9 (receipt time)? If you associate the playback with event time, how does the system handle this late arriving data?

This problem can been solved in various ways, using various technologies, and each solve has tradeoffs. The simplest solution is efficient, but potentially misleading: drop the late arriving data. A more correct, but delay-incurring solution involves holding up processing for an arbitrary amount of time, such that late arriving data within the threshold is processed, and anything arriving after is (hopefully) negligible and discarded. A more correct, but compute inefficient solution involves reprocessing all events within some look back window, at some regular interval. A more complex, more correct, and compute efficient solution involves tracking those partitions which contain late arriving data and reprocessing them as needed.

(Oversimplified) Processing Patterns

I had hoped that our streaming service would grow such that the volume of events and the requirements of data products and stakeholders would necessitate we use the solution that was correct, compute efficient, and did not add unnecessary delay. Unfortunately, the streaming service never scaled up, but I believe the architecture I proposed and POC’d could be useful in other environments. This post will outline the proposed architecture and provide an example implementation, as well as provide insight into our final production architecture.

Streaming data processing can be daunting, especially for small teams with stakeholders that don’t have real time requirements. That being said, there’s explicit advantages to architecting the data platform to be event-centric (see: Kappa Architecture). Given the tradeoffs, the team adopted Apache Beam due to its ability to handle batch and streaming jobs with the same API, as well as its availability as a managed service in GCP. Another benefit of using Beam as a core component in the data platform is its ability group together application events in real time via session windows, vs landing events and grouping at some later point. Beam’s session windows provide the ability to tune how long to wait for events before closing a session (gap duration), and also how long after the window has been closed to allow for late-arriving events (allowed lateness).

Beam also supports the configuration of when to trigger and have the window output the grouped events for downstream processing (For example, after gap duration has been reached, and again at the end of the allowed lateness threshold). Beam’s support for multiple triggers allows implementations to write out partial results, and re-write complete results at a later processing time. This feature drives the question: how do I write this partial output out for downstream consumption. The ‘Late arriving facts’ section of Maxime Beauchemin’s foundational post on Functional Data Engineering provides a solution: write data in event time + processing time partitions.

Here’s an example Beam implementation that utilizes session windows, and writes outputs to event time / processing time partitions. The idea here is that the pipeline will write one or more receipt time partitions for the same event time partition, where the receipt time partitions will contain the partial and complete pipeline results for sessions with late arriving data. Downstream consumers of the event time partition will be responsible for de-duplicating the records found in the receipt time partitions. This implementation uses Spotify’s Scio, a Scala API for Beam that we found to be much more succinct than the Java API (which also benefits from type safety and other JVM/functional benefits not available in the Python API).

Example Beam Pipeline Output

Consumption of streaming data outputs, and further processing via traditional batch pipelines, is an inevitable next step in any event-centric architecture. This architecture utilizes PubSub as a mechanism for advertising data availability, with the goal of decoupling downstream processing from the streaming data producer. After writing event time/receipt time partitions, the pipeline publishes JSON events to a PubSub topic. At the most minimal, the JSON events contain information about the event time partitions where the streaming pipeline has written receipt time inserts/updates. This pattern allows the data consumers to be flexible in how they consume streaming outputs: in some cases, the consumer may be another streaming job which reads event time partitions as soon as they’re available, while in other cases these event time updates can be consumed in a batch via a traditional cron based workflow.

While a cron based workflow is likely sufficient, I wanted to experiment with driving airflow asynchronously via events, and thus this architecture utilizes Airflow’s REST API to trigger DAG runs. This allows users to build batch-style processing DAGs in a familiar way, and assuming the DAGs are idempotent, they can be re-run against the same source partition multiple times as late arriving data becomes available (and this processing can happen in parallel without the artificial delay introduced by a cron). The implementation utilizes a Cloud Function to consume events from PubSub and drive the Airflow DAG for a given execution time value. This implementation is a slight modification to the GCP example that consumes data when it becomes available in GCS, and uses Spark SQL to do simple de-duplication of sessions within the event time partition.

Example Event Driven Airflow Output

I liked the idea of building our data processing platform using this architecture due to its flexibility and extensibility:

  • Downstream consumption via Airflow DAGs could be tuned for performance/cost/correctness by adjusting the cadence that the upstream Beam pipeline emits data availability events, as well as how far in the past late arriving data would be advertised. For example, adding a filter and a fixed window to the pipeline before emitting availability events to PubSub could replicate a more traditional cron based workflow.
  • Configuring PubSub topics and subscriptions is well understood, and I’d expected downstream teams to be able to ‘self serve’ by ingesting events from these topics (assuming their use case wasn’t handled via event-driven Airflow). Centralizing data advertisement and consumption via this mechanism may have also provided good insight into cross-team data utilization and lineage.
  • Beam’s shared streaming and batch API hinted at the ability to re-use code between the streaming session window job and the processing jobs driven by the downstream Airflow DAGs.

Unfortunately, this architecture also had some weak points, which eventually led us to investigate other technologies and architectures:

  • The complexity associated with running and debugging an asynchronous and JVM centric streaming system was at best unknown, and at worst, would result in significant documentation/cross training/late night PagerDuty calls.
  • The cost of running Dataflow in GCP was not well understood, especially compared with more traditional data processing technologies. It was difficult to sell the team on the time investment associated with implementing this architecture when there was a distinct possibility that running everything in a more traditional data warehouse centric architecture would be cheaper.
  • There’d inevitably be cases where Beam’s session window mechanism would split events from the same session into different partitions (IE whatever our late arriving threshold was, there’d always be later arriving data; replays also caused events that were previously grouped to be split). Handling these cases would no doubt result in increased complexity and cost.

In the end, we ended up utilizing components of the example architecture for some specific use cases, but made the decision to embrace a modern data warehouse architecture for most other use cases. We utilized BigQuery as both the data lake and the data warehouse, and relied heavily on dbt to structure and apply complex transformations. The shift to a BigQuery-centric architecture resulted in less of a focus on data storage/partition processing patterns, and more of a focus on data modeling and quick iteration. While the architecture I proposed was theoretically flexible, processing efficient, and correct, the modern data warehousing approach proved to excel at these traits with a much lower time-to-impact. This was a big learning for me, given the market-wide shift away from traditional data warehouses in the last twenty years. That being said, we never had enough data to necessitate the focus on processing efficiency I had honed during my time in ad-tech. Overall, I’m excited to see how the big data landscape continues to change, and I believe learnings from the example architecture could benefit large scale implementations of the modern data warehouse.

Big thanks to all of the members of Quibi data engineering team- it was an amazing experience building a data platform from scratch, and I can’t wait to see everyone in person again one day soon!

--

--