diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index ff5827e..e9224e3 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -31,6 +31,7 @@ couch_file_collection = \ couch_btree.erl \ couch_db.erl \ couch_db_update_notifier.erl \ + couch_doc_update_notifier.erl \ couch_doc.erl \ couch_event_sup.erl \ couch_file.erl \ @@ -52,6 +53,7 @@ couchebin_DATA = \ couch_btree.beam \ couch_db.beam \ couch_db_update_notifier.beam \ + couch_doc_update_notifier.beam \ couch_doc.beam \ couch_event_sup.beam \ couch_file.beam \ diff --git a/src/couchdb/Makefile.in b/src/couchdb/Makefile.in index 4d2df26..4be4426 100644 --- a/src/couchdb/Makefile.in +++ b/src/couchdb/Makefile.in @@ -247,6 +247,7 @@ couch_file_collection = \ couch_btree.erl \ couch_db.erl \ couch_db_update_notifier.erl \ + couch_doc_update_notifier.erl \ couch_doc.erl \ couch_event_sup.erl \ couch_file.erl \ @@ -268,6 +269,7 @@ couchebin_DATA = \ couch_btree.beam \ couch_db.beam \ couch_db_update_notifier.beam \ + couch_doc_update_notifier.beam \ couch_doc.beam \ couch_event_sup.beam \ couch_file.beam \ diff --git a/src/couchdb/couch.app.tpl.in b/src/couchdb/couch.app.tpl.in index 96e5afc..0d51748 100644 --- a/src/couchdb/couch.app.tpl.in +++ b/src/couchdb/couch.app.tpl.in @@ -16,6 +16,7 @@ couch_httpd, couch_event_sup, couch_db_update_notifier, + couch_doc_update_notifier, couch_ft_query, couch_log, couch_rep]}, diff --git a/src/couchdb/couch_doc_update_notifier.erl b/src/couchdb/couch_doc_update_notifier.erl new file mode 100644 index 0000000..cd8e7b8 --- /dev/null +++ b/src/couchdb/couch_doc_update_notifier.erl @@ -0,0 +1,66 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +% +% This causes an OS process to spawned and it is notified every time a database +% is updated. +% +% The notifications are in the form of a the database name sent as a line of +% text to the OS processes stdout. +% + +-module(couch_doc_update_notifier). + +-behaviour(gen_event). + +-export([start_link/1, notify/1]). +-export([init/1, terminate/2, handle_event/2, handle_call/2, handle_info/2, code_change/3,stop/1]). + +-define(ERR_HANDLE, {Port, {exit_status, Status}} -> {stop, {unknown_error, Status}, {unknown_error, Status}, Port}). + +start_link(Exec) -> + couch_event_sup:start_link(couch_doc_update, {couch_doc_update_notifier, make_ref()}, Exec). + +notify(Event) -> + gen_event:notify(couch_doc_update, Event). + +stop(Pid) -> + couch_event_sup:stop(Pid). + +init(Exec) when is_list(Exec) -> % an exe + Port = open_port({spawn, Exec}, [stream, exit_status, hide]), + {ok, Port}; +init(Else) -> + {ok, Else}. + +terminate(_Reason, _Port) -> + ok. + +handle_event(Event, Fun) when is_function(Fun, 1) -> + Fun(Event), + {ok, Fun}; +handle_event(Event, {Fun, FunAcc}) -> + FunAcc2 = Fun(Event, FunAcc), + {ok, {Fun, FunAcc2}}; +handle_event({EventAtom, DbName, DocId, OldDoc, NewDoc}, Port) -> + Obj = {obj, [{type, atom_to_list(EventAtom)}, {db, DbName}, {id, DocId}, {old, OldDoc}, {new, NewDoc}]}, + true = port_command(Port, cjson:encode(Obj) ++ "\n"), + {ok, Port}. + +handle_call(_Request, State) -> + {ok, ok, State}. + +handle_info({'EXIT', _, _Reason}, _Port) -> + remove_handler. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index df8c473..c4fd278 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -184,12 +184,14 @@ handle_db_request(Req, 'GET', {DbName, Db, []}) -> {ok, DbInfo} = couch_db:get_db_info(Db), send_json(Req, {obj, [{db_name, DbName} | DbInfo]}); -handle_db_request(Req, 'POST', {_DbName, Db, []}) -> +handle_db_request(Req, 'POST', {DbName, Db, []}) -> % TODO: Etag handling Json = cjson:decode(Req:recv_body(?MAX_DOC_SIZE)), Doc = couch_doc:from_json_obj(Json), DocId = couch_util:new_uuid(), {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=[]}, []), + OldDoc = #doc{}, + couch_doc_update_notifier:notify({create, DbName, DocId, OldDoc#doc.body, Doc#doc.body}), send_json(Req, 201, {obj, [ {ok, true}, {id, DocId}, @@ -487,7 +489,7 @@ output_reduce_view(Req, View) -> Resp:write_chunk("]}"), end_json_response(Resp). -handle_doc_request(Req, 'DELETE', _DbName, Db, DocId) -> +handle_doc_request(Req, 'DELETE', DbName, Db, DocId) -> QueryRev = proplists:get_value("rev", Req:parse_qs()), Etag = case Req:get_header_value("If-Match") of undefined -> @@ -507,6 +509,17 @@ handle_doc_request(Req, 'DELETE', _DbName, Db, DocId) -> _ -> throw({bad_request, "Document rev and etag have different values"}) end, + % get old document + case couch_db:open_doc_revs(Db, DocId, [RevToDelete], []) of + {ok, [{ok, Doc}]} -> + Doc; + {ok, [Else]} -> + Doc = #doc{}, + throw(Else) + end, + % notify delete + NewDoc = #doc{}, + couch_doc_update_notifier:notify({delete, DbName, DocId, Doc#doc.body, NewDoc#doc.body}), {ok, NewRev} = couch_db:delete_doc(Db, DocId, [RevToDelete]), send_json(Req, 200, {obj, [ {ok, true}, @@ -571,7 +584,7 @@ handle_doc_request(Req, 'GET', _DbName, Db, DocId) -> end_json_response(Resp) end; -handle_doc_request(Req, 'PUT', _DbName, Db, DocId) -> +handle_doc_request(Req, 'PUT', DbName, Db, DocId) -> Json = {obj, DocProps} = cjson:decode(Req:recv_body(?MAX_DOC_SIZE)), DocRev = proplists:get_value("_rev", DocProps), Etag = case Req:get_header_value("If-Match") of @@ -595,7 +608,22 @@ handle_doc_request(Req, 'PUT', _DbName, Db, DocId) -> Doc = couch_doc:from_json_obj(Json), + % get old document + case couch_db:open_doc_revs(Db, DocId, Revs, []) of + {ok, [{ok, OldDoc}]} -> + OldDoc, + Operation = update; + {ok, []} -> + OldDoc = #doc{}, + Operation = create; + {ok, [Else]} -> + OldDoc = #doc{}, + Operation = error, + throw(Else) + end, {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, []), + % notify update + couch_doc_update_notifier:notify({Operation, DbName, DocId, OldDoc#doc.body, Doc#doc.body}), send_json(Req, 201, [{"Etag", "\"" ++ NewRev ++ "\""}], {obj, [ {ok, true}, {id, DocId}, diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl index 7a628eb..4b9501a 100644 --- a/src/couchdb/couch_server_sup.erl +++ b/src/couchdb/couch_server_sup.erl @@ -73,6 +73,7 @@ start_server(InputIniFilename) -> LogFile = proplists:get_value({"Couch", "LogFile"}, Ini, "couchdb.log"), UtilDriverDir = proplists:get_value({"Couch", "UtilDriverDir"}, Ini, ""), UpdateNotifierExes = proplists:get_all_values({"Couch", "DbUpdateNotificationProcess"}, Ini), + DocUpdateNotifierExes = proplists:get_all_values({"Couch", "DocUpdateNotificationProcess"}, Ini), FtSearchQueryServer = proplists:get_value({"Couch", "FullTextSearchQueryServer"}, Ini, ""), RemoteRestart = list_to_atom(proplists:get_value({"Couch", "AllowRemoteRestart"}, Ini, "undefined")), ServerOptions = [{remote_restart, RemoteRestart}], @@ -91,6 +92,12 @@ start_server(InputIniFilename) -> 1000, supervisor, dynamic}, + {couch_doc_update_event, + {gen_event, start_link, [{local, couch_doc_update}]}, + permanent, + 1000, + supervisor, + dynamic}, {couch_server, {couch_server, sup_start_link, [DbRootDir, ServerOptions]}, permanent, @@ -125,6 +132,15 @@ start_server(InputIniFilename) -> [couch_db_update_notifier]} end, UpdateNotifierExes) ++ + lists:map(fun(DocUpdateNotifierExe) -> + {DocUpdateNotifierExe, + {couch_doc_update_notifier, start_link, [DocUpdateNotifierExe]}, + permanent, + 1000, + supervisor, + [couch_doc_update_notifier]} + end, DocUpdateNotifierExes) + ++ case FtSearchQueryServer of "" -> []; @@ -158,6 +174,7 @@ start_server(InputIniFilename) -> "\tLogFile=~s~n" ++ "\tUtilDriverDir=~s~n" ++ "\tDbUpdateNotificationProcesses=~s~n" ++ + "\tDocUpdateNotificationProcesses=~s~n" ++ "\tFullTextSearchQueryServer=~s~n" ++ "~s", [IniFilename, @@ -169,6 +186,7 @@ start_server(InputIniFilename) -> LogFile, UtilDriverDir, UpdateNotifierExes, + DocUpdateNotifierExes, FtSearchQueryServer, [lists:flatten(io_lib:format("\t~s=~s~n", [Lang, QueryExe])) || {Lang, QueryExe} <- QueryServers]]), ?LOG_DEBUG("~s", [ConfigInfo]),