删除了不常用的内容, 并使得Config能与3.10兼容
This commit is contained in:
@@ -34,14 +34,14 @@ class SingletonModel[T](IModel):
|
||||
_InjectInstances:Dict[type,Any] = {}
|
||||
|
||||
@staticmethod
|
||||
def GetInstance(t:Typen[T]) -> T:
|
||||
def GetInstance(t:type) -> T:
|
||||
return SingletonModel._InjectInstances[t]
|
||||
|
||||
@staticmethod
|
||||
def SetInstance(t:Typen[T], obj:T) -> None:
|
||||
def SetInstance(t:type, obj:T) -> None:
|
||||
SingletonModel._InjectInstances[t] = obj
|
||||
|
||||
def __init__(self, t:Typen[T]) -> None:
|
||||
def __init__(self, t:type) -> None:
|
||||
self.typen: type = t
|
||||
|
||||
|
||||
@@ -140,7 +140,7 @@ class Architecture:
|
||||
return type_ in cls._RegisteredObjects
|
||||
|
||||
@classmethod
|
||||
def Get[T](cls, type_:Typen[T]) -> T:
|
||||
def Get(cls, type_:type) -> Any:
|
||||
return cls._RegisteredObjects[type_]
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -204,18 +204,17 @@ try:
|
||||
except ImportError:
|
||||
InternalImportingThrow("Internal", ["pydantic"])
|
||||
|
||||
type Typen[_T] = type
|
||||
|
||||
type Action = Callable[[], None]
|
||||
type ClosuresCallable[_T] = Union[Callable[[Optional[None]], _T], Typen[_T]]
|
||||
T = TypeVar("T")
|
||||
Action = TypeVar("Action", bound=Callable[[], None])
|
||||
ClosuresCallable = Union[Callable[[Optional[None]], Any], type]
|
||||
|
||||
def AssemblyTypen(obj:Any) -> str:
|
||||
if isinstance(obj, type):
|
||||
return f"{obj.__module__}.{obj.__name__}, "\
|
||||
f"{obj.Assembly() if hasattr(obj, "Assembly") else "Global"}"
|
||||
f"{obj.Assembly() if hasattr(obj, 'Assembly') else 'Global'}"
|
||||
else:
|
||||
return f"{obj.__class__.__module__}.{obj.__class__.__name__}, "\
|
||||
f"{obj.GetAssembly() if hasattr(obj, "GetAssembly") else "Global"}"
|
||||
f"{obj.GetAssembly() if hasattr(obj, 'GetAssembly') else 'Global'}"
|
||||
def ReadAssemblyTypen(
|
||||
assembly_typen: str,
|
||||
*,
|
||||
@@ -232,68 +231,13 @@ def ReadAssemblyTypen(
|
||||
target_type = getattr(importlib.import_module(module_name), class_name)
|
||||
return target_type, assembly_name
|
||||
|
||||
# using as c#: event
|
||||
class ActionEvent[_Call:Callable]:
|
||||
def __init__(self, actions:Sequence[_Call]):
|
||||
super().__init__()
|
||||
self._actions: List[Callable] = [action for action in actions]
|
||||
self.call_indexs: List[int] = [i for i in range(len(actions))]
|
||||
self.last_result: List[Any] = []
|
||||
def CallFuncWithoutCallIndexControl(self, index:int, *args, **kwargs) -> Union[Any, Exception]:
|
||||
try:
|
||||
return self._actions[index](*args, **kwargs)
|
||||
except Exception as ex:
|
||||
return ex
|
||||
def CallFunc(self, index:int, *args, **kwargs) -> Union[Any, Exception]:
|
||||
return self.CallFuncWithoutCallIndexControl(self.call_indexs[index], *args, **kwargs)
|
||||
def _InjectInvoke(self, *args, **kwargs):
|
||||
result:List[Any] = []
|
||||
for index in range(self.CallMaxCount):
|
||||
result.append(self.CallFunc(index, *args, **kwargs))
|
||||
return result
|
||||
def Invoke(self, *args, **kwargs) -> Union[Self, bool]:
|
||||
self.last_result = self._InjectInvoke(*args, **kwargs)
|
||||
return self
|
||||
def InitCallIndex(self):
|
||||
self.call_indexs = [i for i in range(len(self._actions))]
|
||||
def AddAction(self, action:_Call):
|
||||
self._actions.append(action)
|
||||
self.call_indexs.append(len(self._actions)-1)
|
||||
return self
|
||||
def AddActions(self, actions:Sequence[_Call]):
|
||||
for action in actions:
|
||||
self.AddAction(action)
|
||||
return self
|
||||
def _InternalRemoveAction(self, action:_Call):
|
||||
if action in self._actions:
|
||||
index = self._actions.index(action)
|
||||
self._actions.remove(action)
|
||||
self.call_indexs.remove(index)
|
||||
for i in range(len(self.call_indexs)):
|
||||
if self.call_indexs[i] > index:
|
||||
self.call_indexs[i] -= 1
|
||||
return True
|
||||
return False
|
||||
def RemoveAction(self, action:_Call):
|
||||
while self._InternalRemoveAction(action):
|
||||
pass
|
||||
return self
|
||||
def IsValid(self):
|
||||
return not any(isinstance(x, Exception) for x in self.last_result)
|
||||
def __bool__(self):
|
||||
return self.IsValid()
|
||||
@property
|
||||
def CallMaxCount(self):
|
||||
return len(self.call_indexs)
|
||||
@property
|
||||
def ActionCount(self):
|
||||
return len(self._actions)
|
||||
|
||||
# region instance
|
||||
|
||||
# threads
|
||||
|
||||
class atomic[_T]:
|
||||
_T = TypeVar("_T")
|
||||
|
||||
class atomic(Generic[_T]):
|
||||
def __init__(
|
||||
self,
|
||||
value: _T,
|
||||
@@ -320,13 +264,13 @@ class atomic[_T]:
|
||||
return self.FetchAdd(value)
|
||||
def __sub__(self, value:_T):
|
||||
return self.FetchSub(value)
|
||||
def __iadd__(self, value:_T) -> Self:
|
||||
def __iadd__(self, value:_T) -> 'atomic[_T]':
|
||||
self.FetchAdd(value)
|
||||
return self
|
||||
def __isub__(self, value:_T) -> Self:
|
||||
def __isub__(self, value:_T) -> 'atomic[_T]':
|
||||
self.FetchSub(value)
|
||||
return self
|
||||
def __enter__(self) -> Self:
|
||||
def __enter__(self) -> 'atomic[_T]':
|
||||
self._is_in_with = True
|
||||
self.locker.acquire()
|
||||
return self
|
||||
@@ -482,7 +426,7 @@ class PlatformIndicator:
|
||||
"""
|
||||
return "Assets/"
|
||||
|
||||
class DescriptiveIndicator[T]:
|
||||
class DescriptiveIndicator(Generic[T]):
|
||||
def __init__(self, description:str, value:T) -> None:
|
||||
self.descripion : str = description
|
||||
self.value : T = value
|
||||
|
||||
@@ -366,7 +366,7 @@ class EasySave:
|
||||
@overload
|
||||
@staticmethod
|
||||
def Read[T](
|
||||
rtype: Typen[T],
|
||||
rtype: type,
|
||||
file: Optional[ToolFile|str] = None,
|
||||
*,
|
||||
setting: Optional[EasySaveSetting] = None
|
||||
|
||||
@@ -60,7 +60,7 @@ class ToolFile(BaseModel):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
filePath: Union[str, Self],
|
||||
filePath: Union[str, 'ToolFile'],
|
||||
):
|
||||
filePath = os.path.expandvars(str(filePath))
|
||||
if ":" in filePath:
|
||||
@@ -152,7 +152,7 @@ class ToolFile(BaseModel):
|
||||
else:
|
||||
os.remove(self.OriginFullPath)
|
||||
return self
|
||||
def Copy(self, targetPath:Optional[Union[Self, str]]=None):
|
||||
def Copy(self, targetPath:Optional[Union['ToolFile', str]]=None):
|
||||
if targetPath is None:
|
||||
return ToolFile(self.OriginFullPath)
|
||||
if self.Exists() == False:
|
||||
@@ -162,7 +162,7 @@ class ToolFile(BaseModel):
|
||||
target_file = target_file|self.GetFilename()
|
||||
shutil.copy(self.OriginFullPath, str(target_file))
|
||||
return target_file
|
||||
def Move(self, targetPath:Union[Self, str]):
|
||||
def Move(self, targetPath:Union['ToolFile', str]):
|
||||
if self.Exists() is False:
|
||||
raise FileNotFoundError("file not found")
|
||||
target_file = ToolFile(str(targetPath))
|
||||
@@ -171,7 +171,7 @@ class ToolFile(BaseModel):
|
||||
shutil.move(self.OriginFullPath, str(target_file))
|
||||
self.OriginFullPath = target_file.OriginFullPath
|
||||
return self
|
||||
def Rename(self, newpath:Union[Self, str]):
|
||||
def Rename(self, newpath:Union['ToolFile', str]):
|
||||
if self.Exists() is False:
|
||||
raise FileNotFoundError("file not found")
|
||||
newpath = str(newpath)
|
||||
@@ -521,7 +521,7 @@ class ToolFile(BaseModel):
|
||||
self.Create()
|
||||
return self
|
||||
|
||||
def MakeFileInside(self, data:Self, is_delete_source = False):
|
||||
def MakeFileInside(self, data:'ToolFile', is_delete_source = False):
|
||||
if self.IsDir() is False:
|
||||
raise Exception("Cannot make file inside a file, because this object target is not a directory")
|
||||
result:ToolFile = self|data.GetFilename()
|
||||
@@ -643,7 +643,7 @@ class ToolFile(BaseModel):
|
||||
except Exception as e:
|
||||
raise EncryptionError(f"Encryption failed: {str(e)}")
|
||||
|
||||
def decrypt(self, key: str, algorithm: str = 'AES') -> Self:
|
||||
def decrypt(self, key: str, algorithm: str = 'AES') -> 'ToolFile':
|
||||
"""
|
||||
解密文件
|
||||
Args:
|
||||
@@ -739,7 +739,7 @@ class ToolFile(BaseModel):
|
||||
except Exception as e:
|
||||
raise HashError(f"Hash verification failed: {str(e)}")
|
||||
|
||||
def save_hash(self, algorithm: str = 'md5', output_path: Optional[str] = None) -> Self:
|
||||
def save_hash(self, algorithm: str = 'md5', output_path: Optional[str] = None) -> 'ToolFile':
|
||||
"""
|
||||
保存文件的哈希值到文件
|
||||
Args:
|
||||
@@ -860,7 +860,7 @@ class ToolFile(BaseModel):
|
||||
max_backups: int = 5,
|
||||
backup_format: str = 'zip',
|
||||
include_metadata: bool = True
|
||||
) -> Self:
|
||||
) -> 'ToolFile':
|
||||
"""
|
||||
创建文件或目录的备份
|
||||
Args:
|
||||
@@ -878,7 +878,7 @@ class ToolFile(BaseModel):
|
||||
# 生成备份目录
|
||||
if backup_dir is None:
|
||||
backup_dir = os.path.join(self.GetDir(), '.backup')
|
||||
backup_dir:Self = ToolFile(backup_dir)
|
||||
backup_dir:'ToolFile' = ToolFile(backup_dir)
|
||||
backup_dir.MustExistsPath()
|
||||
|
||||
# 生成备份文件名
|
||||
@@ -934,10 +934,10 @@ class ToolFile(BaseModel):
|
||||
|
||||
def restore_backup(
|
||||
self,
|
||||
backup_file: Union[str, Self],
|
||||
backup_file: Union[str, 'ToolFile'],
|
||||
restore_path: Optional[str] = None,
|
||||
verify_hash: bool = True
|
||||
) -> Self:
|
||||
) -> 'ToolFile':
|
||||
"""
|
||||
从备份恢复文件或目录
|
||||
Args:
|
||||
@@ -948,7 +948,7 @@ class ToolFile(BaseModel):
|
||||
恢复后的文件对象
|
||||
"""
|
||||
if not isinstance(backup_file, ToolFile):
|
||||
backup_file:Self = ToolFile(backup_file)
|
||||
backup_file:'ToolFile' = ToolFile(backup_file)
|
||||
|
||||
if not backup_file.Exists():
|
||||
raise FileNotFoundError(f"Backup file not found: {backup_file.GetFullPath()}")
|
||||
@@ -957,7 +957,7 @@ class ToolFile(BaseModel):
|
||||
# 确定恢复路径
|
||||
if restore_path is None:
|
||||
restore_path = self.GetFullPath()
|
||||
restore_path:Self = ToolFile(restore_path)
|
||||
restore_path:'ToolFile' = ToolFile(restore_path)
|
||||
|
||||
# 解压备份
|
||||
if backup_file.get_extension() == 'zip':
|
||||
@@ -984,7 +984,7 @@ class ToolFile(BaseModel):
|
||||
except Exception as e:
|
||||
raise BackupError(f"Restore failed: {str(e)}")
|
||||
|
||||
def list_backups(self) -> List[Self]:
|
||||
def list_backups(self) -> List['ToolFile']:
|
||||
"""
|
||||
列出所有备份
|
||||
Returns:
|
||||
@@ -994,7 +994,7 @@ class ToolFile(BaseModel):
|
||||
raise FileNotFoundError(f"File not found: {self.GetFullPath()}")
|
||||
|
||||
try:
|
||||
backup_dir:Self = ToolFile(os.path.join(self.GetDir(), '.backup'))
|
||||
backup_dir:'ToolFile' = ToolFile(os.path.join(self.GetDir(), '.backup'))
|
||||
if not backup_dir.Exists():
|
||||
return []
|
||||
|
||||
@@ -1036,7 +1036,7 @@ class ToolFile(BaseModel):
|
||||
execute: Optional[bool] = None,
|
||||
hidden: Optional[bool] = None,
|
||||
recursive: bool = False
|
||||
) -> Self:
|
||||
) -> 'ToolFile':
|
||||
"""
|
||||
设置文件或目录的权限
|
||||
Args:
|
||||
|
||||
Reference in New Issue
Block a user