From eaa3dbfe836d54c3251e73b720047caa6f86813a Mon Sep 17 00:00:00 2001 From: Sebastian Kippe Date: Mon, 29 Apr 2013 22:11:41 +0200 Subject: [PATCH] Use opslog instead of counters Account for eventual consistency by storing log items for create/update/delete operations instead of using counter keys. We can then map/reduce over the log items in order to extract category sizes and object counts. Furthermore, we can combine single items from time to time in order to keep things fast and tidy. --- lib/remote_storage/riak.rb | 46 ++++++--------- spec/riak_spec.rb | 114 ++++++++++++++++++------------------- spec/spec_helper.rb | 2 +- 3 files changed, 75 insertions(+), 87 deletions(-) diff --git a/lib/remote_storage/riak.rb b/lib/remote_storage/riak.rb index 66b12ef..0d3d048 100644 --- a/lib/remote_storage/riak.rb +++ b/lib/remote_storage/riak.rb @@ -30,8 +30,8 @@ module RemoteStorage @binary_bucket ||= client.bucket(settings.riak['buckets']['binaries']) end - def info_bucket - @info_bucket ||= client.bucket(LiquorCabinet.config['buckets']['info']) + def opslog_bucket + @opslog_bucket ||= client.bucket(settings.riak['buckets']['opslog']) end def authorize_request(user, directory, token, listing=false) @@ -106,8 +106,9 @@ module RemoteStorage object.store - log_object_count(user, directory, 1) unless object_exists - log_object_size(user, directory, new_object_size, existing_object_size) + log_action = object_exists ? "update" : "create" + log_operation(user, directory, log_action, new_object_size, existing_object_size) + update_all_directory_objects(user, directory, timestamp) halt 200 @@ -126,8 +127,7 @@ module RemoteStorage riak_response = data_bucket.delete("#{user}:#{directory}:#{key}") if riak_response[:code] != 404 - log_object_count(user, directory, -1) - log_object_size(user, directory, 0, existing_object_size) + log_operation(user, directory, "delete", 0, existing_object_size) end timestamp = (Time.now.to_f * 1000).to_i @@ -161,31 +161,21 @@ module RemoteStorage object end - def log_object_size(user, directory, new_size=0, old_size=0) - category = extract_category(directory) - info = info_bucket.get_or_new("usage:#{user}:#{category}") - info.content_type = "application/json" - info.data ||= {} - info.data["size"] ||= 0 - info.data["size"] += (-old_size + new_size) - info.indexes.merge!({:user_id_bin => [user]}) - info.store - end - - def log_object_count(user, directory, change) - category = extract_category(directory) - info = info_bucket.get_or_new("usage:#{user}:#{category}") - info.content_type = "application/json" - info.data ||= {} - info.data["count"] ||= 0 - info.data["count"] += change - info.indexes.merge!({:user_id_bin => [user]}) - info.store + def log_operation(user, directory, action, new_size=0, old_size=0) + log_entry = opslog_bucket.new + log_entry.content_type = "application/json" + log_entry.data = { + "action" => action, + "size" => (-old_size + new_size), + "category" => extract_category(directory) + } + log_entry.indexes.merge!({:user_id_bin => [user]}) + log_entry.store end def object_size(object) if binary_link = object.links.select {|l| l.tag == "binary"}.first - response = head(LiquorCabinet.config['buckets']['binaries'], escape(binary_link.key)) + response = head(settings.riak['buckets']['binaries'], escape(binary_link.key)) response[:headers]["content-length"].first.to_i else object.raw_data.nil? ? 0 : object.raw_data.size @@ -206,7 +196,7 @@ module RemoteStorage # A URI object that can be used with HTTP backend methods def riak_uri(bucket, key) - rc = LiquorCabinet.config['riak'].symbolize_keys + rc = settings.riak.symbolize_keys URI.parse "http://#{rc[:host]}:#{rc[:http_port]}/riak/#{bucket}/#{key}" end diff --git a/spec/riak_spec.rb b/spec/riak_spec.rb index 52f0283..a613280 100644 --- a/spec/riak_spec.rb +++ b/spec/riak_spec.rb @@ -1,14 +1,5 @@ require_relative "spec_helper" -def set_usage_info(user, category, type, value) - object = info_bucket.get_or_new("usage:#{user}:#{category}") - object.content_type = "application/json" - data = object.data || {} - data.merge!(type => value) - object.data = data - object.store -end - describe "App with Riak backend" do include Rack::Test::Methods @@ -94,7 +85,6 @@ describe "App with Riak backend" do describe "PUT" do before do header "Authorization", "Bearer 123" - set_usage_info "jimmy", "documents", "size", 23 end describe "with implicit content type" do @@ -112,12 +102,6 @@ describe "App with Riak backend" do data_bucket.get("jimmy:documents:bar").content_type.must_equal "text/plain; charset=utf-8" end - it "increases the usage size counter" do - usage = info_bucket.get("usage:jimmy:documents") - usage.data["size"].must_equal 35 - usage.indexes["user_id_bin"].must_include "jimmy" - end - it "indexes the data set" do indexes = data_bucket.get("jimmy:documents:bar").indexes indexes["user_id_bin"].must_be_kind_of Set @@ -126,10 +110,15 @@ describe "App with Riak backend" do indexes["directory_bin"].must_include "documents" end - # it "logs the operation" do - # logs = storage_client.get_index("rs_opslog", "user_id_bin", "jimmy") - # logs.count.must_equal 1 - # end + it "logs the operation" do + objects = [] + opslog_bucket.keys.each { |k| objects << opslog_bucket.get(k) rescue nil } + + log_entry = objects.select{|o| o.data["action"] == "create"}.first + log_entry.data["size"].must_equal 12 + log_entry.data["category"].must_equal "documents" + log_entry.indexes["user_id_bin"].must_include "jimmy" + end end describe "with explicit content type" do @@ -148,10 +137,6 @@ describe "App with Riak backend" do data_bucket.get("jimmy:documents:jason").content_type.must_equal "application/json" end - it "increases the category size counter" do - info_bucket.get("usage:jimmy:documents").data["size"].must_equal 49 - end - it "delivers the data correctly" do header "Authorization", "Bearer 123" get "/jimmy/documents/jason" @@ -211,7 +196,6 @@ describe "App with Riak backend" do describe "with existing content" do before do - set_usage_info "jimmy", "documents", "size", 10 put "/jimmy/documents/archive/foo", "lorem ipsum" put "/jimmy/documents/archive/foo", "some awesome content" end @@ -221,15 +205,24 @@ describe "App with Riak backend" do data_bucket.get("jimmy:documents/archive:foo").data.must_equal "some awesome content" end - it "increases the category size counter" do - info_bucket.get("usage:jimmy:documents").data["size"].must_equal 30 + it "logs the operations" do + objects = [] + opslog_bucket.keys.each { |k| objects << opslog_bucket.get(k) rescue nil } + + create_entry = objects.select{|o| o.data["action"] == "create"}.first + create_entry.data["size"].must_equal 11 + create_entry.data["category"].must_equal "documents" + create_entry.indexes["user_id_bin"].must_include "jimmy" + + update_entry = objects.select{|o| o.data["action"] == "update"}.first + update_entry.data["size"].must_equal 9 + update_entry.data["category"].must_equal "documents" + update_entry.indexes["user_id_bin"].must_include "jimmy" end end describe "public data" do before do - set_usage_info "jimmy", "public/documents", "size", 10 - set_usage_info "jimmy", "public/documents", "count", 100 put "/jimmy/public/documents/notes/foo", "note to self" end @@ -238,12 +231,14 @@ describe "App with Riak backend" do data_bucket.get("jimmy:public/documents/notes:foo").data.must_equal "note to self" end - it "increases the category size counter" do - info_bucket.get("usage:jimmy:public/documents").data["size"].must_equal 22 - end + it "logs the operation" do + objects = [] + opslog_bucket.keys.each { |k| objects << opslog_bucket.get(k) rescue nil } - it "increases the category object counter" do - info_bucket.get("usage:jimmy:public/documents").data["count"].must_equal 101 + log_entry = objects.select{|o| o.data["action"] == "create"}.first + log_entry.data["size"].must_equal 12 + log_entry.data["category"].must_equal "public/documents" + log_entry.indexes["user_id_bin"].must_include "jimmy" end end @@ -270,10 +265,6 @@ describe "App with Riak backend" do last_response.body.must_equal @image end - it "increases the category size counter" do - info_bucket.get("usage:jimmy:documents").data["size"].must_equal 16067 - end - it "indexes the binary set" do indexes = binary_bucket.get("jimmy:documents:jaypeg").indexes indexes["user_id_bin"].must_be_kind_of Set @@ -281,6 +272,16 @@ describe "App with Riak backend" do indexes["directory_bin"].must_include "documents" end + + it "logs the operation" do + objects = [] + opslog_bucket.keys.each { |k| objects << opslog_bucket.get(k) rescue nil } + + log_entry = objects.select{|o| o.data["action"] == "create"}.first + log_entry.data["size"].must_equal 16044 + log_entry.data["category"].must_equal "documents" + log_entry.indexes["user_id_bin"].must_include "jimmy" + end end context "no binary charset in content-type header" do @@ -358,8 +359,6 @@ describe "App with Riak backend" do describe "DELETE" do before do header "Authorization", "Bearer 123" - set_usage_info "jimmy", "documents", "size", 123 - set_usage_info "jimmy", "documents", "count", 1000 delete "/jimmy/documents/foo" end @@ -370,25 +369,25 @@ describe "App with Riak backend" do }.must_raise Riak::HTTPFailedRequest end - it "decreases the category size counter" do - info_bucket.get("usage:jimmy:documents").data["size"].must_equal 101 - end + it "logs the operation" do + objects = [] + opslog_bucket.keys.each { |k| objects << opslog_bucket.get(k) rescue nil } - it "decreases the category object counter" do - info_bucket.get("usage:jimmy:documents").data["count"].must_equal 999 + log_entry = objects.select{|o| o.data["action"] == "delete"}.first + log_entry.data["size"].must_equal(-22) + log_entry.data["category"].must_equal "documents" + log_entry.indexes["user_id_bin"].must_include "jimmy" end context "non-existing object" do before do - set_usage_info "jimmy", "documents", "size", 10 - set_usage_info "jimmy", "documents", "count", 10 delete "/jimmy/documents/foozius" end - it "doesn't change the category usage info" do - usage = info_bucket.get("usage:jimmy:documents").data - usage["size"].must_equal 10 - usage["count"].must_equal 10 + it "doesn't log the operation" do + objects = [] + opslog_bucket.keys.each { |k| objects << opslog_bucket.get(k) rescue nil } + objects.select{|o| o.data["action"] == "delete"}.size.must_equal 1 end end @@ -398,8 +397,6 @@ describe "App with Riak backend" do filename = File.join(File.expand_path(File.dirname(__FILE__)), "fixtures", "rockrule.jpeg") @image = File.open(filename, "r").read put "/jimmy/documents/jaypeg", @image - set_usage_info "jimmy", "documents", "size", 100000 - set_usage_info "jimmy", "documents", "count", 10 delete "/jimmy/documents/jaypeg" end @@ -418,12 +415,13 @@ describe "App with Riak backend" do }.must_raise Riak::HTTPFailedRequest end - it "decreases the category size counter" do - info_bucket.get("usage:jimmy:documents").data["size"].must_equal 83956 - end + it "logs the operation" do + objects = [] + opslog_bucket.keys.each { |k| objects << opslog_bucket.get(k) rescue nil } - it "decreases the category object counter" do - info_bucket.get("usage:jimmy:documents").data["count"].must_equal 9 + log_entry = objects.select{|o| o.data["action"] == "delete" && o.data["size"] == -16044}.first + log_entry.data["category"].must_equal "documents" + log_entry.indexes["user_id_bin"].must_include "jimmy" end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 063fa13..bd771d9 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -58,7 +58,7 @@ if app.settings.riak end def purge_all_buckets - [data_bucket, directory_bucket, auth_bucket, binary_bucket].each do |bucket| + [data_bucket, directory_bucket, auth_bucket, binary_bucket, opslog_bucket].each do |bucket| bucket.keys.each {|key| bucket.delete key} end end