198 lines
5.9 KiB
Ruby

require "riak"
require "json"
require "cgi"
module RemoteStorage
module Riak
def client
@client ||= ::Riak::Client.new(LiquorCabinet.config['riak'].symbolize_keys)
end
def data_bucket
@data_bucket ||= client.bucket("user_data")
end
def directory_bucket
@directory_bucket ||= client.bucket("rs_directories")
end
def authorize_request(user, category, token)
request_method = env["REQUEST_METHOD"]
return true if category.split("/").first == "public" && request_method == "GET"
authorizations = client.bucket("authorizations").get("#{user}:#{token}").data
permission = category_permission(authorizations, category)
halt 403 unless permission
if ["PUT", "DELETE"].include? request_method
halt 403 unless permission == "rw"
end
rescue ::Riak::HTTPFailedRequest
halt 403
end
def get_data(user, category, key)
object = data_bucket.get("#{user}:#{category}:#{key}")
headers["Content-Type"] = object.content_type
headers["Last-Modified"] = object.last_modified.to_s(:rfc822)
case object.content_type[/^[^;\s]+/]
when "application/json"
return object.data.to_json
else
return serializer_for(object.content_type) ? object.data : object.raw_data
end
rescue ::Riak::HTTPFailedRequest
halt 404
end
def get_directory_listing(user, directory)
directory_object = directory_bucket.get("#{user}:#{directory}")
headers["Content-Type"] = "application/json"
headers["Last-Modified"] = directory_object.last_modified.to_s(:rfc822)
listing = directory_listing(user, directory)
return listing.to_json
rescue ::Riak::HTTPFailedRequest
headers["Content-Type"] = "application/json"
return "{}"
end
def put_data(user, category, key, data, content_type=nil)
object = data_bucket.new("#{user}:#{category}:#{key}")
object.content_type = content_type || "text/plain; charset=utf-8"
data = JSON.parse(data) if content_type[/^[^;\s]+/] == "application/json"
if serializer_for(object.content_type)
object.data = data
else
object.raw_data = data
end
object.indexes.merge!({:user_id_bin => [user],
:directory_bin => [category]})
object.store
create_missing_directory_objects(user, category)
update_directory_object(user, category)
rescue ::Riak::HTTPFailedRequest
halt 422
end
def delete_data(user, category, key)
riak_response = data_bucket.delete("#{user}:#{category}:#{key}")
if directory_entries(user, category).empty?
directory_bucket.delete "#{user}:#{category}"
end
halt riak_response[:code]
rescue ::Riak::HTTPFailedRequest
halt 404
end
private
def serializer_for(content_type)
::Riak::Serializers[content_type[/^[^;\s]+/]]
end
def category_permission(authorizations, category)
authorizations = authorizations.map do |auth|
auth.index(":") ? auth.split(":") : [auth, "rw"]
end
authorizations = Hash[*authorizations.flatten]
permission = authorizations[""]
authorizations.each do |key, value|
if category.match key
if permission.nil? || permission == "r"
permission = value
end
end
end
permission
end
def directory_listing(user, directory)
listing = {}
sub_directories(user, directory).each do |entry|
timestamp = DateTime.rfc2822(entry["last_modified"]).to_time.to_i
directory_name = CGI.unescape(entry["name"]).split("/").last
listing.merge!({ "#{directory_name}/" => timestamp })
end
directory_entries(user, directory).each do |entry|
timestamp = DateTime.rfc2822(entry["last_modified"]).to_time.to_i
listing.merge!({ entry["name"] => timestamp })
end
listing
end
def directory_entries(user, directory)
map_query = <<-EOH
function(v){
keys = v.key.split(':');
key_name = keys[keys.length-1];
last_modified_date = v.values[0]['metadata']['X-Riak-Last-Modified'];
return [{
name: key_name,
last_modified: last_modified_date,
}];
}
EOH
objects = ::Riak::MapReduce.new(client).
index("user_data", "user_id_bin", user).
index("user_data", "directory_bin", directory).
map(map_query, :keep => true).
run
end
def sub_directories(user, directory)
map_query = <<-EOH
function(v){
keys = v.key.split(':');
key_name = keys[keys.length-1];
last_modified_date = v.values[0]['metadata']['X-Riak-Last-Modified'];
return [{
name: key_name,
last_modified: last_modified_date,
}];
}
EOH
objects = ::Riak::MapReduce.new(client).
index("rs_directories", "user_id_bin", user).
index("rs_directories", "directory_bin", directory).
map(map_query, :keep => true).
run
end
def create_missing_directory_objects(user, category)
parent_directories = category.split("/")
parent_directories.pop
while parent_directories.any?
directory = parent_directories.join("/")
unless directory_bucket.exist?("#{user}:#{directory}")
update_directory_object(user, directory)
end
parent_directories.pop
end
end
def update_directory_object(user, category)
if category.match /\//
parent_directory = category[0..category.rindex("/")-1]
end
directory = directory_bucket.new("#{user}:#{category}")
directory.raw_data = ""
directory.indexes.merge!({:user_id_bin => [user]})
if parent_directory
directory.indexes.merge!({:directory_bin => [parent_directory]})
end
directory.store
end
end
end