Lockless logging

This commit is contained in:
Dr. Chat 2017-02-04 19:26:39 -06:00
parent c538671c24
commit 65ca7fc416
1 changed files with 81 additions and 34 deletions

View File

@ -14,9 +14,12 @@
#include <atomic> #include <atomic>
#include <cinttypes> #include <cinttypes>
#include <cstdarg> #include <cstdarg>
#include <cstdlib>
#include <mutex> #include <mutex>
#include <vector> #include <vector>
#include "xenia/base/atomic.h"
#include "xenia/base/debugging.h"
#include "xenia/base/filesystem.h" #include "xenia/base/filesystem.h"
#include "xenia/base/main.h" #include "xenia/base/main.h"
#include "xenia/base/math.h" #include "xenia/base/math.h"
@ -29,8 +32,8 @@
#include "xenia/base/platform_win.h" #include "xenia/base/platform_win.h"
#endif // XE_PLATFORM_WIN32 #endif // XE_PLATFORM_WIN32
DEFINE_string(log_file, "", DEFINE_string(log_file, "xenia.log", "Logs are written to the given file.");
"Logs are written to the given file instead of the default."); DEFINE_bool(log_debugprint, false, "Dump the log to DebugPrint.");
DEFINE_bool(flush_log, true, "Flush log file after each log line batch."); DEFINE_bool(flush_log, true, "Flush log file after each log line batch.");
namespace xe { namespace xe {
@ -42,15 +45,11 @@ thread_local std::vector<char> log_format_buffer_(64 * 1024);
class Logger { class Logger {
public: public:
explicit Logger(const std::wstring& app_name) explicit Logger(const std::wstring& app_name) : running_(true) {
: ring_buffer_(buffer_, kBufferSize), running_(true) {
if (!FLAGS_log_file.empty()) { if (!FLAGS_log_file.empty()) {
auto file_path = xe::to_wstring(FLAGS_log_file.c_str()); auto file_path = xe::to_wstring(FLAGS_log_file.c_str());
xe::filesystem::CreateParentFolder(file_path); xe::filesystem::CreateParentFolder(file_path);
file_ = xe::filesystem::OpenFile(file_path, "wt"); file_ = xe::filesystem::OpenFile(file_path, "wt");
} else {
auto file_path = app_name + L".log";
file_ = xe::filesystem::OpenFile(file_path, "wt");
} }
flush_event_ = xe::threading::Event::CreateAutoResetEvent(false); flush_event_ = xe::threading::Event::CreateAutoResetEvent(false);
@ -70,44 +69,84 @@ class Logger {
void AppendLine(uint32_t thread_id, const char level_char, const char* buffer, void AppendLine(uint32_t thread_id, const char level_char, const char* buffer,
size_t buffer_length) { size_t buffer_length) {
LogLine line; LogLine line;
line.buffer_length = buffer_length;
line.thread_id = thread_id; line.thread_id = thread_id;
line.level_char = level_char; line.level_char = level_char;
line.buffer_length = buffer_length;
// First, run a check and see if we can increment write
// head without any problems. If so, cmpxchg it to reserve some space in the
// ring. If someone beats us, loop around.
//
// Once we have a reservation, write our data and then increment the write
// tail.
size_t size = sizeof(LogLine) + buffer_length;
while (true) { while (true) {
mutex_.lock(); // Attempt to make a reservation.
if (ring_buffer_.write_count() < sizeof(line) + buffer_length) { size_t write_head = write_head_;
// Buffer is full. Stall. size_t read_head = read_head_;
mutex_.unlock();
RingBuffer rb(buffer_, kBufferSize);
rb.set_write_offset(write_head);
rb.set_read_offset(read_head);
if (rb.write_count() < size) {
xe::threading::MaybeYield(); xe::threading::MaybeYield();
continue; continue;
} }
ring_buffer_.Write(&line, sizeof(LogLine));
ring_buffer_.Write(buffer, buffer_length); // We have enough size to make a reservation!
mutex_.unlock(); rb.AdvanceWrite(size);
break; if (xe::atomic_cas(write_head, rb.write_offset(), &write_head_)) {
// Reservation made. Write out logline.
rb.set_write_offset(write_head);
rb.Write(&line, sizeof(LogLine));
rb.Write(buffer, buffer_length);
while (!xe::atomic_cas(write_head, rb.write_offset(), &write_tail_)) {
// Done writing. End the reservation.
xe::threading::MaybeYield();
} }
break;
} else {
// Someone beat us to the chase. Loop around.
continue;
}
}
// Kick the consumer thread
flush_event_->Set(); flush_event_->Set();
} }
private: private:
static const size_t kBufferSize = 32 * 1024 * 1024; static const size_t kBufferSize = 8 * 1024 * 1024;
struct LogLine { struct LogLine {
size_t buffer_length;
uint32_t thread_id; uint32_t thread_id;
char level_char; char level_char;
size_t buffer_length;
}; };
void Write(const char* buf, size_t size) {
if (file_) {
fwrite(buf, 1, size, file_);
}
if (FLAGS_log_debugprint) {
debugging::DebugPrint("%.*s", size, buf);
}
}
void WriteThread() { void WriteThread() {
RingBuffer rb(buffer_, kBufferSize);
while (running_) { while (running_) {
mutex_.lock();
bool did_write = false; bool did_write = false;
while (!ring_buffer_.empty()) { rb.set_write_offset(write_tail_);
while (!rb.empty()) {
did_write = true; did_write = true;
// Read line header and write out the line prefix. // Read line header and write out the line prefix.
LogLine line; LogLine line;
ring_buffer_.Read(&line, sizeof(line)); rb.Read(&line, sizeof(line));
char prefix[] = { char prefix[] = {
line.level_char, line.level_char,
'>', '>',
@ -125,14 +164,16 @@ class Logger {
}; };
std::snprintf(prefix + 3, sizeof(prefix) - 3, "%08" PRIX32 " ", std::snprintf(prefix + 3, sizeof(prefix) - 3, "%08" PRIX32 " ",
line.thread_id); line.thread_id);
fwrite(prefix, 1, sizeof(prefix) - 1, file_); Write(prefix, sizeof(prefix) - 1);
if (line.buffer_length) { if (line.buffer_length) {
// Get access to the line data - which may be split in the ring buffer // Get access to the line data - which may be split in the ring buffer
// - and write it out in parts. // - and write it out in parts.
auto line_range = ring_buffer_.BeginRead(line.buffer_length); auto line_range = rb.BeginRead(line.buffer_length);
fwrite(line_range.first, 1, line_range.first_length, file_); Write(reinterpret_cast<const char*>(line_range.first),
line_range.first_length);
if (line_range.second_length) { if (line_range.second_length) {
fwrite(line_range.second, 1, line_range.second_length, file_); Write(reinterpret_cast<const char*>(line_range.second),
line_range.second_length);
} }
// Always ensure there is a newline. // Always ensure there is a newline.
char last_char = line_range.second char last_char = line_range.second
@ -140,28 +181,33 @@ class Logger {
: line_range.first[line_range.first_length - 1]; : line_range.first[line_range.first_length - 1];
if (last_char != '\n') { if (last_char != '\n') {
const char suffix[1] = {'\n'}; const char suffix[1] = {'\n'};
fwrite(suffix, 1, sizeof(suffix), file_); Write(suffix, 1);
} }
ring_buffer_.EndRead(std::move(line_range)); rb.EndRead(std::move(line_range));
} else { } else {
const char suffix[1] = {'\n'}; const char suffix[1] = {'\n'};
fwrite(suffix, 1, sizeof(suffix), file_); Write(suffix, 1);
} }
rb.set_write_offset(write_tail_);
read_head_ = rb.read_offset();
} }
mutex_.unlock();
if (did_write) { if (did_write) {
if (FLAGS_flush_log) { if (FLAGS_flush_log) {
fflush(file_); fflush(file_);
} }
} }
xe::threading::Wait(flush_event_.get(), true); xe::threading::Wait(flush_event_.get(), true,
std::chrono::milliseconds(1));
} }
} }
FILE* file_ = nullptr; size_t write_head_ = 0;
size_t write_tail_ = 0;
size_t read_head_ = 0;
uint8_t buffer_[kBufferSize]; uint8_t buffer_[kBufferSize];
RingBuffer ring_buffer_; FILE* file_ = nullptr;
std::mutex mutex_;
std::atomic<bool> running_; std::atomic<bool> running_;
std::unique_ptr<xe::threading::Event> flush_event_; std::unique_ptr<xe::threading::Event> flush_event_;
std::unique_ptr<xe::threading::Thread> write_thread_; std::unique_ptr<xe::threading::Thread> write_thread_;
@ -169,7 +215,8 @@ class Logger {
void InitializeLogging(const std::wstring& app_name) { void InitializeLogging(const std::wstring& app_name) {
// We leak this intentionally - lots of cleanup code needs it. // We leak this intentionally - lots of cleanup code needs it.
logger_ = new Logger(app_name); void* mem = _aligned_malloc(sizeof(Logger), 0x10);
logger_ = new (mem) Logger(app_name);
} }
void LogLineFormat(const char level_char, const char* fmt, ...) { void LogLineFormat(const char level_char, const char* fmt, ...) {