mirror of
				https://github.com/xcat2/confluent.git
				synced 2025-10-31 11:22:28 +00:00 
			
		
		
		
	Draft for starting the databse replication
Does not actually heed the data, or implement ongoing relay of data back and forth.
This commit is contained in:
		| @@ -31,6 +31,28 @@ except ImportError: | ||||
| currentleader = None | ||||
|  | ||||
|  | ||||
| def connect_to_leader(): | ||||
|     remote = socket.create_connection((currentleader, 13001)) | ||||
|     # TLS cert validation is custom and will not pass normal CA vetting | ||||
|     # to override completely in the right place requires enormous effort, so just defer until after connect | ||||
|     remote = ssl.wrap_socket(remote, cert_reqs=ssl.CERT_NONE, keyfile='/etc/confluent/privkey.pem', | ||||
|                              certfile='/etc/confluent/srvcert.pem') | ||||
|     collent = cfm.get_collective_member_by_address(currentleader) | ||||
|     if not util.cert_matches(remote.getpeercert(binary_form=True), collent['fingerprint']): | ||||
|         raise Exception("Certificate mismatch in the collective")  # probably Janeway up to something | ||||
|     tlvdata.send(remote, {'collective': {'operation': 'connect'}}) | ||||
|     keydata = tlvdata.recv(remote) | ||||
|     colldata = tlvdata.recv(remote) | ||||
|     globaldata = tlvdata.recv(remote) | ||||
|     dbsize = tlvdata.recv(remote)['dbsize'] | ||||
|     dbjson = '' | ||||
|     while (len(dbjson) < dbsize): | ||||
|         ndata = remote.recv(dbsize - len(dbjson)) | ||||
|         if not ndata: | ||||
|             raise Exception("Error doing initial DB transfer") | ||||
|         dbjson += ndata | ||||
|  | ||||
|  | ||||
| def handle_connection(connection, cert, request, local=False): | ||||
|     global currentleader | ||||
|     operation = request['operation'] | ||||
| @@ -74,6 +96,7 @@ def handle_connection(connection, cert, request, local=False): | ||||
|                 return | ||||
|             tlvdata.send(connection, {'collective': {'status': 'Success'}}) | ||||
|             currentleader = rsp['collective']['leader'] | ||||
|             eventlet.spawn_n(connect_to_leader()) | ||||
|     if 'enroll' == operation: | ||||
|         mycert = util.get_certificate_from_file('/etc/confluent/srvcert.pem') | ||||
|         proof = base64.b64decode(request['hmac']) | ||||
| @@ -105,6 +128,14 @@ def handle_connection(connection, cert, request, local=False): | ||||
|                          {'error': 'Invalid certificate,' | ||||
|                                    'redo invitation process'}) | ||||
|             return | ||||
|         tlvdata.send(connection, cfm._dump_keys(None, False)) | ||||
|         tlvdata.send(connection, cfm._cfgstore['collective]']) | ||||
|         tlvdata.send(connection, cfm.get_globals()) | ||||
|         cfgdata = cfm.ConfigManager(None)._dump_to_json() | ||||
|         tlvdata.send(connection, {'dbsize': len(cfgdata)}) | ||||
|         connection.write(cfgdata) | ||||
|         tlvdata.send(connection, {'tenants': 0}) # skip the tenants for now, so far unused anyway | ||||
|         cfm.register_cfg_listener(drone, connection) | ||||
|         # ok, we have a connecting member whose certificate checks out | ||||
|         # He needs to bootstrap his configuration and subscribe it to updates | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user