diff --git a/confluent_client/confluent/client.py b/confluent_client/confluent/client.py index aa30e6a1..07a85423 100644 --- a/confluent_client/confluent/client.py +++ b/confluent_client/confluent/client.py @@ -27,6 +27,8 @@ import confluent.tlvdata as tlvdata SO_PASSCRED = 16 +inflight = False + def _parseserver(string): if ']:' in string: server, port = string[1:].split(']:') @@ -226,6 +228,14 @@ def send_request(operation, path, server, parameters=None): :param server: The socket to send data over :param parameters: Parameters if any to send along with the request """ + global inflight + if inflight: + # calling code failed to complete a transaction, flush and discard + # their unused data + result = tlvdata.recv(server) + while '_requestdone' not in result: + result = tlvdata.recv(server) + inflight = True payload = {'operation': operation, 'path': path} if parameters is not None: payload['parameters'] = parameters @@ -234,6 +244,7 @@ def send_request(operation, path, server, parameters=None): while '_requestdone' not in result: yield result result = tlvdata.recv(server) + inflight = False