From 63d14e57ca570c55ff2d032b0b76b5d3a255b4ff Mon Sep 17 00:00:00 2001 From: luc Date: Tue, 15 Oct 2024 07:46:38 -0400 Subject: [PATCH] feat: Enhance Kafka setup and improve service dependencies - Replace manual Kafka setup with services-flake's apache-kafka service - Add Zookeeper service as a dependency for Kafka - Introduce kafkaPreStartup script to create the chat_topic - Update PostgreSQL service name from "pg1" to "pg" for consistency - Refine service dependencies: - Kafka now depends on Zookeeper - Topos depends on both PostgreSQL and Kafka - Remove redundant kafkaSetupHook - Adjust Kafka settings for simplified configuration - Include JVM options for Kafka to set heap size These changes streamline the service setup process and ensure proper initialization order, improving the overall reliability and maintainability of the development environment. --- flake.nix | 90 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/flake.nix b/flake.nix index 543c318..d8c95ab 100644 --- a/flake.nix +++ b/flake.nix @@ -69,6 +69,11 @@ configFile = pkgs.copyPathToStore ./config.yaml; yq = pkgs.yq-go; + kafkaPreStartup = '' + echo "Kafka is ready. Creating topic..." + ${pkgs.apacheKafka}/bin/kafka-topics.sh --create --topic chat_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 --if-not-exists + ''; + # Note: This only loads the settings from the repos config file # if one is not already set in the user's .config directory. toposSetupHook = '' @@ -85,48 +90,7 @@ else echo "Config file already exists at $TOPOS_CONFIG_PATH" fi - ''; - - kafkaSetupHook = '' - echo "Starting Kafka in Kraft mode..." - - # Set up necessary environment variables - export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M" - export KAFKA_KRAFT_MODE=true - echo ${pkgs.apacheKafka} - - # Prepare a default config for Kraft mode - if [ ! -f ./kafka.properties ]; then - echo "Initializing Kafka Kraft mode..." - - # Server 1 Kraft - cp ${pkgs.apacheKafka}/config/kraft/server.properties ./server-1.properties - sudo sed -i '57!s/PLAINTEXT/MQ/g' server-1.properties - sudo sed -i '30s/.*/controller.quorum.voters=1@localhost:9091/' server-1.properties - sudo sed -i '78s|.*|log.dirs=/tmp/kraft-combined-logs/server-1|' server-1.properties - sudo sed -i '27s|.*|node.id=1|' server-1.properties - sudo sed -i '42s|.*|listeners=MQ://:9092,CONTROLLER://:9091|' server-1.properties - sudo sed -i '92s|.*|offsets.topic.replication.factor=1|' server-1.properties - sudo sed -i '57s|.*|listener.security.protocol.map=CONTROLLER:PLAINTEXT,MQ:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL|' server-1.properties - - fi - - # Step 1 - KAFKA_CLUSTER_ID="$(${pkgs.apacheKafka}/bin/kafka-storage.sh random-uuid)" - - # Step 2 - ${pkgs.apacheKafka}/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c ./server-1.properties - - # Step 3 - ${pkgs.apacheKafka}/bin/kafka-server-start.sh ./server-1.properties & - - # Step 4 - echo "Kafka environment is ready to use and running in detached terminals." - - # Step 5 - ${pkgs.apacheKafka}/bin/kafka-topics.sh --create --topic chat_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 - - sleep 3 + ${kafkaPreStartup} ''; in @@ -156,7 +120,7 @@ models = [ "phi3" ]; }; - postgres."pg1" = { + postgres."pg" = { enable = true; package = pkgs.postgresql_16.withPackages (p: [ p.pgvector ]); port = 5432; @@ -203,10 +167,48 @@ }; }; + zookeeper."zookeeper".enable = true; + + apache-kafka."kafka" = { + enable = true; + port = 9092; + settings = { + "offsets.topic.replication.factor" = 1; + "zookeeper.connect" = [ "localhost:2181" ]; + }; + # settings = { + # "broker.id" = 1; + # "log.dirs" = [ "/tmp/kraft-combined-logs/server-1" ]; + # "listeners" = [ "PLAINTEXT://localhost:9092" "CONTROLLER://localhost:9091" ]; + # "listener.security.protocol.map" = "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"; + # "advertised.listeners" = "PLAINTEXT://localhost:9092"; + # "controller.quorum.voters" = "1@localhost:9091"; + # "controller.listener.names" = "CONTROLLER"; + # "process.roles" = "broker,controller"; + # "node.id" = 1; + # "offsets.topic.replication.factor" = 1; + # "transaction.state.log.replication.factor" = 1; + # "transaction.state.log.min.isr" = 1; + # "auto.create.topics.enable" = true; + # "num.partitions" = 1; + # }; + # clusterId = "$(${pkgs.apacheKafka}/bin/kafka-storage.sh random-uuid)"; + formatLogDirs = true; + formatLogDirsIgnoreFormatted = true; + jvmOptions = [ + "-Xmx512M" + "-Xms512M" + ]; + }; + topos.enable = true; topos.args = [ "run" ]; }; - settings.processes.topos.depends_on.pg1.condition = "process_healthy"; + settings.processes = { + kafka.depends_on."zookeeper".condition = "process_healthy"; + topos.depends_on.pg.condition = "process_healthy"; + topos.depends_on.kafka.condition = "process_healthy"; + }; }; packages = rec {