Modern data system and its components in the leading IT companies
This came from an article for my graduation(B.S). I currently worked as a Data Engineer at an eCommerce company in Korea. Here I wanted to give an overview of the modern data system to grasp the general pattern and direction in real use cases.
Contents
- Abstract
- Introduction
- Data Pipeline Stages and Components (Collect, Move, Store, Process, Use Orchestrate)
- Conclusions
- References
Abstract
In the recent development of the data system in IT companies, many tools are devised individually to satisfy the diverse requirements. By suggesting a structured frame to understand those interactive processes and tools, we also want to aggregate popular design patterns and tools for technical comparison among things that share a conceptually similar purpose.
Here we define ‘Collect, Move, Store, Process, Use, Orchestrate’ part to demonstrate the stages of the data pipeline, which is the more verbose version of the commonly known ETL [3] to address more typical cases in the industry. And we focus on the tools and differences between those not on the distinction between infrastructure-level environments like on-premise vs cloud. Some are hardly fit into those distinctions but we think that it could give a comprehensive overview.
And we mainly refer to the cases from companies like Google, Amazon, Microsoft, Uber, Netflix, LinkedIn, Facebook, Huawei. And the following is written on the base hypothesis of an application service company.
Introduction
Historically, the data system started from the plain RDBMS to get statistics from the transactional data or to analyze the data in it. The concepts like ‘Data Warehousing’, ‘OLAP’ [10] and ‘Enterprise Information Management’ [4] were there with the requirements from analysts and business intelligence managers and appropriate tools like Vertica [5], Sybase IQ [6]. As the data size grows and Hadoop support large-scale data storage with preceding functionality, the mainstream of the data system started to heavily depend on the Hadoop Environments. The conceptual change the Hadoop brings together is scaling-out rather than scaling-up for the data system. The traditional approach to performing computations on datasets was to invest in a few extremely powerful servers with lots of processors and lots of RAM, slurp the data in from a storage layer (e.g., NAS or SAN), crunch through computation, and write the results back to storage [1].
Figure 1–1. Standard representation of technologies and dependencies in the Hadoop stack [1]
While the former change was triggered by the BI and traditional analysis side of requests, the next one is by A/B testing [7] and deep learning [8]. Needs on larger, faster, and available storage, transformation arise many services. And they bring to the distinguishing concepts like “Data Lake” [14], “HTAP” [15].
Altogether, there are many tools on the list: Filebeat, Fluentd, Scribe, Sqoop, Kafka, Kinesis, ElasticSearch, HDFS, HBase, Spanner, Dynamo, Colossus, Delta Lake, FI-MPPDB, BigTable, Druid, Kudu, Dremel, Hive, MapReduce, Impala, Spark, Presto, Flink, Zeppelin, Supersets, Jupyter Notebook, Zookeeper, Hive Catalog, Yarn, Mesos, Helix, Slider, Nifi, Ozzie, etc.
Figure 1–2. The Google infrastructure stack. [31]
In Section 2, we orderly address the pipeline stages and the above components related to each stage. And in Section 3, we summarize our conclusions.
Data Pipeline Stages and Components
The following subsections consist of 6 parts: Collect, Move, Store, Process, Use, Orchestrate.
- Collect: Gather the transient data into the intra-system for temporary or permanent usage
- Move: Move(and pre-process) the collected data to the long-term main storage or buffer it before writing it
- Store: Store the data in distributed storages
- Process: Process or transform the data to store or extract it using processing engines
- Use: Use the data assets or offer it to the end-users through various services
- Orchestrate: Control applications, resources, and tasks
And one point that generally pops up in most of the stages of the pipeline is ‘Batch and Stream’ [9]:
- Batch: to process the blocks of data that have already been stored over a period of time
- Stream: to process data in real-time as they arrive and quickly detect conditions within a small time period from the point of receiving the data
In the following, we use batch or stream in the context of the above definitions.
Collect
The sources [2] of the data falls in the three groups:
- Production data and its changelogs: Transactional DB(e.g. Order, Product, User tables) and changelogs of the data which are captured by the CDC [11]
- Server Logs [12]: Typically the historical logs of client’s request for pages to servers
- Client Event Logs: The results of event tracking [13] on the client device, which send to servers or other systems to be stored
OLTP DB on the backend would be the industry standard [16] although new architectural patterns like event-sourcing appear. When the data size is relatively small, it seems that simple RDBMS could satisfy the requirements but the growth of the size and complex requirements make them equipped with more sophisticated tools like CDC [17]. Here, CDC is used for two purposes: to replicate and to record all changelogs. And, CDC capture methods could be grouped into 3 categories which are Log reader, Query, Trigger [11](e.g. Oracle’s OGG [18], Striim [19]).
The logging put its purpose on security[20] and visualizing-focused monitoring for administration [21]. And, as the size of logs grows, more integrated and efficient handling of the logs are needed in the concept of log management [22]. Many tools on the market and most of them integrated with visually explicit UI for monitoring. This piled server logs could be used for security [23], business analytics [24], etc. There are various products from the in-house(Facebook’s Scribe) to the paid or open-source(Filebeat, Fluentd) almost with the agent-aggregator structure.
Client event tracking puts its origin on the web tracking which is mostly accompanied by fingerprinting for identification [25]. That identification could be used for digital marketing, security, validating traffic [26]. As the privacy issues have been popped up, it’s a bit trickier to collect the data from the device than before though [27]. With the process for defining the ‘user log’ and the format [28], the tracker plant tracking codes and set up APIs endpoint to receive the data[29, 30].
Move
We define this stage with the two distinct types of software: migration tools like Apache Sqoop and buffering tools with some stream processing abilities like Kafka, AWS Kinesis & Firehose or Google’s MillWheel.
Realtime data flow would be nice with CDC-like service but the batch for migration still exists in many companies [32]. Though all the processing tools have the ability to move the data from operational DB to the long-term storage, the specialized ones for this purpose could be Apache Sqoop, AWS DMS, etc. Sometimes a service can replace the part of the data pipeline including this stage [33].
Kafka originally targeted log processing and realtime analytics by building a scalable and high-performance messaging system [34]. We could realize that it was the coincidental needs from the industry when we check the similar tools made at that period [35]. The requirements on those tools are scalability with distribution support, fault-tolerance, check-pointing, rich consumer & producer connectors, and large queue size and throughput compared to one that exists before. Frequently emerging concepts in this stage are offset, watermark [36], delivery guarantee [37], etc.
Store
The problems that Google File Systems tried to solve were reliability on thousands of inexpensive commodity parts, data-size scalability, optimization for append-heavy tasks, and last the flexibility to ease application-side complexity [38]. And, its descendant HDFS inherits most of the features, except some differences like multiple-writers support.
Figure 2–1. Distributed storage system genealogy [2]
There are many alternatives like Microsoft’s TidyFS [39] and Netflix’s NMDB [52] or Facebook’s Haystack for a special case [40].
Here, we use the following criteria to analyze diverse distributed storage systems [2]:
- Partitioning: Management distribution of data across nodes
- Mutation: Support on modifying data
- Read Paths: Ways of data access
- Availability and Consistency: Trade-offs between availability and consistency
- Use Cases
Partitioning, the mechanism used to distribute data across nodes, is usually combined with replication so that copies of each partition are stored on multiple nodes [41]. Limited options for data partitioning exist for distributed data systems: centralized, range, and hash partitioning. GFS, HDFS falls in the centralized case with advantages like integrity, evenly partitioned data(rebalancing), though the centralized node could be the bottleneck. Range partitioning is used in systems like HBase[42], BigTable [43], RethinkDB, and MongoDB(before version 2.4). Hash partitioning [44] uses the hash function to avoid the defects of the range partitioning: skew and hot spots. Cassandra, MongoDB, DynamoDB [45] fit into this case.
Figure 2–2. BigTable’s Tablet location hierarchy [43]
Data systems are highly tuned for specific purposes so that they support a diverse range of mutations. Mutation methods(append, update, etc), level(file or record), size, latency could be considered. HDFS prefers rewriting than append and not support update as it is designed for the specific data access pattern [38] but most of NoSQL systems support it. So, GFS, HDFS, S3 support file/object level mutation whereas the other storages support record-level one. HBase and Cassandra use data chunks under or around 10 KB, BigTable around 64 KB, while HDFS is 128 MB.
‘Read Paths’ is about how the system access data: indexing-level, column or row-oriented. HDFS supports indexing at the file level using Hive(Hive’s partitioning) [46]. Key-value storage all support indexing at record level though they show different strategies on the multiple indexing [47]. And systems like Solr and ElasticSearch support Lucene-based inverted indexing [48]. For the column or row-oriented, some systems have its own data model while the other outsource it so that it can handle various data formats: ORC, Parquet, Avro, etc.
The traditional CAP Theorem offers a base ground frame for this part but there are many variations in each concept of it so that it does not match well with ACID, isolation concepts [49]. Figure 2–3 shows the unavailability statistics using GFS with 1000 to 7000 nodes for a year. Many reasons are there: a storage node or networking switch can be overloaded, a node binary or operating system may crash or restart, a machine may experience a hardware error, automated repair processes may temporarily remove disks or machines, or the whole cluster could be brought down for maintenance [50].
Figure 2–3. Cumulative distribution function of the duration of node unavailability periods [50]
To guarantee high availability(HA), the systems usually layer(or sacrifice) the consistency [47, 51]: relaxing consistency will allow the system to remain highly available under the partitionable conditions, whereas making consistency a priority means that under certain conditions the system will not be available [53].
Many variations of GFS(HDFS, S3, TidyFS) seems to be located at the core of the data system, though some are more for a specific purpose(Haystack, f4). And, key-value type storage(BigTable, HBase, Cassandra, DynamoDB) is usually equipped to handle OLTP-like requests. And the needs for aggregating tasks also bring some storages like Redshift, Druid [54]. Diverse tools in a data system seem to be merged in the future. Many services emerge with the HTAP concept [55]: Huawei’s FI-MPPDB and Databrcks’ Delta Lake [57].
Process
MapReduce was this primary means to implement distributed processing, though the concept has existed for 20 years as parallel SQL data management systems and overall performance is superior on SQL DBMS [58]. Processing engines share the following attributes:
- Concurrency and compute isolation
- Performance
MapReduce implements external dag management which is made up of different processing stages referred to as map and reduce. A master node handles the whole dag as shown in Figure 2–4. MapReduce was usually wrapped with SQL-like interfaces: Hive [60] or Tenzing [61].
Figure 2–3. Mapreduce Execution overview [59]
MapReduce is increasingly being replaced by modern processing engines like Hive with Tez, Impala, Apache Spark, Apache Flink, Presto, Dremel, Druid [64]. DAG creation of MapReduce is difficult to code so that Hive is widely used. And, compute isolation consists of 3 levels: node, container, task. Node-level isolation became popular with the rise of cloud offerings such as Amazon Elastic MapReduce (EMR) [2]. Container-level isolation needs orchestrators(schedulers) like Yarn, Mesos, or Apache Slider in Hadoop environments or the other increasing use of solutions like Docker and Kubernetes. Task-level isolation, which is supported by a number of modern processing engines, allows for different workloads to run. Traditional queue-level isolation would seat between the container and task one [62].
Performance is the most important factor when choosing the engine, though they show partially different superiority in different contexts. Relatively recent tools like Spark, Presto, Impala, especially Dremio [63] show faster execution time compared to MapReduce, Hive. Also, the ability to disk spill, ‘batch or stream’, data format support, connector support could be the checkpoint when applying the engine.
Use
There are numerous applications based on the data system so here we group them by for which it would be used: API, dashboard & visualization, web elements [65], internal platform.
API follows common deployment processes and toolsets in other departments, whereas many specialized toolsets are on the list for visualization: Grafana, Supersets, Tableau, Google Analytics or in-house tools using D3 [66] or Uber’s deck.gl for special cases [67]. Columnar storages like Redshift, Druid would suffice the burden from it [68]. Web elements are the front of data products and they have large backend support. Feature store and ML platform [69, 70] make training and deploying of the data products faster and manageable. And, Jupyter notebook, Zeppelin are used to supporting analysts or data scientists.
Orchestrate
This part covers the schedulers, coordinators, cluster management services in the context of the application, resource, workflow.
- Application: centralized services for distributed applications
- Resource: services that handle the management of partitioned, replicated and distributed resources hosted on a cluster of nodes
- Workflow: services to programmatically author, schedule and monitor workflows
In the Hadoop environment, it seems that it’s hard to find any other alternatives except Zookeeper.
Figure 2–4. Using ZooKeeper to keep track of the assignment of partitions to nodes [41]
Many distributed data systems rely on a separate coordination service such as ZooKeeper to keep track of this cluster metadata and its state. As it focuses on configuration and synchronization, the resource orchestrators are developed to achieve high utilization of the cluster [71]. There are many variations: Google’s Borg, Google’s Omega[72], Facebook’s Bistro [73], Apache Yarn, Apache Mesos [74], Apache Helix.
Figure 2–5. The high-level architecture of Borg. Only a tiny fraction of the thousands of worker nodes are shown [71]
Most of them share a master-slave architecture combining admission control, efficient task-packing, over-commitment, and machine sharing [71, 74] though the level of isolation varies.
Workflow orchestrators manage directed graphs of data routing, transformation, and system mediation through code or GUI-based web UI. Apache Nifi [75] and StreamSets [76] support GUI to control the dataflow while Oozie offers highly Hadoop-integrated service [77] and Airflow gives a broad range of connectors including the cloud environments [78].
Conclusions
We address typical data pipeline stages and its components. Data systems are changing and new services are incessantly launched resolving the previous limitation or defects. We hope that it gives the general architecture in the data field and helps someone who tries to grasp a broad landscape of the modern data systems.
References
[1] Jan Kunigk, Ian Buss, et al.: “Architecting Modern Data Platforms: A guide to enterprise Hadoop at scale,” O’Reilly Media, Inc., 2019.
[2] Ted Malaska, Jonathan Seidman: “Foundations for Architecting Data Solutions,” O’Reilly Media, Inc., 2018.
[3] https://cloud.google.com/solutions/migration/dw2bq/dw-bq-data-pipelines.
[4] John Ladley: “Data Governance: How to design, deploy, and sustain an effective data governance program,” Elsevier Inc., 2012.
[5] https://en.wikipedia.org/wiki/Vertica.
[6] https://en.wikipedia.org/wiki/SAP_IQ.
[7] https://hbr.org/2017/09/the-surprising-power-of-online-experiments.
[8] Zhu, X: “Do we Need More Training Data?,” 2015.
[9] https://medium.com/@gowthamy/big-data-battle-batch-processing-vs-stream-processing-5d94600d8103.
[10] Michael Stonebraker and Uğur Çetintemel: “‘One Size Fits All’: An Idea Whose Time Has Come and Gone,” at 21st International Conference on Data Engineering (ICDE), April 2005.
[11] Kevin Petrie, Dan Potter, et al.: “Streaming Change Data Capture,” O’Reilly Media, Inc., 2018.
[12] https://en.wikipedia.org/wiki/Server_log.
[13] https://developers.google.com/analytics/devguides/collection/gajs/eventTrackerGuide.
[14] https://en.wikipedia.org/wiki/Data_lake.
[15] https://en.wikipedia.org/wiki/Hybrid_transactional/analytical_processing_(HTAP).
[16] https://insights.stackoverflow.com/survey/2018#technology-_-databases.
[17] https://eng.uber.com/uber-big-data-platform/.
[18] https://docs.oracle.com/cd/E21043_01/integrate.1111/e12644/ogg.htm.
[19] https://www.striim.com/change-data-capture/.
[20] https://en.wikipedia.org/wiki/Log_management.
[21] L. Girardin and D. Brodbeck. ‘‘A Visual Approach for Monitoring Logs.’’ Proc. of the 12th Large Installation Systems Administration (LISA) Conf., 1998.
[22] Adam Sah: “A New Architecture for Managing Enterprise Log Data,” Sixteenth Systems Administration Conference, 2002.
[23] https://www.snowflake.com/security-data-analytics/.
[24] https://www.joe0.com/2017/02/05/applying-big-data-analytics-to-logging/.
[25] Cao, Yinzhi: “(Cross-)Browser Fingerprinting via OS and Hardware Level Features,” 2017–03–07.
[26] Elie Bursztein, Artem Malyshey, et al.: “Picasso: Lightweight Device Class Fingerprinting for Web Clients,” 2016.
[28] https://github.com/snowplow/snowplow/wiki/snowplow-tracker-protocol.
[29] https://en.wikipedia.org/wiki/Google_Analytics.
[30] https://github.com/awslabs/amazon-kinesis-producer.
[31] Malte Schwarzkopf, “Operating system support for warehouse-scale computing,” University of Cambridge Computer Laboratory, 2015.
[32] https://keen.io/blog/architecture-of-giants-data-stacks-at-facebook-netflix-airbnb-and-pinterest/.
[33] https://www.linkedin.com/pulse/nifi-vs-sqoopflumeoozie-birender-saini/.
[34] Jay Kreps, Neha Narkhede, et al.: “Kafka: a Distributed Messaging System for Log Processing,” LinkedIn Corp.
[35] Tyler Akidau, Alex Balikov, et al.: “MillWheel: Fault-Tolerant Stream Processing at Internet Scale.” Google.
[36] https://rongxinblog.wordpress.com/2016/07/29/kafka-high-watermark/.
[37] https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/.
[38] Sanjay Ghemawat, Howard Gobioff, et al.: “The Google File System,” Google, 2003.
[39] Dennis Fetterly, Maya Haridasan, et al.: “TidyFS: A Simple and Small Distributed File System,”, Microsoft.
[40] Doug Beaver, Sanjeev Kumar, et al.: “Finding a needle in Haystack: Facebook’s photo storage,” Facebook.
[41] Martin Kleppmann: “Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems,” Oreily, 2017.
[42] https://hbase.apache.org/.
[43] Fay Chang, Jeffrey Dean, et al.: “Bigtable: A Distributed Storage System for Structured Data,” Google.
[44] David Karge, Eric Lehma, et al.: “Consistent Hashing and Random Trees: Distributed Caching Protocols for Relieving Hot Spots on the World Wide Web,” In Proceedings of the Twenty-Ninth Annual ACM Symposium on theory of Computing, 1997.
[45] Giuseppe DeCandia, Deniz Hastorun, et al.: “Dynamo: Amazon’s Highly Available Key-value Store,” amazon.com.
[46] Eduarda Costa, Carlos Costa, et al.: “Evaluating partitioning and bucketing strategies for Hive-based Big Data Warehousing systems,” Journal of Big Data, 2019.
[47] “Comparing the Use of Amazon DynamoDB and Apache HBase for NoSQL,” Amazon Web Service, 2018.
[48] https://sease.io/2015/07/exploring-solr-internals-lucene.html.
[49] Peter Bailis, Alan Fekete, et al.: “HAT, not CAP: Towards Highly Available Transactions,” at 14th USENIX Workshop on Hot Topics in Operating Systems (HotOS), 2013.
[50] Daniel Ford, Franc¸ois Labelle, et al.: “Availability in Globally Distributed Storage Systems,” Google.
[51] https://en.wikipedia.org/wiki/Eventual_consistency.
[52] https://medium.com/netflix-techblog/implementing-the-netflix-media-database-53b5a840b42a.
[53] https://queue.acm.org/detail.cfm?id=1466448.
[54] https://medium.com/airbnb-engineering/druid-airbnb-data-platform-601c312f2a4c.
[55] https://en.wikipedia.org/wiki/Hybrid_transactional/analytical_processing.
[56] Jianjun Chen, Yu Chen, et al.: “Data Management at Huawei: Recent Accomplishments and Future Challenges,” IEEE 35th International Conference on Data Engineering, 2019.
[57] https://delta.io/.
[58] Andrew Pavlo, Eric Paulson, et al.: “A Comparison of Approaches to Large-Scale Data Analysis,” ACM SIGMOD International Conference on Management of data, 2009.
[59] Jeffrey Dean, Sanjay Ghemawat: “MapReduce: Simplified Data Processing on Large Clusters,” Google.
[60] Ashish Thusoo, Joydeep Sen Sarma, et al.: “Hive — A Warehousing Solution Over a Map-Reduce Framework,” Facebook.
[61] Biswapesh Chattopadhyay, Liang Lin, et al.: “Tenzing A SQL Implementation On The MapReduce Framework,” Google.
[62] https://docs.aws.amazon.com/redshift/latest/dg/cm-c-defining-query-queues.html.
[63] https://community.dremio.com/t/performances-comparisons/1144/9.
[64] https://en.wikipedia.org/wiki/Comparison_of_OLAP_servers.
[65] https://towardsdatascience.com/designing-data-products-b6b93edf3d23.
[66] https://www.youtube.com/watch?v=nMyuCdqzpZc.
[67] https://deck.gl/.
[68] https://www.tableau.com/solutions/amazon-redshift.
[70] https://databricks.com/product/managed-mlflow.
[71] Abhishek Verma, Luis Pedrosa, et al.: “Large-scale cluster management at Google with Borg,” Google.
[72] Malte Schwarzkopf, Andy Konwinski, et al.: “Omega: flexible, scalable schedulers for large compute clusters,” Google.
[73] Andrey Goder, Alexey Spiridonov, et al.: Bistro: “Scheduling Data-Parallel Jobs Against Live Production Systems,” USENIX Annual Technical Conference, 2015.
[74] Benjamin Hindman, Andy Konwinski, et al.: “Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center,” University of California, Berkeley.
[75] https://nifi.apache.org/.
[76] https://streamsets.com/products/dataops-platform.
[77] https://oozie.apache.org.