Skip to content

Online Queries

Online queries on Confluo are executed automatically as new records are written into an Atomic MultiLog. Therefore, these queries need to be pre-defined; the guide on Data Storage describes how we can define filters, aggregates and triggers for online evaluation. In this guide, we will look at how we can retrieve the results of these online queries.

Embedded Mode

We begin with the embedded mode of operation. We work with the assumption that the Atomic MultiLog has already been created, and filters, aggregates and triggers have been added to it as outlined in Data Storage.

To start with, we have a reference to the Atomic MultiLog as follows:

confluo::atomic_multilog* mlog = store.get_atomic_multilog("perf_log");

Querying Pre-defined Filters

We can query a pre-defined filter as follows:

auto record_stream = mlog->query_filter("low_resources", 0, UINT64_MAX);
for (auto s = record_stream; !s.empty(); s = s.tail()) {
  std::cout << s.head().to_string();
}

The first parameter corresponds to the name of the filter to be queried, while the second and third parameters correspond to the begining timestamp and end timestamp to consider for records in the filter. We've specified them to capture all possible values of timestamp. This operation returns a lazily evaluated record stream which supports functional semantics such as filter, map, etc. See Confluo's Stream API for more details on how to work with lazy streams.

Obtaining Pre-defined Aggregates

We can obtian a pre-defined aggregate as follows:

auto value = mlog->get_aggregate("max_latency_ms", 0, UINT64_MAX);
std::cout << value.to_string();

The operation takes the name of the aggregate as its first parameter, while the second and third parameters correspond to begin and end timestmaps, as with pre-defined filters. The query returns a numeric object, which is a wrapper around numeric values in C++.

Obtaining Alerts from a Pre-defined Trigger

Finally, we can obtain alerts generated by triggers installed on an Atomic MultiLog as follows:

auto alert_stream = mlog->get_alerts(0, UINT64_MAX, "high_latency_trigger");
for (auto s = alert_stream; !s.empty(); s = s.tail()) {
  std::cout << s.head().to_string();
}

The query takes begin and end timestamps as its first and second arguments, and an optional trigger name as its third argument. The query returns a lazy stream over generated alerts for this trigger in the specified time-range.

Stand-alone Mode

The API for Stand-alone mode of operation is quite similar to the embedded mode. We only focus on the C++ Client API, since Python and Java Client APIs are almost identical to the C++ Client API.

As with the embedded mode, we work with the assumption that the client is connected to the server, has already created the Atomic MultiLog, and added all relevant filters, aggregates and triggers. Also, the current Atomic MultiLog for the client has been set to perf_log as follows:

client.set_current_atomic_multilog("perf_log");
client.set_current_atomic_multilog("perf_log")

Querying Pre-defined Filters

We can query a pre-defined filter as follows:

auto record_stream = client.query_filter("low_resources", 0, UINT64_MAX);
for (auto s = record_stream; !s.empty(); ++s) {
  std::cout << s.get().to_string();
}
import sys
record_stream = client.query_filter("low_resources", 0, sys.maxsize);
for r in record_stream:
  print r

This operation returns a lazy stream of records, which automatically fetches more data from the server as the clients consumes them.

Obtaining Pre-defined Aggregates

We can obtian a pre-defined aggregate as follows:

std::string value = client.get_aggregate("max_latency_ms", 0, UINT64_MAX);
std::cout << value;
import sys
value = client.get_aggregate("max_latency_ms", 0, sys.maxint)
print value

The operation returns a string representation of the aggregate.

Obtaining Alerts from a Pre-defined Trigger

Finally, we can obtain alerts generated by triggers installed on an Atomic MultiLog as follows:

auto alert_stream = client.get_alerts(0, UINT64_MAX, "high_latency_trigger");
for (auto s = alert_stream; !s.empty(); ++s) {
  std::cout << s.get();
}
import sys
alert_stream = client.get_alerts(0, sys.maxint, "high_latency_trigger")
for a in alert_stream:
  print a

Similar to the filter query, this operation returns a lazy stream of alerts, which automatically fetches more data from the server as the clients consumes them.