Quick Start¶
In this Quick Start, we will take a look at how to download and setup Confluo, load some sample data, and query it.
Pre-requisites¶
- MacOS X or Unix-based OS; Windows is not yet supported.
- C++ compiler that supports C++11 standard (e.g., GCC 5.3 or later)
- CMake 3.2 or later
- Boost 1.58 or later
For python client, you will additionally require:
- Python 2.7 or later
- Python Packages: setuptools, six 1.7.2 or later
For java client, you will additionally require:
- Java JDK 1.7 or later
- ant 1.6.2 or later
Download and Install¶
To download and install Confluo, use the following commands:
git clone https://github.com/ucbrise/confluo.git
cd confluo
mkdir build
cd build
cmake ..
make -j && make test && make install
Using Confluo¶
Confluo can be used in two modes -- embedded and stand-alone. In the embedded mode, Confluo is used as a header-only library in C++, allowing Confluo to use the same address-space as the application process. In the stand-alone mode, Confluo runs as a daemon server process, allowing clients to communicate with it using Apache Thrift protocol.
Embedded Mode¶
In order to use Confluo in the embedded mode, we simply need to include
Confluo's header files under libconfluo/confluo, use the Confluo C++ API in
a C++ application, and compile using a modern C++ compiler. The entry point
header file to include is confluo_store.h
.
We will first create a new Confluo Store with the data path for it to use as follows:
confluo::confluo_store store("/path/to/data");
We then create a new Atomic MultiLog within the Store (synonymous to a database table); this requires three parameters: name, schema, and the storage mode:
std::string schema = "{
timestamp: LONG,
op_latency_ms: DOUBLE,
cpu_util: DOUBLE,
mem_avail: DOUBLE,
log_msg: STRING(100)
}";
auto storage_mode = confluo::storage::IN_MEMORY;
store.create_atomic_multilog("perf_log", schema, storage_mode);
Our schema contains 5 attributes: a signed 8-byte integer timestamp, double floating-point precision operation latency (in ms), CPU utilization and available memory, and a string log message field (upper bounded to 100 characters).
We then obtain a reference to our newly created Atomic MultiLog:
confluo::atomic_multilog* mlog = store.get_atomic_multilog("perf_log");
We can define indexes on the Atomic MultiLog as follows:
mlog->add_index("op_latency_ms");
to add an index on op_latency_ms
attribute. We can also install filters as follows:
mlog->add_filter("low_resources", "cpu_util>0.8 || mem_avail<0.1");
to explicitly filter out records that indicate low system resources (CPU
utilization > 80%, Available Memory < 10%), using a filter named low_resources
.
Additionally, we can add aggregates on filters as follows:
mlog->add_aggregate("max_latency_ms", "low_resources", "MAX(op_latency_ms)");
This adds a new stored aggregate max_latency_ms
on the filter
low_resources
we defined before. In essence, it records the highest
operation latency reported in any record that also indicated low
available resources.
Finally, we can install a trigger on aggregates as follows:
mlog->install_trigger("high_latency_trigger", "max_latency > 1000");
This installs a trigger high_latency_trigger
on the aggregate
max_latency_ms
, which should generate an alert whenever the
condition max_latency_ms > 1000
is satisfied, i.e.,
whenever the maximum latency for an operation exceeds 1s and
the available resources are low.
We are now ready to load some data into this multilog:
size_t off1 = mlog->append({"100", "0.5", "0.9", "INFO: Launched 1 tasks"});
size_t off2 = mlog->append({"500", "0.9", "0.05", "WARN: Server {2} down"});
size_t off3 = mlog->append({"1001", "0.9", "0.03", "WARN: Server {2, 4, 5} down"});
Note that the append
method takes a vector of strings as its input, where
the vector corresponds to a single record. The number of entries in the
vector must match the number of entries in the schema, with the exception
of the timestamp --- if the timestamp is not provided, Confluo will automatically
assign one.
Also note that the operation returns a unique offset corresponding to each append operation. This forms the "key" for records stored in the Atomic MultiLog -- records can be retrieved by specifying their corresponding offsets.
Now we take a look at how we can query the data in the Atomic MultiLog. First, it is straightforward to retrieve records given their offsets:
auto record1 = mlog->read(off1);
auto record2 = mlog->read(off2);
auto record3 = mlog->read(off3);
Each of record1
, record2
, and record3
are vectors of strings.
We can query indexed attributes as follows:
auto record_stream1 = mlog->execute_filter("cpu_util>0.5 || mem_avail<0.5");
for (auto s = record_stream1; !s.empty(); s = s.tail()) {
std::cout << s.head().to_string();
}
Note that the operation returns a lazily evaluated stream, which supports functional style operations like map, filter, etc. See stream.h for more details.
We can also query the defined filter as follows:
auto record_stream2 = mlog->query_filter("low_resources", 0, UINT64_MAX);
for (auto s = record_stream2; !s.empty(); s = s.tail()) {
std::cout << s.head().to_string();
}
The first parameter corresponds to the name of the filter to be queried, while
the second and third parameters correspond to the begining timestamp and end
timestamp to consider for records in the filter. We've specified them to capture
all possible values of timestamp. Similar to the execute_filter
query, this
operation also returns a lazily evaluated record stream.
We query aggregates as follows:
auto value = mlog->get_aggregate("max_latency_ms", 0, UINT64_MAX);
std::cout << value.to_string();
The query takes the name of the aggregate as its first parameter, while the
second and third parameters correspond to begin and end timestmaps, as before.
The query returns a numeric
object, which is a wrapper around numeric values.
Finally, we can query the generated alerts by triggers we have installed as follows:
auto alert_stream = mlog->get_alerts(0, UINT64_MAX, "high_latency_trigger");
for (auto s = alert_stream; !s.empty(); s = s.tail()) {
std::cout << s.head().to_string();
}
The query takes and begin and end timestamps as its first and second arguments, and an optional trigger name as its third argument. The query returns a lazy stream over generated alerts for this trigger in the specified time-range.
See API docs for C++ and in-depth user guides on Data Storage and Conflo Queries for details on Confluo's supported operations.
Stand-alone Mode¶
In the stand-alone mode, Confluo runs as a daemon server, serving client requests using Apache Thrift protocol. To start the server, run:
confluod --address=127.0.0.1 --port=9090
Once the server daemon is running, you can query it using the C++, Python or Java client APIs. The client APIs closely resemble the embedded API.
We first create a new client connection to the Confluo daemon:
confluo::rpc::rpc_client client("127.0.0.1", 9090);
from confluo.rpc.client import RpcClient
client = RpcClient("127.0.0.1", 9090)
The first argument to the rpc_client
constructor corresponds to the server
hostname, while the second argument corresponds to the server port.
We then create a new Atomic MultiLog within the Store (synonymous to a database table); as before, this requires three parameters: a name for the Atomic MultiLog, a fixed schema, and a storage mode:
std::string schema = "{
timestamp: ULONG,
op_latency_ms: DOUBLE,
cpu_util: DOUBLE,
mem_avail: DOUBLE,
log_msg: STRING(100)
}";
auto storage_mode = confluo::storage::IN_MEMORY;
client.create_atomic_multilog("perf_log", schema, storage_mode);
from confluo.rpc.storage import StorageMode
schema = """{
timestamp: ULONG,
op_latency_ms: DOUBLE,
cpu_util: DOUBLE,
mem_avail: DOUBLE,
log_msg: STRING(100)
}"""
storage_mode = StorageMode.IN_MEMORY
client.create_atomic_multilog("perf_log", schema, storage_mode)
This operation also internally sets the current Atomic MultiLog
for the client to the one we just created (i.e., perf_log
).
We can define indexes as follows:
client.add_index("op_latency_ms");
client.add_index("op_latency_ms")
We can also install filters as follows:
client.add_filter("low_resources", "cpu_util>0.8 || mem_avail<0.1");
client.add_filter("low_resources", "cpu_util>0.8 || mem_avail<0.1")
Additionally, we can add aggregates on filters as follows:
client.add_aggregate("max_latency_ms", "low_resources", "MAX(op_latency_ms)");
client.add_aggregate("max_latency_ms", "low_resources", "MAX(op_latency_ms)")
Finally, we can install a trigger on an aggregate as follows:
client.install_trigger("high_latency_trigger", "max_latency_ms > 1000");
client.install_trigger("high_latency_trigger", "max_latency_ms > 1000")
To load data into the Atomic MultiLog:
size_t off1 = client.append({"100", "0.5", "0.9", "INFO: Launched 1 tasks"});
size_t off2 = client.append({"500", "0.9", "0.05", "WARN: Server {2} down"});
size_t off3 = client.append({"1001", "0.9", "0.03", "WARN: Server {2, 4, 5} down"});
off1 = client.append([100.0, 0.5, 0.9, "INFO: Launched 1 tasks"])
off2 = client.append([500.0, 0.9, 0.05, "WARN: Server {2} down"])
off3 = client.append([1001.0, 0.9, 0.03, "WARN: Server {2, 4, 5} down"])
Querying data in the Atomic MultiLog is also similar to the Embedded mode API.
It is straightforward to retrieve records given their offsets:
auto record1 = client.read(off1);
auto record2 = client.read(off2);
auto record3 = client.read(off3);
record1 = client.read(off1)
record2 = client.read(off2)
record3 = client.read(off3)
We can query indexed attributes as follows:
auto record_stream = client.execute_filter("cpu_util>0.5 || mem_avail<0.5");
for (auto s = record_stream; !s.empty(); ++s) {
std::cout << s.get().to_string();
}
record_stream = client.execute_filter("cpu_util>0.5 || mem_avail<0.5")
for r in record_stream:
print r
We can query a pre-defined filter as follows:
auto record_stream = client.query_filter("low_resources", 0, UINT64_MAX);
for (auto s = record_stream; !s.empty(); ++s) {
std::cout << s.get().to_string();
}
import sys
record_stream = client.query_filter("low_resources", 0, sys.maxsize)
for r in record_stream:
print r
We can obtian the value of a pre-defined aggregate as follows:
std::string value = client.get_aggregate("max_latency_ms", 0, UINT64_MAX);
std::cout << value;
import sys
value = client.get_aggregate("max_latency_ms", 0, sys.maxsize)
print value
Finally, we can obtain alerts generated by triggers installed on an Atomic MultiLog as follows:
auto alert_stream = client.get_alerts(0, UINT64_MAX, "high_latency_trigger");
for (auto s = alert_stream; !s.empty(); ++s) {
std::cout << s.get();
}
import sys
alert_stream = client.get_alerts(0, sys.maxsize, "high_latency_trigger")
for a in alert_stream:
print a