「时序数据库」Cassandra时间序列大规模数据建模
「时序数据库」Cassandra时间序列大规模数据建模
在开始使用Cassandra和时间序列数据时,人们面临的最大挑战之一是理解编写工作负载对集群的影响。过快地写入单个分区可能会创建热点,从而限制向外扩展的能力。分区太大可能会导致修复、流和读取性能方面的问题。从大分区的中间读取会带来很大的开销,并导致GC压力的增加。Cassandra 4.0应该可以提高大分区的性能,但是它不能完全解决我已经提到的其他问题。在可预见的未来,我们将需要考虑它们的性能影响,并相应地进行计划。
在这篇文章中,我将讨论一种常见的Cassandra数据建模技术,称为bucketing。Bucketing是一种策略,让我们可以控制每个分区中存储多少数据,以及将写出的数据分散到整个集群。这篇文章将讨论两种形式的攻击。当数据模型需要进一步扩展时,可以结合使用这些技术。读者应该已经熟悉了分区的解剖和基本的CQL命令。
当我们第一次使用Cassandra学习数据建模时,我们可能会看到如下内容:
CREATE TABLE raw_data ( sensor text, ts timeuuid, readint int, primary key(sensor, ts) ) WITH CLUSTERING ORDER BY (ts DESC) AND compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_size': 1, 'compaction_window_unit': 'DAYS'};
这是存储一些非常简单的传感器数据的一个很好的第一个数据模型。通常我们收集的数据要比整数复杂得多,但在这篇文章中,我们将关注键。我们利用TWCS作为压缩战略。TWCS将帮助我们处理压缩大分区的开销,这将使我们的CPU和I/O处于控制之下。不幸的是,它仍然有一些明显的限制。如果我们不使用TTL,那么当我们接收更多数据时,我们的分区大小将无限地持续增长。如上所述,在修复、流化或从任意时间片读取数据时,大分区会带来很大的开销。
为了分解这个大分区,我们将利用第一种形式的bucketing。我们将根据时间窗口将我们的分区分成更小的分区。理想的大小是将分区保持在100MB以下。例如,如果我们每天存储50-75MB的数据,那么每天每个传感器一个分区就是一个不错的选择。只要分区不超过100MB,我们也可以简单地使用周(从某个纪元开始)、月和年。无论选择什么,留一点增长空间是个好主意。
为此,我们将向分区键添加另一个组件。修改之前的数据模型,我们将添加一个day字段:
CREATE TABLE raw_data_by_day ( sensor text, day text, ts timeuuid, reading int, primary key((sensor, day), ts) ) WITH CLUSTERING ORDER BY (ts DESC) AND COMPACTION = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'DAYS', 'compaction_window_size': 1};
插入到表中需要使用date和now()值(你也可以在你的应用代码中生成一个TimeUUID):
INSERT INTO raw_data_by_day (sensor, day, ts, reading) VALUES ('mysensor', '2017-01-01', now(), 10);
这是限制每个分区的数据量的一种方法。为了跨多天获取大量数据,您需要每天发出一个查询。这样查询的好处在于,我们可以将工作分散到整个集群,而不是要求单个节点执行大量工作。我们还可以通过依赖驱动程序中的异步调用并行地发出这些查询。对于这种用例,Python驱动程序甚至有一个方便的辅助函数:
from itertools import product from cassandra.concurrent import execute_concurrent_with_argsdays = ["2017-07-01", "2017-07-12", "2017-07-03"] # collecting three days worth of data session = Cluster(["127.0.0.1"]).connect("blog") prepared = session.prepare("SELECT day, ts, reading FROM raw_data_by_day WHERE sensor = ? and day = ?")args = product(["mysensor"], days) # args: ('test', '2017-07-01'), ('test', '2017-07-12'), ('test', '2017-07-03')# driver handles concurrency for you results = execute_concurrent_with_args(session, prepared, args)# Results: #[ExecutionResult(success=True, result_or_exc=
这种技术的一种变体是每个时间窗口使用不同的表。例如,每月使用一个表意味着每年有12个表:
CREATE TABLE raw_data_may_2017 ( sensor text, ts timeuuid, reading int, primary key(sensor, ts) ) WITH COMPACTION = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'DAYS', 'compaction_window_size': 1};
这种策略的主要好处是有助于存档和快速删除旧数据。例如,在每个月的开始,我们可以将上个月的数据以拼花的格式归档到HDFS或S3中,利用便宜的存储来进行分析。当我们不再需要Cassandra中的数据时,我们可以简单地删除表。您可能会看到,在创建和删除表时需要进行一些额外的维护,因此,这种方法实际上只有在需要归档时才有用。还有其他存档数据的方法,因此这种类型的bucketing可能是不必要的。
上面的策略主要是防止分区在长时间内变得太大。如果我们有一个可预测的工作负载和有很小变化的分区大小,这是很好的。我们可能会摄入太多的信息,以至于单个节点无法写出数据,或者一小部分对象的摄入率要高得多。Twitter就是一个很好的例子,有些人拥有数千万的追随者,但这并不常见。对于我们需要大规模使用的这些类型的账户,通常会有一个单独的代码路径
第二种技术在任何给定时间使用多个分区将插入扇出到整个集群。这个策略的好处是,我们可以使用一个分区来处理小卷,使用多个分区来处理大卷。
我们在这个设计中所做的权衡是在读取时我们需要使用散射聚集,这有明显的更高的开销。这可能会使分页更加困难。我们需要能够跟踪我们为每个小发明摄取了多少数据。这是为了确保我们可以选择正确数量的分区来使用。如果我们使用太多的桶,我们就会在很多分区上执行很多非常小的读取操作。如果桶太少,我们会得到非常大的分区,这些分区不能很好地压缩、修复、流处理,并且读取性能很差。
在这个例子中,我们将研究一个理论模型,它适用于那些在Twitter这样的社交网络上关注大量用户的人。大多数帐户都可以使用一个单独的分区来接收消息,但有些人/机器人可能会关注数百万个帐户。
CREATE TABLE tweet_stream ( account text, day text, bucket int, ts timeuuid, message text, primary key((account, day, bucket), ts) ) WITH CLUSTERING ORDER BY (ts DESC) AND COMPACTION = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'DAYS', 'compaction_window_size': 1};
这个数据模型扩展了前面的数据模型,将bucket添加到分区键中。现在,每天都可以从多个桶中获取数据。当需要读取时,我们需要从所有分区中获取所需的结果。为了演示,我们将插入一些数据到我们的分区:
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 0, now(), 'hi'); cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 1, now(), 'hi2'); cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 2, now(), 'hi3'); cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 3, now(), 'hi4');
如果我们想要十个最新的消息,我们可以这样做:
from itertools import chain from cassandra.util import unix_time_from_uuid1prepared = session.prepare("SELECT ts, message FROM tweet_stream WHERE account = ? and day = ? and bucket = ? LIMIT 10") # let's get 10 buckets partitions = range(10) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]args = product(["jon_haddad"], ["2017-07-01"], partitions)result = execute_concurrent_with_args(session, prepared, args)# [ExecutionResult(success=True, result_or_exc=
这个例子只使用了10个项目,所以我们可以作为懒惰的程序员,合并列表,然后对它们排序。如果我们想获取更多的元素我们就需要k路归并算法。我们将在以后的博客中进一步讨论这个话题。
此时,您应该对如何围绕集群分发数据和请求有了更好的理解,这使得集群可以比使用单个分区时扩展得更大。记住每个问题都是不同的,没有万能的解决方案。
谢谢大家关注,转发,点赞。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~