profiling: Improve connection management.

There can now be multiple ProfilingSessions profiling the same PID.
For now, the ClientConfiguration of the first one wins.

Test: m
Test: flash walleye
Test: profile system_server

Change-Id: Ia8eef00753a75a449c03b87bd0abf3e75fddafdb
diff --git a/src/profiling/memory/heapprofd_integrationtest.cc b/src/profiling/memory/heapprofd_integrationtest.cc
index 44e4292..bb361ed 100644
--- a/src/profiling/memory/heapprofd_integrationtest.cc
+++ b/src/profiling/memory/heapprofd_integrationtest.cc
@@ -43,11 +43,14 @@
   void TearDown() override { DESTROY_TEST_SOCK(kSocketName); }
 };
 
-// TODO(fmayer): Fix out of tree integration test.
-#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
+// ASAN does not like sendmsg of the stack.
+// TODO(fmayer): Try to fix this more properly.
+#ifndef ADDRESS_SANITIZER
 #define MAYBE_EndToEnd EndToEnd
+#define MAYBE_MultiSession MultiSession
 #else
 #define MAYBE_EndToEnd DISABLED_EndToEnd
+#define MAYBE_MultiSession DISABLED_MultiSession
 #endif
 
 TEST_F(HeapprofdIntegrationTest, MAYBE_EndToEnd) {
@@ -63,14 +66,50 @@
         // TODO(fmayer): Test symbolization and result of unwinding.
         // This check will only work on in-tree builds as out-of-tree
         // libunwindstack is behaving a bit weirdly.
+// TODO(fmayer): Fix out of tree integration test.
+#if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
         BookkeepingRecord bookkeeping_record;
         ASSERT_TRUE(HandleUnwindingRecord(&r, &bookkeeping_record));
         bookkeeping_thread.HandleBookkeepingRecord(&bookkeeping_record);
+#endif
+        base::ignore_result(r);
+        base::ignore_result(bookkeeping_thread);
         done();
       },
       &bookkeeping_thread);
 
-  listener.ExpectPID(getpid(), {kSamplingInterval});
+  auto session = listener.ExpectPID(getpid(), {kSamplingInterval});
+  auto sock = base::UnixSocket::Listen(kSocketName, &listener, &task_runner);
+  if (!sock->is_listening()) {
+    PERFETTO_ELOG("Socket not listening.");
+    PERFETTO_CHECK(false);
+  }
+  std::thread th([kSamplingInterval] {
+    Client client(kSocketName, 1);
+    SomeFunction(&client);
+    EXPECT_EQ(client.client_config_for_testing().interval, kSamplingInterval);
+  });
+
+  task_runner.RunUntilCheckpoint("done");
+  th.join();
+}
+
+TEST_F(HeapprofdIntegrationTest, MAYBE_MultiSession) {
+  GlobalCallstackTrie callsites;
+  // TODO(fmayer): Actually test the dump.
+  BookkeepingThread bookkeeping_thread("");
+
+  base::TestTaskRunner task_runner;
+  auto done = task_runner.CreateCheckpoint("done");
+  constexpr uint64_t kSamplingInterval = 123;
+  SocketListener listener([&done](UnwindingRecord) { done(); },
+                          &bookkeeping_thread);
+
+  auto session = listener.ExpectPID(getpid(), {kSamplingInterval});
+  // Allow to get a second session, but it will still use the previous
+  // sampling rate.
+  auto session2 = listener.ExpectPID(getpid(), {kSamplingInterval + 1});
+  session = SocketListener::ProfilingSession();
   auto sock = base::UnixSocket::Listen(kSocketName, &listener, &task_runner);
   if (!sock->is_listening()) {
     PERFETTO_ELOG("Socket not listening.");
diff --git a/src/profiling/memory/socket_listener.cc b/src/profiling/memory/socket_listener.cc
index 654fe4e..26e6ecd 100644
--- a/src/profiling/memory/socket_listener.cc
+++ b/src/profiling/memory/socket_listener.cc
@@ -121,24 +121,12 @@
     pid_t pid,
     ClientConfiguration cfg) {
   PERFETTO_DLOG("Expecting connection from %d", pid);
-  bool inserted;
-  std::tie(std::ignore, inserted) = process_info_.emplace(pid, std::move(cfg));
-  if (!inserted)
-    return ProfilingSession(0, nullptr);
-  return ProfilingSession(pid, this);
-}
-
-void SocketListener::ShutdownPID(pid_t pid) {
-  PERFETTO_DLOG("Shutting down connecting from %d", pid);
-  auto it = process_info_.find(pid);
-  if (it == process_info_.end()) {
-    PERFETTO_DFATAL("Shutting down nonexistant pid.");
-    return;
-  }
-  ProcessInfo& process_info = it->second;
-  // Disconnect all sockets for process.
-  for (base::UnixSocket* socket : process_info.sockets)
-    socket->Shutdown(true);
+  decltype(process_info_)::iterator it;
+  std::tie(it, std::ignore) =
+      process_info_.emplace(pid, ProcessInfo{pid, std::move(cfg)});
+  ProcessInfo* process_info = &(it->second);
+  AddProfilingSession(process_info);
+  return ProfilingSession(process_info, this);
 }
 
 void SocketListener::RecordReceived(base::UnixSocket* self,
@@ -171,5 +159,17 @@
                       std::move(weak_metadata)});
 }
 
+void SocketListener::AddProfilingSession(ProcessInfo* process_info) {
+  ++process_info->active_profile_sessions;
+}
+
+void SocketListener::RemoveProfilingSession(ProcessInfo* process_info) {
+  if (--process_info->active_profile_sessions == 0) {
+    for (base::UnixSocket* socket : process_info->sockets)
+      socket->Shutdown(true);
+    process_info_.erase(process_info->pid);
+  }
+}
+
 }  // namespace profiling
 }  // namespace perfetto
diff --git a/src/profiling/memory/socket_listener.h b/src/profiling/memory/socket_listener.h
index 8c2c3e8..d0053f0 100644
--- a/src/profiling/memory/socket_listener.h
+++ b/src/profiling/memory/socket_listener.h
@@ -31,23 +31,36 @@
 namespace profiling {
 
 class SocketListener : public base::UnixSocket::EventListener {
+ private:
+  struct ProcessInfo {
+    ProcessInfo(pid_t p, ClientConfiguration cfg)
+        : pid(p), client_config(std::move(cfg)) {}
+
+    pid_t pid;
+    size_t active_profile_sessions = 0;
+    ClientConfiguration client_config;
+    std::set<base::UnixSocket*> sockets;
+  };
+
  public:
   friend class ProfilingSession;
   class ProfilingSession {
    public:
     friend class SocketListener;
 
+    ProfilingSession() : ProfilingSession(nullptr, nullptr) {}
+
     ProfilingSession(ProfilingSession&& other)
-        : pid_(other.pid_), listener_(other.listener_) {
+        : process_info_(other.process_info_), listener_(other.listener_) {
       other.listener_ = nullptr;
     }
 
     ~ProfilingSession() {
       if (listener_)
-        listener_->ShutdownPID(pid_);
+        listener_->RemoveProfilingSession(process_info_);
     }
     ProfilingSession& operator=(ProfilingSession&& other) {
-      pid_ = other.pid_;
+      process_info_ = other.process_info_;
       listener_ = other.listener_;
       other.listener_ = nullptr;
       return *this;
@@ -59,10 +72,10 @@
     ProfilingSession& operator=(const ProfilingSession&) = delete;
 
    private:
-    ProfilingSession(pid_t pid, SocketListener* listener)
-        : pid_(pid), listener_(listener) {}
+    ProfilingSession(ProcessInfo* process_info, SocketListener* listener)
+        : process_info_(process_info), listener_(listener) {}
 
-    pid_t pid_;
+    ProcessInfo* process_info_;
     SocketListener* listener_ = nullptr;
   };
 
@@ -79,12 +92,6 @@
   ProfilingSession ExpectPID(pid_t pid, ClientConfiguration cfg);
 
  private:
-  struct ProcessInfo {
-    ProcessInfo(ClientConfiguration cfg) : client_config(std::move(cfg)) {}
-    ClientConfiguration client_config;
-    std::set<base::UnixSocket*> sockets;
-  };
-
   struct Entry {
     Entry(std::unique_ptr<base::UnixSocket> s) : sock(std::move(s)) {}
     // Only here for ownership of the object.
@@ -102,7 +109,9 @@
   };
 
   void RecordReceived(base::UnixSocket*, size_t, std::unique_ptr<uint8_t[]>);
-  void ShutdownPID(pid_t pid);
+
+  void AddProfilingSession(ProcessInfo* process_info);
+  void RemoveProfilingSession(ProcessInfo* process_info);
 
   std::map<base::UnixSocket*, Entry> sockets_;
   std::map<pid_t, std::weak_ptr<UnwindingMetadata>> unwinding_metadata_;