+++ /dev/null
-local cqueues = require "cqueues"
-local monotime = cqueues.monotime
-local cc = require "cqueues.condition"
-local ce = require "cqueues.errno"
-local rand = require "openssl.rand"
-local new_fifo = require "fifo"
-local band = require "http.bit".band
-local h2_error = require "http.h2_error"
-local h2_stream = require "http.h2_stream"
-local hpack = require "http.hpack"
-local h2_banned_ciphers = require "http.tls".banned_ciphers
-local spack = string.pack or require "compat53.string".pack
-local sunpack = string.unpack or require "compat53.string".unpack
-
-local assert = assert
-if _VERSION:match("%d+%.?%d*") < "5.3" then
- assert = require "compat53.module".assert
-end
-
-local function xor(a, b)
- return (a and b) or not (a or b)
-end
-
-local preface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
-
-local default_settings = {
- [0x1] = 4096; -- HEADER_TABLE_SIZE
- [0x2] = true; -- ENABLE_PUSH
- [0x3] = math.huge; -- MAX_CONCURRENT_STREAMS
- [0x4] = 65535; -- INITIAL_WINDOW_SIZE
- [0x5] = 16384; -- MAX_FRAME_SIZE
- [0x6] = math.huge; -- MAX_HEADER_LIST_SIZE
-}
-
-local function merge_settings(new, old)
- return {
- [0x1] = new[0x1] or old[0x1];
- [0x2] = new[0x2] or old[0x2];
- [0x3] = new[0x3] or old[0x3];
- [0x4] = new[0x4] or old[0x4];
- [0x5] = new[0x5] or old[0x5];
- [0x6] = new[0x6] or old[0x6];
- }
-end
-
-local connection_methods = {}
-local connection_mt = {
- __name = "http.h2_connection";
- __index = connection_methods;
-}
-
-function connection_mt:__tostring()
- return string.format("http.h2_connection{type=%q}",
- self.type)
-end
-
-local connection_main_loop
-
--- An 'onerror' that doesn't throw
-local function onerror(socket, op, why, lvl) -- luacheck: ignore 212
- if why == ce.EPIPE or why == ce.ETIMEDOUT then
- return why
- end
- return string.format("%s: %s", op, ce.strerror(why)), why
-end
-
--- Read bytes from the given socket looking for the http2 connection preface
--- optionally ungets the bytes in case of failure
-local function socket_has_preface(socket, unget, timeout)
- local deadline = timeout and (monotime()+timeout)
- local bytes = ""
- local is_h2 = true
- while #bytes < #preface do
- -- read *up to* number of bytes left in preface
- local ok, err, errno = socket:xread(#bytes-#preface, deadline and (deadline-monotime()))
- if ok == nil then
- return nil, err or ce.EPIPE, errno
- end
- bytes = bytes .. ok
- if bytes ~= preface:sub(1, #bytes) then
- is_h2 = false
- break
- end
- end
- if unget then
- local ok, errno = socket:unget(bytes)
- if not ok then
- return nil, onerror(socket, "unget", errno, 2)
- end
- end
- return is_h2
-end
-
-local function new_connection(socket, conn_type, settings, timeout, cq)
- local deadline = timeout and (monotime()+timeout)
- cq = assert(cq or cqueues.running())
-
- socket:setmode("b", "bf")
- socket:setvbuf("full", math.huge) -- 'infinite' buffering; no write locks needed
- socket:onerror(onerror)
-
- local ssl = socket:checktls()
- if ssl then
- local cipher = ssl:getCipherInfo()
- if h2_banned_ciphers[cipher.name] then
- h2_error.errors.INADEQUATE_SECURITY("bad cipher: " .. cipher.name)
- end
- end
- if conn_type == "client" then
- local ok, err = socket:xwrite(preface, "f", timeout)
- if ok == nil then return nil, err end
- elseif conn_type == "server" then
- local ok, err = socket_has_preface(socket, false, timeout)
- if ok == nil then
- return nil, err
- end
- if not ok then
- h2_error.errors.PROTOCOL_ERROR("invalid connection preface. not an http2 client?")
- end
- else
- error('invalid connection type. must be "client" or "server"')
- end
-
- settings = settings or {}
-
- local self = setmetatable({
- socket = socket;
- type = conn_type;
- version = 2; -- for compat with h1_connection
-
- streams = setmetatable({}, {__mode="kv"});
- stream0 = nil; -- store separately with a strong reference
- need_continuation = nil; -- stream
- highest_odd_stream = -1;
- highest_even_stream = -2;
- recv_goaway_lowest = nil;
- recv_goaway = cc.new();
- new_streams = new_fifo();
- new_streams_cond = cc.new();
- peer_settings = default_settings;
- peer_settings_cond = cc.new(); -- signaled when the peer has changed their settings
- acked_settings = default_settings;
- send_settings = {n = 0};
- send_settings_ack_cond = cc.new(); -- for when server ACKs our settings
- send_settings_acked = 0;
- peer_flow_credits = 65535; -- 5.2.1
- peer_flow_credits_increase = cc.new();
- encoding_context = nil;
- decoding_context = nil;
- pongs = {}; -- pending pings we've sent. keyed by opaque 8 byte payload
- }, connection_mt)
- self:new_stream(0)
- self.encoding_context = hpack.new(default_settings[0x1])
- self.decoding_context = hpack.new(default_settings[0x1])
- self.cq = cq:wrap(connection_main_loop, self)
-
- do -- send settings frame + wait for reply to complete connection
- local ok, err = self:settings(settings, deadline and (deadline-monotime()))
- if not ok then
- return nil, err
- end
- end
-
- return self
-end
-
-function connection_methods:pollfd()
- return self.socket:pollfd()
-end
-
-function connection_methods:events()
- return self.socket:events()
-end
-
-function connection_methods:timeout()
- if self.cq:empty() then
- return 0
- end
-end
-
-function connection_main_loop(self)
- while not self.socket:eof("r") do
- local typ, flag, streamid, payload = self:read_http2_frame()
- if typ == nil then
- if flag == nil then -- EOF
- self.socket:close()
- break
- else
- error(flag)
- end
- end
- local handler = h2_stream.frame_handlers[typ]
- -- http2 spec section 4.1:
- -- Implementations MUST ignore and discard any frame that has a type that is unknown.
- if handler then
- local stream = self.streams[streamid]
- if stream == nil then
- if xor(streamid % 2 == 1, self.type == "client") then
- h2_error.errors.PROTOCOL_ERROR("Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers")
- end
- -- TODO: check MAX_CONCURRENT_STREAMS
- stream = self:new_stream(streamid)
- self.new_streams:push(stream)
- self.new_streams_cond:signal(1)
- end
- local ok, err = handler(stream, flag, payload)
- if not ok then
- if h2_error.is(err) and err.stream_error then
- if not stream:write_rst_stream(err.code) then
- error(err)
- end
- else -- connection error or unknown error
- error(err)
- end
- end
- end
- end
- return true
-end
-
-local function handle_step_return(self, step_ok, last_err, errno)
- if step_ok then
- return true
- else
- if not self.socket:eof("w") then
- local code, message
- if step_ok then
- code = h2_error.errors.NO_ERROR.code
- elseif h2_error.is(last_err) then
- code = last_err.code
- message = last_err.message
- else
- code = h2_error.errors.INTERNAL_ERROR.code
- end
- -- ignore write failure here; there's nothing that can be done
- self:write_goaway_frame(nil, code, message)
- end
- self:shutdown()
- return nil, last_err, errno
- end
-end
-
-function connection_methods:checktls()
- return self.socket:checktls()
-end
-
-function connection_methods:localname()
- return self.socket:localname()
-end
-
-function connection_methods:peername()
- return self.socket:peername()
-end
-
-function connection_methods:shutdown()
- local ok, err = self:write_goaway_frame(nil, h2_error.errors.NO_ERROR.code, "connection closed")
- if not ok and err == ce.EPIPE then
- -- other end already closed
- ok, err = true, nil
- end
- for _, stream in pairs(self.streams) do
- stream:shutdown()
- end
- self.socket:shutdown("r")
- return ok, err
-end
-
-function connection_methods:close()
- local ok, err = self:shutdown()
- cqueues.poll()
- cqueues.poll()
- self.socket:close()
- return ok, err
-end
-
-function connection_methods:new_stream(id)
- if id then
- assert(id % 1 == 0)
- else
- if self.recv_goaway_lowest then
- h2_error.errors.PROTOCOL_ERROR("Receivers of a GOAWAY frame MUST NOT open additional streams on the connection")
- end
- if self.type == "client" then
- -- Pick next free odd number
- id = self.highest_odd_stream + 2
- else
- -- Pick next free odd number
- id = self.highest_even_stream + 2
- end
- -- TODO: check MAX_CONCURRENT_STREAMS
- end
- assert(self.streams[id] == nil, "stream id already in use")
- assert(id < 2^32, "stream id too large")
- if id % 2 == 0 then
- assert(id > self.highest_even_stream, "stream id too small")
- self.highest_even_stream = id
- else
- assert(id > self.highest_odd_stream, "stream id too small")
- self.highest_odd_stream = id
- end
- local stream = h2_stream.new(self, id)
- if id == 0 then
- self.stream0 = stream
- else
- -- Add dependency on stream 0. http2 spec, 5.3.1
- self.stream0:reprioritise(stream)
- end
- self.streams[id] = stream
- return stream
-end
-
--- this function *should never throw*
-function connection_methods:get_next_incoming_stream(timeout)
- local deadline = timeout and (monotime()+timeout)
- while self.new_streams:length() == 0 do
- if self.socket:eof('r') or self.recv_goaway_lowest then
- -- TODO? clarification required: can the sender of a GOAWAY subsequently start streams?
- -- (with a lower stream id than they sent in the GOAWAY)
- -- For now, assume not.
- return nil, ce.EPIPE
- end
- local which = cqueues.poll(self.socket, self.new_streams_cond, self.recv_goaway, timeout)
- if which == timeout then
- return nil, ce.ETIMEDOUT
- end
- timeout = deadline and (deadline-monotime())
- end
-
- local stream = self.new_streams:pop()
- return stream
-end
-
--- On success, returns type, flags, stream id and payload
--- On timeout, returns nil, ETIMEDOUT -- safe to retry
--- If the socket has been shutdown for reading, and there is no data left unread, returns EPIPE
--- Will raise an error on other errors, or if the frame is invalid
-function connection_methods:read_http2_frame(timeout)
- local deadline = timeout and (monotime()+timeout)
- local frame_header, err, errno = self.socket:xread(9, timeout)
- if frame_header == nil then
- if err == ce.ETIMEDOUT then
- return nil, err
- elseif err == nil --[[EPIPE]] and self.socket:eof("r") then
- return nil
- else
- return nil, err, errno
- end
- end
- local size, typ, flags, streamid = sunpack(">I3 B B I4", frame_header)
- if size > self.acked_settings[0x5] then
- return nil, h2_error.errors.FRAME_SIZE_ERROR:new_traceback("frame too large")
- end
- -- reserved bit MUST be ignored by receivers
- streamid = band(streamid, 0x7fffffff)
- local payload, err2, errno2 = self.socket:xread(size, deadline and (deadline-monotime()))
- if payload == nil then
- if err2 == ce.ETIMEDOUT then
- -- put frame header back into socket so a retry will work
- local ok, errno3 = self.socket:unget(frame_header)
- if not ok then
- return nil, onerror(self.socket, "unget", errno3, 2)
- end
- end
- return nil, err2, errno2
- end
- return typ, flags, streamid, payload
-end
-
--- If this times out, it was the flushing; not the write itself
--- hence it's not always total failure.
--- It's up to the caller to take some action (e.g. closing) rather than doing it here
--- TODO: distinguish between nothing sent and partially sent?
-function connection_methods:write_http2_frame(typ, flags, streamid, payload, timeout)
- local deadline = timeout and (monotime()+timeout)
- if #payload > self.peer_settings[0x5] then
- return nil, h2_error.errors.FRAME_SIZE_ERROR:new_traceback("frame too large")
- end
- local header = spack(">I3 B B I4", #payload, typ, flags, streamid)
- local ok, err, errno = self.socket:xwrite(header, "f", timeout)
- if not ok then
- return nil, err, errno
- end
- return self.socket:xwrite(payload, "n", deadline and (deadline-monotime()))
-end
-
-function connection_methods:ping(timeout)
- local deadline = timeout and (monotime()+timeout)
- local payload
- -- generate a random, unique payload
- repeat -- keep generating until we don't have a collision
- payload = rand.bytes(8)
- until self.pongs[payload] == nil
- local cond = cc.new()
- self.pongs[payload] = cond
- assert(self.stream0:write_ping_frame(false, payload, timeout))
- while self.pongs[payload] do
- timeout = deadline and (deadline-monotime())
- local which = cqueues.poll(self, cond, timeout)
- if which == self then
- local ok, err, errno = self:step(0)
- if not ok then
- return nil, err, errno
- end
- elseif which == timeout then
- return nil, ce.ETIMEDOUT
- end
- end
- return true
-end
-
-function connection_methods:write_window_update(...)
- return self.stream0:write_window_update(...)
-end
-
-function connection_methods:write_goaway_frame(last_stream_id, err_code, debug_msg)
- if last_stream_id == nil then
- last_stream_id = math.max(self.highest_odd_stream, self.highest_even_stream)
- end
- return self.stream0:write_goaway_frame(last_stream_id, err_code, debug_msg)
-end
-
-function connection_methods:set_peer_settings(peer_settings)
- self.peer_settings = merge_settings(peer_settings, self.peer_settings)
- self.peer_settings_cond:signal()
-end
-
-function connection_methods:ack_settings()
- local n = self.send_settings_acked + 1
- self.send_settings_acked = n
- local acked_settings = self.send_settings[n]
- if acked_settings then
- self.send_settings[n] = nil
- self.acked_settings = merge_settings(acked_settings, self.acked_settings)
- end
- self.send_settings_ack_cond:signal(1)
-end
-
-function connection_methods:settings(tbl, timeout)
- local deadline = timeout and monotime()+timeout
- local n, err, errno = self.stream0:write_settings_frame(false, tbl, timeout)
- if not n then
- return nil, err, errno
- end
- -- Now wait for ACK
- while self.send_settings_acked < n do
- timeout = deadline and (deadline-monotime())
- local which = cqueues.poll(self.send_settings_ack_cond, timeout)
- if which ~= self.send_settings_ack_cond then
- self:write_goaway_frame(nil, h2_error.errors.SETTINGS_TIMEOUT.code, "timeout exceeded")
- return nil, ce.ETIMEDOUT
- end
- end
- return true
-end
-
-return {
- preface = preface;
- socket_has_preface = socket_has_preface;
- new = new_connection;
- methods = connection_methods;
- mt = connection_mt;
-}
\ No newline at end of file
+++ /dev/null
-local cqueues = require "cqueues"
-local monotime = cqueues.monotime
-local cc = require "cqueues.condition"
-local ce = require "cqueues.errno"
-local new_fifo = require "fifo"
-local band = require "http.bit".band
-local bor = require "http.bit".bor
-local h2_errors = require "http.h2_error".errors
-local stream_common = require "http.stream_common"
-local spack = string.pack or require "compat53.string".pack
-local sunpack = string.unpack or require "compat53.string".unpack
-local unpack = table.unpack or unpack -- luacheck: ignore 113
-
-local assert = assert
-if _VERSION:match("%d+%.?%d*") < "5.3" then
- assert = require "compat53.module".assert
-end
-
-local MAX_HEADER_BUFFER_SIZE = 400*1024 -- 400 KB is max size in h2o
-
-local frame_handlers = {}
-
-local stream_methods = {}
-for k, v in pairs(stream_common.methods) do
- stream_methods[k] = v
-end
-local stream_mt = {
- __name = "http.h2_stream";
- __index = stream_methods;
-}
-
-function stream_mt:__tostring()
- local dependee_list = {}
- for s in pairs(self.dependees) do
- dependee_list[#dependee_list+1] = string.format("%d", s.id)
- end
- table.sort(dependee_list)
- dependee_list = table.concat(dependee_list, ",")
- return string.format("http.h2_stream{connection=%s;id=%d;state=%q;parent=%s;dependees={%s}}",
- tostring(self.connection), self.id, self.state,
- (self.parent and tostring(self.parent.id) or "nil"), dependee_list)
-end
-
-local function new_stream(connection, id)
- assert(type(id) == "number" and id >= 0 and id <= 0x7fffffff, "invalid stream id")
- local self = setmetatable({
- connection = connection;
- type = connection.type;
-
- state = "idle";
-
- id = id;
- peer_flow_credits = id ~= 0 and connection.peer_settings[0x4];
- peer_flow_credits_increase = cc.new();
- parent = nil;
- dependees = setmetatable({}, {__mode="kv"});
- weight = 16; -- http2 spec, section 5.3.5
-
- rst_stream_error = nil;
-
- stats_sent_headers = 0; -- number of header blocks sent
- stats_recv_headers = 0; -- number of header blocks received
- stats_sent = 0; -- #bytes sent in DATA blocks
- stats_recv = 0; -- #bytes received in DATA blocks
-
- recv_headers_fifo = new_fifo();
- recv_headers_cond = cc.new();
-
- chunk_fifo = new_fifo();
- chunk_cond = cc.new();
- }, stream_mt)
- return self
-end
-
-local valid_states = {
- ["idle"] = 1; -- initial
- ["open"] = 2; -- have sent or received headers; haven't sent body yet
- ["reserved (local)"] = 2; -- have sent a PUSH_PROMISE
- ["reserved (remote)"] = 2; -- have received a PUSH_PROMISE
- ["half closed (local)"] = 3; -- have sent whole body
- ["half closed (remote)"] = 3; -- have received whole body
- ["closed"] = 4; -- complete
-}
-function stream_methods:set_state(new)
- local new_order = assert(valid_states[new])
- local old = self.state
- if new_order <= valid_states[old] then
- error("invalid state progression ('"..old.."' to '"..new.."')")
- end
- self.state = new
-end
-
-function stream_methods:write_http2_frame(typ, flags, payload, timeout)
- return self.connection:write_http2_frame(typ, flags, self.id, payload, timeout)
-end
-
-function stream_methods:reprioritise(child, exclusive)
- assert(child)
- assert(child.id ~= 0) -- cannot reprioritise stream 0
- if self == child then
- -- http2 spec, section 5.3.1
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("A stream cannot depend on itself", true)
- end
- do -- Check if the child is an ancestor
- local ancestor = self.parent
- while ancestor do
- if ancestor == child then
- -- Break the loop. http spec, section 5.3.3
- local ok, err = child.parent:reprioritise(self, false)
- if not ok then return nil, err end
- break
- end
- ancestor = ancestor.parent
- end
- end
- -- Remove old parent
- if child.parent then
- child.parent.dependees[child] = nil
- end
- -- We are now the parent
- child.parent = self
- if exclusive then
- -- All the parent's deps are now the child's
- for s, v in pairs(self.dependees) do
- s.parent = child
- child.dependees[s] = v
- self.dependees[s] = nil
- end
- else
- self.dependees[child] = true
- end
- return true
-end
-
-local chunk_methods = {}
-local chunk_mt = {
- __name = "http.h2_stream.chunk";
- __index = chunk_methods;
-}
-
-local function new_chunk(stream, original_length, data)
- return setmetatable({
- stream = stream;
- original_length = original_length;
- data = data;
- acked = false;
- }, chunk_mt)
-end
-
-function chunk_methods:ack(no_window_update)
- if self.acked then
- error("already acked")
- end
- self.acked = true
- local len = self.original_length
- if len > 0 and not no_window_update then
- -- ignore errors
- self.stream:write_window_update(len)
- self.stream.connection:write_window_update(len)
- end
-end
-
--- DATA
-frame_handlers[0x0] = function(stream, flags, payload)
- if stream.id == 0 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'DATA' frames MUST be associated with a stream")
- end
- if stream.state ~= "open" and stream.state ~= "half closed (local)" then
- return nil, h2_errors.STREAM_CLOSED:new_traceback("'DATA' frame not allowed in '" .. stream.state .. "' state", true)
- end
-
- local end_stream = band(flags, 0x1) ~= 0
- local padded = band(flags, 0x8) ~= 0
-
- local original_length = #payload
-
- if padded then
- local pad_len = sunpack("> B", payload)
- if pad_len >= #payload then -- >= will take care of the pad_len itself
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("length of the padding is the length of the frame payload or greater")
- elseif payload:match("[^%z]", -pad_len) then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("padding not null bytes")
- end
- payload = payload:sub(2, -pad_len-1)
- end
-
- if end_stream then
- if stream.state == "half closed (local)" then
- stream:set_state("closed")
- else
- stream:set_state("half closed (remote)")
- end
- end
-
- local chunk = new_chunk(stream, original_length, payload)
- stream.chunk_fifo:push(chunk)
- stream.stats_recv = stream.stats_recv + #payload
- if end_stream then
- stream.chunk_fifo:push(nil)
- end
- stream.chunk_cond:signal()
-
- return true
-end
-
-function stream_methods:write_data_frame(payload, end_stream, padded, timeout)
- if self.id == 0 then
- h2_errors.PROTOCOL_ERROR("'DATA' frames MUST be associated with a stream")
- end
- if self.state ~= "open" and self.state ~= "half closed (remote)" then
- h2_errors.STREAM_CLOSED("'DATA' frame not allowed in '" .. self.state .. "' state", true)
- end
- local pad_len, padding = "", ""
- local flags = 0
- if end_stream then
- flags = bor(flags, 0x1)
- end
- if padded then
- flags = bor(flags, 0x8)
- pad_len = spack("> B", padded)
- padding = ("\0"):rep(padded)
- end
- payload = pad_len .. payload .. padding
- -- The entire DATA frame payload is included in flow control,
- -- including Pad Length and Padding fields if present
- local new_stream_peer_flow_credits = self.peer_flow_credits - #payload
- local new_connection_peer_flow_credits = self.connection.peer_flow_credits - #payload
- if new_stream_peer_flow_credits < 0 or new_connection_peer_flow_credits < 0 then
- h2_errors.FLOW_CONTROL_ERROR("not enough flow credits")
- end
- local ok, err, errno = self:write_http2_frame(0x0, flags, payload, timeout)
- if not ok then return nil, err, errno end
- self.peer_flow_credits = new_stream_peer_flow_credits
- self.connection.peer_flow_credits = new_connection_peer_flow_credits
- self.stats_sent = self.stats_sent + #payload
- if end_stream then
- if self.state == "half closed (remote)" then
- self:set_state("closed")
- else
- self:set_state("half closed (local)")
- end
- end
- return ok
-end
-
--- Map from header name to whether it belongs in a request (vs a response)
-local valid_pseudo_headers = {
- [":method"] = true;
- [":scheme"] = true;
- [":path"] = true;
- [":authority"] = true;
- [":status"] = false;
-}
-local function validate_headers(headers, is_request, nth_header, ended_stream)
- do -- Validate that all colon fields are before other ones (section 8.1.2.1)
- local seen_non_colon = false
- for name, value in headers:each() do
- if name:sub(1,1) == ":" then
- --[[ Pseudo-header fields are only valid in the context in
- which they are defined. Pseudo-header fields defined for
- requests MUST NOT appear in responses; pseudo-header fields
- defined for responses MUST NOT appear in requests.
- Pseudo-header fields MUST NOT appear in trailers.
- Endpoints MUST treat a request or response that contains
- undefined or invalid pseudo-header fields as malformed
- (Section 8.1.2.6)]]
- if (is_request and nth_header ~= 1) or valid_pseudo_headers[name] ~= is_request then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("Pseudo-header fields are only valid in the context in which they are defined", true)
- end
- if seen_non_colon then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("All pseudo-header fields MUST appear in the header block before regular header fields", true)
- end
- else
- seen_non_colon = true
- end
- if type(value) ~= "string" then
- return nil, "invalid header field"
- end
- end
- end
- if headers:has("connection") then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("An endpoint MUST NOT generate an HTTP/2 message containing connection-specific header fields", true)
- end
- local te = headers:get_as_sequence("te")
- if te.n > 0 and (te[1] ~= "trailers" or te.n ~= 1) then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback([[The TE header field, which MAY be present in an HTTP/2 request; when it is, it MUST NOT contain any value other than "trailers"]], true)
- end
- if is_request then
- if nth_header == 1 then
- --[[ All HTTP/2 requests MUST include exactly one valid value for the :method, :scheme,
- and :path pseudo-header fields, unless it is a CONNECT request (Section 8.3).
- An HTTP request that omits mandatory pseudo-header fields is malformed (Section 8.1.2.6).]]
- local methods = headers:get_as_sequence(":method")
- if methods.n ~= 1 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("requests MUST include exactly one valid value for the :method, :scheme, and :path pseudo-header fields, unless it is a CONNECT request", true)
- elseif methods[1] ~= "CONNECT" then
- local scheme = headers:get_as_sequence(":scheme")
- local path = headers:get_as_sequence(":path")
- if scheme.n ~= 1 or path.n ~= 1 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("requests MUST include exactly one valid value for the :method, :scheme, and :path pseudo-header fields, unless it is a CONNECT request", true)
- end
- if path[1] == "" and (scheme[1] == "http" or scheme[1] == "https") then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("The :path pseudo-header field MUST NOT be empty for http or https URIs", true)
- end
- else -- is CONNECT method
- -- Section 8.3
- if headers:has(":scheme") or headers:has(":path") then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("For a CONNECT request, the :scheme and :path pseudo-header fields MUST be omitted", true)
- end
- end
- elseif nth_header == 2 then
- if not ended_stream then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("Trailers MUST be at end of stream", true)
- end
- elseif nth_header > 2 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("An HTTP request consists of maximum 2 HEADER blocks", true)
- end
- else
- --[[ For HTTP/2 responses, a single :status pseudo-header field is
- defined that carries the HTTP status code field (RFC7231, Section 6).
- This pseudo-header field MUST be included in all responses; otherwise,
- the response is malformed (Section 8.1.2.6)]]
- if not headers:has(":status") then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback(":status pseudo-header field MUST be included in all responses", true)
- end
- end
- return true
-end
-
--- HEADERS
-frame_handlers[0x1] = function(stream, flags, payload)
- if stream.id == 0 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'HEADERS' frames MUST be associated with a stream")
- end
- if stream.state ~= "idle" and stream.state ~= "open" and stream.state ~= "half closed (local)" and stream.state ~= "reserved (remote)" then
- return nil, h2_errors.STREAM_CLOSED:new_traceback("'HEADERS' frame not allowed in '" .. stream.state .. "' state", true)
- end
-
- local end_stream = band(flags, 0x1) ~= 0
- local end_headers = band(flags, 0x04) ~= 0
- local padded = band(flags, 0x8) ~= 0
- local priority = band(flags, 0x20) ~= 0
-
- -- index where payload body starts
- local pos = 1
- local pad_len
-
- if padded then
- pad_len = sunpack("> B", payload, pos)
- pos = 2
- else
- pad_len = 0
- end
-
- if priority then
- local exclusive, stream_dep, weight
- local tmp
- tmp, weight = sunpack(">I4 B", payload, pos)
- exclusive = band(tmp, 0x80000000) ~= 0
- stream_dep = band(tmp, 0x7fffffff)
- weight = weight + 1
- pos = pos + 5
-
- local new_parent = stream.connection.streams[stream_dep]
-
- -- 5.3.1. Stream Dependencies
- -- A dependency on a stream that is not currently in the tree
- -- results in that stream being given a default priority
- if new_parent then
- local ok, err = new_parent:reprioritise(stream, exclusive)
- if not ok then return nil, err end
- stream.weight = weight
- end
- end
-
- if #payload - pos + 1 > MAX_HEADER_BUFFER_SIZE then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("headers too large")
- end
-
- if not end_headers then
- local recv_headers_buffer = { payload }
- local recv_headers_buffer_items = 1
- local recv_headers_buffer_length = #payload - pos + 1
- repeat
- local end_continuations, header_fragment = stream:read_continuation()
- if not end_continuations then
- return nil, header_fragment
- end
- recv_headers_buffer_items = recv_headers_buffer_items + 1
- recv_headers_buffer[recv_headers_buffer_items] = header_fragment
- recv_headers_buffer_length = recv_headers_buffer_length + #header_fragment
- if recv_headers_buffer_length > MAX_HEADER_BUFFER_SIZE then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("headers too large")
- end
- until end_continuations
-
- payload = table.concat(recv_headers_buffer, "", 1, recv_headers_buffer_items)
- end
-
- if pad_len > 0 then
- if pad_len + pos - 1 > #payload then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("length of the padding is the length of the frame payload or greater")
- elseif payload:match("[^%z]", -pad_len) then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("padding not null bytes")
- end
- payload = payload:sub(1, -pad_len-1)
- end
-
- local headers, newpos = stream.connection.decoding_context:decode_headers(payload, nil, pos)
- if newpos ~= #payload + 1 then
- return nil, h2_errors.COMPRESSION_ERROR:new_traceback("incomplete header fragment")
- end
-
- stream.stats_recv_headers = stream.stats_recv_headers + 1
-
- if end_stream then
- if stream.state == "half closed (local)" then
- stream:set_state("closed")
- else
- stream:set_state("half closed (remote)")
- end
- stream.chunk_fifo:push(nil)
- stream.chunk_cond:signal()
- else
- if stream.state == "idle" then
- stream:set_state("open")
- end
- end
-
- local ok, err = validate_headers(headers, stream.type ~= "client", stream.stats_recv_headers, stream.state == "half closed (remote)" or stream.state == "closed")
- if not ok then return nil, err end
-
- stream.recv_headers_fifo:push(headers)
- stream.recv_headers_cond:signal()
-
- return true
-end
-
-function stream_methods:write_headers_frame(payload, end_stream, end_headers, padded, exclusive, stream_dep, weight, timeout)
- assert(self.state ~= "closed" and self.state ~= "half closed (local)")
- local pad_len, pri, padding = "", "", ""
- local flags = 0
- if end_stream then
- flags = bor(flags, 0x1)
- end
- if end_headers then
- flags = bor(flags, 0x4)
- end
- if padded then
- flags = bor(flags, 0x8)
- pad_len = spack("> B", padded)
- padding = ("\0"):rep(padded)
- end
- if weight or stream_dep then
- flags = bor(flags, 0x20)
- assert(stream_dep < 0x80000000)
- local tmp = stream_dep
- if exclusive then
- tmp = bor(tmp, 0x80000000)
- end
- weight = weight and weight - 1 or 0
- pri = spack("> I4 B", tmp, weight)
- end
- payload = pad_len .. pri .. payload .. padding
- local ok, err, errno = self:write_http2_frame(0x1, flags, payload, timeout)
- if ok == nil then return nil, err, errno end
- self.stats_sent_headers = self.stats_sent_headers + 1
- if end_stream then
- if self.state == "reserved (local)" then
- self:set_state("closed")
- else -- self.state == "idle" or self.state == "open" then
- self:set_state("half closed (local)")
- end
- else
- if self.state == "reserved (local)" then
- self:set_state("half closed (remote)")
- elseif self.state == "idle" then
- self:set_state("open")
- end
- end
- return ok
-end
-
--- PRIORITY
-frame_handlers[0x2] = function(stream, flags, payload) -- luacheck: ignore 212
- if stream.id == 0 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'PRIORITY' frames MUST be associated with a stream")
- end
- if #payload ~= 5 then
- return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("'PRIORITY' frames must be 5 bytes", true)
- end
-
- local exclusive, stream_dep, weight
- local tmp
- tmp, weight = sunpack(">I4 B", payload)
- weight = weight + 1
- exclusive = band(tmp, 0x80000000) ~= 0
- stream_dep = band(tmp, 0x7fffffff)
-
- local new_parent = stream.connection.streams[stream_dep]
- local ok, err = new_parent:reprioritise(stream, exclusive)
- if not ok then return nil, err end
- stream.weight = weight
-
- return true
-end
-
-function stream_methods:write_priority_frame(exclusive, stream_dep, weight, timeout)
- assert(stream_dep < 0x80000000)
- local tmp = stream_dep
- if exclusive then
- tmp = bor(tmp, 0x80000000)
- end
- weight = weight and weight - 1 or 0
- local payload = spack("> I4 B", tmp, weight)
- return self:write_http2_frame(0x2, 0, payload, timeout)
-end
-
--- RST_STREAM
-frame_handlers[0x3] = function(stream, flags, payload) -- luacheck: ignore 212
- if stream.id == 0 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'RST_STREAM' frames MUST be associated with a stream")
- end
- if #payload ~= 4 then
- return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("'RST_STREAM' frames must be 4 bytes")
- end
- if stream.state == "idle" then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback([['RST_STREAM' frames MUST NOT be sent for a stream in the "idle" state]])
- end
-
- local err_code = sunpack(">I4", payload)
-
- stream.rst_stream_error = (h2_errors[err_code] or h2_errors.INTERNAL_ERROR):new {
- message = string.format("'RST_STREAM' on stream #%d (code=0x%x)", stream.id, err_code);
- }
-
- stream:set_state("closed")
- stream.recv_headers_cond:signal()
- stream.chunk_cond:signal()
-
- return true
-end
-
-function stream_methods:write_rst_stream(err_code, timeout)
- if self.id == 0 then
- h2_errors.PROTOCOL_ERROR("'RST_STREAM' frames MUST be associated with a stream")
- end
- if self.state == "idle" then
- h2_errors.PROTOCOL_ERROR([['RST_STREAM' frames MUST NOT be sent for a stream in the "idle" state]], true)
- end
- local flags = 0
- local payload = spack(">I4", err_code)
- local ok, err, errno = self:write_http2_frame(0x3, flags, payload, timeout)
- if not ok then return nil, err, errno end
- self:set_state("closed")
- self:shutdown()
- return ok
-end
-
--- SETTING
-frame_handlers[0x4] = function(stream, flags, payload)
- if stream.id ~= 0 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("stream identifier for a 'SETTINGS' frame MUST be zero")
- end
-
- local ack = band(flags, 0x1) ~= 0
- if ack then -- server is ACK-ing our settings
- if #payload ~= 0 then
- return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("Receipt of a 'SETTINGS' frame with the ACK flag set and a length field value other than 0")
- end
- stream.connection:ack_settings()
- return true
- else -- settings from server
- if #payload % 6 ~= 0 then
- return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("'SETTINGS' frame with a length other than a multiple of 6 octets")
- end
- local peer_settings = {}
- for i=1, #payload, 6 do
- local id, val = sunpack(">I2 I4", payload, i)
- if id == 0x1 then
- stream.connection.encoding_context:set_max_dynamic_table_size(val)
- -- Add a 'max size' element to the next outgoing header
- stream.connection.encoding_context:encode_max_size(val)
- elseif id == 0x2 then
- -- Convert to boolean
- if val == 0 then
- val = false
- elseif val == 1 then
- val = true
- else
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback()
- end
- if val and stream.type == "client" then
- -- Clients MUST reject any attempt to change the SETTINGS_ENABLE_PUSH
- -- setting to a value other than 0 by treating the message as a connection
- -- error of type PROTOCOL_ERROR.
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("SETTINGS_ENABLE_PUSH not allowed for clients")
- end
- elseif id == 0x4 then
- if val >= 2^31 then
- return nil, h2_errors.FLOW_CONTROL_ERROR:new_traceback("SETTINGS_INITIAL_WINDOW_SIZE must be less than 2^31")
- end
- elseif id == 0x5 then
- if val < 16384 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("SETTINGS_MAX_FRAME_SIZE must be greater than or equal to 16384")
- elseif val >= 2^24 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("SETTINGS_MAX_FRAME_SIZE must be less than 2^24")
- end
- end
- peer_settings[id] = val
- end
- stream.connection:set_peer_settings(peer_settings)
- -- Ack server's settings
- return stream:write_settings_frame(true)
- end
-end
-
-local function pack_settings_payload(settings)
- local i = 0
- local a = {}
- local function append(k, v)
- a[i*2+1] = k
- a[i*2+2] = v
- i = i + 1
- end
- local HEADER_TABLE_SIZE = settings[0x1]
- if HEADER_TABLE_SIZE ~= nil then
- append(0x1, HEADER_TABLE_SIZE)
- end
- local ENABLE_PUSH = settings[0x2]
- if ENABLE_PUSH ~= nil then
- if type(ENABLE_PUSH) == "boolean" then
- ENABLE_PUSH = ENABLE_PUSH and 1 or 0
- end
- append(0x2, ENABLE_PUSH)
- end
- local MAX_CONCURRENT_STREAMS = settings[0x3]
- if MAX_CONCURRENT_STREAMS ~= nil then
- append(0x3, MAX_CONCURRENT_STREAMS)
- end
- local INITIAL_WINDOW_SIZE = settings[0x4]
- if INITIAL_WINDOW_SIZE ~= nil then
- if INITIAL_WINDOW_SIZE >= 2^31 then
- h2_errors.FLOW_CONTROL_ERROR("SETTINGS_INITIAL_WINDOW_SIZE must be less than 2^31")
- end
- append(0x4, INITIAL_WINDOW_SIZE)
- end
- local MAX_FRAME_SIZE = settings[0x5]
- if MAX_FRAME_SIZE ~= nil then
- if MAX_FRAME_SIZE < 16384 then
- h2_errors.PROTOCOL_ERROR("SETTINGS_MAX_FRAME_SIZE must be greater than or equal to 16384")
- elseif MAX_FRAME_SIZE >= 2^24 then
- h2_errors.PROTOCOL_ERROR("SETTINGS_MAX_FRAME_SIZE must be less than 2^24")
- end
- append(0x5, MAX_FRAME_SIZE)
- end
- local MAX_HEADER_LIST_SIZE = settings[0x6]
- if MAX_HEADER_LIST_SIZE ~= nil then
- append(0x6, MAX_HEADER_LIST_SIZE)
- end
- return spack(">" .. ("I2 I4"):rep(i), unpack(a, 1, i*2))
-end
-
-function stream_methods:write_settings_frame(ACK, settings, timeout)
- if self.id ~= 0 then
- h2_errors.PROTOCOL_ERROR("'SETTINGS' frames must be on stream id 0")
- end
- local flags, payload
- if ACK then
- if settings ~= nil then
- h2_errors.PROTOCOL_ERROR("'SETTINGS' ACK cannot have new settings")
- end
- flags = 0x1
- payload = ""
- else
- flags = 0
- payload = pack_settings_payload(settings)
- end
- local ok, err, errno = self:write_http2_frame(0x4, flags, payload, timeout)
- if ok and not ACK then
- local n = self.connection.send_settings.n + 1
- self.connection.send_settings.n = n
- self.connection.send_settings[n] = settings
- ok = n
- end
- return ok, err, errno
-end
-
--- PUSH_PROMISE
-frame_handlers[0x5] = function(stream, flags, payload)
- if not stream.connection.acked_settings[0x2] then
- -- An endpoint that has both set this parameter to 0 and had it acknowledged MUST
- -- treat the receipt of a PUSH_PROMISE frame as a connection error of type PROTOCOL_ERROR.
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("SETTINGS_ENABLE_PUSH is 0")
- elseif stream.type == "server" then
- -- A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
- -- frame as a connection error of type PROTOCOL_ERROR.
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("A client cannot push")
- end
- if stream.id == 0 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'PUSH_PROMISE' frames MUST be associated with a stream")
- end
-
- local end_headers = band(flags, 0x04) ~= 0
- local padded = band(flags, 0x8) ~= 0
-
- -- index where payload body starts
- local pos = 1
- local pad_len
-
- if padded then
- pad_len = sunpack("> B", payload, pos)
- pos = 2
- else
- pad_len = 0
- end
-
- local tmp = sunpack(">I4", payload, pos)
- local promised_stream_id = band(tmp, 0x7fffffff)
- pos = pos + 4
-
- if #payload - pos + 1 > MAX_HEADER_BUFFER_SIZE then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("headers too large")
- end
-
- if not end_headers then
- local recv_headers_buffer = { payload }
- local recv_headers_buffer_items = 1
- local recv_headers_buffer_length = #payload - pos + 1
- repeat
- local end_continuations, header_fragment = stream:read_continuation()
- if not end_continuations then
- return nil, header_fragment
- end
- recv_headers_buffer_items = recv_headers_buffer_items + 1
- recv_headers_buffer[recv_headers_buffer_items] = header_fragment
- recv_headers_buffer_length = recv_headers_buffer_length + #header_fragment
- if recv_headers_buffer_length > MAX_HEADER_BUFFER_SIZE then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("headers too large")
- end
- until end_continuations
-
- payload = table.concat(recv_headers_buffer, "", 1, recv_headers_buffer_items)
- end
-
- if pad_len > 0 then
- if pad_len + pos - 1 > #payload then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("length of the padding is the length of the frame payload or greater")
- elseif payload:match("[^%z]", -pad_len) then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("padding not null bytes")
- end
- payload = payload:sub(1, -pad_len-1)
- end
-
- local headers, newpos = stream.connection.decoding_context:decode_headers(payload, nil, pos)
- if newpos ~= #payload + 1 then
- return nil, h2_errors.COMPRESSION_ERROR:new_traceback("incomplete header fragment")
- end
-
- local ok, err = validate_headers(headers, true, 1, false)
- if not ok then return nil, err end
-
- local promised_stream = stream.connection:new_stream(promised_stream_id)
- stream:reprioritise(promised_stream)
- promised_stream:set_state("reserved (remote)")
- promised_stream.recv_headers_fifo:push(headers)
- stream.connection.new_streams:push(promised_stream)
- stream.connection.new_streams_cond:signal(1)
-
- return true
-end
-
-function stream_methods:write_push_promise_frame(promised_stream_id, payload, end_headers, padded, timeout)
- assert(self.state == "open" or self.state == "half closed (remote)")
- assert(self.id ~= 0)
- local pad_len, padding = "", ""
- local flags = 0
- if end_headers then
- flags = bor(flags, 0x4)
- end
- if padded then
- flags = bor(flags, 0x8)
- pad_len = spack("> B", padded)
- padding = ("\0"):rep(padded)
- end
- assert(promised_stream_id > 0)
- assert(promised_stream_id < 0x80000000)
- assert(promised_stream_id % 2 == 0)
- -- TODO: promised_stream_id must be valid for sender
- promised_stream_id = spack(">I4", promised_stream_id)
- payload = pad_len .. promised_stream_id .. payload .. padding
- return self:write_http2_frame(0x5, flags, payload, timeout)
-end
-
--- PING
-frame_handlers[0x6] = function(stream, flags, payload)
- if stream.id ~= 0 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'PING' must be on stream id 0")
- end
- if #payload ~= 8 then
- return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("'PING' frames must be 8 bytes")
- end
-
- local ack = band(flags, 0x1) ~= 0
-
- if ack then
- local cond = stream.connection.pongs[payload]
- if cond then
- cond:signal()
- stream.connection.pongs[payload] = nil
- end
- return true
- else
- return stream:write_ping_frame(true, payload)
- end
-end
-
-function stream_methods:write_ping_frame(ACK, payload, timeout)
- if self.id ~= 0 then
- h2_errors.PROTOCOL_ERROR("'PING' frames must be on stream id 0")
- end
- if #payload ~= 8 then
- h2_errors.FRAME_SIZE_ERROR("'PING' frames must have 8 byte payload")
- end
- local flags = ACK and 0x1 or 0
- return self:write_http2_frame(0x6, flags, payload, timeout)
-end
-
--- GOAWAY
-frame_handlers[0x7] = function(stream, flags, payload) -- luacheck: ignore 212
- if stream.id ~= 0 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'GOAWAY' frames must be on stream id 0")
- end
- if #payload < 8 then
- return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("'GOAWAY' frames must be at least 8 bytes")
- end
-
- local last_streamid = sunpack(">I4 I4", payload)
-
- if stream.connection.recv_goaway_lowest == nil or last_streamid < stream.connection.recv_goaway_lowest then
- stream.connection.recv_goaway_lowest = last_streamid
- stream.connection.recv_goaway:signal()
- end
-
- return true
-end
-
-function stream_methods:write_goaway_frame(last_streamid, err_code, debug_msg, timeout)
- if self.id ~= 0 then
- h2_errors.PROTOCOL_ERROR("'GOAWAY' frames MUST be on stream 0")
- end
- assert(last_streamid)
- local flags = 0
- local payload = spack(">I4 I4", last_streamid, err_code)
- if debug_msg then
- payload = payload .. debug_msg
- end
- return self:write_http2_frame(0x7, flags, payload, timeout)
-end
-
--- WINDOW_UPDATE
-frame_handlers[0x8] = function(stream, flags, payload) -- luacheck: ignore 212
- if #payload ~= 4 then
- return nil, h2_errors.FRAME_SIZE_ERROR:new_traceback("'WINDOW_UPDATE' frames must be 4 bytes")
- end
- if stream.id ~= 0 and stream.state == "idle" then
- return nil, h2_errors.PROTOCOL_ERROR([['WINDOW_UPDATE' frames not allowed in "idle" state]], true)
- end
-
- local tmp = sunpack(">I4", payload)
- assert(band(tmp, 0x80000000) == 0, "'WINDOW_UPDATE' reserved bit set")
- local increment = band(tmp, 0x7fffffff)
- if increment == 0 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'WINDOW_UPDATE' MUST not have an increment of 0", stream.id ~= 0)
- end
-
- local ob
- if stream.id == 0 then -- for connection
- ob = stream.connection
- else
- ob = stream
- end
- local newval = ob.peer_flow_credits + increment
- if newval > 2^31-1 then
- return nil, h2_errors.FLOW_CONTROL_ERROR:new_traceback("A sender MUST NOT allow a flow-control window to exceed 2^31-1 octets", stream.id ~= 0)
- end
- ob.peer_flow_credits = newval
- ob.peer_flow_credits_increase:signal()
-
- return true
-end
-
-function stream_methods:write_window_update_frame(inc, timeout)
- local flags = 0
- if self.id ~= 0 and self.state == "idle" then
- h2_errors.PROTOCOL_ERROR([['WINDOW_UPDATE' frames not allowed in "idle" state]], true)
- end
- if inc >= 0x80000000 or inc <= 0 then
- h2_errors.PROTOCOL_ERROR("invalid window update increment", true)
- end
- local payload = spack(">I4", inc)
- return self:write_http2_frame(0x8, flags, payload, timeout)
-end
-
-function stream_methods:write_window_update(inc)
- while inc >= 0x80000000 do
- local ok, err = self:write_window_update_frame(0x7fffffff)
- if not ok then return nil, err end
- inc = inc - 0x7fffffff
- end
- return self:write_window_update_frame(inc)
-end
-
--- CONTINUATION
-frame_handlers[0x9] = function(stream, flags, payload) -- luacheck: ignore 212
- if stream.id == 0 then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'CONTINUATION' frames MUST be associated with a stream")
- end
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("'CONTINUATION' frames MUST be preceded by a 'HEADERS', 'PUSH_PROMISE' or 'CONTINUATION' frame without the 'END_HEADERS' flag set")
-end
-
-function stream_methods:read_continuation()
- local typ, flag, streamid, payload = self:read_http2_frame()
- if typ == nil then
- return nil, flag, streamid
- elseif typ ~= 0x9 or self.id ~= streamid then
- return nil, h2_errors.PROTOCOL_ERROR:new_traceback("CONTINUATION frame expected")
- end
- local end_headers = band(flag, 0x04) ~= 0
- return end_headers, payload
-end
-
-function stream_methods:write_continuation_frame(payload, end_headers, timeout)
- assert(self.state == "open" or self.state == "half closed (remote)")
- local flags = 0
- if end_headers then
- flags = bor(flags, 0x4)
- end
- return self:write_http2_frame(0x9, flags, payload, timeout)
-end
-
--------------------------------------------
-
-function stream_methods:shutdown()
- if self.state ~= "idle" and self.state ~= "closed" and self.id ~= 0 then
- local ok, err = self:write_rst_stream(0)
- if not ok and err ~= ce.EPIPE then
- error(err)
- end
- end
- local len = 0
- while self.chunk_fifo:length() > 0 do
- local chunk = self.chunk_fifo:pop()
- if chunk ~= nil then
- chunk:ack(true)
- len = len + #chunk.data
- end
- end
- if len > 0 then
- self.connection:write_window_update(len)
- end
-end
-
--- this function *should never throw*
-function stream_methods:get_headers(timeout)
- local deadline = timeout and (monotime()+timeout)
- while self.recv_headers_fifo:length() < 1 do
- if self.state == "closed" then
- return nil, self.rst_stream_error
- end
- local which = cqueues.poll(self.connection, self.recv_headers_cond, timeout)
- if which == self.connection then
- local ok, err, errno = self.connection:step(0)
- if not ok then
- return nil, err, errno
- end
- elseif which == timeout then
- return nil, ce.ETIMEDOUT
- end
- timeout = deadline and (deadline-monotime())
- end
- local headers = self.recv_headers_fifo:pop()
- return headers
-end
-
-function stream_methods:get_next_chunk(timeout)
- local deadline = timeout and (monotime()+timeout)
- while self.chunk_fifo:length() == 0 do
- if self.state == "closed" or self.state == "half closed (remote)" then
- if self.rst_stream_error then
- self.rst_stream_error()
- end
- return nil
- end
- local which = cqueues.poll(self.connection, self.chunk_cond, timeout)
- if which == self.connection then
- local ok, err, errno = self.connection:step(0)
- if not ok then
- return nil, err, errno
- end
- elseif which == timeout then
- return nil, ce.ETIMEDOUT
- end
- timeout = deadline and (deadline-monotime())
- end
- local chunk = self.chunk_fifo:pop()
- if chunk == nil then
- return nil, ce.EPIPE
- else
- local data = chunk.data
- chunk:ack(false)
- return data
- end
-end
-
-function stream_methods:unget(str)
- local chunk = new_chunk(self, 0, str) -- 0 means :ack does nothing
- self.chunk_fifo:insert(1, chunk)
-end
-
-local function write_headers(self, func, headers, timeout)
- local deadline = timeout and (monotime()+timeout)
- local encoding_context = self.connection.encoding_context
- encoding_context:encode_headers(headers)
- local payload = encoding_context:render_data()
- encoding_context:clear_data()
-
- local SETTINGS_MAX_FRAME_SIZE = self.connection.peer_settings[0x5]
- if #payload <= SETTINGS_MAX_FRAME_SIZE then
- assert(func(payload, true, deadline))
- else
- do
- local partial = payload:sub(1, SETTINGS_MAX_FRAME_SIZE)
- assert(func(partial, false, deadline))
- end
- local sent = SETTINGS_MAX_FRAME_SIZE
- local max = #payload-SETTINGS_MAX_FRAME_SIZE
- while sent < max do
- local partial = payload:sub(sent+1, sent+SETTINGS_MAX_FRAME_SIZE)
- assert(self:write_continuation_frame(partial, false, deadline and deadline-monotime()))
- sent = sent + SETTINGS_MAX_FRAME_SIZE
- end
- do
- local partial = payload:sub(sent+1)
- assert(self:write_continuation_frame(partial, true, deadline and deadline-monotime()))
- end
- end
-end
-
-function stream_methods:write_headers(headers, end_stream, timeout)
- assert(headers, "missing argument: headers")
- assert(validate_headers(headers, self.type == "client", self.stats_sent_headers+1, end_stream))
- assert(type(end_stream) == "boolean", "'end_stream' MUST be a boolean")
-
- local padded, exclusive, stream_dep, weight = nil, nil, nil, nil
- write_headers(self, function(payload, end_headers, deadline)
- return self:write_headers_frame(payload, end_stream, end_headers, padded, exclusive, stream_dep, weight, deadline and deadline-monotime())
- end, headers, timeout)
-
- return true
-end
-
-function stream_methods:push_promise(headers, timeout)
- assert(self.type == "server")
- assert(headers, "missing argument: headers")
- assert(validate_headers(headers, true, 1, false))
- assert(headers:has(":authority"))
-
- local promised_stream = self.connection:new_stream()
- self:reprioritise(promised_stream)
- local promised_stream_id = promised_stream.id
-
- local padded = nil
- write_headers(self, function(payload, end_headers, deadline)
- return self:write_push_promise_frame(promised_stream_id, payload, end_headers, padded, deadline)
- end, headers, timeout)
-
- promised_stream:set_state("reserved (local)")
-
- return promised_stream
-end
-
-function stream_methods:write_chunk(payload, end_stream, timeout)
- local deadline = timeout and (monotime()+timeout)
- local sent = 0
- while true do
- while self.peer_flow_credits == 0 do
- local which = cqueues.poll(self.connection, self.peer_flow_credits_increase, timeout)
- if which == self.connection then
- local ok, err, errno = self.connection:step(0)
- if not ok then
- return nil, err, errno
- end
- elseif which == timeout then
- return nil, ce.ETIMEDOUT
- end
- timeout = deadline and (deadline-monotime())
- end
- while self.connection.peer_flow_credits == 0 do
- local which = cqueues.poll(self.connection, self.connection.peer_flow_credits_increase, timeout)
- if which == self.connection then
- local ok, err, errno = self.connection:step(0)
- if not ok then
- return nil, err, errno
- end
- elseif which == timeout then
- return nil, ce.ETIMEDOUT
- end
- timeout = deadline and (deadline-monotime())
- end
- local SETTINGS_MAX_FRAME_SIZE = self.connection.peer_settings[0x5]
- local max_available = math.min(self.peer_flow_credits, self.connection.peer_flow_credits, SETTINGS_MAX_FRAME_SIZE)
- if max_available < (#payload - sent) then
- if max_available > 0 then
- -- send partial payload
- local ok, err, errno = self:write_data_frame(payload:sub(sent+1, sent+max_available), false, timeout)
- if not ok then
- return nil, err, errno
- end
- sent = sent + max_available
- end
- else
- break
- end
- timeout = deadline and (deadline-monotime())
- end
- local ok, err, errno = self:write_data_frame(payload:sub(sent+1), end_stream, timeout)
- if not ok then
- return nil, err, errno
- end
- return true
-end
-
-return {
- new = new_stream;
- methods = stream_methods;
- mt = stream_mt;
-
- frame_handlers = frame_handlers;
- pack_settings_payload = pack_settings_payload;
-}
+++ /dev/null
-local cqueues = require "cqueues"
-local monotime = cqueues.monotime
-local cs = require "cqueues.socket"
-local cc = require "cqueues.condition"
-local ce = require "cqueues.errno"
-local h1_connection = require "http.h1_connection"
-local h2_connection = require "http.h2_connection"
-local http_tls = require "http.tls"
-local pkey = require "openssl.pkey"
-local x509 = require "openssl.x509"
-local name = require "openssl.x509.name"
-local altname = require "openssl.x509.altname"
-
-local hang_timeout = 0.03
-
-local function onerror(socket, op, why, lvl) -- luacheck: ignore 212
- if why == ce.EPIPE or why == ce.ETIMEDOUT then
- return why
- end
- return string.format("%s: %s", op, ce.strerror(why)), why
-end
-
--- Sense for TLS or SSL client hello
--- returns `true`, `false` or `nil, err`
-local function is_tls_client_hello(socket, timeout)
- -- reading for 6 bytes should be safe, as no HTTP version
- -- has a valid client request shorter than 6 bytes
- local first_bytes, err, errno = socket:xread(6, timeout)
- if first_bytes == nil then
- return nil, err or ce.EPIPE, errno
- end
- local use_tls = not not (
- first_bytes:match("^\22\3...\1") or -- TLS
- first_bytes:match("^[\128-\255][\9-\255]\1") -- SSLv2
- )
- local ok
- ok, errno = socket:unget(first_bytes)
- if not ok then
- return nil, onerror(socket, "unget", errno, 2)
- end
- return use_tls
-end
-
--- Wrap a bare cqueues socket in an HTTP connection of a suitable version
--- Starts TLS if necessary
--- this function *should never throw*
-local function wrap_socket(self, socket, deadline)
- socket:setmode("b", "b")
- socket:onerror(onerror)
- local use_tls = self.tls
- if use_tls == nil then
- local err, errno
- use_tls, err, errno = is_tls_client_hello(socket, deadline and (deadline-monotime()))
- if use_tls == nil then
- return nil, err, errno
- end
- end
- local is_h2 -- tri-state
- if use_tls then
- local ok, err, errno = socket:starttls(self.ctx, deadline and (deadline-monotime()))
- if not ok then
- return nil, err, errno
- end
- local ssl = socket:checktls()
- if ssl and http_tls.has_alpn then
- local proto = ssl:getAlpnSelected()
- if proto == "h2" then
- is_h2 = true
- elseif proto == nil or proto == "http/1.1" then
- is_h2 = false
- else
- return nil, "unexpected ALPN protocol: " .. proto
- end
- end
- end
- -- Still not sure if incoming connection is an HTTP1 or HTTP2 connection
- -- Need to sniff for the h2 connection preface to find out for sure
- if is_h2 == nil then
- local err, errno
- is_h2, err, errno = h2_connection.socket_has_preface(socket, true, deadline and (deadline-monotime()))
- if is_h2 == nil then
- return nil, err, errno
- end
- end
- local conn, err, errno
- if is_h2 then
- conn, err, errno = h2_connection.new(socket, "server", nil, deadline and (deadline-monotime()))
- else
- conn, err, errno = h1_connection.new(socket, "server", 1.1)
- end
- if not conn then
- return nil, err, errno
- end
- return conn, is_h2
-end
-
--- this function *should never throw*
-local function handle_client(conn, on_stream)
- while true do
- local stream, err, errno = conn:get_next_incoming_stream()
- if stream == nil then
- if (err == ce.EPIPE or errno == ce.ECONNRESET or errno == ce.ENOTCONN)
- and (conn.socket == nil or conn.socket:pending() == 0) then
- break
- else
- return nil, err, errno
- end
- end
- on_stream(stream)
- end
- -- wait for streams to complete?
- return true
-end
-
--- Prefer whichever comes first
-local function alpn_select(ssl, protos) -- luacheck: ignore 212
- for _, proto in ipairs(protos) do
- if proto == "h2" or proto == "http/1.1" then
- return proto
- end
- end
- return nil
-end
-
--- create a new self signed cert
-local function new_ctx(host)
- local ctx = http_tls.new_server_context()
- if ctx.setAlpnSelect then
- ctx:setAlpnSelect(alpn_select)
- end
- local crt = x509.new()
- -- serial needs to be unique or browsers will show uninformative error messages
- crt:setSerial(os.time())
- -- use the host we're listening on as canonical name
- local dn = name.new()
- dn:add("CN", host)
- crt:setSubject(dn)
- local alt = altname.new()
- alt:add("DNS", host)
- crt:setSubjectAlt(alt)
- -- lasts for 10 years
- crt:setLifetime(os.time(), os.time()+86400*3650)
- -- can't be used as a CA
- crt:setBasicConstraints{CA=false}
- crt:setBasicConstraintsCritical(true)
- -- generate a new private/public key pair
- local key = pkey.new()
- crt:setPublicKey(key)
- crt:sign(key)
- assert(ctx:setPrivateKey(key))
- assert(ctx:setCertificate(crt))
- return ctx
-end
-
-local server_methods = {
- max_concurrent = math.huge;
- client_timeout = 10;
-}
-local server_mt = {
- __name = "http.server";
- __index = server_methods;
-}
-
-function server_mt:__tostring()
- return string.format("http.server{socket=%s;n_connections=%d}",
- tostring(self.socket), self.n_connections)
-end
-
---[[ Creates a new server object
-
-Takes a table of options:
- - `.socket`: A cqueues socket object
- - `.tls`: `nil`: allow both tls and non-tls connections
- - `true`: allows tls connections only
- - `false`: allows non-tls connections only
- - `.ctx`: an `openssl.ssl.context` object to use for tls connections
- - ` `nil`: a self-signed context will be generated
- - `.max_concurrent`: Maximum number of connections to allow live at a time (default: infinity)
- - `.client_timeout`: Timeout (in seconds) to wait for client to send first bytes and/or complete TLS handshake (default: 10)
-]]
-local function new_server(tbl)
- local socket = assert(tbl.socket)
-
- -- Return errors rather than throwing
- socket:onerror(function(s, op, why, lvl) -- luacheck: ignore 431 212
- return why
- end)
-
- return setmetatable({
- socket = socket;
- tls = tbl.tls;
- ctx = tbl.ctx;
- max_concurrent = tbl.max_concurrent;
- n_connections = 0;
- pause_cond = cc.new();
- paused = true;
- connection_done = cc.new(); -- signalled when connection has been closed
- client_timeout = tbl.client_timeout;
- }, server_mt)
-end
-
---[[
-Extra options:
- - `.family`: protocol family
- - `.host`: address to bind to (required if not `.path`)
- - `.port`: port to bind to (optional if tls isn't `nil`, in which case defaults to 80 for `.tls == false` or 443 if `.tls == true`)
- - `.path`: path to UNIX socket (required if not `.host`)
- - `.v6only`: allow ipv6 only (no ipv4-mapped-ipv6)
- - `.mode`: fchmod or chmod socket after creating UNIX domain socket
- - `.mask`: set and restore umask when binding UNIX domain socket
- - `.unlink`: unlink socket path before binding?
- - `.reuseaddr`: turn on SO_REUSEADDR flag?
- - `.reuseport`: turn on SO_REUSEPORT flag?
-]]
-local function listen(tbl)
- local tls = tbl.tls
- local host = tbl.host
- local path = tbl.path
- assert(host or path, "need host or path")
- local port = tbl.port
- if host and port == nil then
- if tls == true then
- port = "443"
- elseif tls == false then
- port = "80"
- else
- error("need port")
- end
- end
- local ctx = tbl.ctx
- if ctx == nil and tls ~= false then
- if host then
- ctx = new_ctx(host)
- else
- error("Custom OpenSSL context required when using a UNIX domain socket")
- end
- end
- local s = assert(cs.listen{
- family = tbl.family;
- host = host;
- port = port;
- path = path;
- mode = tbl.mode;
- mask = tbl.mask;
- unlink = tbl.unlink;
- reuseaddr = tbl.reuseaddr;
- reuseport = tbl.reuseport;
- v6only = tbl.v6only;
- })
- return new_server {
- socket = s;
- tls = tls;
- ctx = ctx;
- max_concurrent = tbl.max_concurrent;
- client_timeout = tbl.client_timeout;
- }
-end
-
--- Actually wait for and *do* the binding
--- Don't *need* to call this, as if not it will be done lazily
-function server_methods:listen(timeout)
- return self.socket:listen(timeout)
-end
-
-function server_methods:localname()
- return self.socket:localname()
-end
-
-function server_methods:pause()
- self.paused = true
- self.pause_cond:signal()
-end
-
-function server_methods:close()
- self:pause()
- cqueues.poll()
- cqueues.poll()
- self.socket:close()
-end
-
-function server_methods:run(on_stream, cq)
- cq = assert(cq or cqueues.running())
- self.paused = false
- repeat
- if self.n_connections >= self.max_concurrent then
- cqueues.poll(self.connection_done, self.pause_cond)
- if self.paused then
- break
- end
- end
- local socket, accept_errno = self.socket:accept({nodelay = true;}, 0)
- if socket == nil then
- if accept_errno == ce.ETIMEDOUT then
- -- Yield this thread until a client arrives or server paused
- cqueues.poll(self.socket, self.pause_cond)
- elseif accept_errno == ce.EMFILE then
- -- Wait for another request to finish
- if cqueues.poll(self.connection_done, self.pause_cond, hang_timeout) == hang_timeout then
- -- If we're stuck waiting, run a garbage collection sweep
- -- This can prevent a hang
- collectgarbage()
- end
- else
- return nil, ce.strerror(accept_errno), accept_errno
- end
- else
- self.n_connections = self.n_connections + 1
- cq:wrap(function()
- local ok, err
- local conn, is_h2, errno = wrap_socket(self, socket)
- if not conn then
- err = is_h2
- socket:close()
- if errno == ce.ECONNRESET or err == 'read: Connection reset by peer' then
- ok = true
- end
- else
- ok, err = handle_client(conn, on_stream)
- conn:close()
- end
- self.n_connections = self.n_connections - 1
- self.connection_done:signal(1)
- if not ok
- and err ~= ce.EPIPE -- client closed connection
- and err ~= ce.ETIMEDOUT -- an operation timed out
- then
- error(err)
- end
- end)
- end
- until self.paused
- return true
-end
-
-return {
- new = new_server;
- listen = listen;
- mt = server_mt;
-}
\ No newline at end of file