2
0
mirror of https://github.com/xcat2/confluent.git synced 2025-01-26 19:10:30 +00:00

Progress client managed filedescriptor

This commit is contained in:
Jarrod Johnson 2021-02-18 14:58:45 -05:00
parent e8778cb992
commit 0b5c4f6f0f
8 changed files with 70 additions and 43 deletions

View File

@ -169,7 +169,8 @@ class Command(object):
raise Exception('Confluent service is not available')
else:
self._connect_tls()
tlvdata.recv(self.connection)
self.protversion = int(tlvdata.recv(self.connection).split(
b'--')[1].strip()[1:])
authdata = tlvdata.recv(self.connection)
if authdata['authpassed'] == 1:
self.authenticated = True
@ -180,10 +181,12 @@ class Command(object):
passphrase = os.environ['CONFLUENT_PASSPHRASE']
self.authenticate(username, passphrase)
def add_file(self, name, handle):
def add_file(self, name, handle, mode):
if self.protversion < 3:
raise Exception('Not supported with connected confluent server')
if not self.unixdomain:
raise Exception('Can only add a file to a unix domain connection')
tlvdata.send(self.connection, {'filename': name}, handle)
tlvdata.send(self.connection, {'filename': name, 'mode': mode}, handle)
def authenticate(self, username, password):
tlvdata.send(self.connection,

View File

@ -22,6 +22,7 @@ import confluent.tlv as tlv
import eventlet.green.socket as socket
from datetime import datetime
import json
import os
import struct
try:
@ -66,6 +67,11 @@ def CMSG_SPACE(length): # bits/socket.h
return ctypes.c_size_t(ret)
class ClientFile(object):
def __init__(self, name, mode, fd):
self.fileobject = os.fdopen(fd, mode)
self.filename = name
libc = ctypes.CDLL(ctypes.util.find_library('c'))
recvmsg = libc.recvmsg
recvmsg.argtypes = [ctypes.c_int, ctypes.POINTER(msghdr), ctypes.c_int]
@ -183,8 +189,8 @@ def recv(handle):
if clev == socket.SOL_SOCKET and ctype == socket.SCM_RIGHTS:
filehandles.fromstring(
cdata[:len(cdata) - len(cdata) % filehandles.itemsize])
print(repr(filehandles))
print(repr(data))
data = json.loads(data)
return ClientFile(data['filename'], data['mode'], filehandles[0])
else:
data = handle.recv(dlen)
while len(data) < dlen:

View File

@ -340,7 +340,10 @@ def osimport(imagefile):
imagefile = os.path.abspath(imagefile)
if c.unixdomain:
ofile = open(imagefile, 'rb')
c.add_file(imagefile, ofile.fileno())
try:
c.add_file(imagefile, ofile.fileno(), 'rb')
except Exception:
pass
importing = False
shortname = None
for rsp in c.create('/deployment/importing/', {'filename': imagefile}):

View File

@ -1138,6 +1138,7 @@ class ConfigManager(object):
return _cfgstore['tenant'][self.tenant]
def __init__(self, tenant, decrypt=False, username=None):
self.clientfiles = {}
global _cfgstore
with _initlock:
if _cfgstore is None:
@ -1172,6 +1173,13 @@ class ConfigManager(object):
self._bg_sync_to_file()
self.wait_for_sync()
def add_client_file(self, clientfile):
self.clientfiles[clientfile.filename] = clientfile.fileobject
def close_client_files(self):
for f in self.clientfiles:
self.clientfiles[f].close()
def get_collective_member(self, name):
return get_collective_member(name)

View File

@ -180,7 +180,8 @@ def handle_deployment(configmanager, inputdata, pathcomponents,
yield imp
return
elif operation == 'create':
importer = osimage.MediaImporter(inputdata['filename'])
importer = osimage.MediaImporter(inputdata['filename'],
configmanager)
yield msg.KeyValueData({'target': importer.targpath,
'name': importer.importkey})
return

View File

@ -351,10 +351,6 @@ def _parse_ssdp(peer, rsp, peerdata):
if __name__ == '__main__':
for rsp in scan(['urn:dmtf-org:service:redfish-rest:1']):
for rsp in scan(['urn:dmtf-org:service:redfish-rest:1'], '10.240.52.189'):
print(repr(rsp))
def fun(a):
print(repr(a))
def byefun(a):
print('bye' + repr(a))
snoop(fun, byefun)

View File

@ -440,10 +440,10 @@ def check_rhel(isoinfo):
return {'name': 'rhel-{0}-{1}'.format(ver, arch), 'method': EXTRACT, 'category': 'el{0}'.format(major)}
def scan_iso(filename):
def scan_iso(archive):
filesizes = {}
filecontents = {}
with libarchive.file_reader(filename) as reader:
with libarchive.fd_reader(archive.fileno()) as reader:
for ent in reader:
if str(ent).endswith('TRANS.TBL'):
continue
@ -456,31 +456,32 @@ def scan_iso(filename):
return filesizes, filecontents
def fingerprint(filename):
with open(filename, 'rb') as archive:
header = archive.read(32768)
archive.seek(32769)
if archive.read(6) == b'CD001\x01':
# ISO image
isoinfo = scan_iso(filename)
name = None
for fun in globals():
if fun.startswith('check_'):
name = globals()[fun](isoinfo)
if name:
return name, isoinfo[0]
return None
else:
sum = hashlib.sha256(header)
if sum.digest() in HEADERSUMS:
archive.seek(32768)
def fingerprint(archive):
header = archive.read(32768)
archive.seek(32769)
if archive.read(6) == b'CD001\x01':
# ISO image
archive.seek(0)
isoinfo = scan_iso(archive)
archive.seek(0)
name = None
for fun in globals():
if fun.startswith('check_'):
name = globals()[fun](isoinfo)
if name:
return name, isoinfo[0]
return None
else:
sum = hashlib.sha256(header)
if sum.digest() in HEADERSUMS:
archive.seek(32768)
chunk = archive.read(32768)
while chunk:
sum.update(chunk)
chunk = archive.read(32768)
while chunk:
sum.update(chunk)
chunk = archive.read(32768)
imginfo = HASHPRINTS.get(sum.hexdigest(), None)
if imginfo:
return imginfo, None
imginfo = HASHPRINTS.get(sum.hexdigest(), None)
if imginfo:
return imginfo, None
def import_image(filename, callback, backend=False):
@ -581,10 +582,15 @@ def generate_stock_profiles(defprofile, distpath, targpath, osname,
class MediaImporter(object):
def __init__(self, media):
def __init__(self, media, cfm=None):
self.worker = None
self.profiles = []
identity = fingerprint(media)
medfile = None
if cfm and media in cfm.clientfiles:
medfile = cfm.clientfiles[media]
else:
medfile = open(medfile, 'rb')
identity = fingerprint(medfile)
if not identity:
raise exc.InvalidArgumentException('Unsupported Media')
self.percent = 0.0
@ -611,6 +617,7 @@ class MediaImporter(object):
if os.path.exists(self.targpath):
raise Exception('{0} already exists'.format(self.targpath))
self.filename = os.path.abspath(media)
self.medfile = medfile
self.importer = eventlet.spawn(self.importmedia)
def stop(self):

View File

@ -175,7 +175,7 @@ def sessionhdl(connection, authname, skipauth=False, cert=None):
cfm = authdata[1]
send_data(connection, {'authpassed': 1})
request = tlvdata.recv(connection)
if request and 'collective' in request:
if request and isinstance(request, dict) and 'collective' in request:
if skipauth:
if not libssl:
tlvdata.send(
@ -216,7 +216,7 @@ def sessionhdl(connection, authname, skipauth=False, cert=None):
'error': 'Unexpected error - ' + str(e)})
send_data(connection, {'_requestdone': 1})
request = tlvdata.recv(connection)
cfm.close_client_files()
def send_response(responses, connection):
if responses is None:
@ -227,6 +227,9 @@ def send_response(responses, connection):
def process_request(connection, request, cfm, authdata, authname, skipauth):
if isinstance(request, tlvdata.ClientFile):
cfm.add_client_file(request)
return
if not isinstance(request, dict):
raise exc.InvalidArgumentException
operation = request['operation']