Add pipeline_communicate() generator
This generator will be used as the send operations during execute()
step when the connection is in pipeline mode. It can consume results or
send queries depending on socket read/write ready state. Queries to be
sent are in a queue of Callable[[], None] which are built from partial
application of pgconn.send_query() and similar functions.
We add a couple of unit tests, in test_generators.py along with a test
script is taken from PostgreSQL sources. It demonstrates the use of
pipeline mode where query-send and results-fetch steps are interleaved
without any sync point emitted. (In this test, this works because the
output buffer gets full.) The test writes data to logs. Typically, we'd
get:
enter pipeline
sent BEGIN TRANSACTION
sent DROP TABLE IF EXISTS pq_pipeline_demo
sent CREATE UNLOGGED TABLE pq_pipeline_demo( id serial primary key, itemno integer, int8filler int8)
prepare INSERT INTO pq_pipeline_demo(itemno, int8filler) VALUES ($1, $2) as 'prepare'
sent prepared 'prepare' with [b'10000', b'
4611686018427387904']
sent prepared 'prepare' with [b'9999', b'
4611686018427387904']
sent prepared 'prepare' with [b'9998', b'
4611686018427387904']
sent prepared 'prepare' with [b'9997', b'
4611686018427387904']
sent prepared 'prepare' with [b'9996', b'
4611686018427387904']
sent prepared 'prepare' with [b'9995', b'
4611686018427387904']
sent prepared 'prepare' with [b'9994', b'
4611686018427387904']
sent prepared 'prepare' with [b'9993', b'
4611686018427387904']
sent prepared 'prepare' with [b'9992', b'
4611686018427387904']
...
sent prepared 'prepare' with [b'9690', b'
4611686018427387904']
sent prepared 'prepare' with [b'9689', b'
4611686018427387904']
sent prepared 'prepare' with [b'9688', b'
4611686018427387904']
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
...
got COMMAND_OK results
got COMMAND_OK results
sent prepared 'prepare' with [b'9687', b'
4611686018427387904']
sent prepared 'prepare' with [b'9686', b'
4611686018427387904']
sent prepared 'prepare' with [b'9685', b'
4611686018427387904']
sent prepared 'prepare' with [b'9684', b'
4611686018427387904']
sent prepared 'prepare' with [b'9683', b'
4611686018427387904']
...
sent prepared 'prepare' with [b'2', b'
4611686018427387904']
sent prepared 'prepare' with [b'1', b'
4611686018427387904']
sent COMMIT
pipeline sync sent
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
got COMMAND_OK results
...
got COMMAND_OK results
got COMMAND_OK results
got PIPELINE_SYNC results
exit pipeline
We can see that commands are sent, until the output buffer is full (the
connection is then Read-ready only), then results are fetched, until
more commands can be sent, and the cycle repeat.