目录
1.kafka安装
1.1安装JDK1.8
1.2安装Zookeeper3.7
1.3 Kafka2.13安装
2.命令行测试
3.客户端程序开发
3.1 openssl编译
3.3 生产者
3.4 消费者
1.kafka安装
本地装了一套kafka的环境:
序号 |
名称 |
备注 |
下载链接 |
1 |
JDK1.8 |
Java开发环境 |
https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html |
2 |
Zookeeper3.7 |
分布式应用程序协调服务 |
http://mirror.bit.edu.cn/apache/zookeeper/ |
3 |
Kafka2.13 |
Kafka开发环境 |
http://kafka.apache.org/downloads.html |
下载如下:
1.1安装JDK1.8
步骤1:双击安装包,直到安装完成,
步骤2:需要添加以下的环境变量(右键点击“我的电脑” -> "高级系统设置" -> "环境变量" ),如下:
JAVA_HOME: C:\Program Files\Java\jdk1.8.0_171 (jdk的安装路径)
Path: 在现有的值后面添加"; %JAVA_HOME%\bin"
步骤3:1.3 打开cmd运行 "java -version" 查看当前系统Java的版本:
1.2安装Zookeeper3.7
步骤1:解压安装包apache-zookeeper-3.7.0-bin.tar.gz,
步骤2:打开zookeeper-3.4.13\conf,把zoo_sample.cfg重命名成zoo.cfg,从文本编辑器里打开zoo.cfg,把dataDir的值改成“./zookeeper-3.4.13/data”
步骤3:添加如下系统变量:
ZOOKEEPER_HOME: D:\kafkaPrestduy\apache-zookeeper-3.7.0-bin (zookeeper目录)
Path: 在现有的值后面添加 ";%ZOOKEEPER_HOME%\bin;"
1.3 Kafka2.13安装
步骤1:kafka_2.13-3.1.0.tgz解压
步骤2:打开kafka_2.11-2.0.0\config,从文本编辑器里打开 server.properties。把 log.dirs的值改成 “./logs”
2.命令行测试
步骤1:启动zookeeper,进入目录:D:\kafkaPrestduy\apache-zookeeper-3.7.0-bin\bin
执行如下:./zkserver
步骤2:启动kafka,进入目录:D:\kafkaPrestduy\kafka_2.13-3.1.0\bin\windows
启动服务: ./kafka-server-start.bat ..\..\config\server.properties
创建话题:
.\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
创建生产者:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
创建消费者:
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
查看话题:
./kafka-topics.bat --list --bootstrap-server localhost:9092
3.客户端程序开发
librdkafka依赖openssl,zlib与zstd库,所以先编译依赖,然后编译librdkafka
各个库使用版本:
库名 |
下载地址 |
openssl-1.0.2a |
/index.html |
librdkafka-1.4.x |
https://github.com/edenhill/librdkafka |
zlib |
NuGet获取(解决方案右键->获取NuGet) |
zstd |
NuGet获取(解决方案右键->获取NuGet) |
备注:zlib与zstd:D:\kafkaPrestduy\librdkafka-1.4.x\librdkafka-1.4.x\win32\packages
3.1 openssl编译
OpenSSL编译在vs2019,首先下载/index.html下,找并下载:openssl-1.0.2a
步骤1:下载编译环境Perl和NASM,双击安装即可
Download & Install Perl - ActiveState
https://www.nasm.us/pub/nasm/releasebuilds/2.14rc15/win64/nasm-2.14rc15-installer-x64.exe
步骤2:编译方式选择:在此x64的release版本:perl Configure VC-WIN64A
编译类别 |
平台 |
编译指令 |
debug |
x86 |
perl Configure debug-VC-WIN32 --prefix=xxx |
x64 |
perl Configure debug-VC-WIN64A --prefix=xxx |
|
release |
x86 |
perl Configure VC-WIN32 --prefix=xxx |
x64 |
perl Configure VC-WIN64A --prefix= |
注意:没有安装nasm,加上no-asm
--prefix参数:库生成目录
步骤3:创建 makefile 文件
创建32位makefile文件:ms\do_ms.bat
创建64位makefile文件 ms\do_win64a.ba
步骤4:编译动态库和静态库
编译动态库:
nmake -f ms\ntdll.mak
nmake -f ms\ntdll.mak test
nmake -f ms\ntdll.mak install
编译静态库:
lib nmake -f ms\nt.mak
nmake -f ms\nt.mak test
nmake -f ms\nt.mak install
备注:重新编译需要:
nmake -f ms\nt.mak clean
nmake -f ms\ntdll.mak clean
步骤5:在指定的目录下获取到库文件
3.2 librdkafka编译
步骤1:https://github.com/edenhill/librdkafka,下载并解压
步骤2:\librdkafka-1.4.x\win32\librdkafka.sln下使用vs2019打开
步骤3:编译librdkafka项目,点击编译,报错如下:
Error This project references NuGet package(s) that are missing on this computer. Use NuGet Package Restore to download them. For more information, see 必应
解决方法:
找到 项目文件librdkafka.csproj,打开后,移除下面:
<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild">
<PropertyGroup>
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
</PropertyGroup>
<Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" />
</Target>
2.zlib和zstd连接失败
在\librdkafka-1.4.x\win32\packages目录下,有zlib和libzstd编译好的库,只添加包含头文件和附加库目录,以及链接lib文件。
Zlib头文件:
\librdkafka-1.4.x\win32\packages\zlib.v140.windesktop.msvcstl.dyn.rt-dyn.1.2.8.8\build\native\include
zlib库文件:
librdkafka-1.4.x\win32\packages\zlib.v140.windesktop.msvcstl.dyn.rt-dyn.1.2.8.8\lib\native\v140\windesktop\msvcstl\dyn\rt-dyn\x64\Release
libzstd头文件:
\librdkafka-1.4.x\win32\packages\confluent.libzstd.redist.1.3.8-g9f9630f4-test1\build\native\include
Libzstd库文件:
\librdkafka-1.4.x\win32\packages\confluent.libzstd.redist.1.3.8-g9f9630f4-test1\build\native\lib\win\x64
3.确实openssl缺失
找到opensssl编译生成目录,添加包含头文件和附加库目录,以及链接lib文件
步骤4:编译libkafka,文件生产在\librdkafka-1.4.x\win32\outdir\v142\x64\Release
3.3 生产者
官方精简:
#define _CRT_SECURE_NO_WARNINGS
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include "rdkafka.h"
static volatile sig_atomic_t run = 1;
static void stop(int sig) {
run = 0;
fclose(stdin);
}
static void dr_msg_cb(rd_kafka_t* rk,const rd_kafka_message_t* rkmessage, void* opaque) {
if (rkmessage->err)
{
fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err));
}
else
{
fprintf(stderr, "Message delivered % d bytes partition % d\n", rkmessage->len, rkmessage->partition);
}
}
int main(int argc, char** argv)
{
rd_kafka_t* rk;
rd_kafka_conf_t* conf;
char errstr[512];
char buf[512];
const char* brokers;
const char* topic;
brokers = "localhost:9092";
topic = "test_2";
conf = rd_kafka_conf_new();
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%s\n", errstr);
return 1;
}
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk)
{
fprintf(stderr,"Failed to create new producer: %s\n", errstr);
return 1;
}
signal(SIGINT, stop);
fprintf(stderr,"Type some text and hit enter to produce message\n");
while (run && fgets(buf, sizeof(buf), stdin))
{
size_t len = strlen(buf);
rd_kafka_resp_err_t err;
if (buf[len - 1] == '\n')
{
buf[--len] = '\0';
}
if (len == 0) {
rd_kafka_poll(rk, 0);
continue;
}
retry:
err = rd_kafka_producev(rk,RD_KAFKA_V_TOPIC(topic),RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),RD_KAFKA_V_VALUE(buf, len),RD_KAFKA_V_OPAQUE(NULL),RD_KAFKA_V_END);
if(err)
{
fprintf(stderr,"Failed to produce to topic %s: %s\n",topic, rd_kafka_err2str(err));
if(err == RD_KAFKA_RESP_ERR__QUEUE_FULL)
{
rd_kafka_poll(rk, 1000);
goto retry;
}
}
rd_kafka_poll(rk, 0);
}
rd_kafka_flush(rk, 10 * 1000 );
if (rd_kafka_outq_len(rk) > 0)
{
fprintf(stderr, "%d message(s) were not delivered\n", rd_kafka_outq_len(rk));
}
rd_kafka_destroy(rk);
return 0;
}
3.4 消费者
#define _CRT_SECURE_NO_WARNINGS
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <ctype.h>
#include "rdkafka.h"
static volatile sig_atomic_t run = 1;
static void stop(int sig) {
run = 0;
}
static int is_printable(const char* buf, size_t size) {
size_t i;
for (i = 0; i < size; i++)
if (!isprint((int)buf[i]))
return 0;
return 1;
}
int main(int argc, char** argv) {
rd_kafka_t* rk=NULL;
rd_kafka_conf_t* conf = NULL;
rd_kafka_resp_err_t err;
char errstr[512];
const char* brokers = NULL;
const char* groupid = NULL;
char** topics = NULL;
int topic_cnt;
rd_kafka_topic_partition_list_t* subscription = NULL;
int i;
brokers = "localhost:9092";
groupid = "101";
char *name = (char*)malloc(10 * sizeof(char*));
strcpy(name,"test_2");
topics = &name;
topic_cnt = 1;
conf = rd_kafka_conf_new();
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "group.id", groupid,errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!rk) {
fprintf(stderr, "Failed to create new consumer: % s\n", errstr);
return 1;
}
conf = NULL;
rd_kafka_poll_set_consumer(rk);
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
for (i = 0; i < topic_cnt; i++)
rd_kafka_topic_partition_list_add(subscription,topics[i],RD_KAFKA_PARTITION_UA);
err = rd_kafka_subscribe(rk, subscription);
if (err) {
fprintf(stderr,"Failed to subscribe to %d topics: %s\n",subscription->cnt, rd_kafka_err2str(err));rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_destroy(rk);
return 1;
}
fprintf(stderr,"Subscribed to %d topic(s), " "waiting for rebalance and messages...\n",subscription->cnt);
rd_kafka_topic_partition_list_destroy(subscription);
signal(SIGINT, stop);
while (run) {
rd_kafka_message_t* rkm;
rkm = rd_kafka_consumer_poll(rk, 100);
if (!rkm)
continue;
if (rkm->err) {
fprintf(stderr,"%% Consumer error: %s\n",rd_kafka_message_errstr(rkm));rd_kafka_message_destroy(rkm);
continue;
}
printf("Message on %s %d at offset %d:\n",rd_kafka_topic_name(rkm->rkt), rkm->partition,rkm->offset);
if (rkm->key && is_printable((const char*)rkm->key, rkm->key_len))
{
printf(" Key: %.*s\n", (int)rkm->key_len, (const char*)rkm->key);
}
else if (rkm->key)
{
printf(" Key: (%d bytes)\n", (int)rkm->key_len);
}
if (rkm->payload && is_printable((const char*)rkm->payload, rkm->len))
{
printf(" Value: %.*s\n", (int)rkm->len, (const char*)rkm->payload);
}
else if (rkm->key)
{
printf(" Value: (%d bytes)\n", (int)rkm->len);
}
rd_kafka_message_destroy(rkm);
}
fprintf(stderr, "%% Closing consumer\n");
rd_kafka_consumer_close(rk);
/* Destroy the consumer */
rd_kafka_destroy(rk);
return 0;
}
运行结果:
参考:
librdkafka编译及简单使用过程简介_一缕阳光宣泄、整个世界的博客-CSDN博客_rdkafka编译
Windows 下编译 OpenSSL_青春不老,奋斗不止!-CSDN博客_openssl编译
用VS2019编译librdkafka库_eamon100的博客-CSDN博客
编译OpenSSL 动态库/静态库以及运行时库的选择_YuHengZuo的博客-CSDN博客