mirror of
https://gitlab.com/libvirt/libvirt-python.git
synced 2025-07-21 16:59:25 +03:00
override: Add manual PEP 484 type annotations
Signed-off-by: Philipp Hahn <hahn@univention.de>
This commit is contained in:
committed by
Daniel Berrange
parent
67af8b910b
commit
abbd47f4ea
@ -1,4 +1,4 @@
|
||||
def __del__(self):
|
||||
def __del__(self) -> None:
|
||||
try:
|
||||
if self.cb:
|
||||
libvirtmod.virStreamEventRemoveCallback(self._o)
|
||||
@ -9,7 +9,7 @@
|
||||
libvirtmod.virStreamFree(self._o)
|
||||
self._o = None
|
||||
|
||||
def _dispatchStreamEventCallback(self, events, cbData):
|
||||
def _dispatchStreamEventCallback(self, events: int, cbData: Dict[str, Any]) -> int:
|
||||
"""
|
||||
Dispatches events to python user's stream event callbacks
|
||||
"""
|
||||
@ -19,14 +19,14 @@
|
||||
cb(self, events, opaque)
|
||||
return 0
|
||||
|
||||
def eventAddCallback(self, events, cb, opaque):
|
||||
def eventAddCallback(self, events: int, cb: Callable[['virStream', int, _T], None], opaque: _T) -> None:
|
||||
self.cb = cb
|
||||
cbData = {"stream": self, "cb": cb, "opaque": opaque}
|
||||
ret = libvirtmod.virStreamEventAddCallback(self._o, events, cbData)
|
||||
if ret == -1:
|
||||
raise libvirtError('virStreamEventAddCallback() failed')
|
||||
|
||||
def recvAll(self, handler, opaque):
|
||||
def recvAll(self, handler: Callable[['virStream', bytes, _T], int], opaque: _T) -> None:
|
||||
"""Receive the entire data stream, sending the data to the
|
||||
requested data sink. This is simply a convenient alternative
|
||||
to virStreamRecv, for apps that do blocking-I/O.
|
||||
@ -59,7 +59,7 @@
|
||||
pass
|
||||
raise e
|
||||
|
||||
def sendAll(self, handler, opaque):
|
||||
def sendAll(self, handler: Callable[['virStream', int, _T], bytes], opaque: _T) -> None:
|
||||
"""
|
||||
Send the entire data stream, reading the data from the
|
||||
requested data source. This is simply a convenient alternative
|
||||
@ -92,7 +92,7 @@
|
||||
raise libvirtError("cannot use sendAll with "
|
||||
"nonblocking stream")
|
||||
|
||||
def recv(self, nbytes):
|
||||
def recv(self, nbytes: int) -> bytes:
|
||||
"""Reads a series of bytes from the stream. This method may
|
||||
block the calling application for an arbitrary amount
|
||||
of time.
|
||||
@ -110,7 +110,7 @@
|
||||
raise libvirtError('virStreamRecv() failed')
|
||||
return ret
|
||||
|
||||
def send(self, data):
|
||||
def send(self, data: bytes) -> int:
|
||||
"""Write a series of bytes to the stream. This method may
|
||||
block the calling application for an arbitrary amount
|
||||
of time. Once an application has finished sending data
|
||||
@ -129,7 +129,7 @@
|
||||
raise libvirtError('virStreamSend() failed')
|
||||
return ret
|
||||
|
||||
def recvHole(self, flags=0):
|
||||
def recvHole(self, flags: int = 0) -> int:
|
||||
"""This method is used to determine the length in bytes
|
||||
of the empty space to be created in a stream's target
|
||||
file when uploading or downloading sparsely populated
|
||||
@ -140,7 +140,7 @@
|
||||
raise libvirtError('virStreamRecvHole() failed')
|
||||
return ret
|
||||
|
||||
def sendHole(self, length, flags=0):
|
||||
def sendHole(self, length: int, flags: int = 0) -> int:
|
||||
"""Rather than transmitting empty file space, this method
|
||||
directs the stream target to create length bytes of empty
|
||||
space. This method would be used when uploading or
|
||||
@ -152,7 +152,7 @@
|
||||
raise libvirtError('virStreamSendHole() failed')
|
||||
return ret
|
||||
|
||||
def recvFlags(self, nbytes, flags=0):
|
||||
def recvFlags(self, nbytes: int, flags: int = 0) -> Union[bytes, int]:
|
||||
"""Reads a series of bytes from the stream. This method may
|
||||
block the calling application for an arbitrary amount
|
||||
of time. This is just like recv except it has flags
|
||||
@ -171,7 +171,7 @@
|
||||
raise libvirtError('virStreamRecvFlags() failed')
|
||||
return ret
|
||||
|
||||
def sparseRecvAll(self, handler, holeHandler, opaque):
|
||||
def sparseRecvAll(self, handler: Callable[['virStream', bytes, _T], Union[bytes, int]], holeHandler: Callable[['virStream', int, _T], Optional[int]], opaque: _T) -> None:
|
||||
"""Receive the entire data stream, sending the data to
|
||||
the requested data sink handler and calling the skip
|
||||
holeHandler to generate holes for sparse stream targets.
|
||||
@ -219,7 +219,7 @@
|
||||
self.abort()
|
||||
raise RuntimeError("sparseRecvAll handler returned %d" % ret)
|
||||
|
||||
def sparseSendAll(self, handler, holeHandler, skipHandler, opaque):
|
||||
def sparseSendAll(self, handler: Callable[['virStream', int, _T], Union[bytes, int]], holeHandler: Callable[['virStream', _T], Tuple[bool, int]], skipHandler: Callable[['virStream', int, _T], int], opaque: _T) -> None:
|
||||
"""Send the entire data stream, reading the data from the
|
||||
requested data source. This is simply a convenient
|
||||
alternative to virStreamSend, for apps that do
|
||||
@ -269,6 +269,7 @@
|
||||
if not got:
|
||||
break
|
||||
|
||||
assert isinstance(got, bytes)
|
||||
ret = self.send(got)
|
||||
if ret == -2:
|
||||
raise libvirtError("cannot use sparseSendAll with "
|
||||
|
Reference in New Issue
Block a user