Initial commit

This commit is contained in:
2025-10-12 13:48:41 +02:00
commit 35cd34b3ca
17 changed files with 10495 additions and 0 deletions

31
.gitignore vendored Normal file
View File

@@ -0,0 +1,31 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
# NVDA addon
*.nvda-addon
addon/manifest.ini
# SCons
.sconsign.dblite
.sconf_temp/
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
desktop.ini
# Build
build/
dist/
*.egg-info/

56
LICENSE Normal file
View File

@@ -0,0 +1,56 @@
GNU GENERAL PUBLIC LICENSE
Version 2, June 1991
Copyright (C) 1989, 1991 Free Software Foundation, Inc.
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
================================================================================
This add-on incorporates the psutil library:
psutil is distributed under the BSD license reproduced below.
Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola'
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the psutil authors nor the names of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

136
README.md Normal file
View File

@@ -0,0 +1,136 @@
# CPU Priority Manager for NVDA
An NVDA add-on that allows you to manage CPU affinity and process priority for NVDA processes, improving performance on systems with heterogeneous CPU architectures (e.g., Intel CPUs with Performance and Efficiency cores).
## Features
- **CPU Affinity Control**: Pin NVDA processes to specific CPU cores
- **Priority Management**: Set process priority from Normal to Realtime
- **Easy Configuration**: Integrated settings panel in NVDA preferences
- **Automatic Application**: Settings are applied automatically when NVDA starts
- **Child Process Support**: Applies settings to NVDA child processes as well
## Why Use This Add-on?
Modern AMD Ryzen and Intel CPUs feature a hybrid architecture with:
- **Performance cores (P-cores)**: High-performance cores for demanding tasks
- **Efficiency cores (E-cores)**: Power-efficient cores for background tasks
NVDA performs significantly better when pinned to Performance cores. This add-on allows you to pin NVDA to your CPU's Performance cores for better responsiveness and set higher process priority so NVDA theoretically gets more CPU time
## Installation
1. <a href="https://iamtalon.me/cpuPriority-1.0.0.nvda-addon">Download the .nvda-addon file</a>
2. Open it with NVDA running (press Enter on the file)
3. Follow the installation prompts
4. Restart NVDA
## Configuration
1. Open NVDA Settings (NVDA+N → Preferences → Settings)
2. Navigate to "CPU Priority Manager" category
3. Configure the following options:
### Settings
#### Enable CPU Priority Management
Check this box to activate the add-on's functionality.
#### CPU Cores
Specify which CPU cores NVDA should use, as a comma-separated list (e.g., `4,5,6,7`).
#### Process Priority
Choose the priority level for NVDA processes:
- **Normal**: Standard priority (default Windows behavior)
- **Above Normal**: Higher than normal applications
- **High**: High priority (recommended for most users)
- **Realtime**: Highest priority ⚠️ **Use with caution!**
**Recommended Setting:** High Priority
**Warning:** Realtime priority can make your system unstable if NVDA consumes too much CPU time. Only use this if you understand the implications.
### Applying Changes
After changing settings:
1. Click "OK" or "Apply" to save
2. **Restart NVDA** for changes to take effect
The add-on applies settings during NVDA startup, so a restart is required.
## How It Works
The add-on uses the `psutil` library to:
1. Identify the current NVDA process and its children
2. Set CPU affinity to restrict execution to specified cores
3. Set process priority using Windows priority classes
Settings are stored in your NVDA configuration and support configuration profiles.
## Troubleshooting
### Settings don't seem to apply
- Make sure you've restarted NVDA after changing settings
- Check the NVDA log (NVDA+F1) for any error messages
- Ensure you're running NVDA with sufficient permissions
### Invalid CPU cores error
- Make sure you're using valid core numbers (0 to N-1, where N is your total logical core count)
- Use only numbers and commas, no spaces: `0,1,2,3`
- The add-on will show your system's total core count in the settings panel
### Performance not improving
- Verify your CPU actually has Performance/Efficiency cores
- Try different core combinations
- Check Task Manager to confirm NVDA is running on the specified cores
- Consider starting with "High" priority before trying "Realtime"
### System becomes unstable
- If you set Realtime priority and experience issues:
1. Restart NVDA in safe mode (use `nvda.exe --safe-mode`)
2. Disable the add-on or change priority to "High"
3. Restart NVDA normally
### Configuration Storage
Settings are stored in `nvdaConfig\nvda.ini` under the `[cpuPriority]` section:
```ini
[cpuPriority]
enabled = True
cpuCores = 4,5,6,7
priorityLevel = HIGH
```
### Logging
The add-on logs all actions to the NVDA log. View the log with NVDA+F1 to see:
- When settings are applied
- Which processes were configured
- Any errors encountered
## Building from Source
If you want to build the add-on yourself:
1. You need python
2. Clone or download this repository
3. Install SCons: `uv tool install scons`
4. Run: `scons` in the project directory
5. The `.nvda-addon` file will be created in the project root
## License
This add-on is licensed under the GNU General Public License v2.0.
## Credits
- **psutil library**: Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola
- **NVDA**: Copyright (c) 2006-2025 NV Access
## Contributing
If you think I messed something up, feel free to poke me on <a href="https://dragonscave.space/@talon">Mastodon</a>. I had to close registrations to this forge because of a frankly insane amount of spam BS, so if you want in, let me know and I'll let you in. Alternatively, just fork the thing on your preferred forge and make some edits, tell me about it, and I'll pull in your changes.
## Disclaimer
This add-on modifies system-level process settings, so it's entirely possible something might fall over in weird circumstances. Check the safe mode above if you can't get NVDA to run anymore.
Also, this is my first time working with NVDA addons. I have very little idea of what I'm doing here as of yet. This addon works for me, it might work for you. That's about all I can say. Always happy to hear of improvements I could make, either on Mastodon or just by email or whatever. You can also hit my Discord at TalonTheDragon.

View File

@@ -0,0 +1,204 @@
# -*- coding: UTF-8 -*-
# CPU Priority Manager for NVDA
# Allows setting CPU affinity and process priority for NVDA processes
import os
import sys
# Add psutil to the path
addonDir = os.path.dirname(__file__)
sys.path.insert(0, addonDir)
try:
import psutil
except ImportError:
psutil = None
import globalPluginHandler
import gui
from gui import guiHelper, nvdaControls
from gui.settingsDialogs import SettingsPanel
import config
import wx
import addonHandler
from logHandler import log
addonHandler.initTranslation()
confspec = {
"enabled": "boolean(default=False)",
"cpuCores": "string(default='')",
"priorityLevel": "string(default='HIGH')",
}
config.conf.spec["cpuPriority"] = confspec
class CPUPrioritySettingsPanel(SettingsPanel):
# Translators: Title of the CPU Priority settings panel
title = _("CPU Priority Manager")
def makeSettings(self, settingsSizer):
sHelper = guiHelper.BoxSizerHelper(self, sizer=settingsSizer)
# Translators: Label for checkbox to enable/disable CPU priority management
self.enabledCheckbox = sHelper.addItem(
wx.CheckBox(self, label=_("&Enable CPU priority management"))
)
self.enabledCheckbox.SetValue(config.conf["cpuPriority"]["enabled"])
self.enabledCheckbox.Bind(wx.EVT_CHECKBOX, self.onEnableToggle)
# Translators: Label for text field to specify CPU cores
cpuCoresLabel = _("CPU &cores (comma-separated, e.g., 4,5,6,7):")
self.cpuCoresEdit = sHelper.addLabeledControl(cpuCoresLabel, wx.TextCtrl)
self.cpuCoresEdit.SetValue(config.conf["cpuPriority"]["cpuCores"])
# Translators: Label for priority level dropdown
priorityLabel = _("Process &priority:")
self.priorityChoice = sHelper.addLabeledControl(
priorityLabel,
wx.Choice,
choices=[
# Translators: Normal priority level
_("Normal"),
# Translators: Above normal priority level
_("Above Normal"),
# Translators: High priority level
_("High"),
# Translators: Realtime priority level
_("Realtime (Use with caution)")
]
)
# Map config values to choice indices
priorityMap = {
"NORMAL": 0,
"ABOVE_NORMAL": 1,
"HIGH": 2,
"REALTIME": 3
}
currentPriority = config.conf["cpuPriority"]["priorityLevel"]
self.priorityChoice.SetSelection(priorityMap.get(currentPriority, 2))
if psutil:
try:
cpu_count = psutil.cpu_count(logical=True)
# Translators: Information about available CPU cores
infoText = _("Your system has {count} logical CPU cores (0-{max}).").format(
count=cpu_count,
max=cpu_count - 1
)
sHelper.addItem(wx.StaticText(self, label=infoText))
except Exception:
pass
# Translators: Warning about realtime priority
warningText = _(
"Warning: Setting realtime priority may make your system unstable if NVDA uses too much CPU. "
"Use high priority for most cases. Changes require NVDA restart to take effect."
)
warningLabel = wx.StaticText(self, label=warningText)
warningLabel.Wrap(self.GetSize()[0] - 20)
sHelper.addItem(warningLabel)
self.onEnableToggle(None)
def onEnableToggle(self, evt):
"""Enable or disable controls based on enabled checkbox"""
enabled = self.enabledCheckbox.GetValue()
self.cpuCoresEdit.Enable(enabled)
self.priorityChoice.Enable(enabled)
def onSave(self):
"""Save settings to configuration"""
config.conf["cpuPriority"]["enabled"] = self.enabledCheckbox.GetValue()
config.conf["cpuPriority"]["cpuCores"] = self.cpuCoresEdit.GetValue().strip()
# Map choice indices back to config values
priorityMap = {
0: "NORMAL",
1: "ABOVE_NORMAL",
2: "HIGH",
3: "REALTIME"
}
config.conf["cpuPriority"]["priorityLevel"] = priorityMap[self.priorityChoice.GetSelection()]
class GlobalPlugin(globalPluginHandler.GlobalPlugin):
def __init__(self, *args, **kwargs):
super(GlobalPlugin, self).__init__(*args, **kwargs)
if psutil is None:
log.error("CPU Priority Manager: psutil library not available")
return
gui.settingsDialogs.NVDASettingsDialog.categoryClasses.append(CPUPrioritySettingsPanel)
if config.conf["cpuPriority"]["enabled"]:
self.applyCPUSettings()
def terminate(self):
super(GlobalPlugin, self).terminate()
try:
gui.settingsDialogs.NVDASettingsDialog.categoryClasses.remove(CPUPrioritySettingsPanel)
except (ValueError, AttributeError):
pass
def applyCPUSettings(self):
if psutil is None:
log.error("CPU Priority Manager: psutil not available, cannot apply settings")
return
try:
current_process = psutil.Process()
self.applyToProcess(current_process)
try:
children = current_process.children(recursive=True)
for child in children:
try:
self.applyToProcess(child)
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
log.debug(f"CPU Priority Manager: Could not apply settings to child process {child.pid}: {e}")
except Exception as e:
log.error(f"CPU Priority Manager: Error getting child processes: {e}")
log.info("CPU Priority Manager: Settings applied successfully")
except Exception as e:
log.error(f"CPU Priority Manager: Error applying settings: {e}")
def applyToProcess(self, process):
cpuCoresStr = config.conf["cpuPriority"]["cpuCores"]
if cpuCoresStr:
try:
# Parse CPU cores from comma-separated string
cpuCores = [int(core.strip()) for core in cpuCoresStr.split(',') if core.strip()]
if cpuCores:
process.cpu_affinity(cpuCores)
log.debug(f"CPU Priority Manager: Set CPU affinity for process {process.pid} to cores {cpuCores}")
except ValueError as e:
log.error(f"CPU Priority Manager: Invalid CPU cores format: {e}")
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
log.debug(f"CPU Priority Manager: Could not set CPU affinity for process {process.pid}: {e}")
except Exception as e:
log.error(f"CPU Priority Manager: Error setting CPU affinity: {e}")
priorityLevel = config.conf["cpuPriority"]["priorityLevel"]
try:
priorityClass = {
"NORMAL": psutil.NORMAL_PRIORITY_CLASS,
"ABOVE_NORMAL": psutil.ABOVE_NORMAL_PRIORITY_CLASS,
"HIGH": psutil.HIGH_PRIORITY_CLASS,
"REALTIME": psutil.REALTIME_PRIORITY_CLASS
}.get(priorityLevel, psutil.HIGH_PRIORITY_CLASS)
process.nice(priorityClass)
log.debug(f"CPU Priority Manager: Set priority for process {process.pid} to {priorityLevel}")
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
log.debug(f"CPU Priority Manager: Could not set priority for process {process.pid}: {e}")
except Exception as e:
log.error(f"CPU Priority Manager: Error setting priority: {e}")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,948 @@
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Common objects shared by __init__.py and _ps*.py modules.
Note: this module is imported by setup.py, so it should not import
psutil or third-party modules.
"""
import collections
import enum
import functools
import os
import socket
import stat
import sys
import threading
import warnings
from collections import namedtuple
from socket import AF_INET
from socket import SOCK_DGRAM
from socket import SOCK_STREAM
try:
from socket import AF_INET6
except ImportError:
AF_INET6 = None
try:
from socket import AF_UNIX
except ImportError:
AF_UNIX = None
PSUTIL_DEBUG = bool(os.getenv('PSUTIL_DEBUG'))
_DEFAULT = object()
# fmt: off
__all__ = [
# OS constants
'FREEBSD', 'BSD', 'LINUX', 'NETBSD', 'OPENBSD', 'MACOS', 'OSX', 'POSIX',
'SUNOS', 'WINDOWS',
# connection constants
'CONN_CLOSE', 'CONN_CLOSE_WAIT', 'CONN_CLOSING', 'CONN_ESTABLISHED',
'CONN_FIN_WAIT1', 'CONN_FIN_WAIT2', 'CONN_LAST_ACK', 'CONN_LISTEN',
'CONN_NONE', 'CONN_SYN_RECV', 'CONN_SYN_SENT', 'CONN_TIME_WAIT',
# net constants
'NIC_DUPLEX_FULL', 'NIC_DUPLEX_HALF', 'NIC_DUPLEX_UNKNOWN', # noqa: F822
# process status constants
'STATUS_DEAD', 'STATUS_DISK_SLEEP', 'STATUS_IDLE', 'STATUS_LOCKED',
'STATUS_RUNNING', 'STATUS_SLEEPING', 'STATUS_STOPPED', 'STATUS_SUSPENDED',
'STATUS_TRACING_STOP', 'STATUS_WAITING', 'STATUS_WAKE_KILL',
'STATUS_WAKING', 'STATUS_ZOMBIE', 'STATUS_PARKED',
# other constants
'ENCODING', 'ENCODING_ERRS', 'AF_INET6',
# named tuples
'pconn', 'pcputimes', 'pctxsw', 'pgids', 'pio', 'pionice', 'popenfile',
'pthread', 'puids', 'sconn', 'scpustats', 'sdiskio', 'sdiskpart',
'sdiskusage', 'snetio', 'snicaddr', 'snicstats', 'sswap', 'suser',
# utility functions
'conn_tmap', 'deprecated_method', 'isfile_strict', 'memoize',
'parse_environ_block', 'path_exists_strict', 'usage_percent',
'supports_ipv6', 'sockfam_to_enum', 'socktype_to_enum', "wrap_numbers",
'open_text', 'open_binary', 'cat', 'bcat',
'bytes2human', 'conn_to_ntuple', 'debug',
# shell utils
'hilite', 'term_supports_colors', 'print_color',
]
# fmt: on
# ===================================================================
# --- OS constants
# ===================================================================
POSIX = os.name == "posix"
WINDOWS = os.name == "nt"
LINUX = sys.platform.startswith("linux")
MACOS = sys.platform.startswith("darwin")
OSX = MACOS # deprecated alias
FREEBSD = sys.platform.startswith(("freebsd", "midnightbsd"))
OPENBSD = sys.platform.startswith("openbsd")
NETBSD = sys.platform.startswith("netbsd")
BSD = FREEBSD or OPENBSD or NETBSD
SUNOS = sys.platform.startswith(("sunos", "solaris"))
AIX = sys.platform.startswith("aix")
# ===================================================================
# --- API constants
# ===================================================================
# Process.status()
STATUS_RUNNING = "running"
STATUS_SLEEPING = "sleeping"
STATUS_DISK_SLEEP = "disk-sleep"
STATUS_STOPPED = "stopped"
STATUS_TRACING_STOP = "tracing-stop"
STATUS_ZOMBIE = "zombie"
STATUS_DEAD = "dead"
STATUS_WAKE_KILL = "wake-kill"
STATUS_WAKING = "waking"
STATUS_IDLE = "idle" # Linux, macOS, FreeBSD
STATUS_LOCKED = "locked" # FreeBSD
STATUS_WAITING = "waiting" # FreeBSD
STATUS_SUSPENDED = "suspended" # NetBSD
STATUS_PARKED = "parked" # Linux
# Process.net_connections() and psutil.net_connections()
CONN_ESTABLISHED = "ESTABLISHED"
CONN_SYN_SENT = "SYN_SENT"
CONN_SYN_RECV = "SYN_RECV"
CONN_FIN_WAIT1 = "FIN_WAIT1"
CONN_FIN_WAIT2 = "FIN_WAIT2"
CONN_TIME_WAIT = "TIME_WAIT"
CONN_CLOSE = "CLOSE"
CONN_CLOSE_WAIT = "CLOSE_WAIT"
CONN_LAST_ACK = "LAST_ACK"
CONN_LISTEN = "LISTEN"
CONN_CLOSING = "CLOSING"
CONN_NONE = "NONE"
# net_if_stats()
class NicDuplex(enum.IntEnum):
NIC_DUPLEX_FULL = 2
NIC_DUPLEX_HALF = 1
NIC_DUPLEX_UNKNOWN = 0
globals().update(NicDuplex.__members__)
# sensors_battery()
class BatteryTime(enum.IntEnum):
POWER_TIME_UNKNOWN = -1
POWER_TIME_UNLIMITED = -2
globals().update(BatteryTime.__members__)
# --- others
ENCODING = sys.getfilesystemencoding()
ENCODING_ERRS = sys.getfilesystemencodeerrors()
# ===================================================================
# --- namedtuples
# ===================================================================
# --- for system functions
# fmt: off
# psutil.swap_memory()
sswap = namedtuple('sswap', ['total', 'used', 'free', 'percent', 'sin',
'sout'])
# psutil.disk_usage()
sdiskusage = namedtuple('sdiskusage', ['total', 'used', 'free', 'percent'])
# psutil.disk_io_counters()
sdiskio = namedtuple('sdiskio', ['read_count', 'write_count',
'read_bytes', 'write_bytes',
'read_time', 'write_time'])
# psutil.disk_partitions()
sdiskpart = namedtuple('sdiskpart', ['device', 'mountpoint', 'fstype', 'opts'])
# psutil.net_io_counters()
snetio = namedtuple('snetio', ['bytes_sent', 'bytes_recv',
'packets_sent', 'packets_recv',
'errin', 'errout',
'dropin', 'dropout'])
# psutil.users()
suser = namedtuple('suser', ['name', 'terminal', 'host', 'started', 'pid'])
# psutil.net_connections()
sconn = namedtuple('sconn', ['fd', 'family', 'type', 'laddr', 'raddr',
'status', 'pid'])
# psutil.net_if_addrs()
snicaddr = namedtuple('snicaddr',
['family', 'address', 'netmask', 'broadcast', 'ptp'])
# psutil.net_if_stats()
snicstats = namedtuple('snicstats',
['isup', 'duplex', 'speed', 'mtu', 'flags'])
# psutil.cpu_stats()
scpustats = namedtuple(
'scpustats', ['ctx_switches', 'interrupts', 'soft_interrupts', 'syscalls'])
# psutil.cpu_freq()
scpufreq = namedtuple('scpufreq', ['current', 'min', 'max'])
# psutil.sensors_temperatures()
shwtemp = namedtuple(
'shwtemp', ['label', 'current', 'high', 'critical'])
# psutil.sensors_battery()
sbattery = namedtuple('sbattery', ['percent', 'secsleft', 'power_plugged'])
# psutil.sensors_fans()
sfan = namedtuple('sfan', ['label', 'current'])
# fmt: on
# --- for Process methods
# psutil.Process.cpu_times()
pcputimes = namedtuple(
'pcputimes', ['user', 'system', 'children_user', 'children_system']
)
# psutil.Process.open_files()
popenfile = namedtuple('popenfile', ['path', 'fd'])
# psutil.Process.threads()
pthread = namedtuple('pthread', ['id', 'user_time', 'system_time'])
# psutil.Process.uids()
puids = namedtuple('puids', ['real', 'effective', 'saved'])
# psutil.Process.gids()
pgids = namedtuple('pgids', ['real', 'effective', 'saved'])
# psutil.Process.io_counters()
pio = namedtuple(
'pio', ['read_count', 'write_count', 'read_bytes', 'write_bytes']
)
# psutil.Process.ionice()
pionice = namedtuple('pionice', ['ioclass', 'value'])
# psutil.Process.ctx_switches()
pctxsw = namedtuple('pctxsw', ['voluntary', 'involuntary'])
# psutil.Process.net_connections()
pconn = namedtuple(
'pconn', ['fd', 'family', 'type', 'laddr', 'raddr', 'status']
)
# psutil.net_connections() and psutil.Process.net_connections()
addr = namedtuple('addr', ['ip', 'port'])
# ===================================================================
# --- Process.net_connections() 'kind' parameter mapping
# ===================================================================
conn_tmap = {
"all": ([AF_INET, AF_INET6, AF_UNIX], [SOCK_STREAM, SOCK_DGRAM]),
"tcp": ([AF_INET, AF_INET6], [SOCK_STREAM]),
"tcp4": ([AF_INET], [SOCK_STREAM]),
"udp": ([AF_INET, AF_INET6], [SOCK_DGRAM]),
"udp4": ([AF_INET], [SOCK_DGRAM]),
"inet": ([AF_INET, AF_INET6], [SOCK_STREAM, SOCK_DGRAM]),
"inet4": ([AF_INET], [SOCK_STREAM, SOCK_DGRAM]),
"inet6": ([AF_INET6], [SOCK_STREAM, SOCK_DGRAM]),
}
if AF_INET6 is not None:
conn_tmap.update({
"tcp6": ([AF_INET6], [SOCK_STREAM]),
"udp6": ([AF_INET6], [SOCK_DGRAM]),
})
if AF_UNIX is not None and not SUNOS:
conn_tmap.update({"unix": ([AF_UNIX], [SOCK_STREAM, SOCK_DGRAM])})
# =====================================================================
# --- Exceptions
# =====================================================================
class Error(Exception):
"""Base exception class. All other psutil exceptions inherit
from this one.
"""
__module__ = 'psutil'
def _infodict(self, attrs):
info = collections.OrderedDict()
for name in attrs:
value = getattr(self, name, None)
if value or (name == "pid" and value == 0):
info[name] = value
return info
def __str__(self):
# invoked on `raise Error`
info = self._infodict(("pid", "ppid", "name"))
if info:
details = "({})".format(
", ".join([f"{k}={v!r}" for k, v in info.items()])
)
else:
details = None
return " ".join([x for x in (getattr(self, "msg", ""), details) if x])
def __repr__(self):
# invoked on `repr(Error)`
info = self._infodict(("pid", "ppid", "name", "seconds", "msg"))
details = ", ".join([f"{k}={v!r}" for k, v in info.items()])
return f"psutil.{self.__class__.__name__}({details})"
class NoSuchProcess(Error):
"""Exception raised when a process with a certain PID doesn't
or no longer exists.
"""
__module__ = 'psutil'
def __init__(self, pid, name=None, msg=None):
Error.__init__(self)
self.pid = pid
self.name = name
self.msg = msg or "process no longer exists"
def __reduce__(self):
return (self.__class__, (self.pid, self.name, self.msg))
class ZombieProcess(NoSuchProcess):
"""Exception raised when querying a zombie process. This is
raised on macOS, BSD and Solaris only, and not always: depending
on the query the OS may be able to succeed anyway.
On Linux all zombie processes are querable (hence this is never
raised). Windows doesn't have zombie processes.
"""
__module__ = 'psutil'
def __init__(self, pid, name=None, ppid=None, msg=None):
NoSuchProcess.__init__(self, pid, name, msg)
self.ppid = ppid
self.msg = msg or "PID still exists but it's a zombie"
def __reduce__(self):
return (self.__class__, (self.pid, self.name, self.ppid, self.msg))
class AccessDenied(Error):
"""Exception raised when permission to perform an action is denied."""
__module__ = 'psutil'
def __init__(self, pid=None, name=None, msg=None):
Error.__init__(self)
self.pid = pid
self.name = name
self.msg = msg or ""
def __reduce__(self):
return (self.__class__, (self.pid, self.name, self.msg))
class TimeoutExpired(Error):
"""Raised on Process.wait(timeout) if timeout expires and process
is still alive.
"""
__module__ = 'psutil'
def __init__(self, seconds, pid=None, name=None):
Error.__init__(self)
self.seconds = seconds
self.pid = pid
self.name = name
self.msg = f"timeout after {seconds} seconds"
def __reduce__(self):
return (self.__class__, (self.seconds, self.pid, self.name))
# ===================================================================
# --- utils
# ===================================================================
def usage_percent(used, total, round_=None):
"""Calculate percentage usage of 'used' against 'total'."""
try:
ret = (float(used) / total) * 100
except ZeroDivisionError:
return 0.0
else:
if round_ is not None:
ret = round(ret, round_)
return ret
def memoize(fun):
"""A simple memoize decorator for functions supporting (hashable)
positional arguments.
It also provides a cache_clear() function for clearing the cache:
>>> @memoize
... def foo()
... return 1
...
>>> foo()
1
>>> foo.cache_clear()
>>>
It supports:
- functions
- classes (acts as a @singleton)
- staticmethods
- classmethods
It does NOT support:
- methods
"""
@functools.wraps(fun)
def wrapper(*args, **kwargs):
key = (args, frozenset(sorted(kwargs.items())))
try:
return cache[key]
except KeyError:
try:
ret = cache[key] = fun(*args, **kwargs)
except Exception as err:
raise err from None
return ret
def cache_clear():
"""Clear cache."""
cache.clear()
cache = {}
wrapper.cache_clear = cache_clear
return wrapper
def memoize_when_activated(fun):
"""A memoize decorator which is disabled by default. It can be
activated and deactivated on request.
For efficiency reasons it can be used only against class methods
accepting no arguments.
>>> class Foo:
... @memoize
... def foo()
... print(1)
...
>>> f = Foo()
>>> # deactivated (default)
>>> foo()
1
>>> foo()
1
>>>
>>> # activated
>>> foo.cache_activate(self)
>>> foo()
1
>>> foo()
>>> foo()
>>>
"""
@functools.wraps(fun)
def wrapper(self):
try:
# case 1: we previously entered oneshot() ctx
ret = self._cache[fun]
except AttributeError:
# case 2: we never entered oneshot() ctx
try:
return fun(self)
except Exception as err:
raise err from None
except KeyError:
# case 3: we entered oneshot() ctx but there's no cache
# for this entry yet
try:
ret = fun(self)
except Exception as err:
raise err from None
try:
self._cache[fun] = ret
except AttributeError:
# multi-threading race condition, see:
# https://github.com/giampaolo/psutil/issues/1948
pass
return ret
def cache_activate(proc):
"""Activate cache. Expects a Process instance. Cache will be
stored as a "_cache" instance attribute.
"""
proc._cache = {}
def cache_deactivate(proc):
"""Deactivate and clear cache."""
try:
del proc._cache
except AttributeError:
pass
wrapper.cache_activate = cache_activate
wrapper.cache_deactivate = cache_deactivate
return wrapper
def isfile_strict(path):
"""Same as os.path.isfile() but does not swallow EACCES / EPERM
exceptions, see:
http://mail.python.org/pipermail/python-dev/2012-June/120787.html.
"""
try:
st = os.stat(path)
except PermissionError:
raise
except OSError:
return False
else:
return stat.S_ISREG(st.st_mode)
def path_exists_strict(path):
"""Same as os.path.exists() but does not swallow EACCES / EPERM
exceptions. See:
http://mail.python.org/pipermail/python-dev/2012-June/120787.html.
"""
try:
os.stat(path)
except PermissionError:
raise
except OSError:
return False
else:
return True
def supports_ipv6():
"""Return True if IPv6 is supported on this platform."""
if not socket.has_ipv6 or AF_INET6 is None:
return False
try:
with socket.socket(AF_INET6, socket.SOCK_STREAM) as sock:
sock.bind(("::1", 0))
return True
except OSError:
return False
def parse_environ_block(data):
"""Parse a C environ block of environment variables into a dictionary."""
# The block is usually raw data from the target process. It might contain
# trailing garbage and lines that do not look like assignments.
ret = {}
pos = 0
# localize global variable to speed up access.
WINDOWS_ = WINDOWS
while True:
next_pos = data.find("\0", pos)
# nul byte at the beginning or double nul byte means finish
if next_pos <= pos:
break
# there might not be an equals sign
equal_pos = data.find("=", pos, next_pos)
if equal_pos > pos:
key = data[pos:equal_pos]
value = data[equal_pos + 1 : next_pos]
# Windows expects environment variables to be uppercase only
if WINDOWS_:
key = key.upper()
ret[key] = value
pos = next_pos + 1
return ret
def sockfam_to_enum(num):
"""Convert a numeric socket family value to an IntEnum member.
If it's not a known member, return the numeric value itself.
"""
try:
return socket.AddressFamily(num)
except ValueError:
return num
def socktype_to_enum(num):
"""Convert a numeric socket type value to an IntEnum member.
If it's not a known member, return the numeric value itself.
"""
try:
return socket.SocketKind(num)
except ValueError:
return num
def conn_to_ntuple(fd, fam, type_, laddr, raddr, status, status_map, pid=None):
"""Convert a raw connection tuple to a proper ntuple."""
if fam in {socket.AF_INET, AF_INET6}:
if laddr:
laddr = addr(*laddr)
if raddr:
raddr = addr(*raddr)
if type_ == socket.SOCK_STREAM and fam in {AF_INET, AF_INET6}:
status = status_map.get(status, CONN_NONE)
else:
status = CONN_NONE # ignore whatever C returned to us
fam = sockfam_to_enum(fam)
type_ = socktype_to_enum(type_)
if pid is None:
return pconn(fd, fam, type_, laddr, raddr, status)
else:
return sconn(fd, fam, type_, laddr, raddr, status, pid)
def broadcast_addr(addr):
"""Given the address ntuple returned by ``net_if_addrs()``
calculates the broadcast address.
"""
import ipaddress
if not addr.address or not addr.netmask:
return None
if addr.family == socket.AF_INET:
return str(
ipaddress.IPv4Network(
f"{addr.address}/{addr.netmask}", strict=False
).broadcast_address
)
if addr.family == socket.AF_INET6:
return str(
ipaddress.IPv6Network(
f"{addr.address}/{addr.netmask}", strict=False
).broadcast_address
)
def deprecated_method(replacement):
"""A decorator which can be used to mark a method as deprecated
'replcement' is the method name which will be called instead.
"""
def outer(fun):
msg = (
f"{fun.__name__}() is deprecated and will be removed; use"
f" {replacement}() instead"
)
if fun.__doc__ is None:
fun.__doc__ = msg
@functools.wraps(fun)
def inner(self, *args, **kwargs):
warnings.warn(msg, category=DeprecationWarning, stacklevel=2)
return getattr(self, replacement)(*args, **kwargs)
return inner
return outer
class _WrapNumbers:
"""Watches numbers so that they don't overflow and wrap
(reset to zero).
"""
def __init__(self):
self.lock = threading.Lock()
self.cache = {}
self.reminders = {}
self.reminder_keys = {}
def _add_dict(self, input_dict, name):
assert name not in self.cache
assert name not in self.reminders
assert name not in self.reminder_keys
self.cache[name] = input_dict
self.reminders[name] = collections.defaultdict(int)
self.reminder_keys[name] = collections.defaultdict(set)
def _remove_dead_reminders(self, input_dict, name):
"""In case the number of keys changed between calls (e.g. a
disk disappears) this removes the entry from self.reminders.
"""
old_dict = self.cache[name]
gone_keys = set(old_dict.keys()) - set(input_dict.keys())
for gone_key in gone_keys:
for remkey in self.reminder_keys[name][gone_key]:
del self.reminders[name][remkey]
del self.reminder_keys[name][gone_key]
def run(self, input_dict, name):
"""Cache dict and sum numbers which overflow and wrap.
Return an updated copy of `input_dict`.
"""
if name not in self.cache:
# This was the first call.
self._add_dict(input_dict, name)
return input_dict
self._remove_dead_reminders(input_dict, name)
old_dict = self.cache[name]
new_dict = {}
for key in input_dict:
input_tuple = input_dict[key]
try:
old_tuple = old_dict[key]
except KeyError:
# The input dict has a new key (e.g. a new disk or NIC)
# which didn't exist in the previous call.
new_dict[key] = input_tuple
continue
bits = []
for i in range(len(input_tuple)):
input_value = input_tuple[i]
old_value = old_tuple[i]
remkey = (key, i)
if input_value < old_value:
# it wrapped!
self.reminders[name][remkey] += old_value
self.reminder_keys[name][key].add(remkey)
bits.append(input_value + self.reminders[name][remkey])
new_dict[key] = tuple(bits)
self.cache[name] = input_dict
return new_dict
def cache_clear(self, name=None):
"""Clear the internal cache, optionally only for function 'name'."""
with self.lock:
if name is None:
self.cache.clear()
self.reminders.clear()
self.reminder_keys.clear()
else:
self.cache.pop(name, None)
self.reminders.pop(name, None)
self.reminder_keys.pop(name, None)
def cache_info(self):
"""Return internal cache dicts as a tuple of 3 elements."""
with self.lock:
return (self.cache, self.reminders, self.reminder_keys)
def wrap_numbers(input_dict, name):
"""Given an `input_dict` and a function `name`, adjust the numbers
which "wrap" (restart from zero) across different calls by adding
"old value" to "new value" and return an updated dict.
"""
with _wn.lock:
return _wn.run(input_dict, name)
_wn = _WrapNumbers()
wrap_numbers.cache_clear = _wn.cache_clear
wrap_numbers.cache_info = _wn.cache_info
# The read buffer size for open() builtin. This (also) dictates how
# much data we read(2) when iterating over file lines as in:
# >>> with open(file) as f:
# ... for line in f:
# ... ...
# Default per-line buffer size for binary files is 1K. For text files
# is 8K. We use a bigger buffer (32K) in order to have more consistent
# results when reading /proc pseudo files on Linux, see:
# https://github.com/giampaolo/psutil/issues/2050
# https://github.com/giampaolo/psutil/issues/708
FILE_READ_BUFFER_SIZE = 32 * 1024
def open_binary(fname):
return open(fname, "rb", buffering=FILE_READ_BUFFER_SIZE)
def open_text(fname):
"""Open a file in text mode by using the proper FS encoding and
en/decoding error handlers.
"""
# See:
# https://github.com/giampaolo/psutil/issues/675
# https://github.com/giampaolo/psutil/pull/733
fobj = open( # noqa: SIM115
fname,
buffering=FILE_READ_BUFFER_SIZE,
encoding=ENCODING,
errors=ENCODING_ERRS,
)
try:
# Dictates per-line read(2) buffer size. Defaults is 8k. See:
# https://github.com/giampaolo/psutil/issues/2050#issuecomment-1013387546
fobj._CHUNK_SIZE = FILE_READ_BUFFER_SIZE
except AttributeError:
pass
except Exception:
fobj.close()
raise
return fobj
def cat(fname, fallback=_DEFAULT, _open=open_text):
"""Read entire file content and return it as a string. File is
opened in text mode. If specified, `fallback` is the value
returned in case of error, either if the file does not exist or
it can't be read().
"""
if fallback is _DEFAULT:
with _open(fname) as f:
return f.read()
else:
try:
with _open(fname) as f:
return f.read()
except OSError:
return fallback
def bcat(fname, fallback=_DEFAULT):
"""Same as above but opens file in binary mode."""
return cat(fname, fallback=fallback, _open=open_binary)
def bytes2human(n, format="%(value).1f%(symbol)s"):
"""Used by various scripts. See: https://code.activestate.com/recipes/578019-bytes-to-human-human-to-bytes-converter/?in=user-4178764.
>>> bytes2human(10000)
'9.8K'
>>> bytes2human(100001221)
'95.4M'
"""
symbols = ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
prefix = {}
for i, s in enumerate(symbols[1:]):
prefix[s] = 1 << (i + 1) * 10
for symbol in reversed(symbols[1:]):
if abs(n) >= prefix[symbol]:
value = float(n) / prefix[symbol]
return format % locals()
return format % dict(symbol=symbols[0], value=n)
def get_procfs_path():
"""Return updated psutil.PROCFS_PATH constant."""
return sys.modules['psutil'].PROCFS_PATH
def decode(s):
return s.decode(encoding=ENCODING, errors=ENCODING_ERRS)
# =====================================================================
# --- shell utils
# =====================================================================
@memoize
def term_supports_colors(file=sys.stdout): # pragma: no cover
if os.name == 'nt':
return True
try:
import curses
assert file.isatty()
curses.setupterm()
assert curses.tigetnum("colors") > 0
except Exception: # noqa: BLE001
return False
else:
return True
def hilite(s, color=None, bold=False): # pragma: no cover
"""Return an highlighted version of 'string'."""
if not term_supports_colors():
return s
attr = []
colors = dict(
blue='34',
brown='33',
darkgrey='30',
green='32',
grey='37',
lightblue='36',
red='91',
violet='35',
yellow='93',
)
colors[None] = '29'
try:
color = colors[color]
except KeyError:
msg = f"invalid color {color!r}; choose amongst {list(colors.keys())}"
raise ValueError(msg) from None
attr.append(color)
if bold:
attr.append('1')
return f"\x1b[{';'.join(attr)}m{s}\x1b[0m"
def print_color(
s, color=None, bold=False, file=sys.stdout
): # pragma: no cover
"""Print a colorized version of string."""
if not term_supports_colors():
print(s, file=file)
elif POSIX:
print(hilite(s, color, bold), file=file)
else:
import ctypes
DEFAULT_COLOR = 7
GetStdHandle = ctypes.windll.Kernel32.GetStdHandle
SetConsoleTextAttribute = (
ctypes.windll.Kernel32.SetConsoleTextAttribute
)
colors = dict(green=2, red=4, brown=6, yellow=6)
colors[None] = DEFAULT_COLOR
try:
color = colors[color]
except KeyError:
msg = (
f"invalid color {color!r}; choose between"
f" {list(colors.keys())!r}"
)
raise ValueError(msg) from None
if bold and color <= 7:
color += 8
handle_id = -12 if file is sys.stderr else -11
GetStdHandle.restype = ctypes.c_ulong
handle = GetStdHandle(handle_id)
SetConsoleTextAttribute(handle, color)
try:
print(s, file=file)
finally:
SetConsoleTextAttribute(handle, DEFAULT_COLOR)
def debug(msg):
"""If PSUTIL_DEBUG env var is set, print a debug message to stderr."""
if PSUTIL_DEBUG:
import inspect
fname, lineno, _, _lines, _index = inspect.getframeinfo(
inspect.currentframe().f_back
)
if isinstance(msg, Exception):
if isinstance(msg, OSError):
# ...because str(exc) may contain info about the file name
msg = f"ignoring {msg}"
else:
msg = f"ignoring {msg!r}"
print( # noqa: T201
f"psutil-debug [{fname}:{lineno}]> {msg}", file=sys.stderr
)

View File

@@ -0,0 +1,564 @@
# Copyright (c) 2009, Giampaolo Rodola'
# Copyright (c) 2017, Arnon Yaari
# All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""AIX platform implementation."""
import functools
import glob
import os
import re
import subprocess
import sys
from collections import namedtuple
from . import _common
from . import _psposix
from . import _psutil_aix as cext
from . import _psutil_posix as cext_posix
from ._common import NIC_DUPLEX_FULL
from ._common import NIC_DUPLEX_HALF
from ._common import NIC_DUPLEX_UNKNOWN
from ._common import AccessDenied
from ._common import NoSuchProcess
from ._common import ZombieProcess
from ._common import conn_to_ntuple
from ._common import get_procfs_path
from ._common import memoize_when_activated
from ._common import usage_percent
__extra__all__ = ["PROCFS_PATH"]
# =====================================================================
# --- globals
# =====================================================================
HAS_THREADS = hasattr(cext, "proc_threads")
HAS_NET_IO_COUNTERS = hasattr(cext, "net_io_counters")
HAS_PROC_IO_COUNTERS = hasattr(cext, "proc_io_counters")
PAGE_SIZE = cext_posix.getpagesize()
AF_LINK = cext_posix.AF_LINK
PROC_STATUSES = {
cext.SIDL: _common.STATUS_IDLE,
cext.SZOMB: _common.STATUS_ZOMBIE,
cext.SACTIVE: _common.STATUS_RUNNING,
cext.SSWAP: _common.STATUS_RUNNING, # TODO what status is this?
cext.SSTOP: _common.STATUS_STOPPED,
}
TCP_STATUSES = {
cext.TCPS_ESTABLISHED: _common.CONN_ESTABLISHED,
cext.TCPS_SYN_SENT: _common.CONN_SYN_SENT,
cext.TCPS_SYN_RCVD: _common.CONN_SYN_RECV,
cext.TCPS_FIN_WAIT_1: _common.CONN_FIN_WAIT1,
cext.TCPS_FIN_WAIT_2: _common.CONN_FIN_WAIT2,
cext.TCPS_TIME_WAIT: _common.CONN_TIME_WAIT,
cext.TCPS_CLOSED: _common.CONN_CLOSE,
cext.TCPS_CLOSE_WAIT: _common.CONN_CLOSE_WAIT,
cext.TCPS_LAST_ACK: _common.CONN_LAST_ACK,
cext.TCPS_LISTEN: _common.CONN_LISTEN,
cext.TCPS_CLOSING: _common.CONN_CLOSING,
cext.PSUTIL_CONN_NONE: _common.CONN_NONE,
}
proc_info_map = dict(
ppid=0,
rss=1,
vms=2,
create_time=3,
nice=4,
num_threads=5,
status=6,
ttynr=7,
)
# =====================================================================
# --- named tuples
# =====================================================================
# psutil.Process.memory_info()
pmem = namedtuple('pmem', ['rss', 'vms'])
# psutil.Process.memory_full_info()
pfullmem = pmem
# psutil.Process.cpu_times()
scputimes = namedtuple('scputimes', ['user', 'system', 'idle', 'iowait'])
# psutil.virtual_memory()
svmem = namedtuple('svmem', ['total', 'available', 'percent', 'used', 'free'])
# =====================================================================
# --- memory
# =====================================================================
def virtual_memory():
total, avail, free, _pinned, inuse = cext.virtual_mem()
percent = usage_percent((total - avail), total, round_=1)
return svmem(total, avail, percent, inuse, free)
def swap_memory():
"""Swap system memory as a (total, used, free, sin, sout) tuple."""
total, free, sin, sout = cext.swap_mem()
used = total - free
percent = usage_percent(used, total, round_=1)
return _common.sswap(total, used, free, percent, sin, sout)
# =====================================================================
# --- CPU
# =====================================================================
def cpu_times():
"""Return system-wide CPU times as a named tuple."""
ret = cext.per_cpu_times()
return scputimes(*[sum(x) for x in zip(*ret)])
def per_cpu_times():
"""Return system per-CPU times as a list of named tuples."""
ret = cext.per_cpu_times()
return [scputimes(*x) for x in ret]
def cpu_count_logical():
"""Return the number of logical CPUs in the system."""
try:
return os.sysconf("SC_NPROCESSORS_ONLN")
except ValueError:
# mimic os.cpu_count() behavior
return None
def cpu_count_cores():
cmd = ["lsdev", "-Cc", "processor"]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
stdout, stderr = (x.decode(sys.stdout.encoding) for x in (stdout, stderr))
if p.returncode != 0:
msg = f"{cmd!r} command error\n{stderr}"
raise RuntimeError(msg)
processors = stdout.strip().splitlines()
return len(processors) or None
def cpu_stats():
"""Return various CPU stats as a named tuple."""
ctx_switches, interrupts, soft_interrupts, syscalls = cext.cpu_stats()
return _common.scpustats(
ctx_switches, interrupts, soft_interrupts, syscalls
)
# =====================================================================
# --- disks
# =====================================================================
disk_io_counters = cext.disk_io_counters
disk_usage = _psposix.disk_usage
def disk_partitions(all=False):
"""Return system disk partitions."""
# TODO - the filtering logic should be better checked so that
# it tries to reflect 'df' as much as possible
retlist = []
partitions = cext.disk_partitions()
for partition in partitions:
device, mountpoint, fstype, opts = partition
if device == 'none':
device = ''
if not all:
# Differently from, say, Linux, we don't have a list of
# common fs types so the best we can do, AFAIK, is to
# filter by filesystem having a total size > 0.
if not disk_usage(mountpoint).total:
continue
ntuple = _common.sdiskpart(device, mountpoint, fstype, opts)
retlist.append(ntuple)
return retlist
# =====================================================================
# --- network
# =====================================================================
net_if_addrs = cext_posix.net_if_addrs
if HAS_NET_IO_COUNTERS:
net_io_counters = cext.net_io_counters
def net_connections(kind, _pid=-1):
"""Return socket connections. If pid == -1 return system-wide
connections (as opposed to connections opened by one process only).
"""
families, types = _common.conn_tmap[kind]
rawlist = cext.net_connections(_pid)
ret = []
for item in rawlist:
fd, fam, type_, laddr, raddr, status, pid = item
if fam not in families:
continue
if type_ not in types:
continue
nt = conn_to_ntuple(
fd,
fam,
type_,
laddr,
raddr,
status,
TCP_STATUSES,
pid=pid if _pid == -1 else None,
)
ret.append(nt)
return ret
def net_if_stats():
"""Get NIC stats (isup, duplex, speed, mtu)."""
duplex_map = {"Full": NIC_DUPLEX_FULL, "Half": NIC_DUPLEX_HALF}
names = {x[0] for x in net_if_addrs()}
ret = {}
for name in names:
mtu = cext_posix.net_if_mtu(name)
flags = cext_posix.net_if_flags(name)
# try to get speed and duplex
# TODO: rewrite this in C (entstat forks, so use truss -f to follow.
# looks like it is using an undocumented ioctl?)
duplex = ""
speed = 0
p = subprocess.Popen(
["/usr/bin/entstat", "-d", name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = p.communicate()
stdout, stderr = (
x.decode(sys.stdout.encoding) for x in (stdout, stderr)
)
if p.returncode == 0:
re_result = re.search(
r"Running: (\d+) Mbps.*?(\w+) Duplex", stdout
)
if re_result is not None:
speed = int(re_result.group(1))
duplex = re_result.group(2)
output_flags = ','.join(flags)
isup = 'running' in flags
duplex = duplex_map.get(duplex, NIC_DUPLEX_UNKNOWN)
ret[name] = _common.snicstats(isup, duplex, speed, mtu, output_flags)
return ret
# =====================================================================
# --- other system functions
# =====================================================================
def boot_time():
"""The system boot time expressed in seconds since the epoch."""
return cext.boot_time()
def users():
"""Return currently connected users as a list of namedtuples."""
retlist = []
rawlist = cext_posix.users()
localhost = (':0.0', ':0')
for item in rawlist:
user, tty, hostname, tstamp, user_process, pid = item
# note: the underlying C function includes entries about
# system boot, run level and others. We might want
# to use them in the future.
if not user_process:
continue
if hostname in localhost:
hostname = 'localhost'
nt = _common.suser(user, tty, hostname, tstamp, pid)
retlist.append(nt)
return retlist
# =====================================================================
# --- processes
# =====================================================================
def pids():
"""Returns a list of PIDs currently running on the system."""
return [int(x) for x in os.listdir(get_procfs_path()) if x.isdigit()]
def pid_exists(pid):
"""Check for the existence of a unix pid."""
return os.path.exists(os.path.join(get_procfs_path(), str(pid), "psinfo"))
def wrap_exceptions(fun):
"""Call callable into a try/except clause and translate ENOENT,
EACCES and EPERM in NoSuchProcess or AccessDenied exceptions.
"""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
pid, ppid, name = self.pid, self._ppid, self._name
try:
return fun(self, *args, **kwargs)
except (FileNotFoundError, ProcessLookupError) as err:
# ENOENT (no such file or directory) gets raised on open().
# ESRCH (no such process) can get raised on read() if
# process is gone in meantime.
if not pid_exists(pid):
raise NoSuchProcess(pid, name) from err
raise ZombieProcess(pid, name, ppid) from err
except PermissionError as err:
raise AccessDenied(pid, name) from err
return wrapper
class Process:
"""Wrapper class around underlying C implementation."""
__slots__ = ["_cache", "_name", "_ppid", "_procfs_path", "pid"]
def __init__(self, pid):
self.pid = pid
self._name = None
self._ppid = None
self._procfs_path = get_procfs_path()
def oneshot_enter(self):
self._proc_basic_info.cache_activate(self)
self._proc_cred.cache_activate(self)
def oneshot_exit(self):
self._proc_basic_info.cache_deactivate(self)
self._proc_cred.cache_deactivate(self)
@wrap_exceptions
@memoize_when_activated
def _proc_basic_info(self):
return cext.proc_basic_info(self.pid, self._procfs_path)
@wrap_exceptions
@memoize_when_activated
def _proc_cred(self):
return cext.proc_cred(self.pid, self._procfs_path)
@wrap_exceptions
def name(self):
if self.pid == 0:
return "swapper"
# note: max 16 characters
return cext.proc_name(self.pid, self._procfs_path).rstrip("\x00")
@wrap_exceptions
def exe(self):
# there is no way to get executable path in AIX other than to guess,
# and guessing is more complex than what's in the wrapping class
cmdline = self.cmdline()
if not cmdline:
return ''
exe = cmdline[0]
if os.path.sep in exe:
# relative or absolute path
if not os.path.isabs(exe):
# if cwd has changed, we're out of luck - this may be wrong!
exe = os.path.abspath(os.path.join(self.cwd(), exe))
if (
os.path.isabs(exe)
and os.path.isfile(exe)
and os.access(exe, os.X_OK)
):
return exe
# not found, move to search in PATH using basename only
exe = os.path.basename(exe)
# search for exe name PATH
for path in os.environ["PATH"].split(":"):
possible_exe = os.path.abspath(os.path.join(path, exe))
if os.path.isfile(possible_exe) and os.access(
possible_exe, os.X_OK
):
return possible_exe
return ''
@wrap_exceptions
def cmdline(self):
return cext.proc_args(self.pid)
@wrap_exceptions
def environ(self):
return cext.proc_environ(self.pid)
@wrap_exceptions
def create_time(self):
return self._proc_basic_info()[proc_info_map['create_time']]
@wrap_exceptions
def num_threads(self):
return self._proc_basic_info()[proc_info_map['num_threads']]
if HAS_THREADS:
@wrap_exceptions
def threads(self):
rawlist = cext.proc_threads(self.pid)
retlist = []
for thread_id, utime, stime in rawlist:
ntuple = _common.pthread(thread_id, utime, stime)
retlist.append(ntuple)
# The underlying C implementation retrieves all OS threads
# and filters them by PID. At this point we can't tell whether
# an empty list means there were no connections for process or
# process is no longer active so we force NSP in case the PID
# is no longer there.
if not retlist:
# will raise NSP if process is gone
os.stat(f"{self._procfs_path}/{self.pid}")
return retlist
@wrap_exceptions
def net_connections(self, kind='inet'):
ret = net_connections(kind, _pid=self.pid)
# The underlying C implementation retrieves all OS connections
# and filters them by PID. At this point we can't tell whether
# an empty list means there were no connections for process or
# process is no longer active so we force NSP in case the PID
# is no longer there.
if not ret:
# will raise NSP if process is gone
os.stat(f"{self._procfs_path}/{self.pid}")
return ret
@wrap_exceptions
def nice_get(self):
return cext_posix.getpriority(self.pid)
@wrap_exceptions
def nice_set(self, value):
return cext_posix.setpriority(self.pid, value)
@wrap_exceptions
def ppid(self):
self._ppid = self._proc_basic_info()[proc_info_map['ppid']]
return self._ppid
@wrap_exceptions
def uids(self):
real, effective, saved, _, _, _ = self._proc_cred()
return _common.puids(real, effective, saved)
@wrap_exceptions
def gids(self):
_, _, _, real, effective, saved = self._proc_cred()
return _common.puids(real, effective, saved)
@wrap_exceptions
def cpu_times(self):
t = cext.proc_cpu_times(self.pid, self._procfs_path)
return _common.pcputimes(*t)
@wrap_exceptions
def terminal(self):
ttydev = self._proc_basic_info()[proc_info_map['ttynr']]
# convert from 64-bit dev_t to 32-bit dev_t and then map the device
ttydev = ((ttydev & 0x0000FFFF00000000) >> 16) | (ttydev & 0xFFFF)
# try to match rdev of /dev/pts/* files ttydev
for dev in glob.glob("/dev/**/*"):
if os.stat(dev).st_rdev == ttydev:
return dev
return None
@wrap_exceptions
def cwd(self):
procfs_path = self._procfs_path
try:
result = os.readlink(f"{procfs_path}/{self.pid}/cwd")
return result.rstrip('/')
except FileNotFoundError:
os.stat(f"{procfs_path}/{self.pid}") # raise NSP or AD
return ""
@wrap_exceptions
def memory_info(self):
ret = self._proc_basic_info()
rss = ret[proc_info_map['rss']] * 1024
vms = ret[proc_info_map['vms']] * 1024
return pmem(rss, vms)
memory_full_info = memory_info
@wrap_exceptions
def status(self):
code = self._proc_basic_info()[proc_info_map['status']]
# XXX is '?' legit? (we're not supposed to return it anyway)
return PROC_STATUSES.get(code, '?')
def open_files(self):
# TODO rewrite without using procfiles (stat /proc/pid/fd/* and then
# find matching name of the inode)
p = subprocess.Popen(
["/usr/bin/procfiles", "-n", str(self.pid)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = p.communicate()
stdout, stderr = (
x.decode(sys.stdout.encoding) for x in (stdout, stderr)
)
if "no such process" in stderr.lower():
raise NoSuchProcess(self.pid, self._name)
procfiles = re.findall(r"(\d+): S_IFREG.*name:(.*)\n", stdout)
retlist = []
for fd, path in procfiles:
path = path.strip()
if path.startswith("//"):
path = path[1:]
if path.lower() == "cannot be retrieved":
continue
retlist.append(_common.popenfile(path, int(fd)))
return retlist
@wrap_exceptions
def num_fds(self):
if self.pid == 0: # no /proc/0/fd
return 0
return len(os.listdir(f"{self._procfs_path}/{self.pid}/fd"))
@wrap_exceptions
def num_ctx_switches(self):
return _common.pctxsw(*cext.proc_num_ctx_switches(self.pid))
@wrap_exceptions
def wait(self, timeout=None):
return _psposix.wait_pid(self.pid, timeout, self._name)
if HAS_PROC_IO_COUNTERS:
@wrap_exceptions
def io_counters(self):
try:
rc, wc, rb, wb = cext.proc_io_counters(self.pid)
except OSError as err:
# if process is terminated, proc_io_counters returns OSError
# instead of NSP
if not pid_exists(self.pid):
raise NoSuchProcess(self.pid, self._name) from err
raise
return _common.pio(rc, wc, rb, wb)

View File

@@ -0,0 +1,946 @@
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""FreeBSD, OpenBSD and NetBSD platforms implementation."""
import contextlib
import errno
import functools
import os
from collections import defaultdict
from collections import namedtuple
from xml.etree import ElementTree # noqa: ICN001
from . import _common
from . import _psposix
from . import _psutil_bsd as cext
from . import _psutil_posix as cext_posix
from ._common import FREEBSD
from ._common import NETBSD
from ._common import OPENBSD
from ._common import AccessDenied
from ._common import NoSuchProcess
from ._common import ZombieProcess
from ._common import conn_tmap
from ._common import conn_to_ntuple
from ._common import debug
from ._common import memoize
from ._common import memoize_when_activated
from ._common import usage_percent
__extra__all__ = []
# =====================================================================
# --- globals
# =====================================================================
if FREEBSD:
PROC_STATUSES = {
cext.SIDL: _common.STATUS_IDLE,
cext.SRUN: _common.STATUS_RUNNING,
cext.SSLEEP: _common.STATUS_SLEEPING,
cext.SSTOP: _common.STATUS_STOPPED,
cext.SZOMB: _common.STATUS_ZOMBIE,
cext.SWAIT: _common.STATUS_WAITING,
cext.SLOCK: _common.STATUS_LOCKED,
}
elif OPENBSD:
PROC_STATUSES = {
cext.SIDL: _common.STATUS_IDLE,
cext.SSLEEP: _common.STATUS_SLEEPING,
cext.SSTOP: _common.STATUS_STOPPED,
# According to /usr/include/sys/proc.h SZOMB is unused.
# test_zombie_process() shows that SDEAD is the right
# equivalent. Also it appears there's no equivalent of
# psutil.STATUS_DEAD. SDEAD really means STATUS_ZOMBIE.
# cext.SZOMB: _common.STATUS_ZOMBIE,
cext.SDEAD: _common.STATUS_ZOMBIE,
cext.SZOMB: _common.STATUS_ZOMBIE,
# From http://www.eecs.harvard.edu/~margo/cs161/videos/proc.h.txt
# OpenBSD has SRUN and SONPROC: SRUN indicates that a process
# is runnable but *not* yet running, i.e. is on a run queue.
# SONPROC indicates that the process is actually executing on
# a CPU, i.e. it is no longer on a run queue.
# As such we'll map SRUN to STATUS_WAKING and SONPROC to
# STATUS_RUNNING
cext.SRUN: _common.STATUS_WAKING,
cext.SONPROC: _common.STATUS_RUNNING,
}
elif NETBSD:
PROC_STATUSES = {
cext.SIDL: _common.STATUS_IDLE,
cext.SSLEEP: _common.STATUS_SLEEPING,
cext.SSTOP: _common.STATUS_STOPPED,
cext.SZOMB: _common.STATUS_ZOMBIE,
cext.SRUN: _common.STATUS_WAKING,
cext.SONPROC: _common.STATUS_RUNNING,
}
TCP_STATUSES = {
cext.TCPS_ESTABLISHED: _common.CONN_ESTABLISHED,
cext.TCPS_SYN_SENT: _common.CONN_SYN_SENT,
cext.TCPS_SYN_RECEIVED: _common.CONN_SYN_RECV,
cext.TCPS_FIN_WAIT_1: _common.CONN_FIN_WAIT1,
cext.TCPS_FIN_WAIT_2: _common.CONN_FIN_WAIT2,
cext.TCPS_TIME_WAIT: _common.CONN_TIME_WAIT,
cext.TCPS_CLOSED: _common.CONN_CLOSE,
cext.TCPS_CLOSE_WAIT: _common.CONN_CLOSE_WAIT,
cext.TCPS_LAST_ACK: _common.CONN_LAST_ACK,
cext.TCPS_LISTEN: _common.CONN_LISTEN,
cext.TCPS_CLOSING: _common.CONN_CLOSING,
cext.PSUTIL_CONN_NONE: _common.CONN_NONE,
}
PAGESIZE = cext_posix.getpagesize()
AF_LINK = cext_posix.AF_LINK
HAS_PROC_NUM_THREADS = hasattr(cext, "proc_num_threads")
kinfo_proc_map = dict(
ppid=0,
status=1,
real_uid=2,
effective_uid=3,
saved_uid=4,
real_gid=5,
effective_gid=6,
saved_gid=7,
ttynr=8,
create_time=9,
ctx_switches_vol=10,
ctx_switches_unvol=11,
read_io_count=12,
write_io_count=13,
user_time=14,
sys_time=15,
ch_user_time=16,
ch_sys_time=17,
rss=18,
vms=19,
memtext=20,
memdata=21,
memstack=22,
cpunum=23,
name=24,
)
# =====================================================================
# --- named tuples
# =====================================================================
# fmt: off
# psutil.virtual_memory()
svmem = namedtuple(
'svmem', ['total', 'available', 'percent', 'used', 'free',
'active', 'inactive', 'buffers', 'cached', 'shared', 'wired'])
# psutil.cpu_times()
scputimes = namedtuple(
'scputimes', ['user', 'nice', 'system', 'idle', 'irq'])
# psutil.Process.memory_info()
pmem = namedtuple('pmem', ['rss', 'vms', 'text', 'data', 'stack'])
# psutil.Process.memory_full_info()
pfullmem = pmem
# psutil.Process.cpu_times()
pcputimes = namedtuple('pcputimes',
['user', 'system', 'children_user', 'children_system'])
# psutil.Process.memory_maps(grouped=True)
pmmap_grouped = namedtuple(
'pmmap_grouped', 'path rss, private, ref_count, shadow_count')
# psutil.Process.memory_maps(grouped=False)
pmmap_ext = namedtuple(
'pmmap_ext', 'addr, perms path rss, private, ref_count, shadow_count')
# psutil.disk_io_counters()
if FREEBSD:
sdiskio = namedtuple('sdiskio', ['read_count', 'write_count',
'read_bytes', 'write_bytes',
'read_time', 'write_time',
'busy_time'])
else:
sdiskio = namedtuple('sdiskio', ['read_count', 'write_count',
'read_bytes', 'write_bytes'])
# fmt: on
# =====================================================================
# --- memory
# =====================================================================
def virtual_memory():
mem = cext.virtual_mem()
if NETBSD:
total, free, active, inactive, wired, cached = mem
# On NetBSD buffers and shared mem is determined via /proc.
# The C ext set them to 0.
with open('/proc/meminfo', 'rb') as f:
for line in f:
if line.startswith(b'Buffers:'):
buffers = int(line.split()[1]) * 1024
elif line.startswith(b'MemShared:'):
shared = int(line.split()[1]) * 1024
# Before avail was calculated as (inactive + cached + free),
# same as zabbix, but it turned out it could exceed total (see
# #2233), so zabbix seems to be wrong. Htop calculates it
# differently, and the used value seem more realistic, so let's
# match htop.
# https://github.com/htop-dev/htop/blob/e7f447b/netbsd/NetBSDProcessList.c#L162
# https://github.com/zabbix/zabbix/blob/af5e0f8/src/libs/zbxsysinfo/netbsd/memory.c#L135
used = active + wired
avail = total - used
else:
total, free, active, inactive, wired, cached, buffers, shared = mem
# matches freebsd-memory CLI:
# * https://people.freebsd.org/~rse/dist/freebsd-memory
# * https://www.cyberciti.biz/files/scripts/freebsd-memory.pl.txt
# matches zabbix:
# * https://github.com/zabbix/zabbix/blob/af5e0f8/src/libs/zbxsysinfo/freebsd/memory.c#L143
avail = inactive + cached + free
used = active + wired + cached
percent = usage_percent((total - avail), total, round_=1)
return svmem(
total,
avail,
percent,
used,
free,
active,
inactive,
buffers,
cached,
shared,
wired,
)
def swap_memory():
"""System swap memory as (total, used, free, sin, sout) namedtuple."""
total, used, free, sin, sout = cext.swap_mem()
percent = usage_percent(used, total, round_=1)
return _common.sswap(total, used, free, percent, sin, sout)
# =====================================================================
# --- CPU
# =====================================================================
def cpu_times():
"""Return system per-CPU times as a namedtuple."""
user, nice, system, idle, irq = cext.cpu_times()
return scputimes(user, nice, system, idle, irq)
def per_cpu_times():
"""Return system CPU times as a namedtuple."""
ret = []
for cpu_t in cext.per_cpu_times():
user, nice, system, idle, irq = cpu_t
item = scputimes(user, nice, system, idle, irq)
ret.append(item)
return ret
def cpu_count_logical():
"""Return the number of logical CPUs in the system."""
return cext.cpu_count_logical()
if OPENBSD or NETBSD:
def cpu_count_cores():
# OpenBSD and NetBSD do not implement this.
return 1 if cpu_count_logical() == 1 else None
else:
def cpu_count_cores():
"""Return the number of CPU cores in the system."""
# From the C module we'll get an XML string similar to this:
# http://manpages.ubuntu.com/manpages/precise/man4/smp.4freebsd.html
# We may get None in case "sysctl kern.sched.topology_spec"
# is not supported on this BSD version, in which case we'll mimic
# os.cpu_count() and return None.
ret = None
s = cext.cpu_topology()
if s is not None:
# get rid of padding chars appended at the end of the string
index = s.rfind("</groups>")
if index != -1:
s = s[: index + 9]
root = ElementTree.fromstring(s)
try:
ret = len(root.findall('group/children/group/cpu')) or None
finally:
# needed otherwise it will memleak
root.clear()
if not ret:
# If logical CPUs == 1 it's obvious we' have only 1 core.
if cpu_count_logical() == 1:
return 1
return ret
def cpu_stats():
"""Return various CPU stats as a named tuple."""
if FREEBSD:
# Note: the C ext is returning some metrics we are not exposing:
# traps.
ctxsw, intrs, soft_intrs, syscalls, _traps = cext.cpu_stats()
elif NETBSD:
# XXX
# Note about intrs: the C extension returns 0. intrs
# can be determined via /proc/stat; it has the same value as
# soft_intrs thought so the kernel is faking it (?).
#
# Note about syscalls: the C extension always sets it to 0 (?).
#
# Note: the C ext is returning some metrics we are not exposing:
# traps, faults and forks.
ctxsw, intrs, soft_intrs, syscalls, _traps, _faults, _forks = (
cext.cpu_stats()
)
with open('/proc/stat', 'rb') as f:
for line in f:
if line.startswith(b'intr'):
intrs = int(line.split()[1])
elif OPENBSD:
# Note: the C ext is returning some metrics we are not exposing:
# traps, faults and forks.
ctxsw, intrs, soft_intrs, syscalls, _traps, _faults, _forks = (
cext.cpu_stats()
)
return _common.scpustats(ctxsw, intrs, soft_intrs, syscalls)
if FREEBSD:
def cpu_freq():
"""Return frequency metrics for CPUs. As of Dec 2018 only
CPU 0 appears to be supported by FreeBSD and all other cores
match the frequency of CPU 0.
"""
ret = []
num_cpus = cpu_count_logical()
for cpu in range(num_cpus):
try:
current, available_freq = cext.cpu_freq(cpu)
except NotImplementedError:
continue
if available_freq:
try:
min_freq = int(available_freq.split(" ")[-1].split("/")[0])
except (IndexError, ValueError):
min_freq = None
try:
max_freq = int(available_freq.split(" ")[0].split("/")[0])
except (IndexError, ValueError):
max_freq = None
ret.append(_common.scpufreq(current, min_freq, max_freq))
return ret
elif OPENBSD:
def cpu_freq():
curr = float(cext.cpu_freq())
return [_common.scpufreq(curr, 0.0, 0.0)]
# =====================================================================
# --- disks
# =====================================================================
def disk_partitions(all=False):
"""Return mounted disk partitions as a list of namedtuples.
'all' argument is ignored, see:
https://github.com/giampaolo/psutil/issues/906.
"""
retlist = []
partitions = cext.disk_partitions()
for partition in partitions:
device, mountpoint, fstype, opts = partition
ntuple = _common.sdiskpart(device, mountpoint, fstype, opts)
retlist.append(ntuple)
return retlist
disk_usage = _psposix.disk_usage
disk_io_counters = cext.disk_io_counters
# =====================================================================
# --- network
# =====================================================================
net_io_counters = cext.net_io_counters
net_if_addrs = cext_posix.net_if_addrs
def net_if_stats():
"""Get NIC stats (isup, duplex, speed, mtu)."""
names = net_io_counters().keys()
ret = {}
for name in names:
try:
mtu = cext_posix.net_if_mtu(name)
flags = cext_posix.net_if_flags(name)
duplex, speed = cext_posix.net_if_duplex_speed(name)
except OSError as err:
# https://github.com/giampaolo/psutil/issues/1279
if err.errno != errno.ENODEV:
raise
else:
if hasattr(_common, 'NicDuplex'):
duplex = _common.NicDuplex(duplex)
output_flags = ','.join(flags)
isup = 'running' in flags
ret[name] = _common.snicstats(
isup, duplex, speed, mtu, output_flags
)
return ret
def net_connections(kind):
"""System-wide network connections."""
families, types = conn_tmap[kind]
ret = set()
if OPENBSD:
rawlist = cext.net_connections(-1, families, types)
elif NETBSD:
rawlist = cext.net_connections(-1, kind)
else: # FreeBSD
rawlist = cext.net_connections(families, types)
for item in rawlist:
fd, fam, type, laddr, raddr, status, pid = item
nt = conn_to_ntuple(
fd, fam, type, laddr, raddr, status, TCP_STATUSES, pid
)
ret.add(nt)
return list(ret)
# =====================================================================
# --- sensors
# =====================================================================
if FREEBSD:
def sensors_battery():
"""Return battery info."""
try:
percent, minsleft, power_plugged = cext.sensors_battery()
except NotImplementedError:
# See: https://github.com/giampaolo/psutil/issues/1074
return None
power_plugged = power_plugged == 1
if power_plugged:
secsleft = _common.POWER_TIME_UNLIMITED
elif minsleft == -1:
secsleft = _common.POWER_TIME_UNKNOWN
else:
secsleft = minsleft * 60
return _common.sbattery(percent, secsleft, power_plugged)
def sensors_temperatures():
"""Return CPU cores temperatures if available, else an empty dict."""
ret = defaultdict(list)
num_cpus = cpu_count_logical()
for cpu in range(num_cpus):
try:
current, high = cext.sensors_cpu_temperature(cpu)
if high <= 0:
high = None
name = f"Core {cpu}"
ret["coretemp"].append(
_common.shwtemp(name, current, high, high)
)
except NotImplementedError:
pass
return ret
# =====================================================================
# --- other system functions
# =====================================================================
def boot_time():
"""The system boot time expressed in seconds since the epoch."""
return cext.boot_time()
if NETBSD:
try:
INIT_BOOT_TIME = boot_time()
except Exception as err: # noqa: BLE001
# Don't want to crash at import time.
debug(f"ignoring exception on import: {err!r}")
INIT_BOOT_TIME = 0
def adjust_proc_create_time(ctime):
"""Account for system clock updates."""
if INIT_BOOT_TIME == 0:
return ctime
diff = INIT_BOOT_TIME - boot_time()
if diff == 0 or abs(diff) < 1:
return ctime
debug("system clock was updated; adjusting process create_time()")
if diff < 0:
return ctime - diff
return ctime + diff
def users():
"""Return currently connected users as a list of namedtuples."""
retlist = []
rawlist = cext.users() if OPENBSD else cext_posix.users()
for item in rawlist:
user, tty, hostname, tstamp, pid = item
if tty == '~':
continue # reboot or shutdown
nt = _common.suser(user, tty or None, hostname, tstamp, pid)
retlist.append(nt)
return retlist
# =====================================================================
# --- processes
# =====================================================================
@memoize
def _pid_0_exists():
try:
Process(0).name()
except NoSuchProcess:
return False
except AccessDenied:
return True
else:
return True
def pids():
"""Returns a list of PIDs currently running on the system."""
ret = cext.pids()
if OPENBSD and (0 not in ret) and _pid_0_exists():
# On OpenBSD the kernel does not return PID 0 (neither does
# ps) but it's actually querable (Process(0) will succeed).
ret.insert(0, 0)
return ret
if NETBSD:
def pid_exists(pid):
exists = _psposix.pid_exists(pid)
if not exists:
# We do this because _psposix.pid_exists() lies in case of
# zombie processes.
return pid in pids()
else:
return True
elif OPENBSD:
def pid_exists(pid):
exists = _psposix.pid_exists(pid)
if not exists:
return False
else:
# OpenBSD seems to be the only BSD platform where
# _psposix.pid_exists() returns True for thread IDs (tids),
# so we can't use it.
return pid in pids()
else: # FreeBSD
pid_exists = _psposix.pid_exists
def is_zombie(pid):
try:
st = cext.proc_oneshot_info(pid)[kinfo_proc_map['status']]
return PROC_STATUSES.get(st) == _common.STATUS_ZOMBIE
except OSError:
return False
def wrap_exceptions(fun):
"""Decorator which translates bare OSError exceptions into
NoSuchProcess and AccessDenied.
"""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
pid, ppid, name = self.pid, self._ppid, self._name
try:
return fun(self, *args, **kwargs)
except ProcessLookupError as err:
if is_zombie(pid):
raise ZombieProcess(pid, name, ppid) from err
raise NoSuchProcess(pid, name) from err
except PermissionError as err:
raise AccessDenied(pid, name) from err
except OSError as err:
if pid == 0 and 0 in pids():
raise AccessDenied(pid, name) from err
raise
return wrapper
@contextlib.contextmanager
def wrap_exceptions_procfs(inst):
"""Same as above, for routines relying on reading /proc fs."""
pid, name, ppid = inst.pid, inst._name, inst._ppid
try:
yield
except (ProcessLookupError, FileNotFoundError) as err:
# ENOENT (no such file or directory) gets raised on open().
# ESRCH (no such process) can get raised on read() if
# process is gone in meantime.
if is_zombie(inst.pid):
raise ZombieProcess(pid, name, ppid) from err
else:
raise NoSuchProcess(pid, name) from err
except PermissionError as err:
raise AccessDenied(pid, name) from err
class Process:
"""Wrapper class around underlying C implementation."""
__slots__ = ["_cache", "_name", "_ppid", "pid"]
def __init__(self, pid):
self.pid = pid
self._name = None
self._ppid = None
def _assert_alive(self):
"""Raise NSP if the process disappeared on us."""
# For those C function who do not raise NSP, possibly returning
# incorrect or incomplete result.
cext.proc_name(self.pid)
@wrap_exceptions
@memoize_when_activated
def oneshot(self):
"""Retrieves multiple process info in one shot as a raw tuple."""
ret = cext.proc_oneshot_info(self.pid)
assert len(ret) == len(kinfo_proc_map)
return ret
def oneshot_enter(self):
self.oneshot.cache_activate(self)
def oneshot_exit(self):
self.oneshot.cache_deactivate(self)
@wrap_exceptions
def name(self):
name = self.oneshot()[kinfo_proc_map['name']]
return name if name is not None else cext.proc_name(self.pid)
@wrap_exceptions
def exe(self):
if FREEBSD:
if self.pid == 0:
return '' # else NSP
return cext.proc_exe(self.pid)
elif NETBSD:
if self.pid == 0:
# /proc/0 dir exists but /proc/0/exe doesn't
return ""
with wrap_exceptions_procfs(self):
return os.readlink(f"/proc/{self.pid}/exe")
else:
# OpenBSD: exe cannot be determined; references:
# https://chromium.googlesource.com/chromium/src/base/+/
# master/base_paths_posix.cc
# We try our best guess by using which against the first
# cmdline arg (may return None).
import shutil
cmdline = self.cmdline()
if cmdline:
return shutil.which(cmdline[0]) or ""
else:
return ""
@wrap_exceptions
def cmdline(self):
if OPENBSD and self.pid == 0:
return [] # ...else it crashes
elif NETBSD:
# XXX - most of the times the underlying sysctl() call on
# NetBSD and OpenBSD returns a truncated string. Also
# /proc/pid/cmdline behaves the same so it looks like this
# is a kernel bug.
try:
return cext.proc_cmdline(self.pid)
except OSError as err:
if err.errno == errno.EINVAL:
pid, name, ppid = self.pid, self._name, self._ppid
if is_zombie(self.pid):
raise ZombieProcess(pid, name, ppid) from err
if not pid_exists(self.pid):
raise NoSuchProcess(pid, name, ppid) from err
# XXX: this happens with unicode tests. It means the C
# routine is unable to decode invalid unicode chars.
debug(f"ignoring {err!r} and returning an empty list")
return []
else:
raise
else:
return cext.proc_cmdline(self.pid)
@wrap_exceptions
def environ(self):
return cext.proc_environ(self.pid)
@wrap_exceptions
def terminal(self):
tty_nr = self.oneshot()[kinfo_proc_map['ttynr']]
tmap = _psposix.get_terminal_map()
try:
return tmap[tty_nr]
except KeyError:
return None
@wrap_exceptions
def ppid(self):
self._ppid = self.oneshot()[kinfo_proc_map['ppid']]
return self._ppid
@wrap_exceptions
def uids(self):
rawtuple = self.oneshot()
return _common.puids(
rawtuple[kinfo_proc_map['real_uid']],
rawtuple[kinfo_proc_map['effective_uid']],
rawtuple[kinfo_proc_map['saved_uid']],
)
@wrap_exceptions
def gids(self):
rawtuple = self.oneshot()
return _common.pgids(
rawtuple[kinfo_proc_map['real_gid']],
rawtuple[kinfo_proc_map['effective_gid']],
rawtuple[kinfo_proc_map['saved_gid']],
)
@wrap_exceptions
def cpu_times(self):
rawtuple = self.oneshot()
return _common.pcputimes(
rawtuple[kinfo_proc_map['user_time']],
rawtuple[kinfo_proc_map['sys_time']],
rawtuple[kinfo_proc_map['ch_user_time']],
rawtuple[kinfo_proc_map['ch_sys_time']],
)
if FREEBSD:
@wrap_exceptions
def cpu_num(self):
return self.oneshot()[kinfo_proc_map['cpunum']]
@wrap_exceptions
def memory_info(self):
rawtuple = self.oneshot()
return pmem(
rawtuple[kinfo_proc_map['rss']],
rawtuple[kinfo_proc_map['vms']],
rawtuple[kinfo_proc_map['memtext']],
rawtuple[kinfo_proc_map['memdata']],
rawtuple[kinfo_proc_map['memstack']],
)
memory_full_info = memory_info
@wrap_exceptions
def create_time(self, monotonic=False):
ctime = self.oneshot()[kinfo_proc_map['create_time']]
if NETBSD and not monotonic:
# NetBSD: ctime subject to system clock updates.
ctime = adjust_proc_create_time(ctime)
return ctime
@wrap_exceptions
def num_threads(self):
if HAS_PROC_NUM_THREADS:
# FreeBSD / NetBSD
return cext.proc_num_threads(self.pid)
else:
return len(self.threads())
@wrap_exceptions
def num_ctx_switches(self):
rawtuple = self.oneshot()
return _common.pctxsw(
rawtuple[kinfo_proc_map['ctx_switches_vol']],
rawtuple[kinfo_proc_map['ctx_switches_unvol']],
)
@wrap_exceptions
def threads(self):
# Note: on OpenSBD this (/dev/mem) requires root access.
rawlist = cext.proc_threads(self.pid)
retlist = []
for thread_id, utime, stime in rawlist:
ntuple = _common.pthread(thread_id, utime, stime)
retlist.append(ntuple)
if OPENBSD:
self._assert_alive()
return retlist
@wrap_exceptions
def net_connections(self, kind='inet'):
families, types = conn_tmap[kind]
ret = []
if NETBSD:
rawlist = cext.net_connections(self.pid, kind)
elif OPENBSD:
rawlist = cext.net_connections(self.pid, families, types)
else:
rawlist = cext.proc_net_connections(self.pid, families, types)
for item in rawlist:
fd, fam, type, laddr, raddr, status = item[:6]
if FREEBSD:
if (fam not in families) or (type not in types):
continue
nt = conn_to_ntuple(
fd, fam, type, laddr, raddr, status, TCP_STATUSES
)
ret.append(nt)
self._assert_alive()
return ret
@wrap_exceptions
def wait(self, timeout=None):
return _psposix.wait_pid(self.pid, timeout, self._name)
@wrap_exceptions
def nice_get(self):
return cext_posix.getpriority(self.pid)
@wrap_exceptions
def nice_set(self, value):
return cext_posix.setpriority(self.pid, value)
@wrap_exceptions
def status(self):
code = self.oneshot()[kinfo_proc_map['status']]
# XXX is '?' legit? (we're not supposed to return it anyway)
return PROC_STATUSES.get(code, '?')
@wrap_exceptions
def io_counters(self):
rawtuple = self.oneshot()
return _common.pio(
rawtuple[kinfo_proc_map['read_io_count']],
rawtuple[kinfo_proc_map['write_io_count']],
-1,
-1,
)
@wrap_exceptions
def cwd(self):
"""Return process current working directory."""
# sometimes we get an empty string, in which case we turn
# it into None
if OPENBSD and self.pid == 0:
return "" # ...else it would raise EINVAL
return cext.proc_cwd(self.pid)
nt_mmap_grouped = namedtuple(
'mmap', 'path rss, private, ref_count, shadow_count'
)
nt_mmap_ext = namedtuple(
'mmap', 'addr, perms path rss, private, ref_count, shadow_count'
)
@wrap_exceptions
def open_files(self):
"""Return files opened by process as a list of namedtuples."""
rawlist = cext.proc_open_files(self.pid)
return [_common.popenfile(path, fd) for path, fd in rawlist]
@wrap_exceptions
def num_fds(self):
"""Return the number of file descriptors opened by this process."""
ret = cext.proc_num_fds(self.pid)
if NETBSD:
self._assert_alive()
return ret
# --- FreeBSD only APIs
if FREEBSD:
@wrap_exceptions
def cpu_affinity_get(self):
return cext.proc_cpu_affinity_get(self.pid)
@wrap_exceptions
def cpu_affinity_set(self, cpus):
# Pre-emptively check if CPUs are valid because the C
# function has a weird behavior in case of invalid CPUs,
# see: https://github.com/giampaolo/psutil/issues/586
allcpus = set(range(len(per_cpu_times())))
for cpu in cpus:
if cpu not in allcpus:
msg = f"invalid CPU {cpu!r} (choose between {allcpus})"
raise ValueError(msg)
try:
cext.proc_cpu_affinity_set(self.pid, cpus)
except OSError as err:
# 'man cpuset_setaffinity' about EDEADLK:
# <<the call would leave a thread without a valid CPU to run
# on because the set does not overlap with the thread's
# anonymous mask>>
if err.errno in {errno.EINVAL, errno.EDEADLK}:
for cpu in cpus:
if cpu not in allcpus:
msg = (
f"invalid CPU {cpu!r} (choose between"
f" {allcpus})"
)
raise ValueError(msg) from err
raise
@wrap_exceptions
def memory_maps(self):
return cext.proc_memory_maps(self.pid)
@wrap_exceptions
def rlimit(self, resource, limits=None):
if limits is None:
return cext.proc_getrlimit(self.pid, resource)
else:
if len(limits) != 2:
msg = (
"second argument must be a (soft, hard) tuple, got"
f" {limits!r}"
)
raise ValueError(msg)
soft, hard = limits
return cext.proc_setrlimit(self.pid, resource, soft, hard)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,572 @@
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""macOS platform implementation."""
import errno
import functools
import os
from collections import namedtuple
from . import _common
from . import _psposix
from . import _psutil_osx as cext
from . import _psutil_posix as cext_posix
from ._common import AccessDenied
from ._common import NoSuchProcess
from ._common import ZombieProcess
from ._common import conn_tmap
from ._common import conn_to_ntuple
from ._common import debug
from ._common import isfile_strict
from ._common import memoize_when_activated
from ._common import parse_environ_block
from ._common import usage_percent
__extra__all__ = []
# =====================================================================
# --- globals
# =====================================================================
PAGESIZE = cext_posix.getpagesize()
AF_LINK = cext_posix.AF_LINK
TCP_STATUSES = {
cext.TCPS_ESTABLISHED: _common.CONN_ESTABLISHED,
cext.TCPS_SYN_SENT: _common.CONN_SYN_SENT,
cext.TCPS_SYN_RECEIVED: _common.CONN_SYN_RECV,
cext.TCPS_FIN_WAIT_1: _common.CONN_FIN_WAIT1,
cext.TCPS_FIN_WAIT_2: _common.CONN_FIN_WAIT2,
cext.TCPS_TIME_WAIT: _common.CONN_TIME_WAIT,
cext.TCPS_CLOSED: _common.CONN_CLOSE,
cext.TCPS_CLOSE_WAIT: _common.CONN_CLOSE_WAIT,
cext.TCPS_LAST_ACK: _common.CONN_LAST_ACK,
cext.TCPS_LISTEN: _common.CONN_LISTEN,
cext.TCPS_CLOSING: _common.CONN_CLOSING,
cext.PSUTIL_CONN_NONE: _common.CONN_NONE,
}
PROC_STATUSES = {
cext.SIDL: _common.STATUS_IDLE,
cext.SRUN: _common.STATUS_RUNNING,
cext.SSLEEP: _common.STATUS_SLEEPING,
cext.SSTOP: _common.STATUS_STOPPED,
cext.SZOMB: _common.STATUS_ZOMBIE,
}
kinfo_proc_map = dict(
ppid=0,
ruid=1,
euid=2,
suid=3,
rgid=4,
egid=5,
sgid=6,
ttynr=7,
ctime=8,
status=9,
name=10,
)
pidtaskinfo_map = dict(
cpuutime=0,
cpustime=1,
rss=2,
vms=3,
pfaults=4,
pageins=5,
numthreads=6,
volctxsw=7,
)
# =====================================================================
# --- named tuples
# =====================================================================
# fmt: off
# psutil.cpu_times()
scputimes = namedtuple('scputimes', ['user', 'nice', 'system', 'idle'])
# psutil.virtual_memory()
svmem = namedtuple(
'svmem', ['total', 'available', 'percent', 'used', 'free',
'active', 'inactive', 'wired'])
# psutil.Process.memory_info()
pmem = namedtuple('pmem', ['rss', 'vms', 'pfaults', 'pageins'])
# psutil.Process.memory_full_info()
pfullmem = namedtuple('pfullmem', pmem._fields + ('uss', ))
# fmt: on
# =====================================================================
# --- memory
# =====================================================================
def virtual_memory():
"""System virtual memory as a namedtuple."""
total, active, inactive, wired, free, speculative = cext.virtual_mem()
# This is how Zabbix calculate avail and used mem:
# https://github.com/zabbix/zabbix/blob/master/src/libs/zbxsysinfo/osx/memory.c
# Also see: https://github.com/giampaolo/psutil/issues/1277
avail = inactive + free
used = active + wired
# This is NOT how Zabbix calculates free mem but it matches "free"
# cmdline utility.
free -= speculative
percent = usage_percent((total - avail), total, round_=1)
return svmem(total, avail, percent, used, free, active, inactive, wired)
def swap_memory():
"""Swap system memory as a (total, used, free, sin, sout) tuple."""
total, used, free, sin, sout = cext.swap_mem()
percent = usage_percent(used, total, round_=1)
return _common.sswap(total, used, free, percent, sin, sout)
# =====================================================================
# --- CPU
# =====================================================================
def cpu_times():
"""Return system CPU times as a namedtuple."""
user, nice, system, idle = cext.cpu_times()
return scputimes(user, nice, system, idle)
def per_cpu_times():
"""Return system CPU times as a named tuple."""
ret = []
for cpu_t in cext.per_cpu_times():
user, nice, system, idle = cpu_t
item = scputimes(user, nice, system, idle)
ret.append(item)
return ret
def cpu_count_logical():
"""Return the number of logical CPUs in the system."""
return cext.cpu_count_logical()
def cpu_count_cores():
"""Return the number of CPU cores in the system."""
return cext.cpu_count_cores()
def cpu_stats():
ctx_switches, interrupts, soft_interrupts, syscalls, _traps = (
cext.cpu_stats()
)
return _common.scpustats(
ctx_switches, interrupts, soft_interrupts, syscalls
)
if cext.has_cpu_freq(): # not always available on ARM64
def cpu_freq():
"""Return CPU frequency.
On macOS per-cpu frequency is not supported.
Also, the returned frequency never changes, see:
https://arstechnica.com/civis/viewtopic.php?f=19&t=465002.
"""
curr, min_, max_ = cext.cpu_freq()
return [_common.scpufreq(curr, min_, max_)]
# =====================================================================
# --- disks
# =====================================================================
disk_usage = _psposix.disk_usage
disk_io_counters = cext.disk_io_counters
def disk_partitions(all=False):
"""Return mounted disk partitions as a list of namedtuples."""
retlist = []
partitions = cext.disk_partitions()
for partition in partitions:
device, mountpoint, fstype, opts = partition
if device == 'none':
device = ''
if not all:
if not os.path.isabs(device) or not os.path.exists(device):
continue
ntuple = _common.sdiskpart(device, mountpoint, fstype, opts)
retlist.append(ntuple)
return retlist
# =====================================================================
# --- sensors
# =====================================================================
def sensors_battery():
"""Return battery information."""
try:
percent, minsleft, power_plugged = cext.sensors_battery()
except NotImplementedError:
# no power source - return None according to interface
return None
power_plugged = power_plugged == 1
if power_plugged:
secsleft = _common.POWER_TIME_UNLIMITED
elif minsleft == -1:
secsleft = _common.POWER_TIME_UNKNOWN
else:
secsleft = minsleft * 60
return _common.sbattery(percent, secsleft, power_plugged)
# =====================================================================
# --- network
# =====================================================================
net_io_counters = cext.net_io_counters
net_if_addrs = cext_posix.net_if_addrs
def net_connections(kind='inet'):
"""System-wide network connections."""
# Note: on macOS this will fail with AccessDenied unless
# the process is owned by root.
ret = []
for pid in pids():
try:
cons = Process(pid).net_connections(kind)
except NoSuchProcess:
continue
else:
if cons:
for c in cons:
c = list(c) + [pid]
ret.append(_common.sconn(*c))
return ret
def net_if_stats():
"""Get NIC stats (isup, duplex, speed, mtu)."""
names = net_io_counters().keys()
ret = {}
for name in names:
try:
mtu = cext_posix.net_if_mtu(name)
flags = cext_posix.net_if_flags(name)
duplex, speed = cext_posix.net_if_duplex_speed(name)
except OSError as err:
# https://github.com/giampaolo/psutil/issues/1279
if err.errno != errno.ENODEV:
raise
else:
if hasattr(_common, 'NicDuplex'):
duplex = _common.NicDuplex(duplex)
output_flags = ','.join(flags)
isup = 'running' in flags
ret[name] = _common.snicstats(
isup, duplex, speed, mtu, output_flags
)
return ret
# =====================================================================
# --- other system functions
# =====================================================================
def boot_time():
"""The system boot time expressed in seconds since the epoch."""
return cext.boot_time()
try:
INIT_BOOT_TIME = boot_time()
except Exception as err: # noqa: BLE001
# Don't want to crash at import time.
debug(f"ignoring exception on import: {err!r}")
INIT_BOOT_TIME = 0
def adjust_proc_create_time(ctime):
"""Account for system clock updates."""
if INIT_BOOT_TIME == 0:
return ctime
diff = INIT_BOOT_TIME - boot_time()
if diff == 0 or abs(diff) < 1:
return ctime
debug("system clock was updated; adjusting process create_time()")
if diff < 0:
return ctime - diff
return ctime + diff
def users():
"""Return currently connected users as a list of namedtuples."""
retlist = []
rawlist = cext_posix.users()
for item in rawlist:
user, tty, hostname, tstamp, pid = item
if tty == '~':
continue # reboot or shutdown
if not tstamp:
continue
nt = _common.suser(user, tty or None, hostname or None, tstamp, pid)
retlist.append(nt)
return retlist
# =====================================================================
# --- processes
# =====================================================================
def pids():
ls = cext.pids()
if 0 not in ls:
# On certain macOS versions pids() C doesn't return PID 0 but
# "ps" does and the process is querable via sysctl():
# https://travis-ci.org/giampaolo/psutil/jobs/309619941
try:
Process(0).create_time()
ls.insert(0, 0)
except NoSuchProcess:
pass
except AccessDenied:
ls.insert(0, 0)
return ls
pid_exists = _psposix.pid_exists
def is_zombie(pid):
try:
st = cext.proc_kinfo_oneshot(pid)[kinfo_proc_map['status']]
return st == cext.SZOMB
except OSError:
return False
def wrap_exceptions(fun):
"""Decorator which translates bare OSError exceptions into
NoSuchProcess and AccessDenied.
"""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
pid, ppid, name = self.pid, self._ppid, self._name
try:
return fun(self, *args, **kwargs)
except ProcessLookupError as err:
if is_zombie(pid):
raise ZombieProcess(pid, name, ppid) from err
raise NoSuchProcess(pid, name) from err
except PermissionError as err:
raise AccessDenied(pid, name) from err
return wrapper
class Process:
"""Wrapper class around underlying C implementation."""
__slots__ = ["_cache", "_name", "_ppid", "pid"]
def __init__(self, pid):
self.pid = pid
self._name = None
self._ppid = None
@wrap_exceptions
@memoize_when_activated
def _get_kinfo_proc(self):
# Note: should work with all PIDs without permission issues.
ret = cext.proc_kinfo_oneshot(self.pid)
assert len(ret) == len(kinfo_proc_map)
return ret
@wrap_exceptions
@memoize_when_activated
def _get_pidtaskinfo(self):
# Note: should work for PIDs owned by user only.
ret = cext.proc_pidtaskinfo_oneshot(self.pid)
assert len(ret) == len(pidtaskinfo_map)
return ret
def oneshot_enter(self):
self._get_kinfo_proc.cache_activate(self)
self._get_pidtaskinfo.cache_activate(self)
def oneshot_exit(self):
self._get_kinfo_proc.cache_deactivate(self)
self._get_pidtaskinfo.cache_deactivate(self)
@wrap_exceptions
def name(self):
name = self._get_kinfo_proc()[kinfo_proc_map['name']]
return name if name is not None else cext.proc_name(self.pid)
@wrap_exceptions
def exe(self):
return cext.proc_exe(self.pid)
@wrap_exceptions
def cmdline(self):
return cext.proc_cmdline(self.pid)
@wrap_exceptions
def environ(self):
return parse_environ_block(cext.proc_environ(self.pid))
@wrap_exceptions
def ppid(self):
self._ppid = self._get_kinfo_proc()[kinfo_proc_map['ppid']]
return self._ppid
@wrap_exceptions
def cwd(self):
return cext.proc_cwd(self.pid)
@wrap_exceptions
def uids(self):
rawtuple = self._get_kinfo_proc()
return _common.puids(
rawtuple[kinfo_proc_map['ruid']],
rawtuple[kinfo_proc_map['euid']],
rawtuple[kinfo_proc_map['suid']],
)
@wrap_exceptions
def gids(self):
rawtuple = self._get_kinfo_proc()
return _common.puids(
rawtuple[kinfo_proc_map['rgid']],
rawtuple[kinfo_proc_map['egid']],
rawtuple[kinfo_proc_map['sgid']],
)
@wrap_exceptions
def terminal(self):
tty_nr = self._get_kinfo_proc()[kinfo_proc_map['ttynr']]
tmap = _psposix.get_terminal_map()
try:
return tmap[tty_nr]
except KeyError:
return None
@wrap_exceptions
def memory_info(self):
rawtuple = self._get_pidtaskinfo()
return pmem(
rawtuple[pidtaskinfo_map['rss']],
rawtuple[pidtaskinfo_map['vms']],
rawtuple[pidtaskinfo_map['pfaults']],
rawtuple[pidtaskinfo_map['pageins']],
)
@wrap_exceptions
def memory_full_info(self):
basic_mem = self.memory_info()
uss = cext.proc_memory_uss(self.pid)
return pfullmem(*basic_mem + (uss,))
@wrap_exceptions
def cpu_times(self):
rawtuple = self._get_pidtaskinfo()
return _common.pcputimes(
rawtuple[pidtaskinfo_map['cpuutime']],
rawtuple[pidtaskinfo_map['cpustime']],
# children user / system times are not retrievable (set to 0)
0.0,
0.0,
)
@wrap_exceptions
def create_time(self, monotonic=False):
ctime = self._get_kinfo_proc()[kinfo_proc_map['ctime']]
if not monotonic:
ctime = adjust_proc_create_time(ctime)
return ctime
@wrap_exceptions
def num_ctx_switches(self):
# Unvoluntary value seems not to be available;
# getrusage() numbers seems to confirm this theory.
# We set it to 0.
vol = self._get_pidtaskinfo()[pidtaskinfo_map['volctxsw']]
return _common.pctxsw(vol, 0)
@wrap_exceptions
def num_threads(self):
return self._get_pidtaskinfo()[pidtaskinfo_map['numthreads']]
@wrap_exceptions
def open_files(self):
if self.pid == 0:
return []
files = []
rawlist = cext.proc_open_files(self.pid)
for path, fd in rawlist:
if isfile_strict(path):
ntuple = _common.popenfile(path, fd)
files.append(ntuple)
return files
@wrap_exceptions
def net_connections(self, kind='inet'):
families, types = conn_tmap[kind]
rawlist = cext.proc_net_connections(self.pid, families, types)
ret = []
for item in rawlist:
fd, fam, type, laddr, raddr, status = item
nt = conn_to_ntuple(
fd, fam, type, laddr, raddr, status, TCP_STATUSES
)
ret.append(nt)
return ret
@wrap_exceptions
def num_fds(self):
if self.pid == 0:
return 0
return cext.proc_num_fds(self.pid)
@wrap_exceptions
def wait(self, timeout=None):
return _psposix.wait_pid(self.pid, timeout, self._name)
@wrap_exceptions
def nice_get(self):
return cext_posix.getpriority(self.pid)
@wrap_exceptions
def nice_set(self, value):
return cext_posix.setpriority(self.pid, value)
@wrap_exceptions
def status(self):
code = self._get_kinfo_proc()[kinfo_proc_map['status']]
# XXX is '?' legit? (we're not supposed to return it anyway)
return PROC_STATUSES.get(code, '?')
@wrap_exceptions
def threads(self):
rawlist = cext.proc_threads(self.pid)
retlist = []
for thread_id, utime, stime in rawlist:
ntuple = _common.pthread(thread_id, utime, stime)
retlist.append(ntuple)
return retlist

View File

@@ -0,0 +1,206 @@
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Routines common to all posix systems."""
import enum
import glob
import os
import signal
import time
from ._common import MACOS
from ._common import TimeoutExpired
from ._common import memoize
from ._common import sdiskusage
from ._common import usage_percent
if MACOS:
from . import _psutil_osx
__all__ = ['pid_exists', 'wait_pid', 'disk_usage', 'get_terminal_map']
def pid_exists(pid):
"""Check whether pid exists in the current process table."""
if pid == 0:
# According to "man 2 kill" PID 0 has a special meaning:
# it refers to <<every process in the process group of the
# calling process>> so we don't want to go any further.
# If we get here it means this UNIX platform *does* have
# a process with id 0.
return True
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
# EPERM clearly means there's a process to deny access to
return True
# According to "man 2 kill" possible error values are
# (EINVAL, EPERM, ESRCH)
else:
return True
Negsignal = enum.IntEnum(
'Negsignal', {x.name: -x.value for x in signal.Signals}
)
def negsig_to_enum(num):
"""Convert a negative signal value to an enum."""
try:
return Negsignal(num)
except ValueError:
return num
def wait_pid(
pid,
timeout=None,
proc_name=None,
_waitpid=os.waitpid,
_timer=getattr(time, 'monotonic', time.time), # noqa: B008
_min=min,
_sleep=time.sleep,
_pid_exists=pid_exists,
):
"""Wait for a process PID to terminate.
If the process terminated normally by calling exit(3) or _exit(2),
or by returning from main(), the return value is the positive integer
passed to *exit().
If it was terminated by a signal it returns the negated value of the
signal which caused the termination (e.g. -SIGTERM).
If PID is not a children of os.getpid() (current process) just
wait until the process disappears and return None.
If PID does not exist at all return None immediately.
If *timeout* != None and process is still alive raise TimeoutExpired.
timeout=0 is also possible (either return immediately or raise).
"""
if pid <= 0:
# see "man waitpid"
msg = "can't wait for PID 0"
raise ValueError(msg)
interval = 0.0001
flags = 0
if timeout is not None:
flags |= os.WNOHANG
stop_at = _timer() + timeout
def sleep(interval):
# Sleep for some time and return a new increased interval.
if timeout is not None:
if _timer() >= stop_at:
raise TimeoutExpired(timeout, pid=pid, name=proc_name)
_sleep(interval)
return _min(interval * 2, 0.04)
# See: https://linux.die.net/man/2/waitpid
while True:
try:
retpid, status = os.waitpid(pid, flags)
except InterruptedError:
interval = sleep(interval)
except ChildProcessError:
# This has two meanings:
# - PID is not a child of os.getpid() in which case
# we keep polling until it's gone
# - PID never existed in the first place
# In both cases we'll eventually return None as we
# can't determine its exit status code.
while _pid_exists(pid):
interval = sleep(interval)
return None
else:
if retpid == 0:
# WNOHANG flag was used and PID is still running.
interval = sleep(interval)
continue
if os.WIFEXITED(status):
# Process terminated normally by calling exit(3) or _exit(2),
# or by returning from main(). The return value is the
# positive integer passed to *exit().
return os.WEXITSTATUS(status)
elif os.WIFSIGNALED(status):
# Process exited due to a signal. Return the negative value
# of that signal.
return negsig_to_enum(-os.WTERMSIG(status))
# elif os.WIFSTOPPED(status):
# # Process was stopped via SIGSTOP or is being traced, and
# # waitpid() was called with WUNTRACED flag. PID is still
# # alive. From now on waitpid() will keep returning (0, 0)
# # until the process state doesn't change.
# # It may make sense to catch/enable this since stopped PIDs
# # ignore SIGTERM.
# interval = sleep(interval)
# continue
# elif os.WIFCONTINUED(status):
# # Process was resumed via SIGCONT and waitpid() was called
# # with WCONTINUED flag.
# interval = sleep(interval)
# continue
else:
# Should never happen.
msg = f"unknown process exit status {status!r}"
raise ValueError(msg)
def disk_usage(path):
"""Return disk usage associated with path.
Note: UNIX usually reserves 5% disk space which is not accessible
by user. In this function "total" and "used" values reflect the
total and used disk space whereas "free" and "percent" represent
the "free" and "used percent" user disk space.
"""
st = os.statvfs(path)
# Total space which is only available to root (unless changed
# at system level).
total = st.f_blocks * st.f_frsize
# Remaining free space usable by root.
avail_to_root = st.f_bfree * st.f_frsize
# Remaining free space usable by user.
avail_to_user = st.f_bavail * st.f_frsize
# Total space being used in general.
used = total - avail_to_root
if MACOS:
# see: https://github.com/giampaolo/psutil/pull/2152
used = _psutil_osx.disk_usage_used(path, used)
# Total space which is available to user (same as 'total' but
# for the user).
total_user = used + avail_to_user
# User usage percent compared to the total amount of space
# the user can use. This number would be higher if compared
# to root's because the user has less space (usually -5%).
usage_percent_user = usage_percent(used, total_user, round_=1)
# NB: the percentage is -5% than what shown by df due to
# reserved blocks that we are currently not considering:
# https://github.com/giampaolo/psutil/issues/829#issuecomment-223750462
return sdiskusage(
total=total, used=used, free=avail_to_user, percent=usage_percent_user
)
@memoize
def get_terminal_map():
"""Get a map of device-id -> path as a dict.
Used by Process.terminal().
"""
ret = {}
ls = glob.glob('/dev/tty*') + glob.glob('/dev/pts/*')
for name in ls:
assert name not in ret, name
try:
ret[os.stat(name).st_rdev] = name
except FileNotFoundError:
pass
return ret

View File

@@ -0,0 +1,733 @@
# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Sun OS Solaris platform implementation."""
import errno
import functools
import os
import socket
import subprocess
import sys
from collections import namedtuple
from socket import AF_INET
from . import _common
from . import _psposix
from . import _psutil_posix as cext_posix
from . import _psutil_sunos as cext
from ._common import AF_INET6
from ._common import ENCODING
from ._common import AccessDenied
from ._common import NoSuchProcess
from ._common import ZombieProcess
from ._common import debug
from ._common import get_procfs_path
from ._common import isfile_strict
from ._common import memoize_when_activated
from ._common import sockfam_to_enum
from ._common import socktype_to_enum
from ._common import usage_percent
__extra__all__ = ["CONN_IDLE", "CONN_BOUND", "PROCFS_PATH"]
# =====================================================================
# --- globals
# =====================================================================
PAGE_SIZE = cext_posix.getpagesize()
AF_LINK = cext_posix.AF_LINK
IS_64_BIT = sys.maxsize > 2**32
CONN_IDLE = "IDLE"
CONN_BOUND = "BOUND"
PROC_STATUSES = {
cext.SSLEEP: _common.STATUS_SLEEPING,
cext.SRUN: _common.STATUS_RUNNING,
cext.SZOMB: _common.STATUS_ZOMBIE,
cext.SSTOP: _common.STATUS_STOPPED,
cext.SIDL: _common.STATUS_IDLE,
cext.SONPROC: _common.STATUS_RUNNING, # same as run
cext.SWAIT: _common.STATUS_WAITING,
}
TCP_STATUSES = {
cext.TCPS_ESTABLISHED: _common.CONN_ESTABLISHED,
cext.TCPS_SYN_SENT: _common.CONN_SYN_SENT,
cext.TCPS_SYN_RCVD: _common.CONN_SYN_RECV,
cext.TCPS_FIN_WAIT_1: _common.CONN_FIN_WAIT1,
cext.TCPS_FIN_WAIT_2: _common.CONN_FIN_WAIT2,
cext.TCPS_TIME_WAIT: _common.CONN_TIME_WAIT,
cext.TCPS_CLOSED: _common.CONN_CLOSE,
cext.TCPS_CLOSE_WAIT: _common.CONN_CLOSE_WAIT,
cext.TCPS_LAST_ACK: _common.CONN_LAST_ACK,
cext.TCPS_LISTEN: _common.CONN_LISTEN,
cext.TCPS_CLOSING: _common.CONN_CLOSING,
cext.PSUTIL_CONN_NONE: _common.CONN_NONE,
cext.TCPS_IDLE: CONN_IDLE, # sunos specific
cext.TCPS_BOUND: CONN_BOUND, # sunos specific
}
proc_info_map = dict(
ppid=0,
rss=1,
vms=2,
create_time=3,
nice=4,
num_threads=5,
status=6,
ttynr=7,
uid=8,
euid=9,
gid=10,
egid=11,
)
# =====================================================================
# --- named tuples
# =====================================================================
# psutil.cpu_times()
scputimes = namedtuple('scputimes', ['user', 'system', 'idle', 'iowait'])
# psutil.cpu_times(percpu=True)
pcputimes = namedtuple(
'pcputimes', ['user', 'system', 'children_user', 'children_system']
)
# psutil.virtual_memory()
svmem = namedtuple('svmem', ['total', 'available', 'percent', 'used', 'free'])
# psutil.Process.memory_info()
pmem = namedtuple('pmem', ['rss', 'vms'])
pfullmem = pmem
# psutil.Process.memory_maps(grouped=True)
pmmap_grouped = namedtuple(
'pmmap_grouped', ['path', 'rss', 'anonymous', 'locked']
)
# psutil.Process.memory_maps(grouped=False)
pmmap_ext = namedtuple(
'pmmap_ext', 'addr perms ' + ' '.join(pmmap_grouped._fields)
)
# =====================================================================
# --- memory
# =====================================================================
def virtual_memory():
"""Report virtual memory metrics."""
# we could have done this with kstat, but IMHO this is good enough
total = os.sysconf('SC_PHYS_PAGES') * PAGE_SIZE
# note: there's no difference on Solaris
free = avail = os.sysconf('SC_AVPHYS_PAGES') * PAGE_SIZE
used = total - free
percent = usage_percent(used, total, round_=1)
return svmem(total, avail, percent, used, free)
def swap_memory():
"""Report swap memory metrics."""
sin, sout = cext.swap_mem()
# XXX
# we are supposed to get total/free by doing so:
# http://cvs.opensolaris.org/source/xref/onnv/onnv-gate/
# usr/src/cmd/swap/swap.c
# ...nevertheless I can't manage to obtain the same numbers as 'swap'
# cmdline utility, so let's parse its output (sigh!)
p = subprocess.Popen(
[
'/usr/bin/env',
f"PATH=/usr/sbin:/sbin:{os.environ['PATH']}",
'swap',
'-l',
],
stdout=subprocess.PIPE,
)
stdout, _ = p.communicate()
stdout = stdout.decode(sys.stdout.encoding)
if p.returncode != 0:
msg = f"'swap -l' failed (retcode={p.returncode})"
raise RuntimeError(msg)
lines = stdout.strip().split('\n')[1:]
if not lines:
msg = 'no swap device(s) configured'
raise RuntimeError(msg)
total = free = 0
for line in lines:
line = line.split()
t, f = line[3:5]
total += int(int(t) * 512)
free += int(int(f) * 512)
used = total - free
percent = usage_percent(used, total, round_=1)
return _common.sswap(
total, used, free, percent, sin * PAGE_SIZE, sout * PAGE_SIZE
)
# =====================================================================
# --- CPU
# =====================================================================
def cpu_times():
"""Return system-wide CPU times as a named tuple."""
ret = cext.per_cpu_times()
return scputimes(*[sum(x) for x in zip(*ret)])
def per_cpu_times():
"""Return system per-CPU times as a list of named tuples."""
ret = cext.per_cpu_times()
return [scputimes(*x) for x in ret]
def cpu_count_logical():
"""Return the number of logical CPUs in the system."""
try:
return os.sysconf("SC_NPROCESSORS_ONLN")
except ValueError:
# mimic os.cpu_count() behavior
return None
def cpu_count_cores():
"""Return the number of CPU cores in the system."""
return cext.cpu_count_cores()
def cpu_stats():
"""Return various CPU stats as a named tuple."""
ctx_switches, interrupts, syscalls, _traps = cext.cpu_stats()
soft_interrupts = 0
return _common.scpustats(
ctx_switches, interrupts, soft_interrupts, syscalls
)
# =====================================================================
# --- disks
# =====================================================================
disk_io_counters = cext.disk_io_counters
disk_usage = _psposix.disk_usage
def disk_partitions(all=False):
"""Return system disk partitions."""
# TODO - the filtering logic should be better checked so that
# it tries to reflect 'df' as much as possible
retlist = []
partitions = cext.disk_partitions()
for partition in partitions:
device, mountpoint, fstype, opts = partition
if device == 'none':
device = ''
if not all:
# Differently from, say, Linux, we don't have a list of
# common fs types so the best we can do, AFAIK, is to
# filter by filesystem having a total size > 0.
try:
if not disk_usage(mountpoint).total:
continue
except OSError as err:
# https://github.com/giampaolo/psutil/issues/1674
debug(f"skipping {mountpoint!r}: {err}")
continue
ntuple = _common.sdiskpart(device, mountpoint, fstype, opts)
retlist.append(ntuple)
return retlist
# =====================================================================
# --- network
# =====================================================================
net_io_counters = cext.net_io_counters
net_if_addrs = cext_posix.net_if_addrs
def net_connections(kind, _pid=-1):
"""Return socket connections. If pid == -1 return system-wide
connections (as opposed to connections opened by one process only).
Only INET sockets are returned (UNIX are not).
"""
families, types = _common.conn_tmap[kind]
rawlist = cext.net_connections(_pid)
ret = set()
for item in rawlist:
fd, fam, type_, laddr, raddr, status, pid = item
if fam not in families:
continue
if type_ not in types:
continue
# TODO: refactor and use _common.conn_to_ntuple.
if fam in {AF_INET, AF_INET6}:
if laddr:
laddr = _common.addr(*laddr)
if raddr:
raddr = _common.addr(*raddr)
status = TCP_STATUSES[status]
fam = sockfam_to_enum(fam)
type_ = socktype_to_enum(type_)
if _pid == -1:
nt = _common.sconn(fd, fam, type_, laddr, raddr, status, pid)
else:
nt = _common.pconn(fd, fam, type_, laddr, raddr, status)
ret.add(nt)
return list(ret)
def net_if_stats():
"""Get NIC stats (isup, duplex, speed, mtu)."""
ret = cext.net_if_stats()
for name, items in ret.items():
isup, duplex, speed, mtu = items
if hasattr(_common, 'NicDuplex'):
duplex = _common.NicDuplex(duplex)
ret[name] = _common.snicstats(isup, duplex, speed, mtu, '')
return ret
# =====================================================================
# --- other system functions
# =====================================================================
def boot_time():
"""The system boot time expressed in seconds since the epoch."""
return cext.boot_time()
def users():
"""Return currently connected users as a list of namedtuples."""
retlist = []
rawlist = cext_posix.users()
localhost = (':0.0', ':0')
for item in rawlist:
user, tty, hostname, tstamp, user_process, pid = item
# note: the underlying C function includes entries about
# system boot, run level and others. We might want
# to use them in the future.
if not user_process:
continue
if hostname in localhost:
hostname = 'localhost'
nt = _common.suser(user, tty, hostname, tstamp, pid)
retlist.append(nt)
return retlist
# =====================================================================
# --- processes
# =====================================================================
def pids():
"""Returns a list of PIDs currently running on the system."""
path = get_procfs_path().encode(ENCODING)
return [int(x) for x in os.listdir(path) if x.isdigit()]
def pid_exists(pid):
"""Check for the existence of a unix pid."""
return _psposix.pid_exists(pid)
def wrap_exceptions(fun):
"""Call callable into a try/except clause and translate ENOENT,
EACCES and EPERM in NoSuchProcess or AccessDenied exceptions.
"""
@functools.wraps(fun)
def wrapper(self, *args, **kwargs):
pid, ppid, name = self.pid, self._ppid, self._name
try:
return fun(self, *args, **kwargs)
except (FileNotFoundError, ProcessLookupError) as err:
# ENOENT (no such file or directory) gets raised on open().
# ESRCH (no such process) can get raised on read() if
# process is gone in meantime.
if not pid_exists(pid):
raise NoSuchProcess(pid, name) from err
raise ZombieProcess(pid, name, ppid) from err
except PermissionError as err:
raise AccessDenied(pid, name) from err
except OSError as err:
if pid == 0:
if 0 in pids():
raise AccessDenied(pid, name) from err
raise
raise
return wrapper
class Process:
"""Wrapper class around underlying C implementation."""
__slots__ = ["_cache", "_name", "_ppid", "_procfs_path", "pid"]
def __init__(self, pid):
self.pid = pid
self._name = None
self._ppid = None
self._procfs_path = get_procfs_path()
def _assert_alive(self):
"""Raise NSP if the process disappeared on us."""
# For those C function who do not raise NSP, possibly returning
# incorrect or incomplete result.
os.stat(f"{self._procfs_path}/{self.pid}")
def oneshot_enter(self):
self._proc_name_and_args.cache_activate(self)
self._proc_basic_info.cache_activate(self)
self._proc_cred.cache_activate(self)
def oneshot_exit(self):
self._proc_name_and_args.cache_deactivate(self)
self._proc_basic_info.cache_deactivate(self)
self._proc_cred.cache_deactivate(self)
@wrap_exceptions
@memoize_when_activated
def _proc_name_and_args(self):
return cext.proc_name_and_args(self.pid, self._procfs_path)
@wrap_exceptions
@memoize_when_activated
def _proc_basic_info(self):
if self.pid == 0 and not os.path.exists(
f"{self._procfs_path}/{self.pid}/psinfo"
):
raise AccessDenied(self.pid)
ret = cext.proc_basic_info(self.pid, self._procfs_path)
assert len(ret) == len(proc_info_map)
return ret
@wrap_exceptions
@memoize_when_activated
def _proc_cred(self):
return cext.proc_cred(self.pid, self._procfs_path)
@wrap_exceptions
def name(self):
# note: max len == 15
return self._proc_name_and_args()[0]
@wrap_exceptions
def exe(self):
try:
return os.readlink(f"{self._procfs_path}/{self.pid}/path/a.out")
except OSError:
pass # continue and guess the exe name from the cmdline
# Will be guessed later from cmdline but we want to explicitly
# invoke cmdline here in order to get an AccessDenied
# exception if the user has not enough privileges.
self.cmdline()
return ""
@wrap_exceptions
def cmdline(self):
return self._proc_name_and_args()[1].split(' ')
@wrap_exceptions
def environ(self):
return cext.proc_environ(self.pid, self._procfs_path)
@wrap_exceptions
def create_time(self):
return self._proc_basic_info()[proc_info_map['create_time']]
@wrap_exceptions
def num_threads(self):
return self._proc_basic_info()[proc_info_map['num_threads']]
@wrap_exceptions
def nice_get(self):
# Note #1: getpriority(3) doesn't work for realtime processes.
# Psinfo is what ps uses, see:
# https://github.com/giampaolo/psutil/issues/1194
return self._proc_basic_info()[proc_info_map['nice']]
@wrap_exceptions
def nice_set(self, value):
if self.pid in {2, 3}:
# Special case PIDs: internally setpriority(3) return ESRCH
# (no such process), no matter what.
# The process actually exists though, as it has a name,
# creation time, etc.
raise AccessDenied(self.pid, self._name)
return cext_posix.setpriority(self.pid, value)
@wrap_exceptions
def ppid(self):
self._ppid = self._proc_basic_info()[proc_info_map['ppid']]
return self._ppid
@wrap_exceptions
def uids(self):
try:
real, effective, saved, _, _, _ = self._proc_cred()
except AccessDenied:
real = self._proc_basic_info()[proc_info_map['uid']]
effective = self._proc_basic_info()[proc_info_map['euid']]
saved = None
return _common.puids(real, effective, saved)
@wrap_exceptions
def gids(self):
try:
_, _, _, real, effective, saved = self._proc_cred()
except AccessDenied:
real = self._proc_basic_info()[proc_info_map['gid']]
effective = self._proc_basic_info()[proc_info_map['egid']]
saved = None
return _common.puids(real, effective, saved)
@wrap_exceptions
def cpu_times(self):
try:
times = cext.proc_cpu_times(self.pid, self._procfs_path)
except OSError as err:
if err.errno == errno.EOVERFLOW and not IS_64_BIT:
# We may get here if we attempt to query a 64bit process
# with a 32bit python.
# Error originates from read() and also tools like "cat"
# fail in the same way (!).
# Since there simply is no way to determine CPU times we
# return 0.0 as a fallback. See:
# https://github.com/giampaolo/psutil/issues/857
times = (0.0, 0.0, 0.0, 0.0)
else:
raise
return _common.pcputimes(*times)
@wrap_exceptions
def cpu_num(self):
return cext.proc_cpu_num(self.pid, self._procfs_path)
@wrap_exceptions
def terminal(self):
procfs_path = self._procfs_path
hit_enoent = False
tty = wrap_exceptions(self._proc_basic_info()[proc_info_map['ttynr']])
if tty != cext.PRNODEV:
for x in (0, 1, 2, 255):
try:
return os.readlink(f"{procfs_path}/{self.pid}/path/{x}")
except FileNotFoundError:
hit_enoent = True
continue
if hit_enoent:
self._assert_alive()
@wrap_exceptions
def cwd(self):
# /proc/PID/path/cwd may not be resolved by readlink() even if
# it exists (ls shows it). If that's the case and the process
# is still alive return None (we can return None also on BSD).
# Reference: https://groups.google.com/g/comp.unix.solaris/c/tcqvhTNFCAs
procfs_path = self._procfs_path
try:
return os.readlink(f"{procfs_path}/{self.pid}/path/cwd")
except FileNotFoundError:
os.stat(f"{procfs_path}/{self.pid}") # raise NSP or AD
return ""
@wrap_exceptions
def memory_info(self):
ret = self._proc_basic_info()
rss = ret[proc_info_map['rss']] * 1024
vms = ret[proc_info_map['vms']] * 1024
return pmem(rss, vms)
memory_full_info = memory_info
@wrap_exceptions
def status(self):
code = self._proc_basic_info()[proc_info_map['status']]
# XXX is '?' legit? (we're not supposed to return it anyway)
return PROC_STATUSES.get(code, '?')
@wrap_exceptions
def threads(self):
procfs_path = self._procfs_path
ret = []
tids = os.listdir(f"{procfs_path}/{self.pid}/lwp")
hit_enoent = False
for tid in tids:
tid = int(tid)
try:
utime, stime = cext.query_process_thread(
self.pid, tid, procfs_path
)
except OSError as err:
if err.errno == errno.EOVERFLOW and not IS_64_BIT:
# We may get here if we attempt to query a 64bit process
# with a 32bit python.
# Error originates from read() and also tools like "cat"
# fail in the same way (!).
# Since there simply is no way to determine CPU times we
# return 0.0 as a fallback. See:
# https://github.com/giampaolo/psutil/issues/857
continue
# ENOENT == thread gone in meantime
if err.errno == errno.ENOENT:
hit_enoent = True
continue
raise
else:
nt = _common.pthread(tid, utime, stime)
ret.append(nt)
if hit_enoent:
self._assert_alive()
return ret
@wrap_exceptions
def open_files(self):
retlist = []
hit_enoent = False
procfs_path = self._procfs_path
pathdir = f"{procfs_path}/{self.pid}/path"
for fd in os.listdir(f"{procfs_path}/{self.pid}/fd"):
path = os.path.join(pathdir, fd)
if os.path.islink(path):
try:
file = os.readlink(path)
except FileNotFoundError:
hit_enoent = True
continue
else:
if isfile_strict(file):
retlist.append(_common.popenfile(file, int(fd)))
if hit_enoent:
self._assert_alive()
return retlist
def _get_unix_sockets(self, pid):
"""Get UNIX sockets used by process by parsing 'pfiles' output."""
# TODO: rewrite this in C (...but the damn netstat source code
# does not include this part! Argh!!)
cmd = ["pfiles", str(pid)]
p = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = p.communicate()
stdout, stderr = (
x.decode(sys.stdout.encoding) for x in (stdout, stderr)
)
if p.returncode != 0:
if 'permission denied' in stderr.lower():
raise AccessDenied(self.pid, self._name)
if 'no such process' in stderr.lower():
raise NoSuchProcess(self.pid, self._name)
msg = f"{cmd!r} command error\n{stderr}"
raise RuntimeError(msg)
lines = stdout.split('\n')[2:]
for i, line in enumerate(lines):
line = line.lstrip()
if line.startswith('sockname: AF_UNIX'):
path = line.split(' ', 2)[2]
type = lines[i - 2].strip()
if type == 'SOCK_STREAM':
type = socket.SOCK_STREAM
elif type == 'SOCK_DGRAM':
type = socket.SOCK_DGRAM
else:
type = -1
yield (-1, socket.AF_UNIX, type, path, "", _common.CONN_NONE)
@wrap_exceptions
def net_connections(self, kind='inet'):
ret = net_connections(kind, _pid=self.pid)
# The underlying C implementation retrieves all OS connections
# and filters them by PID. At this point we can't tell whether
# an empty list means there were no connections for process or
# process is no longer active so we force NSP in case the PID
# is no longer there.
if not ret:
# will raise NSP if process is gone
os.stat(f"{self._procfs_path}/{self.pid}")
# UNIX sockets
if kind in {'all', 'unix'}:
ret.extend([
_common.pconn(*conn)
for conn in self._get_unix_sockets(self.pid)
])
return ret
nt_mmap_grouped = namedtuple('mmap', 'path rss anon locked')
nt_mmap_ext = namedtuple('mmap', 'addr perms path rss anon locked')
@wrap_exceptions
def memory_maps(self):
def toaddr(start, end):
return "{}-{}".format(
hex(start)[2:].strip('L'), hex(end)[2:].strip('L')
)
procfs_path = self._procfs_path
retlist = []
try:
rawlist = cext.proc_memory_maps(self.pid, procfs_path)
except OSError as err:
if err.errno == errno.EOVERFLOW and not IS_64_BIT:
# We may get here if we attempt to query a 64bit process
# with a 32bit python.
# Error originates from read() and also tools like "cat"
# fail in the same way (!).
# Since there simply is no way to determine CPU times we
# return 0.0 as a fallback. See:
# https://github.com/giampaolo/psutil/issues/857
return []
else:
raise
hit_enoent = False
for item in rawlist:
addr, addrsize, perm, name, rss, anon, locked = item
addr = toaddr(addr, addrsize)
if not name.startswith('['):
try:
name = os.readlink(f"{procfs_path}/{self.pid}/path/{name}")
except OSError as err:
if err.errno == errno.ENOENT:
# sometimes the link may not be resolved by
# readlink() even if it exists (ls shows it).
# If that's the case we just return the
# unresolved link path.
# This seems an inconsistency with /proc similar
# to: http://goo.gl/55XgO
name = f"{procfs_path}/{self.pid}/path/{name}"
hit_enoent = True
else:
raise
retlist.append((addr, perm, name, rss, anon, locked))
if hit_enoent:
self._assert_alive()
return retlist
@wrap_exceptions
def num_fds(self):
return len(os.listdir(f"{self._procfs_path}/{self.pid}/fd"))
@wrap_exceptions
def num_ctx_switches(self):
return _common.pctxsw(
*cext.proc_num_ctx_switches(self.pid, self._procfs_path)
)
@wrap_exceptions
def wait(self, timeout=None):
return _psposix.wait_pid(self.pid, timeout, self._name)

File diff suppressed because it is too large Load Diff

48
buildVars.py Normal file
View File

@@ -0,0 +1,48 @@
# -*- coding: UTF-8 -*-
# Build customizations
# Change this file instead of sconstruct or manifest files, whenever possible.
# Full getext (please don't change)
_ = lambda x: x
# Add-on information variables
addon_info = {
# add-on Name, internal for nvda
"addon_name": "cpuPriority",
# Add-on summary, usually the user visible name of the addon.
# Translators: Summary for this add-on to be shown on installation and add-on information.
"addon_summary": _("CPU Priority Manager"),
# Add-on description
# Translators: Long description to be shown for this add-on on add-on information from add-ons manager
"addon_description": _("Manages CPU affinity and process priority for NVDA. Allows pinning NVDA to specific CPU cores (e.g., performance cores) and setting process priority to improve responsiveness."),
# version
"addon_version": "1.0.0",
# Author(s)
"addon_author": "Talon <talon@iamtalon.me>",
# URL for the add-on documentation support
"addon_url": "https://code.iamtalon.me/Talon/cpu-affinity-nvda-addon",
# Documentation file name
"addon_docFileName": "readme.html",
# Minimum NVDA version supported (e.g. "2018.3.0", minor version is optional)
"addon_minimumNVDAVersion": "2024.1.0",
# Last NVDA version supported/tested (e.g. "2018.4.0", ideally more recent than minimum version)
"addon_lastTestedNVDAVersion": "2025.3.0",
# Add-on update channel (default is None, denoting stable releases, and for development releases, use "dev"; do not change unless you know what you are doing)
"addon_updateChannel": None,
}
import os.path
# Define the python files that are the sources of your add-on.
# You can use glob expressions here, they will be expanded.
pythonSources = [
os.path.join("addon", "globalPlugins", "cpuPriority", "*.py"),
]
# Files that contain strings for translation. Usually your python sources
i18nSources = pythonSources + ["buildVars.py"]
# Files that will be ignored when building the nvda-addon file
# Paths are relative to the addon directory, not to the root directory of your addon sources.
excludedFiles = []

10
manifest.ini.tpl Normal file
View File

@@ -0,0 +1,10 @@
name = "{addon_name}"
summary = "{addon_summary}"
description = "{addon_description}"
author = "{addon_author}"
url = "{addon_url}"
version = "{addon_version}"
docFileName = "{addon_docFileName}"
minimumNVDAVersion = "{addon_minimumNVDAVersion}"
lastTestedNVDAVersion = "{addon_lastTestedNVDAVersion}"
updateChannel = {addon_updateChannel}

92
sconstruct Normal file
View File

@@ -0,0 +1,92 @@
import os
import sys
import zipfile
import shutil
# Import build variables
sys.path.insert(0, os.getcwd())
import buildVars
# Environment
env = Environment()
# Get addon info
addon_info = buildVars.addon_info
addon_name = addon_info["addon_name"]
addon_version = addon_info["addon_version"]
# Output file name
addon_file = f"{addon_name}-{addon_version}.nvda-addon"
def create_manifest(target, source, env):
"""Create manifest.ini from template"""
template_path = "manifest.ini.tpl"
manifest_path = os.path.join("addon", "manifest.ini")
with open(template_path, "r", encoding="utf-8") as f:
content = f.read()
# Replace placeholders
for key, value in addon_info.items():
placeholder = "{" + key + "}"
# Skip updateChannel if None
if key == "addon_updateChannel" and value is None:
# Remove the entire line
import re
content = re.sub(r'^updateChannel = .*\n?', '', content, flags=re.MULTILINE)
else:
content = content.replace(placeholder, str(value) if value is not None else "")
with open(manifest_path, "w", encoding="utf-8") as f:
f.write(content)
return None
def create_addon(target, source, env):
"""Create the .nvda-addon file (zip archive)"""
addon_dir = "addon"
output_file = str(target[0])
# Remove old addon file if it exists
if os.path.exists(output_file):
os.remove(output_file)
# Create zip file
with zipfile.ZipFile(output_file, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(addon_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, addon_dir)
zf.write(file_path, arcname)
print(f"Created {output_file}")
return None
# Build targets
manifest = env.Command(
os.path.join("addon", "manifest.ini"),
"manifest.ini.tpl",
create_manifest
)
addon = env.Command(
addon_file,
[manifest, "addon"],
create_addon
)
# Set dependencies
env.Depends(addon, manifest)
# Default target
env.Default(addon)
# Clean target - remove generated files
if env.GetOption('clean'):
manifest_path = os.path.join("addon", "manifest.ini")
if os.path.exists(manifest_path):
os.remove(manifest_path)
print(f"Removed {manifest_path}")
if os.path.exists(addon_file):
os.remove(addon_file)
print(f"Removed {addon_file}")

87
test_affinity.py Normal file
View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
Test script to verify NVDA CPU affinity and priority settings
"""
import psutil
import sys
def find_nvda_processes():
nvda_processes = []
for proc in psutil.process_iter(['pid', 'name']):
try:
name = proc.info['name'].lower()
if 'nvda' in name:
nvda_processes.append(proc)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return nvda_processes
def get_priority_name(priority_class):
priority_names = {
psutil.IDLE_PRIORITY_CLASS: "IDLE",
psutil.BELOW_NORMAL_PRIORITY_CLASS: "BELOW_NORMAL",
psutil.NORMAL_PRIORITY_CLASS: "NORMAL",
psutil.ABOVE_NORMAL_PRIORITY_CLASS: "ABOVE_NORMAL",
psutil.HIGH_PRIORITY_CLASS: "HIGH",
psutil.REALTIME_PRIORITY_CLASS: "REALTIME"
}
return priority_names.get(priority_class, f"UNKNOWN ({priority_class})")
def main():
print("NVDA CPU Affinity and Priority Verification")
print()
cpu_count = psutil.cpu_count(logical=True)
print(f"System Information:")
print(f" Total CPU cores (logical): {cpu_count}")
print(f" Available cores: 0-{cpu_count - 1}")
print()
nvda_processes = find_nvda_processes()
if not nvda_processes:
print("ERROR: No NVDA processes found!")
print("Make sure NVDA is running.")
sys.exit(1)
print(f"Found {len(nvda_processes)} NVDA process(es):")
print()
for proc in nvda_processes:
try:
pid = proc.pid
name = proc.name()
try:
affinity = proc.cpu_affinity()
except (psutil.AccessDenied, AttributeError) as e:
affinity = f"ERROR: {e}"
try:
priority = proc.nice()
priority_name = get_priority_name(priority)
except (psutil.AccessDenied, AttributeError) as e:
priority = f"ERROR: {e}"
priority_name = ""
print(f"Process: {name}")
print(f" PID: {pid}")
print(f" CPU Affinity: {affinity}")
if isinstance(affinity, list):
if len(affinity) == cpu_count:
print(f" -> Using ALL cores")
else:
print(f" -> Restricted to {len(affinity)} core(s)")
print(f" Priority: {priority_name}")
print()
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
print(f" ERROR: Could not access process: {e}")
print()
if __name__ == "__main__":
main()