[vtimeline] add tracepoint
This commit is contained in:
parent
66c8ba7c27
commit
bf07c54372
158
tracepoint.py
Normal file
158
tracepoint.py
Normal file
@ -0,0 +1,158 @@
|
||||
import os
|
||||
import queue
|
||||
import socket
|
||||
import atexit
|
||||
import ctypes
|
||||
from pathlib import Path
|
||||
import logging.handlers
|
||||
|
||||
################################
|
||||
### Tracepoint for vtimeline ###
|
||||
################################
|
||||
|
||||
|
||||
_ROTATE_FILE_COUNT = 5
|
||||
_ROTATE_FILE_MAX_SIZE = 200 * 1024 * 1024
|
||||
|
||||
|
||||
class TracePointFormatter(logging.Formatter):
|
||||
def __init__(self):
|
||||
self.pid = os.getenv("RANK", -1) # global rank
|
||||
self.tid = 0
|
||||
|
||||
def format(self, record: logging.LogRecord):
|
||||
# the chrome trace metirc include
|
||||
# name : event name
|
||||
# cat : the event categories
|
||||
# ph : the event type, B-begin E-end
|
||||
# ts : tracing clock timestamp of the event, mciro sencond
|
||||
# pid : pid, or global rank
|
||||
# tid : tid, or stream id
|
||||
|
||||
# must use like below:
|
||||
# logger.info("event_name", extra={"cat": "cat", "pid": pid, "tid": tid, ph: "B"})
|
||||
|
||||
try:
|
||||
format_str = ",".join(
|
||||
[
|
||||
str(int(record.created * 1000000)), # microsecond
|
||||
str(self.pid), # global rank
|
||||
str(self.tid), # stream_id, default is cpu
|
||||
record.cat,
|
||||
record.getMessage(),
|
||||
record.ph,
|
||||
]
|
||||
)
|
||||
except Exception as e:
|
||||
format_str = f"error logger format : {str(e)}"
|
||||
|
||||
return format_str
|
||||
|
||||
|
||||
class TracePoint:
|
||||
def __init__(self, event_name: str, cat_name: str):
|
||||
self.logger = logging.getLogger("TracePoint")
|
||||
self.name = event_name
|
||||
self.cat = cat_name
|
||||
|
||||
def begin(self):
|
||||
self.record(self.name, self.cat, "B")
|
||||
|
||||
def end(self):
|
||||
self.record(self.name, self.cat, "E")
|
||||
|
||||
def record(self, event_name: str, cat_name: str, ph: str):
|
||||
self.logger.info(
|
||||
event_name,
|
||||
extra={
|
||||
"cat": cat_name,
|
||||
"ph": ph,
|
||||
},
|
||||
)
|
||||
|
||||
def __enter__(self):
|
||||
self.begin()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.end()
|
||||
return False
|
||||
|
||||
|
||||
class CUDAVTimeLine:
|
||||
def __init__(
|
||||
self,
|
||||
lib_path="/usr/local/lib/libvtimeline.so",
|
||||
):
|
||||
self.lib = ctypes.CDLL(lib_path)
|
||||
|
||||
self.lib.enable_vtimeline.argtypes = []
|
||||
self.lib.enable_vtimeline.restype = ctypes.c_int
|
||||
|
||||
self.lib.disable_vtimeline.argtypes = []
|
||||
self.lib.disable_vtimeline.restype = ctypes.c_int
|
||||
|
||||
def enable(self):
|
||||
if self.lib.enable_vtimeline() != 0:
|
||||
print("Failed to enable CUDAVTimeline")
|
||||
|
||||
def disable(self):
|
||||
self.lib.disable_vtimeline()
|
||||
|
||||
|
||||
def __create_async_rotating_file_handler(
|
||||
log_file_path: Path, formatter: logging.Formatter
|
||||
):
|
||||
# set up handler
|
||||
# rorate to 5 file, and each file 200MB
|
||||
file_handler = logging.handlers.RotatingFileHandler(
|
||||
filename=log_file_path,
|
||||
mode="a",
|
||||
maxBytes=_ROTATE_FILE_MAX_SIZE,
|
||||
backupCount=_ROTATE_FILE_COUNT,
|
||||
encoding="utf-8",
|
||||
)
|
||||
file_handler.setFormatter(formatter)
|
||||
|
||||
queue_buffer = queue.Queue()
|
||||
|
||||
# queue_handler -> listener -> file_hander
|
||||
queue_handler = logging.handlers.QueueHandler(queue_buffer)
|
||||
|
||||
queue_listener = logging.handlers.QueueListener(
|
||||
queue_buffer, file_handler, respect_handler_level=True
|
||||
)
|
||||
queue_listener.start()
|
||||
atexit.register(queue_listener.stop)
|
||||
|
||||
return queue_handler
|
||||
|
||||
|
||||
def __create_logger(log_root_dir: str, logger_name: str):
|
||||
log_dir = Path(log_root_dir) / logger_name / socket.gethostname()
|
||||
log_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# set logger
|
||||
logger = logging.getLogger(logger_name)
|
||||
logger.setLevel(logging.INFO)
|
||||
logger.addHandler(
|
||||
__create_async_rotating_file_handler(
|
||||
log_dir / f"{logger_name}_{os.getenv('LOCAL_RANK', -1)}.log",
|
||||
logging.Formatter(
|
||||
fmt="[%(levelname)s][%(process)d][%(name)s][%(asctime)s] %(message)s"
|
||||
),
|
||||
)
|
||||
)
|
||||
logger.propagate = False
|
||||
|
||||
|
||||
def tracepoint_module_setup():
|
||||
log_dir = os.getenv("VTIMELINE_LOGGER_DIR", "/var/log")
|
||||
|
||||
__create_logger(log_dir, "TracePoint")
|
||||
|
||||
|
||||
def cudavtimeline_module_setup():
|
||||
cupti = CUDAVTimeLine()
|
||||
cupti.enable()
|
||||
atexit.register(cupti.disable)
|
@ -68,8 +68,8 @@ void init_spdlog_env() {
|
||||
static std::mutex env_mutex;
|
||||
std::lock_guard<std::mutex> locker(env_mutex);
|
||||
|
||||
std::string log_dir_name = g_logger_dir + "/megatron_cupti/" + g_hostname;
|
||||
std::string log_file_name = log_dir_name + "/megatron_" + g_local_rank + ".log";
|
||||
std::string log_dir_name = g_logger_dir + "/cupti/" + g_hostname;
|
||||
std::string log_file_name = log_dir_name + "/rank_" + g_local_rank + ".log";
|
||||
|
||||
auto rotating_sink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(
|
||||
log_file_name, max_file_size, max_files);
|
||||
@ -277,8 +277,9 @@ bool init_cupti_env() {
|
||||
}
|
||||
|
||||
void enable_cupti_activity() {
|
||||
std::array<CUpti_ActivityKind, 1> activity_kinds = {
|
||||
std::array<CUpti_ActivityKind, 2> activity_kinds = {
|
||||
CUPTI_ACTIVITY_KIND_KERNEL,
|
||||
CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL,
|
||||
};
|
||||
|
||||
for (const CUpti_ActivityKind kind : activity_kinds) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user