From: daurnimator Date: Mon, 19 Dec 2016 04:43:51 +0000 (+1100) Subject: Update to be compatible with lua-http 0.1 release X-Git-Tag: v1.2.0-rc1~56^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9dfd8f676f1d5b78a18bf5c2b5be5094e957e2f6;p=thirdparty%2Fknot-resolver.git Update to be compatible with lua-http 0.1 release --- diff --git a/modules/http/README.rst b/modules/http/README.rst index 7190389b2..90b603ff7 100644 --- a/modules/http/README.rst +++ b/modules/http/README.rst @@ -246,7 +246,7 @@ Dependencies .. code-block:: bash - $ luarocks install --server=https://luarocks.org/dev http CC=cc + $ luarocks install http * `mmdblua `_ available in LuaRocks diff --git a/modules/http/http.lua b/modules/http/http.lua index 997258ab9..ee1cf9009 100644 --- a/modules/http/http.lua +++ b/modules/http/http.lua @@ -147,7 +147,7 @@ end -- Web server service closure local function route(endpoints) - return function (stream) + return function (_, stream) -- HTTP/2: We're only permitted to send in open/half-closed (remote) local connection = stream.connection if connection.version >= 2 then @@ -188,7 +188,6 @@ local function route(endpoints) end end end - stream:shutdown() end end @@ -291,22 +290,20 @@ function M.interface(host, port, endpoints, crtfile, keyfile) panic('failed to load certificate "%s" - %s', crtfile, err or 'error') end end + -- Compose server handler + local routes = route(endpoints) -- Create TLS context and start listening local s, err = server.listen { + cq = cq; host = host, port = port, client_timeout = 5, ctx = crt and tlscontext(crt, key), + onstream = routes; } if not s then panic('failed to listen on %s#%d: %s', host, port, err) end - -- Compose server handler - local routes = route(endpoints) - cq:wrap(function () - assert(s:run(routes)) - s:close() - end) table.insert(M.servers, s) -- Create certificate renewal timer if ephemeral if crt and ephemeral then diff --git a/modules/http/http.mk b/modules/http/http.mk index 897a0380c..d4f4bf274 100644 --- a/modules/http/http.mk +++ b/modules/http/http.mk @@ -1,6 +1,3 @@ http_SOURCES := http.lua prometheus.lua -http_INSTALL := $(wildcard modules/http/static/*) \ - modules/http/http/h2_stream.lua \ - modules/http/http/h2_connection.lua \ - modules/http/http/server.lua +http_INSTALL := $(wildcard modules/http/static/*) $(call make_lua_module,http) diff --git a/modules/http/http/LICENSE b/modules/http/http/LICENSE deleted file mode 100644 index ec71e4921..000000000 --- a/modules/http/http/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2015-2016 Daurnimator - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. \ No newline at end of file diff --git a/modules/http/http/README b/modules/http/http/README deleted file mode 100644 index 382e159c9..000000000 --- a/modules/http/http/README +++ /dev/null @@ -1,5 +0,0 @@ -Embedded unstable APIs from https://github.com/daurnimator/lua-http under MIT license, see LICENSE. - -ChangeLog: -* Marek Vavrusa : - - Modified h2_connection to reuse current cqueue context \ No newline at end of file diff --git a/modules/http/http/h2_connection.lua b/modules/http/http/h2_connection.lua deleted file mode 100644 index 195ccaea2..000000000 --- a/modules/http/http/h2_connection.lua +++ /dev/null @@ -1,462 +0,0 @@ -local cqueues = require "cqueues" -local monotime = cqueues.monotime -local cc = require "cqueues.condition" -local ce = require "cqueues.errno" -local rand = require "openssl.rand" -local new_fifo = require "fifo" -local band = require "http.bit".band -local h2_error = require "http.h2_error" -local h2_stream = require "http.h2_stream" -local hpack = require "http.hpack" -local h2_banned_ciphers = require "http.tls".banned_ciphers -local spack = string.pack or require "compat53.string".pack -local sunpack = string.unpack or require "compat53.string".unpack - -local assert = assert -if _VERSION:match("%d+%.?%d*") < "5.3" then - assert = require "compat53.module".assert -end - -local function xor(a, b) - return (a and b) or not (a or b) -end - -local preface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" - -local default_settings = { - [0x1] = 4096; -- HEADER_TABLE_SIZE - [0x2] = true; -- ENABLE_PUSH - [0x3] = math.huge; -- MAX_CONCURRENT_STREAMS - [0x4] = 65535; -- INITIAL_WINDOW_SIZE - [0x5] = 16384; -- MAX_FRAME_SIZE - [0x6] = math.huge; -- MAX_HEADER_LIST_SIZE -} - -local function merge_settings(new, old) - return { - [0x1] = new[0x1] or old[0x1]; - [0x2] = new[0x2] or old[0x2]; - [0x3] = new[0x3] or old[0x3]; - [0x4] = new[0x4] or old[0x4]; - [0x5] = new[0x5] or old[0x5]; - [0x6] = new[0x6] or old[0x6]; - } -end - -local connection_methods = {} -local connection_mt = { - __name = "http.h2_connection"; - __index = connection_methods; -} - -function connection_mt:__tostring() - return string.format("http.h2_connection{type=%q}", - self.type) -end - -local connection_main_loop - --- An 'onerror' that doesn't throw -local function onerror(socket, op, why, lvl) -- luacheck: ignore 212 - if why == ce.EPIPE or why == ce.ETIMEDOUT then - return why - end - return string.format("%s: %s", op, ce.strerror(why)), why -end - --- Read bytes from the given socket looking for the http2 connection preface --- optionally ungets the bytes in case of failure -local function socket_has_preface(socket, unget, timeout) - local deadline = timeout and (monotime()+timeout) - local bytes = "" - local is_h2 = true - while #bytes < #preface do - -- read *up to* number of bytes left in preface - local ok, err, errno = socket:xread(#bytes-#preface, deadline and (deadline-monotime())) - if ok == nil then - return nil, err or ce.EPIPE, errno - end - bytes = bytes .. ok - if bytes ~= preface:sub(1, #bytes) then - is_h2 = false - break - end - end - if unget then - local ok, errno = socket:unget(bytes) - if not ok then - return nil, onerror(socket, "unget", errno, 2) - end - end - return is_h2 -end - -local function new_connection(socket, conn_type, settings, timeout, cq) - local deadline = timeout and (monotime()+timeout) - cq = assert(cq or cqueues.running()) - - socket:setmode("b", "bf") - socket:setvbuf("full", math.huge) -- 'infinite' buffering; no write locks needed - socket:onerror(onerror) - - local ssl = socket:checktls() - if ssl then - local cipher = ssl:getCipherInfo() - if h2_banned_ciphers[cipher.name] then - h2_error.errors.INADEQUATE_SECURITY("bad cipher: " .. cipher.name) - end - end - if conn_type == "client" then - local ok, err = socket:xwrite(preface, "f", timeout) - if ok == nil then return nil, err end - elseif conn_type == "server" then - local ok, err = socket_has_preface(socket, false, timeout) - if ok == nil then - return nil, err - end - if not ok then - h2_error.errors.PROTOCOL_ERROR("invalid connection preface. not an http2 client?") - end - else - error('invalid connection type. must be "client" or "server"') - end - - settings = settings or {} - - local self = setmetatable({ - socket = socket; - type = conn_type; - version = 2; -- for compat with h1_connection - - streams = setmetatable({}, {__mode="kv"}); - stream0 = nil; -- store separately with a strong reference - need_continuation = nil; -- stream - highest_odd_stream = -1; - highest_even_stream = -2; - recv_goaway_lowest = nil; - recv_goaway = cc.new(); - new_streams = new_fifo(); - new_streams_cond = cc.new(); - peer_settings = default_settings; - peer_settings_cond = cc.new(); -- signaled when the peer has changed their settings - acked_settings = default_settings; - send_settings = {n = 0}; - send_settings_ack_cond = cc.new(); -- for when server ACKs our settings - send_settings_acked = 0; - peer_flow_credits = 65535; -- 5.2.1 - peer_flow_credits_increase = cc.new(); - encoding_context = nil; - decoding_context = nil; - pongs = {}; -- pending pings we've sent. keyed by opaque 8 byte payload - }, connection_mt) - self:new_stream(0) - self.encoding_context = hpack.new(default_settings[0x1]) - self.decoding_context = hpack.new(default_settings[0x1]) - self.cq = cq:wrap(connection_main_loop, self) - - do -- send settings frame + wait for reply to complete connection - local ok, err = self:settings(settings, deadline and (deadline-monotime())) - if not ok then - return nil, err - end - end - - return self -end - -function connection_methods:pollfd() - return self.socket:pollfd() -end - -function connection_methods:events() - return self.socket:events() -end - -function connection_methods:timeout() - if self.cq:empty() then - return 0 - end -end - -function connection_main_loop(self) - while not self.socket:eof("r") do - local typ, flag, streamid, payload = self:read_http2_frame() - if typ == nil then - if flag == nil then -- EOF - self.socket:close() - break - else - error(flag) - end - end - local handler = h2_stream.frame_handlers[typ] - -- http2 spec section 4.1: - -- Implementations MUST ignore and discard any frame that has a type that is unknown. - if handler then - local stream = self.streams[streamid] - if stream == nil then - if xor(streamid % 2 == 1, self.type == "client") then - h2_error.errors.PROTOCOL_ERROR("Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers") - end - -- TODO: check MAX_CONCURRENT_STREAMS - stream = self:new_stream(streamid) - self.new_streams:push(stream) - self.new_streams_cond:signal(1) - end - local ok, err = handler(stream, flag, payload) - if not ok then - if h2_error.is(err) and err.stream_error then - if not stream:write_rst_stream(err.code) then - error(err) - end - else -- connection error or unknown error - error(err) - end - end - end - end - return true -end - -local function handle_step_return(self, step_ok, last_err, errno) - if step_ok then - return true - else - if not self.socket:eof("w") then - local code, message - if step_ok then - code = h2_error.errors.NO_ERROR.code - elseif h2_error.is(last_err) then - code = last_err.code - message = last_err.message - else - code = h2_error.errors.INTERNAL_ERROR.code - end - -- ignore write failure here; there's nothing that can be done - self:write_goaway_frame(nil, code, message) - end - self:shutdown() - return nil, last_err, errno - end -end - -function connection_methods:checktls() - return self.socket:checktls() -end - -function connection_methods:localname() - return self.socket:localname() -end - -function connection_methods:peername() - return self.socket:peername() -end - -function connection_methods:shutdown() - local ok, err = self:write_goaway_frame(nil, h2_error.errors.NO_ERROR.code, "connection closed") - if not ok and err == ce.EPIPE then - -- other end already closed - ok, err = true, nil - end - for _, stream in pairs(self.streams) do - stream:shutdown() - end - self.socket:shutdown("r") - return ok, err -end - -function connection_methods:close() - local ok, err = self:shutdown() - cqueues.poll() - cqueues.poll() - self.socket:close() - return ok, err -end - -function connection_methods:new_stream(id) - if id then - assert(id % 1 == 0) - else - if self.recv_goaway_lowest then - h2_error.errors.PROTOCOL_ERROR("Receivers of a GOAWAY frame MUST NOT open additional streams on the connection") - end - if self.type == "client" then - -- Pick next free odd number - id = self.highest_odd_stream + 2 - else - -- Pick next free odd number - id = self.highest_even_stream + 2 - end - -- TODO: check MAX_CONCURRENT_STREAMS - end - assert(self.streams[id] == nil, "stream id already in use") - assert(id < 2^32, "stream id too large") - if id % 2 == 0 then - assert(id > self.highest_even_stream, "stream id too small") - self.highest_even_stream = id - else - assert(id > self.highest_odd_stream, "stream id too small") - self.highest_odd_stream = id - end - local stream = h2_stream.new(self, id) - if id == 0 then - self.stream0 = stream - else - -- Add dependency on stream 0. http2 spec, 5.3.1 - self.stream0:reprioritise(stream) - end - self.streams[id] = stream - return stream -end - --- this function *should never throw* -function connection_methods:get_next_incoming_stream(timeout) - local deadline = timeout and (monotime()+timeout) - while self.new_streams:length() == 0 do - if self.socket:eof('r') or self.recv_goaway_lowest then - -- TODO? clarification required: can the sender of a GOAWAY subsequently start streams? - -- (with a lower stream id than they sent in the GOAWAY) - -- For now, assume not. - return nil, ce.EPIPE - end - local which = cqueues.poll(self.socket, self.new_streams_cond, self.recv_goaway, timeout) - if which == timeout then - return nil, ce.ETIMEDOUT - end - timeout = deadline and (deadline-monotime()) - end - - local stream = self.new_streams:pop() - return stream -end - --- On success, returns type, flags, stream id and payload --- On timeout, returns nil, ETIMEDOUT -- safe to retry --- If the socket has been shutdown for reading, and there is no data left unread, returns EPIPE --- Will raise an error on other errors, or if the frame is invalid -function connection_methods:read_http2_frame(timeout) - local deadline = timeout and (monotime()+timeout) - local frame_header, err, errno = self.socket:xread(9, timeout) - if frame_header == nil then - if err == ce.ETIMEDOUT then - return nil, err - elseif err == nil --[[EPIPE]] and self.socket:eof("r") then - return nil - else - return nil, err, errno - end - end - local size, typ, flags, streamid = sunpack(">I3 B B I4", frame_header) - if size > self.acked_settings[0x5] then - return nil, h2_error.errors.FRAME_SIZE_ERROR:new_traceback("frame too large") - end - -- reserved bit MUST be ignored by receivers - streamid = band(streamid, 0x7fffffff) - local payload, err2, errno2 = self.socket:xread(size, deadline and (deadline-monotime())) - if payload == nil then - if err2 == ce.ETIMEDOUT then - -- put frame header back into socket so a retry will work - local ok, errno3 = self.socket:unget(frame_header) - if not ok then - return nil, onerror(self.socket, "unget", errno3, 2) - end - end - return nil, err2, errno2 - end - return typ, flags, streamid, payload -end - --- If this times out, it was the flushing; not the write itself --- hence it's not always total failure. --- It's up to the caller to take some action (e.g. closing) rather than doing it here --- TODO: distinguish between nothing sent and partially sent? -function connection_methods:write_http2_frame(typ, flags, streamid, payload, timeout) - local deadline = timeout and (monotime()+timeout) - if #payload > self.peer_settings[0x5] then - return nil, h2_error.errors.FRAME_SIZE_ERROR:new_traceback("frame too large") - end - local header = spack(">I3 B B I4", #payload, typ, flags, streamid) - local ok, err, errno = self.socket:xwrite(header, "f", timeout) - if not ok then - return nil, err, errno - end - return self.socket:xwrite(payload, "n", deadline and (deadline-monotime())) -end - -function connection_methods:ping(timeout) - local deadline = timeout and (monotime()+timeout) - local payload - -- generate a random, unique payload - repeat -- keep generating until we don't have a collision - payload = rand.bytes(8) - until self.pongs[payload] == nil - local cond = cc.new() - self.pongs[payload] = cond - assert(self.stream0:write_ping_frame(false, payload, timeout)) - while self.pongs[payload] do - timeout = deadline and (deadline-monotime()) - local which = cqueues.poll(self, cond, timeout) - if which == self then - local ok, err, errno = self:step(0) - if not ok then - return nil, err, errno - end - elseif which == timeout then - return nil, ce.ETIMEDOUT - end - end - return true -end - -function connection_methods:write_window_update(...) - return self.stream0:write_window_update(...) -end - -function connection_methods:write_goaway_frame(last_stream_id, err_code, debug_msg) - if last_stream_id == nil then - last_stream_id = math.max(self.highest_odd_stream, self.highest_even_stream) - end - return self.stream0:write_goaway_frame(last_stream_id, err_code, debug_msg) -end - -function connection_methods:set_peer_settings(peer_settings) - self.peer_settings = merge_settings(peer_settings, self.peer_settings) - self.peer_settings_cond:signal() -end - -function connection_methods:ack_settings() - local n = self.send_settings_acked + 1 - self.send_settings_acked = n - local acked_settings = self.send_settings[n] - if acked_settings then - self.send_settings[n] = nil - self.acked_settings = merge_settings(acked_settings, self.acked_settings) - end - self.send_settings_ack_cond:signal(1) -end - -function connection_methods:settings(tbl, timeout) - local deadline = timeout and monotime()+timeout - local n, err, errno = self.stream0:write_settings_frame(false, tbl, timeout) - if not n then - return nil, err, errno - end - -- Now wait for ACK - while self.send_settings_acked < n do - timeout = deadline and (deadline-monotime()) - local which = cqueues.poll(self.send_settings_ack_cond, timeout) - if which ~= self.send_settings_ack_cond then - self:write_goaway_frame(nil, h2_error.errors.SETTINGS_TIMEOUT.code, "timeout exceeded") - return nil, ce.ETIMEDOUT - end - end - return true -end - -return { - preface = preface; - socket_has_preface = socket_has_preface; - new = new_connection; - methods = connection_methods; - mt = connection_mt; -} \ No newline at end of file diff --git a/modules/http/http/h2_stream.lua b/modules/http/http/h2_stream.lua deleted file mode 100644 index e20f6de5a..000000000 --- a/modules/http/http/h2_stream.lua +++ /dev/null @@ -1,1141 +0,0 @@ -local cqueues = require "cqueues" -local monotime = cqueues.monotime -local cc = require "cqueues.condition" -local ce = require "cqueues.errno" -local new_fifo = require "fifo" -local band = require "http.bit".band -local bor = require "http.bit".bor -local h2_errors = require "http.h2_error".errors -local stream_common = require "http.stream_common" -local spack = string.pack or require "compat53.string".pack -local sunpack = string.unpack or require "compat53.string".unpack -local unpack = table.unpack or unpack -- luacheck: ignore 113 - -local assert = assert -if _VERSION:match("%d+%.?%d*") < "5.3" then - assert = require "compat53.module".assert -end - -local MAX_HEADER_BUFFER_SIZE = 400*1024 -- 400 KB is max size in h2o - -local frame_handlers = {} - -local stream_methods = {} -for k, v in pairs(stream_common.methods) do - stream_methods[k] = v -end -local stream_mt = { - __name = "http.h2_stream"; - __index = stream_methods; -} - -function stream_mt:__tostring() - local dependee_list = {} - for s in pairs(self.dependees) do - dependee_list[#dependee_list+1] = string.format("%d", s.id) - end - table.sort(dependee_list) - dependee_list = table.concat(dependee_list, ",") - return string.format("http.h2_stream{connection=%s;id=%d;state=%q;parent=%s;dependees={%s}}", - tostring(self.connection), self.id, self.state, - (self.parent and tostring(self.parent.id) or "nil"), dependee_list) -end - -local function new_stream(connection, id) - assert(type(id) == "number" and id >= 0 and id <= 0x7fffffff, "invalid stream id") - local self = setmetatable({ - connection = connection; - type = connection.type; - - state = "idle"; - - id = id; - peer_flow_credits = id ~= 0 and connection.peer_settings[0x4]; - peer_flow_credits_increase = cc.new(); - parent = nil; - dependees = setmetatable({}, {__mode="kv"}); - weight = 16; -- http2 spec, section 5.3.5 - - rst_stream_error = nil; - - stats_sent_headers = 0; -- number of header blocks sent - stats_recv_headers = 0; -- number of header blocks received - stats_sent = 0; -- #bytes sent in DATA blocks - stats_recv = 0; -- #bytes received in DATA blocks - - recv_headers_fifo = new_fifo(); - recv_headers_cond = cc.new(); - - chunk_fifo = new_fifo(); - chunk_cond = cc.new(); - }, stream_mt) - return self -end - -local valid_states = { - ["idle"] = 1; -- initial - ["open"] = 2; -- have sent or received headers; haven't sent body yet - ["reserved (local)"] = 2; -- have sent a PUSH_PROMISE - ["reserved (remote)"] = 2; -- have received a PUSH_PROMISE - ["half closed (local)"] = 3; -- have sent whole body - ["half closed (remote)"] = 3; -- have received whole body - ["closed"] = 4; -- complete -} -function stream_methods:set_state(new) - local new_order = assert(valid_states[new]) - local old = self.state - if new_order <= valid_states[old] then - error("invalid state progression ('"..old.."' to '"..new.."')") - end - self.state = new -end - -function stream_methods:write_http2_frame(typ, flags, payload, timeout) - return self.connection:write_http2_frame(typ, flags, self.id, payload, timeout) -end - -function stream_methods:reprioritise(child, exclusive) - assert(child) - assert(child.id ~= 0) -- cannot reprioritise stream 0 - if self == child then - -- http2 spec, section 5.3.1 - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("A stream cannot depend on itself", true) - end - do -- Check if the child is an ancestor - local ancestor = self.parent - while ancestor do - if ancestor == child then - -- Break the loop. http spec, section 5.3.3 - local ok, err = child.parent:reprioritise(self, false) - if not ok then return nil, err end - break - end - ancestor = ancestor.parent - end - end - -- Remove old parent - if child.parent then - child.parent.dependees[child] = nil - end - -- We are now the parent - child.parent = self - if exclusive then - -- All the parent's deps are now the child's - for s, v in pairs(self.dependees) do - s.parent = child - child.dependees[s] = v - self.dependees[s] = nil - end - else - self.dependees[child] = true - end - return true -end - -local chunk_methods = {} -local chunk_mt = { - __name = "http.h2_stream.chunk"; - __index = chunk_methods; -} - -local function new_chunk(stream, original_length, data) - return setmetatable({ - stream = stream; - original_length = original_length; - data = data; - acked = false; - }, chunk_mt) -end - -function chunk_methods:ack(no_window_update) - if self.acked then - error("already acked") - end - self.acked = true - local len = self.original_length - if len > 0 and not no_window_update then - -- ignore errors - self.stream:write_window_update(len) - self.stream.connection:write_window_update(len) - end -end - --- DATA -frame_handlers[0x0] = function(stream, flags, payload) - if stream.id == 0 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'DATA' frames MUST be associated with a stream") - end - if stream.state ~= "open" and stream.state ~= "half closed (local)" then - return nil, h2_errors.STREAM_CLOSED:new_traceback("'DATA' frame not allowed in '" .. stream.state .. "' state", true) - end - - local end_stream = band(flags, 0x1) ~= 0 - local padded = band(flags, 0x8) ~= 0 - - local original_length = #payload - - if padded then - local pad_len = sunpack("> B", payload) - if pad_len >= #payload then -- >= will take care of the pad_len itself - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("length of the padding is the length of the frame payload or greater") - elseif payload:match("[^%z]", -pad_len) then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("padding not null bytes") - end - payload = payload:sub(2, -pad_len-1) - end - - if end_stream then - if stream.state == "half closed (local)" then - stream:set_state("closed") - else - stream:set_state("half closed (remote)") - end - end - - local chunk = new_chunk(stream, original_length, payload) - stream.chunk_fifo:push(chunk) - stream.stats_recv = stream.stats_recv + #payload - if end_stream then - stream.chunk_fifo:push(nil) - end - stream.chunk_cond:signal() - - return true -end - -function stream_methods:write_data_frame(payload, end_stream, padded, timeout) - if self.id == 0 then - h2_errors.PROTOCOL_ERROR("'DATA' frames MUST be associated with a stream") - end - if self.state ~= "open" and self.state ~= "half closed (remote)" then - h2_errors.STREAM_CLOSED("'DATA' frame not allowed in '" .. self.state .. "' state", true) - end - local pad_len, padding = "", "" - local flags = 0 - if end_stream then - flags = bor(flags, 0x1) - end - if padded then - flags = bor(flags, 0x8) - pad_len = spack("> B", padded) - padding = ("\0"):rep(padded) - end - payload = pad_len .. payload .. padding - -- The entire DATA frame payload is included in flow control, - -- including Pad Length and Padding fields if present - local new_stream_peer_flow_credits = self.peer_flow_credits - #payload - local new_connection_peer_flow_credits = self.connection.peer_flow_credits - #payload - if new_stream_peer_flow_credits < 0 or new_connection_peer_flow_credits < 0 then - h2_errors.FLOW_CONTROL_ERROR("not enough flow credits") - end - local ok, err, errno = self:write_http2_frame(0x0, flags, payload, timeout) - if not ok then return nil, err, errno end - self.peer_flow_credits = new_stream_peer_flow_credits - self.connection.peer_flow_credits = new_connection_peer_flow_credits - self.stats_sent = self.stats_sent + #payload - if end_stream then - if self.state == "half closed (remote)" then - self:set_state("closed") - else - self:set_state("half closed (local)") - end - end - return ok -end - --- Map from header name to whether it belongs in a request (vs a response) -local valid_pseudo_headers = { - [":method"] = true; - [":scheme"] = true; - [":path"] = true; - [":authority"] = true; - [":status"] = false; -} -local function validate_headers(headers, is_request, nth_header, ended_stream) - do -- Validate that all colon fields are before other ones (section 8.1.2.1) - local seen_non_colon = false - for name, value in headers:each() do - if name:sub(1,1) == ":" then - --[[ Pseudo-header fields are only valid in the context in - which they are defined. Pseudo-header fields defined for - requests MUST NOT appear in responses; pseudo-header fields - defined for responses MUST NOT appear in requests. - Pseudo-header fields MUST NOT appear in trailers. - Endpoints MUST treat a request or response that contains - undefined or invalid pseudo-header fields as malformed - (Section 8.1.2.6)]] - if (is_request and nth_header ~= 1) or valid_pseudo_headers[name] ~= is_request then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("Pseudo-header fields are only valid in the context in which they are defined", true) - end - if seen_non_colon then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("All pseudo-header fields MUST appear in the header block before regular header fields", true) - end - else - seen_non_colon = true - end - if type(value) ~= "string" then - return nil, "invalid header field" - end - end - end - if headers:has("connection") then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("An endpoint MUST NOT generate an HTTP/2 message containing connection-specific header fields", true) - end - local te = headers:get_as_sequence("te") - if te.n > 0 and (te[1] ~= "trailers" or te.n ~= 1) then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback([[The TE header field, which MAY be present in an HTTP/2 request; when it is, it MUST NOT contain any value other than "trailers"]], true) - end - if is_request then - if nth_header == 1 then - --[[ All HTTP/2 requests MUST include exactly one valid value for the :method, :scheme, - and :path pseudo-header fields, unless it is a CONNECT request (Section 8.3). - An HTTP request that omits mandatory pseudo-header fields is malformed (Section 8.1.2.6).]] - local methods = headers:get_as_sequence(":method") - if methods.n ~= 1 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("requests MUST include exactly one valid value for the :method, :scheme, and :path pseudo-header fields, unless it is a CONNECT request", true) - elseif methods[1] ~= "CONNECT" then - local scheme = headers:get_as_sequence(":scheme") - local path = headers:get_as_sequence(":path") - if scheme.n ~= 1 or path.n ~= 1 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("requests MUST include exactly one valid value for the :method, :scheme, and :path pseudo-header fields, unless it is a CONNECT request", true) - end - if path[1] == "" and (scheme[1] == "http" or scheme[1] == "https") then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("The :path pseudo-header field MUST NOT be empty for http or https URIs", true) - end - else -- is CONNECT method - -- Section 8.3 - if headers:has(":scheme") or headers:has(":path") then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("For a CONNECT request, the :scheme and :path pseudo-header fields MUST be omitted", true) - end - end - elseif nth_header == 2 then - if not ended_stream then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("Trailers MUST be at end of stream", true) - end - elseif nth_header > 2 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("An HTTP request consists of maximum 2 HEADER blocks", true) - end - else - --[[ For HTTP/2 responses, a single :status pseudo-header field is - defined that carries the HTTP status code field (RFC7231, Section 6). - This pseudo-header field MUST be included in all responses; otherwise, - the response is malformed (Section 8.1.2.6)]] - if not headers:has(":status") then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback(":status pseudo-header field MUST be included in all responses", true) - end - end - return true -end - --- HEADERS -frame_handlers[0x1] = function(stream, flags, payload) - if stream.id == 0 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'HEADERS' frames MUST be associated with a stream") - end - if stream.state ~= "idle" and stream.state ~= "open" and stream.state ~= "half closed (local)" and stream.state ~= "reserved (remote)" then - return nil, h2_errors.STREAM_CLOSED:new_traceback("'HEADERS' frame not allowed in '" .. stream.state .. "' state", true) - end - - local end_stream = band(flags, 0x1) ~= 0 - local end_headers = band(flags, 0x04) ~= 0 - local padded = band(flags, 0x8) ~= 0 - local priority = band(flags, 0x20) ~= 0 - - -- index where payload body starts - local pos = 1 - local pad_len - - if padded then - pad_len = sunpack("> B", payload, pos) - pos = 2 - else - pad_len = 0 - end - - if priority then - local exclusive, stream_dep, weight - local tmp - tmp, weight = sunpack(">I4 B", payload, pos) - exclusive = band(tmp, 0x80000000) ~= 0 - stream_dep = band(tmp, 0x7fffffff) - weight = weight + 1 - pos = pos + 5 - - local new_parent = stream.connection.streams[stream_dep] - - -- 5.3.1. Stream Dependencies - -- A dependency on a stream that is not currently in the tree - -- results in that stream being given a default priority - if new_parent then - local ok, err = new_parent:reprioritise(stream, exclusive) - if not ok then return nil, err end - stream.weight = weight - end - end - - if #payload - pos + 1 > MAX_HEADER_BUFFER_SIZE then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("headers too large") - end - - if not end_headers then - local recv_headers_buffer = { payload } - local recv_headers_buffer_items = 1 - local recv_headers_buffer_length = #payload - pos + 1 - repeat - local end_continuations, header_fragment = stream:read_continuation() - if not end_continuations then - return nil, header_fragment - end - recv_headers_buffer_items = recv_headers_buffer_items + 1 - recv_headers_buffer[recv_headers_buffer_items] = header_fragment - recv_headers_buffer_length = recv_headers_buffer_length + #header_fragment - if recv_headers_buffer_length > MAX_HEADER_BUFFER_SIZE then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("headers too large") - end - until end_continuations - - payload = table.concat(recv_headers_buffer, "", 1, recv_headers_buffer_items) - end - - if pad_len > 0 then - if pad_len + pos - 1 > #payload then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("length of the padding is the length of the frame payload or greater") - elseif payload:match("[^%z]", -pad_len) then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("padding not null bytes") - end - payload = payload:sub(1, -pad_len-1) - end - - local headers, newpos = stream.connection.decoding_context:decode_headers(payload, nil, pos) - if newpos ~= #payload + 1 then - return nil, h2_errors.COMPRESSION_ERROR:new_traceback("incomplete header fragment") - end - - stream.stats_recv_headers = stream.stats_recv_headers + 1 - - if end_stream then - if stream.state == "half closed (local)" then - stream:set_state("closed") - else - stream:set_state("half closed (remote)") - end - stream.chunk_fifo:push(nil) - stream.chunk_cond:signal() - else - if stream.state == "idle" then - stream:set_state("open") - end - end - - local ok, err = validate_headers(headers, stream.type ~= "client", stream.stats_recv_headers, stream.state == "half closed (remote)" or stream.state == "closed") - if not ok then return nil, err end - - stream.recv_headers_fifo:push(headers) - stream.recv_headers_cond:signal() - - return true -end - -function stream_methods:write_headers_frame(payload, end_stream, end_headers, padded, exclusive, stream_dep, weight, timeout) - assert(self.state ~= "closed" and self.state ~= "half closed (local)") - local pad_len, pri, padding = "", "", "" - local flags = 0 - if end_stream then - flags = bor(flags, 0x1) - end - if end_headers then - flags = bor(flags, 0x4) - end - if padded then - flags = bor(flags, 0x8) - pad_len = spack("> B", padded) - padding = ("\0"):rep(padded) - end - if weight or stream_dep then - flags = bor(flags, 0x20) - assert(stream_dep < 0x80000000) - local tmp = stream_dep - if exclusive then - tmp = bor(tmp, 0x80000000) - end - weight = weight and weight - 1 or 0 - pri = spack("> I4 B", tmp, weight) - end - payload = pad_len .. pri .. payload .. padding - local ok, err, errno = self:write_http2_frame(0x1, flags, payload, timeout) - if ok == nil then return nil, err, errno end - self.stats_sent_headers = self.stats_sent_headers + 1 - if end_stream then - if self.state == "reserved (local)" then - self:set_state("closed") - else -- self.state == "idle" or self.state == "open" then - self:set_state("half closed (local)") - end - else - if self.state == "reserved (local)" then - self:set_state("half closed (remote)") - elseif self.state == "idle" then - self:set_state("open") - end - end - return ok -end - --- PRIORITY -frame_handlers[0x2] = function(stream, flags, payload) -- luacheck: ignore 212 - if stream.id == 0 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'PRIORITY' frames MUST be associated with a stream") - end - if #payload ~= 5 then - return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("'PRIORITY' frames must be 5 bytes", true) - end - - local exclusive, stream_dep, weight - local tmp - tmp, weight = sunpack(">I4 B", payload) - weight = weight + 1 - exclusive = band(tmp, 0x80000000) ~= 0 - stream_dep = band(tmp, 0x7fffffff) - - local new_parent = stream.connection.streams[stream_dep] - local ok, err = new_parent:reprioritise(stream, exclusive) - if not ok then return nil, err end - stream.weight = weight - - return true -end - -function stream_methods:write_priority_frame(exclusive, stream_dep, weight, timeout) - assert(stream_dep < 0x80000000) - local tmp = stream_dep - if exclusive then - tmp = bor(tmp, 0x80000000) - end - weight = weight and weight - 1 or 0 - local payload = spack("> I4 B", tmp, weight) - return self:write_http2_frame(0x2, 0, payload, timeout) -end - --- RST_STREAM -frame_handlers[0x3] = function(stream, flags, payload) -- luacheck: ignore 212 - if stream.id == 0 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'RST_STREAM' frames MUST be associated with a stream") - end - if #payload ~= 4 then - return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("'RST_STREAM' frames must be 4 bytes") - end - if stream.state == "idle" then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback([['RST_STREAM' frames MUST NOT be sent for a stream in the "idle" state]]) - end - - local err_code = sunpack(">I4", payload) - - stream.rst_stream_error = (h2_errors[err_code] or h2_errors.INTERNAL_ERROR):new { - message = string.format("'RST_STREAM' on stream #%d (code=0x%x)", stream.id, err_code); - } - - stream:set_state("closed") - stream.recv_headers_cond:signal() - stream.chunk_cond:signal() - - return true -end - -function stream_methods:write_rst_stream(err_code, timeout) - if self.id == 0 then - h2_errors.PROTOCOL_ERROR("'RST_STREAM' frames MUST be associated with a stream") - end - if self.state == "idle" then - h2_errors.PROTOCOL_ERROR([['RST_STREAM' frames MUST NOT be sent for a stream in the "idle" state]], true) - end - local flags = 0 - local payload = spack(">I4", err_code) - local ok, err, errno = self:write_http2_frame(0x3, flags, payload, timeout) - if not ok then return nil, err, errno end - self:set_state("closed") - self:shutdown() - return ok -end - --- SETTING -frame_handlers[0x4] = function(stream, flags, payload) - if stream.id ~= 0 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("stream identifier for a 'SETTINGS' frame MUST be zero") - end - - local ack = band(flags, 0x1) ~= 0 - if ack then -- server is ACK-ing our settings - if #payload ~= 0 then - return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("Receipt of a 'SETTINGS' frame with the ACK flag set and a length field value other than 0") - end - stream.connection:ack_settings() - return true - else -- settings from server - if #payload % 6 ~= 0 then - return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("'SETTINGS' frame with a length other than a multiple of 6 octets") - end - local peer_settings = {} - for i=1, #payload, 6 do - local id, val = sunpack(">I2 I4", payload, i) - if id == 0x1 then - stream.connection.encoding_context:set_max_dynamic_table_size(val) - -- Add a 'max size' element to the next outgoing header - stream.connection.encoding_context:encode_max_size(val) - elseif id == 0x2 then - -- Convert to boolean - if val == 0 then - val = false - elseif val == 1 then - val = true - else - return nil, h2_errors.PROTOCOL_ERROR:new_traceback() - end - if val and stream.type == "client" then - -- Clients MUST reject any attempt to change the SETTINGS_ENABLE_PUSH - -- setting to a value other than 0 by treating the message as a connection - -- error of type PROTOCOL_ERROR. - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("SETTINGS_ENABLE_PUSH not allowed for clients") - end - elseif id == 0x4 then - if val >= 2^31 then - return nil, h2_errors.FLOW_CONTROL_ERROR:new_traceback("SETTINGS_INITIAL_WINDOW_SIZE must be less than 2^31") - end - elseif id == 0x5 then - if val < 16384 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("SETTINGS_MAX_FRAME_SIZE must be greater than or equal to 16384") - elseif val >= 2^24 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("SETTINGS_MAX_FRAME_SIZE must be less than 2^24") - end - end - peer_settings[id] = val - end - stream.connection:set_peer_settings(peer_settings) - -- Ack server's settings - return stream:write_settings_frame(true) - end -end - -local function pack_settings_payload(settings) - local i = 0 - local a = {} - local function append(k, v) - a[i*2+1] = k - a[i*2+2] = v - i = i + 1 - end - local HEADER_TABLE_SIZE = settings[0x1] - if HEADER_TABLE_SIZE ~= nil then - append(0x1, HEADER_TABLE_SIZE) - end - local ENABLE_PUSH = settings[0x2] - if ENABLE_PUSH ~= nil then - if type(ENABLE_PUSH) == "boolean" then - ENABLE_PUSH = ENABLE_PUSH and 1 or 0 - end - append(0x2, ENABLE_PUSH) - end - local MAX_CONCURRENT_STREAMS = settings[0x3] - if MAX_CONCURRENT_STREAMS ~= nil then - append(0x3, MAX_CONCURRENT_STREAMS) - end - local INITIAL_WINDOW_SIZE = settings[0x4] - if INITIAL_WINDOW_SIZE ~= nil then - if INITIAL_WINDOW_SIZE >= 2^31 then - h2_errors.FLOW_CONTROL_ERROR("SETTINGS_INITIAL_WINDOW_SIZE must be less than 2^31") - end - append(0x4, INITIAL_WINDOW_SIZE) - end - local MAX_FRAME_SIZE = settings[0x5] - if MAX_FRAME_SIZE ~= nil then - if MAX_FRAME_SIZE < 16384 then - h2_errors.PROTOCOL_ERROR("SETTINGS_MAX_FRAME_SIZE must be greater than or equal to 16384") - elseif MAX_FRAME_SIZE >= 2^24 then - h2_errors.PROTOCOL_ERROR("SETTINGS_MAX_FRAME_SIZE must be less than 2^24") - end - append(0x5, MAX_FRAME_SIZE) - end - local MAX_HEADER_LIST_SIZE = settings[0x6] - if MAX_HEADER_LIST_SIZE ~= nil then - append(0x6, MAX_HEADER_LIST_SIZE) - end - return spack(">" .. ("I2 I4"):rep(i), unpack(a, 1, i*2)) -end - -function stream_methods:write_settings_frame(ACK, settings, timeout) - if self.id ~= 0 then - h2_errors.PROTOCOL_ERROR("'SETTINGS' frames must be on stream id 0") - end - local flags, payload - if ACK then - if settings ~= nil then - h2_errors.PROTOCOL_ERROR("'SETTINGS' ACK cannot have new settings") - end - flags = 0x1 - payload = "" - else - flags = 0 - payload = pack_settings_payload(settings) - end - local ok, err, errno = self:write_http2_frame(0x4, flags, payload, timeout) - if ok and not ACK then - local n = self.connection.send_settings.n + 1 - self.connection.send_settings.n = n - self.connection.send_settings[n] = settings - ok = n - end - return ok, err, errno -end - --- PUSH_PROMISE -frame_handlers[0x5] = function(stream, flags, payload) - if not stream.connection.acked_settings[0x2] then - -- An endpoint that has both set this parameter to 0 and had it acknowledged MUST - -- treat the receipt of a PUSH_PROMISE frame as a connection error of type PROTOCOL_ERROR. - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("SETTINGS_ENABLE_PUSH is 0") - elseif stream.type == "server" then - -- A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE - -- frame as a connection error of type PROTOCOL_ERROR. - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("A client cannot push") - end - if stream.id == 0 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'PUSH_PROMISE' frames MUST be associated with a stream") - end - - local end_headers = band(flags, 0x04) ~= 0 - local padded = band(flags, 0x8) ~= 0 - - -- index where payload body starts - local pos = 1 - local pad_len - - if padded then - pad_len = sunpack("> B", payload, pos) - pos = 2 - else - pad_len = 0 - end - - local tmp = sunpack(">I4", payload, pos) - local promised_stream_id = band(tmp, 0x7fffffff) - pos = pos + 4 - - if #payload - pos + 1 > MAX_HEADER_BUFFER_SIZE then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("headers too large") - end - - if not end_headers then - local recv_headers_buffer = { payload } - local recv_headers_buffer_items = 1 - local recv_headers_buffer_length = #payload - pos + 1 - repeat - local end_continuations, header_fragment = stream:read_continuation() - if not end_continuations then - return nil, header_fragment - end - recv_headers_buffer_items = recv_headers_buffer_items + 1 - recv_headers_buffer[recv_headers_buffer_items] = header_fragment - recv_headers_buffer_length = recv_headers_buffer_length + #header_fragment - if recv_headers_buffer_length > MAX_HEADER_BUFFER_SIZE then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("headers too large") - end - until end_continuations - - payload = table.concat(recv_headers_buffer, "", 1, recv_headers_buffer_items) - end - - if pad_len > 0 then - if pad_len + pos - 1 > #payload then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("length of the padding is the length of the frame payload or greater") - elseif payload:match("[^%z]", -pad_len) then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("padding not null bytes") - end - payload = payload:sub(1, -pad_len-1) - end - - local headers, newpos = stream.connection.decoding_context:decode_headers(payload, nil, pos) - if newpos ~= #payload + 1 then - return nil, h2_errors.COMPRESSION_ERROR:new_traceback("incomplete header fragment") - end - - local ok, err = validate_headers(headers, true, 1, false) - if not ok then return nil, err end - - local promised_stream = stream.connection:new_stream(promised_stream_id) - stream:reprioritise(promised_stream) - promised_stream:set_state("reserved (remote)") - promised_stream.recv_headers_fifo:push(headers) - stream.connection.new_streams:push(promised_stream) - stream.connection.new_streams_cond:signal(1) - - return true -end - -function stream_methods:write_push_promise_frame(promised_stream_id, payload, end_headers, padded, timeout) - assert(self.state == "open" or self.state == "half closed (remote)") - assert(self.id ~= 0) - local pad_len, padding = "", "" - local flags = 0 - if end_headers then - flags = bor(flags, 0x4) - end - if padded then - flags = bor(flags, 0x8) - pad_len = spack("> B", padded) - padding = ("\0"):rep(padded) - end - assert(promised_stream_id > 0) - assert(promised_stream_id < 0x80000000) - assert(promised_stream_id % 2 == 0) - -- TODO: promised_stream_id must be valid for sender - promised_stream_id = spack(">I4", promised_stream_id) - payload = pad_len .. promised_stream_id .. payload .. padding - return self:write_http2_frame(0x5, flags, payload, timeout) -end - --- PING -frame_handlers[0x6] = function(stream, flags, payload) - if stream.id ~= 0 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'PING' must be on stream id 0") - end - if #payload ~= 8 then - return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("'PING' frames must be 8 bytes") - end - - local ack = band(flags, 0x1) ~= 0 - - if ack then - local cond = stream.connection.pongs[payload] - if cond then - cond:signal() - stream.connection.pongs[payload] = nil - end - return true - else - return stream:write_ping_frame(true, payload) - end -end - -function stream_methods:write_ping_frame(ACK, payload, timeout) - if self.id ~= 0 then - h2_errors.PROTOCOL_ERROR("'PING' frames must be on stream id 0") - end - if #payload ~= 8 then - h2_errors.FRAME_SIZE_ERROR("'PING' frames must have 8 byte payload") - end - local flags = ACK and 0x1 or 0 - return self:write_http2_frame(0x6, flags, payload, timeout) -end - --- GOAWAY -frame_handlers[0x7] = function(stream, flags, payload) -- luacheck: ignore 212 - if stream.id ~= 0 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'GOAWAY' frames must be on stream id 0") - end - if #payload < 8 then - return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("'GOAWAY' frames must be at least 8 bytes") - end - - local last_streamid = sunpack(">I4 I4", payload) - - if stream.connection.recv_goaway_lowest == nil or last_streamid < stream.connection.recv_goaway_lowest then - stream.connection.recv_goaway_lowest = last_streamid - stream.connection.recv_goaway:signal() - end - - return true -end - -function stream_methods:write_goaway_frame(last_streamid, err_code, debug_msg, timeout) - if self.id ~= 0 then - h2_errors.PROTOCOL_ERROR("'GOAWAY' frames MUST be on stream 0") - end - assert(last_streamid) - local flags = 0 - local payload = spack(">I4 I4", last_streamid, err_code) - if debug_msg then - payload = payload .. debug_msg - end - return self:write_http2_frame(0x7, flags, payload, timeout) -end - --- WINDOW_UPDATE -frame_handlers[0x8] = function(stream, flags, payload) -- luacheck: ignore 212 - if #payload ~= 4 then - return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("'WINDOW_UPDATE' frames must be 4 bytes") - end - if stream.id ~= 0 and stream.state == "idle" then - return nil, h2_errors.PROTOCOL_ERROR([['WINDOW_UPDATE' frames not allowed in "idle" state]], true) - end - - local tmp = sunpack(">I4", payload) - assert(band(tmp, 0x80000000) == 0, "'WINDOW_UPDATE' reserved bit set") - local increment = band(tmp, 0x7fffffff) - if increment == 0 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'WINDOW_UPDATE' MUST not have an increment of 0", stream.id ~= 0) - end - - local ob - if stream.id == 0 then -- for connection - ob = stream.connection - else - ob = stream - end - local newval = ob.peer_flow_credits + increment - if newval > 2^31-1 then - return nil, h2_errors.FLOW_CONTROL_ERROR:new_traceback("A sender MUST NOT allow a flow-control window to exceed 2^31-1 octets", stream.id ~= 0) - end - ob.peer_flow_credits = newval - ob.peer_flow_credits_increase:signal() - - return true -end - -function stream_methods:write_window_update_frame(inc, timeout) - local flags = 0 - if self.id ~= 0 and self.state == "idle" then - h2_errors.PROTOCOL_ERROR([['WINDOW_UPDATE' frames not allowed in "idle" state]], true) - end - if inc >= 0x80000000 or inc <= 0 then - h2_errors.PROTOCOL_ERROR("invalid window update increment", true) - end - local payload = spack(">I4", inc) - return self:write_http2_frame(0x8, flags, payload, timeout) -end - -function stream_methods:write_window_update(inc) - while inc >= 0x80000000 do - local ok, err = self:write_window_update_frame(0x7fffffff) - if not ok then return nil, err end - inc = inc - 0x7fffffff - end - return self:write_window_update_frame(inc) -end - --- CONTINUATION -frame_handlers[0x9] = function(stream, flags, payload) -- luacheck: ignore 212 - if stream.id == 0 then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'CONTINUATION' frames MUST be associated with a stream") - end - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'CONTINUATION' frames MUST be preceded by a 'HEADERS', 'PUSH_PROMISE' or 'CONTINUATION' frame without the 'END_HEADERS' flag set") -end - -function stream_methods:read_continuation() - local typ, flag, streamid, payload = self:read_http2_frame() - if typ == nil then - return nil, flag, streamid - elseif typ ~= 0x9 or self.id ~= streamid then - return nil, h2_errors.PROTOCOL_ERROR:new_traceback("CONTINUATION frame expected") - end - local end_headers = band(flag, 0x04) ~= 0 - return end_headers, payload -end - -function stream_methods:write_continuation_frame(payload, end_headers, timeout) - assert(self.state == "open" or self.state == "half closed (remote)") - local flags = 0 - if end_headers then - flags = bor(flags, 0x4) - end - return self:write_http2_frame(0x9, flags, payload, timeout) -end - -------------------------------------------- - -function stream_methods:shutdown() - if self.state ~= "idle" and self.state ~= "closed" and self.id ~= 0 then - local ok, err = self:write_rst_stream(0) - if not ok and err ~= ce.EPIPE then - error(err) - end - end - local len = 0 - while self.chunk_fifo:length() > 0 do - local chunk = self.chunk_fifo:pop() - if chunk ~= nil then - chunk:ack(true) - len = len + #chunk.data - end - end - if len > 0 then - self.connection:write_window_update(len) - end -end - --- this function *should never throw* -function stream_methods:get_headers(timeout) - local deadline = timeout and (monotime()+timeout) - while self.recv_headers_fifo:length() < 1 do - if self.state == "closed" then - return nil, self.rst_stream_error - end - local which = cqueues.poll(self.connection, self.recv_headers_cond, timeout) - if which == self.connection then - local ok, err, errno = self.connection:step(0) - if not ok then - return nil, err, errno - end - elseif which == timeout then - return nil, ce.ETIMEDOUT - end - timeout = deadline and (deadline-monotime()) - end - local headers = self.recv_headers_fifo:pop() - return headers -end - -function stream_methods:get_next_chunk(timeout) - local deadline = timeout and (monotime()+timeout) - while self.chunk_fifo:length() == 0 do - if self.state == "closed" or self.state == "half closed (remote)" then - if self.rst_stream_error then - self.rst_stream_error() - end - return nil - end - local which = cqueues.poll(self.connection, self.chunk_cond, timeout) - if which == self.connection then - local ok, err, errno = self.connection:step(0) - if not ok then - return nil, err, errno - end - elseif which == timeout then - return nil, ce.ETIMEDOUT - end - timeout = deadline and (deadline-monotime()) - end - local chunk = self.chunk_fifo:pop() - if chunk == nil then - return nil, ce.EPIPE - else - local data = chunk.data - chunk:ack(false) - return data - end -end - -function stream_methods:unget(str) - local chunk = new_chunk(self, 0, str) -- 0 means :ack does nothing - self.chunk_fifo:insert(1, chunk) -end - -local function write_headers(self, func, headers, timeout) - local deadline = timeout and (monotime()+timeout) - local encoding_context = self.connection.encoding_context - encoding_context:encode_headers(headers) - local payload = encoding_context:render_data() - encoding_context:clear_data() - - local SETTINGS_MAX_FRAME_SIZE = self.connection.peer_settings[0x5] - if #payload <= SETTINGS_MAX_FRAME_SIZE then - assert(func(payload, true, deadline)) - else - do - local partial = payload:sub(1, SETTINGS_MAX_FRAME_SIZE) - assert(func(partial, false, deadline)) - end - local sent = SETTINGS_MAX_FRAME_SIZE - local max = #payload-SETTINGS_MAX_FRAME_SIZE - while sent < max do - local partial = payload:sub(sent+1, sent+SETTINGS_MAX_FRAME_SIZE) - assert(self:write_continuation_frame(partial, false, deadline and deadline-monotime())) - sent = sent + SETTINGS_MAX_FRAME_SIZE - end - do - local partial = payload:sub(sent+1) - assert(self:write_continuation_frame(partial, true, deadline and deadline-monotime())) - end - end -end - -function stream_methods:write_headers(headers, end_stream, timeout) - assert(headers, "missing argument: headers") - assert(validate_headers(headers, self.type == "client", self.stats_sent_headers+1, end_stream)) - assert(type(end_stream) == "boolean", "'end_stream' MUST be a boolean") - - local padded, exclusive, stream_dep, weight = nil, nil, nil, nil - write_headers(self, function(payload, end_headers, deadline) - return self:write_headers_frame(payload, end_stream, end_headers, padded, exclusive, stream_dep, weight, deadline and deadline-monotime()) - end, headers, timeout) - - return true -end - -function stream_methods:push_promise(headers, timeout) - assert(self.type == "server") - assert(headers, "missing argument: headers") - assert(validate_headers(headers, true, 1, false)) - assert(headers:has(":authority")) - - local promised_stream = self.connection:new_stream() - self:reprioritise(promised_stream) - local promised_stream_id = promised_stream.id - - local padded = nil - write_headers(self, function(payload, end_headers, deadline) - return self:write_push_promise_frame(promised_stream_id, payload, end_headers, padded, deadline) - end, headers, timeout) - - promised_stream:set_state("reserved (local)") - - return promised_stream -end - -function stream_methods:write_chunk(payload, end_stream, timeout) - local deadline = timeout and (monotime()+timeout) - local sent = 0 - while true do - while self.peer_flow_credits == 0 do - local which = cqueues.poll(self.connection, self.peer_flow_credits_increase, timeout) - if which == self.connection then - local ok, err, errno = self.connection:step(0) - if not ok then - return nil, err, errno - end - elseif which == timeout then - return nil, ce.ETIMEDOUT - end - timeout = deadline and (deadline-monotime()) - end - while self.connection.peer_flow_credits == 0 do - local which = cqueues.poll(self.connection, self.connection.peer_flow_credits_increase, timeout) - if which == self.connection then - local ok, err, errno = self.connection:step(0) - if not ok then - return nil, err, errno - end - elseif which == timeout then - return nil, ce.ETIMEDOUT - end - timeout = deadline and (deadline-monotime()) - end - local SETTINGS_MAX_FRAME_SIZE = self.connection.peer_settings[0x5] - local max_available = math.min(self.peer_flow_credits, self.connection.peer_flow_credits, SETTINGS_MAX_FRAME_SIZE) - if max_available < (#payload - sent) then - if max_available > 0 then - -- send partial payload - local ok, err, errno = self:write_data_frame(payload:sub(sent+1, sent+max_available), false, timeout) - if not ok then - return nil, err, errno - end - sent = sent + max_available - end - else - break - end - timeout = deadline and (deadline-monotime()) - end - local ok, err, errno = self:write_data_frame(payload:sub(sent+1), end_stream, timeout) - if not ok then - return nil, err, errno - end - return true -end - -return { - new = new_stream; - methods = stream_methods; - mt = stream_mt; - - frame_handlers = frame_handlers; - pack_settings_payload = pack_settings_payload; -} diff --git a/modules/http/http/server.lua b/modules/http/http/server.lua deleted file mode 100644 index 49d9fe67e..000000000 --- a/modules/http/http/server.lua +++ /dev/null @@ -1,339 +0,0 @@ -local cqueues = require "cqueues" -local monotime = cqueues.monotime -local cs = require "cqueues.socket" -local cc = require "cqueues.condition" -local ce = require "cqueues.errno" -local h1_connection = require "http.h1_connection" -local h2_connection = require "http.h2_connection" -local http_tls = require "http.tls" -local pkey = require "openssl.pkey" -local x509 = require "openssl.x509" -local name = require "openssl.x509.name" -local altname = require "openssl.x509.altname" - -local hang_timeout = 0.03 - -local function onerror(socket, op, why, lvl) -- luacheck: ignore 212 - if why == ce.EPIPE or why == ce.ETIMEDOUT then - return why - end - return string.format("%s: %s", op, ce.strerror(why)), why -end - --- Sense for TLS or SSL client hello --- returns `true`, `false` or `nil, err` -local function is_tls_client_hello(socket, timeout) - -- reading for 6 bytes should be safe, as no HTTP version - -- has a valid client request shorter than 6 bytes - local first_bytes, err, errno = socket:xread(6, timeout) - if first_bytes == nil then - return nil, err or ce.EPIPE, errno - end - local use_tls = not not ( - first_bytes:match("^\22\3...\1") or -- TLS - first_bytes:match("^[\128-\255][\9-\255]\1") -- SSLv2 - ) - local ok - ok, errno = socket:unget(first_bytes) - if not ok then - return nil, onerror(socket, "unget", errno, 2) - end - return use_tls -end - --- Wrap a bare cqueues socket in an HTTP connection of a suitable version --- Starts TLS if necessary --- this function *should never throw* -local function wrap_socket(self, socket, deadline) - socket:setmode("b", "b") - socket:onerror(onerror) - local use_tls = self.tls - if use_tls == nil then - local err, errno - use_tls, err, errno = is_tls_client_hello(socket, deadline and (deadline-monotime())) - if use_tls == nil then - return nil, err, errno - end - end - local is_h2 -- tri-state - if use_tls then - local ok, err, errno = socket:starttls(self.ctx, deadline and (deadline-monotime())) - if not ok then - return nil, err, errno - end - local ssl = socket:checktls() - if ssl and http_tls.has_alpn then - local proto = ssl:getAlpnSelected() - if proto == "h2" then - is_h2 = true - elseif proto == nil or proto == "http/1.1" then - is_h2 = false - else - return nil, "unexpected ALPN protocol: " .. proto - end - end - end - -- Still not sure if incoming connection is an HTTP1 or HTTP2 connection - -- Need to sniff for the h2 connection preface to find out for sure - if is_h2 == nil then - local err, errno - is_h2, err, errno = h2_connection.socket_has_preface(socket, true, deadline and (deadline-monotime())) - if is_h2 == nil then - return nil, err, errno - end - end - local conn, err, errno - if is_h2 then - conn, err, errno = h2_connection.new(socket, "server", nil, deadline and (deadline-monotime())) - else - conn, err, errno = h1_connection.new(socket, "server", 1.1) - end - if not conn then - return nil, err, errno - end - return conn, is_h2 -end - --- this function *should never throw* -local function handle_client(conn, on_stream) - while true do - local stream, err, errno = conn:get_next_incoming_stream() - if stream == nil then - if (err == ce.EPIPE or errno == ce.ECONNRESET or errno == ce.ENOTCONN) - and (conn.socket == nil or conn.socket:pending() == 0) then - break - else - return nil, err, errno - end - end - on_stream(stream) - end - -- wait for streams to complete? - return true -end - --- Prefer whichever comes first -local function alpn_select(ssl, protos) -- luacheck: ignore 212 - for _, proto in ipairs(protos) do - if proto == "h2" or proto == "http/1.1" then - return proto - end - end - return nil -end - --- create a new self signed cert -local function new_ctx(host) - local ctx = http_tls.new_server_context() - if ctx.setAlpnSelect then - ctx:setAlpnSelect(alpn_select) - end - local crt = x509.new() - -- serial needs to be unique or browsers will show uninformative error messages - crt:setSerial(os.time()) - -- use the host we're listening on as canonical name - local dn = name.new() - dn:add("CN", host) - crt:setSubject(dn) - local alt = altname.new() - alt:add("DNS", host) - crt:setSubjectAlt(alt) - -- lasts for 10 years - crt:setLifetime(os.time(), os.time()+86400*3650) - -- can't be used as a CA - crt:setBasicConstraints{CA=false} - crt:setBasicConstraintsCritical(true) - -- generate a new private/public key pair - local key = pkey.new() - crt:setPublicKey(key) - crt:sign(key) - assert(ctx:setPrivateKey(key)) - assert(ctx:setCertificate(crt)) - return ctx -end - -local server_methods = { - max_concurrent = math.huge; - client_timeout = 10; -} -local server_mt = { - __name = "http.server"; - __index = server_methods; -} - -function server_mt:__tostring() - return string.format("http.server{socket=%s;n_connections=%d}", - tostring(self.socket), self.n_connections) -end - ---[[ Creates a new server object - -Takes a table of options: - - `.socket`: A cqueues socket object - - `.tls`: `nil`: allow both tls and non-tls connections - - `true`: allows tls connections only - - `false`: allows non-tls connections only - - `.ctx`: an `openssl.ssl.context` object to use for tls connections - - ` `nil`: a self-signed context will be generated - - `.max_concurrent`: Maximum number of connections to allow live at a time (default: infinity) - - `.client_timeout`: Timeout (in seconds) to wait for client to send first bytes and/or complete TLS handshake (default: 10) -]] -local function new_server(tbl) - local socket = assert(tbl.socket) - - -- Return errors rather than throwing - socket:onerror(function(s, op, why, lvl) -- luacheck: ignore 431 212 - return why - end) - - return setmetatable({ - socket = socket; - tls = tbl.tls; - ctx = tbl.ctx; - max_concurrent = tbl.max_concurrent; - n_connections = 0; - pause_cond = cc.new(); - paused = true; - connection_done = cc.new(); -- signalled when connection has been closed - client_timeout = tbl.client_timeout; - }, server_mt) -end - ---[[ -Extra options: - - `.family`: protocol family - - `.host`: address to bind to (required if not `.path`) - - `.port`: port to bind to (optional if tls isn't `nil`, in which case defaults to 80 for `.tls == false` or 443 if `.tls == true`) - - `.path`: path to UNIX socket (required if not `.host`) - - `.v6only`: allow ipv6 only (no ipv4-mapped-ipv6) - - `.mode`: fchmod or chmod socket after creating UNIX domain socket - - `.mask`: set and restore umask when binding UNIX domain socket - - `.unlink`: unlink socket path before binding? - - `.reuseaddr`: turn on SO_REUSEADDR flag? - - `.reuseport`: turn on SO_REUSEPORT flag? -]] -local function listen(tbl) - local tls = tbl.tls - local host = tbl.host - local path = tbl.path - assert(host or path, "need host or path") - local port = tbl.port - if host and port == nil then - if tls == true then - port = "443" - elseif tls == false then - port = "80" - else - error("need port") - end - end - local ctx = tbl.ctx - if ctx == nil and tls ~= false then - if host then - ctx = new_ctx(host) - else - error("Custom OpenSSL context required when using a UNIX domain socket") - end - end - local s = assert(cs.listen{ - family = tbl.family; - host = host; - port = port; - path = path; - mode = tbl.mode; - mask = tbl.mask; - unlink = tbl.unlink; - reuseaddr = tbl.reuseaddr; - reuseport = tbl.reuseport; - v6only = tbl.v6only; - }) - return new_server { - socket = s; - tls = tls; - ctx = ctx; - max_concurrent = tbl.max_concurrent; - client_timeout = tbl.client_timeout; - } -end - --- Actually wait for and *do* the binding --- Don't *need* to call this, as if not it will be done lazily -function server_methods:listen(timeout) - return self.socket:listen(timeout) -end - -function server_methods:localname() - return self.socket:localname() -end - -function server_methods:pause() - self.paused = true - self.pause_cond:signal() -end - -function server_methods:close() - self:pause() - cqueues.poll() - cqueues.poll() - self.socket:close() -end - -function server_methods:run(on_stream, cq) - cq = assert(cq or cqueues.running()) - self.paused = false - repeat - if self.n_connections >= self.max_concurrent then - cqueues.poll(self.connection_done, self.pause_cond) - if self.paused then - break - end - end - local socket, accept_errno = self.socket:accept({nodelay = true;}, 0) - if socket == nil then - if accept_errno == ce.ETIMEDOUT then - -- Yield this thread until a client arrives or server paused - cqueues.poll(self.socket, self.pause_cond) - elseif accept_errno == ce.EMFILE then - -- Wait for another request to finish - if cqueues.poll(self.connection_done, self.pause_cond, hang_timeout) == hang_timeout then - -- If we're stuck waiting, run a garbage collection sweep - -- This can prevent a hang - collectgarbage() - end - else - return nil, ce.strerror(accept_errno), accept_errno - end - else - self.n_connections = self.n_connections + 1 - cq:wrap(function() - local ok, err - local conn, is_h2, errno = wrap_socket(self, socket) - if not conn then - err = is_h2 - socket:close() - if errno == ce.ECONNRESET or err == 'read: Connection reset by peer' then - ok = true - end - else - ok, err = handle_client(conn, on_stream) - conn:close() - end - self.n_connections = self.n_connections - 1 - self.connection_done:signal(1) - if not ok - and err ~= ce.EPIPE -- client closed connection - and err ~= ce.ETIMEDOUT -- an operation timed out - then - error(err) - end - end) - end - until self.paused - return true -end - -return { - new = new_server; - listen = listen; - mt = server_mt; -} \ No newline at end of file