--[[ Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ]]-- -- This is elastic.lua - ElasticSearch library -- N.B. Please keep lib/elastic.lua and admin/elastic.lua synchronised -- Note re http.request: -- In case of failure, the function returns nil followed by an error message. -- If successful, the simple form returns the response body as a string -- followed by the response status code, the response headers and the response status line. -- The generic function returns the same information, -- except the first return value is just the number 1 (the body goes to the sink). local http = require 'socket.http' local ltn12 = require 'ltn12' local JSON = require 'cjson' local config = { es_host = "http://localhost:9200/", es_url = "http://localhost:9200/helpwanted/" } local default_doc = "item" -- ES no longer supports Content-type: application/x-www-form-urlencoded -- which is the default with the simple interface http.request(url, body) -- we have to use the rather more complicated generic interface -- returns the body as the first parameter (if successful), emulating the simple interface local function _http_request(url, body, method) local inhdrs = {} source=ltn12.source.string(body) inhdrs['content-length'] = #body inhdrs['content-type'] = 'application/json' local result = {} local one, code, outhdrs, status = http.request { url = url, method = method or 'POST', headers = inhdrs, sink=ltn12.sink.table(result), source=source } if one then -- success return table.concat(result), code, outhdrs, status else return one, code end end -- Standard ES query, returns $size results of any doc of type $doc, sorting by $sitem (desc) local function find(query, size, doc, sitem) doc = doc or default_doc sitem = sitem or "created" size = size or 10 query = query:gsub(" ", "+") local url = config.es_url .. doc .. "/_search?q="..query.."&sort=" .. sitem .. ":desc&size=" .. size local result = http.request(url) local json = JSON.decode(result) local out = {} if json and json.hits and json.hits.hits then for k, v in pairs(json.hits.hits) do v._source.request_id = v._id table.insert(out, v._source) end end return out end -- Get a single document local function get (ty, id) local url = config.es_url .. ty .. "/" .. id local result = http.request(url) local json = JSON.decode(result) if json and json._source then json._source.request_id = json._id return json._source else return {} end end -- Do a raw ES query with a JSON query local function raw(query, doctype) local js = JSON.encode(query) doctype = doctype or default_doc local url = config.es_url .. doctype .. "/_search" local result = _http_request(url, js) local json = JSON.decode(result) return json or {} end -- scrolling query generator local function scrollquerygen(query, doctype) -- start off the scroll local url = config.es_url .. (doctype or default_doc) .. "/_search?scroll=1m" local result = _http_request(url, JSON.encode(query)) local js = JSON.decode(result) while js and js.hits and js.hits.hits and #js.hits.hits > 0 do -- scroll as long as we get new results coroutine.yield(js) -- continue the scroll (N.B. does not use the doctype) local url = config.es_host .. "_search/scroll?scroll=1m&scroll_id=" .. js._scroll_id local result = http.request(url) js = JSON.decode(result) end -- drop the scroll local url = config.es_host .. "_search/scroll/" .. js._scroll_id http.request { url = url, method = 'DELETE' } end local function scrollquery(query, doctype) return coroutine.wrap(function () scrollquerygen(query, doctype) end) end -- Update a document local function update(doctype, id, query) local js = JSON.encode({doc = query }) doctype = doctype or default_doc local url = config.es_url .. doctype .. "/" .. id .. "/_update" local result = _http_request(url, js) local json = JSON.decode(result) return json or {} end -- Put a new document somewhere local function index(r, id, ty, body) if not id then id = r:sha1(ty .. (math.random(1,99999999)*os.time()) .. ':' .. r:clock()) end local url = config.es_url .. ty .. "/" .. id local result = _http_request(url, body) local json = JSON.decode(result) return json or {} end -- module defs return { find = find, get = get, raw = raw, scrollquery = scrollquery, index = index, update = update }