Store binary objects in Riak CS

This commit is contained in:
Garret Alfert 2013-10-07 15:19:46 +02:00
parent e2d69af020
commit c10fd893c6
2 changed files with 54 additions and 39 deletions

View File

@ -40,8 +40,15 @@ module RemoteStorage
server.headers["Content-Type"] = object.content_type server.headers["Content-Type"] = object.content_type
server.headers["Last-Modified"] = last_modified_date_for(object) server.headers["Last-Modified"] = last_modified_date_for(object)
if binary_link = object.links.select {|l| l.tag == "binary"}.first if binary_key = object.meta["binary_key"]
object = client[binary_link.bucket].get(binary_link.key) object = cs_binary_bucket.files.get(binary_key[0])
case object.content_type[/^[^;\s]+/]
when "application/json"
return object.body.to_json
else
return object.body
end
end end
case object.content_type[/^[^;\s]+/] case object.content_type[/^[^;\s]+/]
@ -86,7 +93,7 @@ module RemoteStorage
new_object_size = object.raw_data.size new_object_size = object.raw_data.size
end end
object.store response = object.store
log_count = object_exists ? 0 : 1 log_count = object_exists ? 0 : 1
log_operation(user, directory, log_count, new_object_size, existing_object_size) log_operation(user, directory, log_count, new_object_size, existing_object_size)
@ -102,8 +109,9 @@ module RemoteStorage
object = data_bucket.get("#{user}:#{directory}:#{key}") object = data_bucket.get("#{user}:#{directory}:#{key}")
existing_object_size = object_size(object) existing_object_size = object_size(object)
if binary_link = object.links.select {|l| l.tag == "binary"}.first if binary_key = object.meta["binary_key"]
client[binary_link.bucket].delete(binary_link.key) object = cs_binary_bucket.files.get(binary_key[0])
object.destroy
end end
riak_response = data_bucket.delete("#{user}:#{directory}:#{key}") riak_response = data_bucket.delete("#{user}:#{directory}:#{key}")
@ -155,9 +163,9 @@ module RemoteStorage
end end
def object_size(object) def object_size(object)
if binary_link = object.links.select {|l| l.tag == "binary"}.first if binary_key = object.meta["binary_key"]
response = head(settings['buckets']['binaries'], escape(binary_link.key)) response = cs_client.head_object cs_binary_bucket.key, binary_key[0]
response[:headers]["content-length"].first.to_i response.headers["Content-Length"].to_i
else else
object.raw_data.nil? ? 0 : object.raw_data.size object.raw_data.nil? ? 0 : object.raw_data.size
end end
@ -167,14 +175,6 @@ module RemoteStorage
::Riak.escaper.escape(string).gsub("+", "%20").gsub('/', "%2F") ::Riak.escaper.escape(string).gsub("+", "%20").gsub('/', "%2F")
end end
# Perform a HEAD request via the backend method
def head(bucket, key)
client.http do |h|
url = riak_uri(bucket, key)
h.head [200], url
end
end
# A URI object that can be used with HTTP backend methods # A URI object that can be used with HTTP backend methods
def riak_uri(bucket, key) def riak_uri(bucket, key)
rc = settings.symbolize_keys rc = settings.symbolize_keys
@ -354,14 +354,13 @@ module RemoteStorage
end end
def save_binary_data(object, data) def save_binary_data(object, data)
binary_object = binary_bucket.new(object.key) cs_binary_object = cs_binary_bucket.files.create(
binary_object.content_type = object.content_type :key => object.key,
binary_object.raw_data = data :body => data,
binary_object.indexes = object.indexes :content_type => object.content_type
binary_object.store )
link = ::Riak::Link.new(binary_bucket.name, binary_object.key, "binary") object.meta["binary_key"] = cs_binary_object.key
object.links << link
object.raw_data = "" object.raw_data = ""
end end
@ -412,5 +411,19 @@ module RemoteStorage
def opslog_bucket def opslog_bucket
@opslog_bucket ||= client.bucket(settings['buckets']['opslog']) @opslog_bucket ||= client.bucket(settings['buckets']['opslog'])
end end
def cs_client
@cs_client ||= Fog::Storage.new({
:provider => 'AWS',
:aws_access_key_id => settings['riak_cs']['access_key'],
:aws_secret_access_key => settings['riak_cs']['secret_key'],
:endpoint => settings['riak_cs']['endpoint']
})
end
def cs_binary_bucket
@cs_binary_bucket ||= cs_client.directories.create(:key => settings['buckets']['cs_binaries'])
end
end end
end end

View File

@ -282,13 +282,13 @@ describe "App with Riak backend" do
last_response.body.must_equal @image last_response.body.must_equal @image
end end
it "indexes the binary set" do # it "indexes the binary set" do
indexes = binary_bucket.get("jimmy:documents:jaypeg").indexes # indexes = binary_bucket.get("jimmy:documents:jaypeg").indexes
indexes["user_id_bin"].must_be_kind_of Set # indexes["user_id_bin"].must_be_kind_of Set
indexes["user_id_bin"].must_include "jimmy" # indexes["user_id_bin"].must_include "jimmy"
indexes["directory_bin"].must_include "documents" # indexes["directory_bin"].must_include "documents"
end # end
it "logs the operation" do it "logs the operation" do
objects = [] objects = []
@ -323,13 +323,13 @@ describe "App with Riak backend" do
last_response.body.must_equal @image last_response.body.must_equal @image
end end
it "indexes the binary set" do # it "indexes the binary set" do
indexes = binary_bucket.get("jimmy:documents:jaypeg").indexes # indexes = binary_bucket.get("jimmy:documents:jaypeg").indexes
indexes["user_id_bin"].must_be_kind_of Set # indexes["user_id_bin"].must_be_kind_of Set
indexes["user_id_bin"].must_include "jimmy" # indexes["user_id_bin"].must_include "jimmy"
indexes["directory_bin"].must_include "documents" # indexes["directory_bin"].must_include "documents"
end # end
end end
end end
@ -427,9 +427,11 @@ describe "App with Riak backend" do
it "removes the binary object" do it "removes the binary object" do
last_response.status.must_equal 204 last_response.status.must_equal 204
lambda {
binary_bucket.get("jimmy:documents:jaypeg") skip "check if object is removed from Riak CS"
}.must_raise Riak::HTTPFailedRequest # lambda {
# binary_bucket.get("jimmy:documents:jaypeg")
# }.must_raise Riak::HTTPFailedRequest
end end
it "logs the operation" do it "logs the operation" do