320 lines
9.9 KiB
C++
320 lines
9.9 KiB
C++
#include <algorithm>
|
|
#include <condition_variable>
|
|
#include <cstddef>
|
|
#include <cstdint>
|
|
#include <cupti.h>
|
|
#include <iostream>
|
|
#include <memory>
|
|
#include <spdlog/async.h>
|
|
#include <spdlog/spdlog.h>
|
|
#include <spdlog/sinks/rotating_file_sink.h>
|
|
#include <string>
|
|
#include <mutex>
|
|
#include <thread>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
// device pre alloc memory
|
|
#define DEVICE_BUFFER_POOL_PRE_ALLOC_SIZE 3
|
|
#define DEVICE_BUFFER_PRE_ALLOC_SIZE 3200000
|
|
|
|
// 1s == 1000ms flush
|
|
#define CUPTI_FLUSH_TIME 1000
|
|
|
|
// host pre alloc memory 1mb * 10
|
|
#define CLIENT_BUFFER_SIZE static_cast<std::size_t>(1 * 1024 * 1024)
|
|
#define CLIENT_BUFFER_NUM 10
|
|
#define CLIENT_BUFFER_LIMIT 32
|
|
|
|
namespace {
|
|
|
|
using buffer_ptr = std::unique_ptr<uint8_t[]>;
|
|
|
|
std::mutex g_buffer_pool_mutex;
|
|
std::vector<buffer_ptr> g_buffer_pool(CLIENT_BUFFER_NUM);
|
|
std::size_t g_buffer_pool_cnt = CLIENT_BUFFER_NUM;
|
|
|
|
std::thread g_consume_buffer_from_cupti_task;
|
|
std::condition_variable g_client_buffer_notify;
|
|
std::mutex g_process_buffer_mutex;
|
|
std::vector<std::pair<buffer_ptr, size_t>> g_process_buffer;
|
|
|
|
std::uint8_t g_need_to_stop = 0;
|
|
|
|
// ##################################//
|
|
// prepare the name of dir and path //
|
|
// ##################################//
|
|
const char *_logger_dir = std::getenv("VTIMELINE_LOGGER_DIR");
|
|
const char *_hostname = std::getenv("HOSTNAME");
|
|
const char *_local_rank = std::getenv("LOCAL_RANK");
|
|
const char *_rank = std::getenv("RANK");
|
|
|
|
const std::string g_logger_dir =
|
|
_logger_dir == nullptr ? "/var/log" : std::string(_logger_dir);
|
|
const std::string g_hostname =
|
|
_hostname == nullptr ? "unkown-hostname" : std::string(_hostname);
|
|
const std::string g_local_rank = _local_rank == nullptr ? "-1" : std::string(_local_rank);
|
|
const std::string g_rank = _rank == nullptr ? "-1" : std::string(_rank);
|
|
|
|
void init_spdlog_env() {
|
|
const size_t max_file_size = static_cast<size_t>(200 * 1024 * 1024); // 200M
|
|
const size_t max_files = 5;
|
|
|
|
const size_t q_size = 102400;
|
|
const size_t thread_count = 3;
|
|
|
|
const uint64_t flush_time = 3;
|
|
|
|
static std::mutex env_mutex;
|
|
std::lock_guard<std::mutex> locker(env_mutex);
|
|
|
|
std::string log_dir_name = g_logger_dir + "/cupti/" + g_hostname;
|
|
std::string log_file_name = log_dir_name + "/rank_" + g_local_rank + ".log";
|
|
|
|
auto rotating_sink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(
|
|
log_file_name, max_file_size, max_files);
|
|
|
|
std::vector<spdlog::sink_ptr> sinks{rotating_sink};
|
|
|
|
spdlog::init_thread_pool(q_size, thread_count);
|
|
|
|
auto logger = std::make_shared<spdlog::async_logger>(
|
|
"cupti", sinks.begin(), sinks.end(), spdlog::thread_pool(),
|
|
spdlog::async_overflow_policy::discard_new);
|
|
|
|
spdlog::set_default_logger(logger);
|
|
spdlog::set_pattern("%v");
|
|
spdlog::flush_every(std::chrono::seconds(flush_time));
|
|
}
|
|
|
|
void CUPTIAPI requestBuffer(uint8_t **buffer, size_t *size, size_t *maxNumRecords) {
|
|
*maxNumRecords = 0;
|
|
|
|
{
|
|
std::unique_lock<std::mutex> locker(g_buffer_pool_mutex);
|
|
if (!g_buffer_pool.empty()) {
|
|
*buffer = g_buffer_pool.back().release();
|
|
*size = CLIENT_BUFFER_SIZE;
|
|
g_buffer_pool.pop_back();
|
|
return;
|
|
}
|
|
}
|
|
|
|
if (g_buffer_pool_cnt >= CLIENT_BUFFER_LIMIT) {
|
|
std::cerr << "Buffer pool limit reached, cannot allocate new buffer!\n";
|
|
*buffer = nullptr;
|
|
*size = 0;
|
|
return;
|
|
}
|
|
|
|
// extend buffer
|
|
std::cerr << "Allocating new buffer, pool count: " << g_buffer_pool_cnt << "\n";
|
|
auto buffer_addr = std::make_unique<uint8_t[]>(CLIENT_BUFFER_SIZE);
|
|
|
|
*buffer = buffer_addr.release();
|
|
*size = CLIENT_BUFFER_SIZE;
|
|
g_buffer_pool_cnt += 1;
|
|
}
|
|
|
|
// NOTE: the context and stream_id is NULL, be deprecated as of CUDA 6.0
|
|
void CUPTIAPI processBuffer(CUcontext context [[maybe_unused]],
|
|
uint32_t stream_id [[maybe_unused]], uint8_t *buffer,
|
|
size_t size, size_t valid_size) {
|
|
if (size == 0 || buffer == nullptr) {
|
|
return;
|
|
}
|
|
|
|
{
|
|
std::unique_lock<std::mutex> locker(g_process_buffer_mutex);
|
|
auto item = std::make_pair(buffer_ptr(buffer), valid_size);
|
|
g_process_buffer.emplace_back(std::move(item));
|
|
}
|
|
|
|
g_client_buffer_notify.notify_one();
|
|
}
|
|
|
|
void consume_record_from_buffer(CUpti_Activity *record) {
|
|
switch (record->kind) {
|
|
case CUPTI_ACTIVITY_KIND_KERNEL: {
|
|
CUpti_ActivityKernel9 *kernel = reinterpret_cast<CUpti_ActivityKernel9 *>(record);
|
|
// kernel be put in the queue to ready to submit
|
|
uint64_t start_ts = kernel->start;
|
|
uint64_t end_ts = kernel->end;
|
|
|
|
uint32_t stream_id = kernel->streamId;
|
|
const std::string name(kernel->name);
|
|
|
|
// one rank for one device
|
|
spdlog::info("{},{},{},{},{},{}", start_ts, g_rank, stream_id, "KERNEL", name,
|
|
"B");
|
|
spdlog::info("{},{},{},{},{},{}", end_ts, g_rank, stream_id, "KERNEL", name, "E");
|
|
break;
|
|
}
|
|
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
void consume_buffer_from_cupti() {
|
|
while (true) {
|
|
buffer_ptr buffer_addr;
|
|
size_t valid_size = 0;
|
|
|
|
{
|
|
std::unique_lock<std::mutex> locker(g_process_buffer_mutex);
|
|
if (g_process_buffer.empty()) {
|
|
return;
|
|
}
|
|
auto buffer_item = std::move(g_process_buffer.back());
|
|
g_process_buffer.pop_back();
|
|
|
|
buffer_addr = std::move(buffer_item.first);
|
|
valid_size = buffer_item.second;
|
|
}
|
|
|
|
if (valid_size > 0) {
|
|
CUpti_Activity *record = nullptr;
|
|
auto *buffer = buffer_addr.get();
|
|
do {
|
|
CUptiResult status =
|
|
cuptiActivityGetNextRecord(buffer, valid_size, &record);
|
|
|
|
if (status == CUPTI_SUCCESS) {
|
|
consume_record_from_buffer(record);
|
|
} else {
|
|
break;
|
|
}
|
|
} while (true);
|
|
}
|
|
|
|
{
|
|
std::unique_lock<std::mutex> locker(g_buffer_pool_mutex);
|
|
g_buffer_pool.emplace_back(std::move(buffer_addr));
|
|
}
|
|
}
|
|
}
|
|
|
|
void init_buffer_process_task() {
|
|
// init the buffer pool
|
|
std::generate_n(g_buffer_pool.begin(), CLIENT_BUFFER_NUM, []() {
|
|
return std::make_unique<uint8_t[]>(CLIENT_BUFFER_SIZE);
|
|
});
|
|
|
|
g_consume_buffer_from_cupti_task = std::move(std::thread([]() {
|
|
g_need_to_stop = 0;
|
|
|
|
while (true) {
|
|
{
|
|
std::unique_lock<std::mutex> locker(g_process_buffer_mutex);
|
|
g_client_buffer_notify.wait(locker, []() {
|
|
return !g_process_buffer.empty() || g_need_to_stop;
|
|
});
|
|
}
|
|
|
|
// process record
|
|
consume_buffer_from_cupti();
|
|
|
|
if (g_need_to_stop) {
|
|
return;
|
|
}
|
|
}
|
|
}));
|
|
}
|
|
|
|
bool init_cupti_env() {
|
|
CUpti_SubscriberHandle handler = nullptr;
|
|
CUptiResult result = cuptiSubscribe(&handler, nullptr, nullptr);
|
|
if (result != CUPTI_SUCCESS) {
|
|
std::cerr << "Failed to subscribe CUPTI, error code " << static_cast<int>(result)
|
|
<< "\n";
|
|
return false;
|
|
}
|
|
|
|
size_t attr_value_size = 0;
|
|
|
|
// set device buffer pool limit == pre allocate value
|
|
// set device buffer size, so peak cuda memory size is buffer size * buffer limit
|
|
// set flush time
|
|
// set callbacks to enable the function
|
|
size_t buffer_pool_limit_size = DEVICE_BUFFER_POOL_PRE_ALLOC_SIZE;
|
|
attr_value_size = sizeof(buffer_pool_limit_size);
|
|
result = cuptiActivitySetAttribute(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_POOL_LIMIT,
|
|
&attr_value_size, (void *)&buffer_pool_limit_size);
|
|
if (result != CUPTI_SUCCESS) {
|
|
std::cerr << "Failed to set device buffer pool limit, error code: "
|
|
<< static_cast<int>(result) << "\n";
|
|
return false;
|
|
}
|
|
|
|
result =
|
|
cuptiActivitySetAttribute(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_PRE_ALLOCATE_VALUE,
|
|
&attr_value_size, (void *)&buffer_pool_limit_size);
|
|
if (result != CUPTI_SUCCESS) {
|
|
std::cerr << "Failed to set device buffer pre-allocate value, error code: "
|
|
<< static_cast<int>(result) << "\n";
|
|
return false;
|
|
}
|
|
|
|
size_t buffer_size = DEVICE_BUFFER_PRE_ALLOC_SIZE;
|
|
attr_value_size = sizeof(buffer_size);
|
|
result = cuptiActivitySetAttribute(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_SIZE,
|
|
&attr_value_size, (void *)&buffer_size);
|
|
if (result != CUPTI_SUCCESS) {
|
|
std::cerr << "Failed to set device buffer size, error code: "
|
|
<< static_cast<int>(result) << "\n";
|
|
return false;
|
|
}
|
|
|
|
result = cuptiActivityRegisterCallbacks(requestBuffer, processBuffer);
|
|
if (result != CUPTI_SUCCESS) {
|
|
std::cerr << "Failed to register activity callbacks, error code: "
|
|
<< static_cast<int>(result) << "\n";
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void enable_cupti_activity() {
|
|
std::array<CUpti_ActivityKind, 2> activity_kinds = {
|
|
CUPTI_ACTIVITY_KIND_KERNEL,
|
|
CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL,
|
|
};
|
|
|
|
for (const CUpti_ActivityKind kind : activity_kinds) {
|
|
CUptiResult result = cuptiActivityEnable(kind);
|
|
if (result != CUPTI_SUCCESS) {
|
|
std::cerr << "Failed to enable activity: " << static_cast<int>(kind)
|
|
<< ", error code: " << static_cast<int>(result) << "\n";
|
|
}
|
|
}
|
|
}
|
|
|
|
} // namespace
|
|
|
|
extern "C" int enable_vtimeline(void) {
|
|
::init_spdlog_env();
|
|
|
|
::init_buffer_process_task();
|
|
|
|
if (!::init_cupti_env()) {
|
|
return 1;
|
|
}
|
|
|
|
::enable_cupti_activity();
|
|
|
|
return 0;
|
|
}
|
|
|
|
extern "C" int disable_vtimeline(void) {
|
|
cuptiActivityFlushAll(CUPTI_ACTIVITY_FLAG_FLUSH_FORCED);
|
|
|
|
g_need_to_stop = 1;
|
|
g_client_buffer_notify.notify_one();
|
|
|
|
// wait the backgroud task
|
|
g_consume_buffer_from_cupti_task.join();
|
|
|
|
return 0;
|
|
} |